일상/컴퓨터

Fake News Detection 모델 파이썬으로 구현해보기

미적미적달팽이 2024. 9. 1. 17:56

 

2024.08.11 - [일상/컴퓨터] - [논문 리뷰] 트랜스포머 transformer 기반 가짜 뉴스 탐지

 

[논문 리뷰] 트랜스포머 transformer 기반 가짜 뉴스 탐지

논문 링크: https://rdcu.be/dQBgs    최근 인터넷 커뮤니티의 발달로 인하여 생기는 부작용들이 많다. 특히나 틱톡, 유튜브, 인스타그램, 포털 사이트, 커뮤니티 사이트 등 다양한 플랫폼들의 탄생으

gunrestaurant.tistory.com

 

지난번에 읽었던 논문을 참고해서 가짜 뉴스 탐지 모델을 파이썬으로 구현을 해보려고 한다.

논문에서도 구글 코랩으로 구현했다고 하지만 결제로 할 수 있는 더 큰 리소스가 필요할 것 같아서 좀 경량화시켜서 시도를 해보려고 한다.

사용한 데이터는 캐글에서 찾아서 사용하였다.

https://www.kaggle.com/datasets/ruchi798/source-based-news-classification?select=news_articles.csv

 

Source based Fake News Classification

Classification of news by type and labels

www.kaggle.com

뉴스데이터와 함께 사용할 만한 소셜 컨텍스트 데이터는 없지만 캐글에서도 잘 찾아보면 단순 타이틀 텍스트뿐만이 아닌 뉴스 데이터도 꽤 있다. 구동은 캐글에서 진행했다.

 

일단 데이터를 불러왔다.

import pandas as pd

# 데이터셋 로드
df = pd.read_csv('/content/news_articles.csv')

# 데이터셋의 첫 몇 행 표시
df.head()

데이터에는 12개의 열과 2096개의 열이 존재한다.

# 데이터셋 구조 확인
print(df.info())

# 결측값 확인
print(df.isnull().sum())

그 다음에는 결측치를 확인했는데 텍스트 열에 결측치와 다른 곳에 결측치가 존재해서 이를 전처리 하고자 결측치가 없는 행을 제거하고자 한다.

# 나머지 결측값이 있는 행 삭제
df = df.dropna()

# 결측값이 제거된 후 데이터셋 확인
print(df.info())

# 결측값 확인
print(df.isnull().sum())

결측치 제거 후 잘 제거 됐는지 확인이 필요하다면 다시 isnull 메소드를 사용해주자.

텍스트 데이터를 사용하는 것이기에 텍스트 전처리를 해줘야한다.

import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import string
import re

# nltk 리소스 다운로드 (한 번만 실행)
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')

전처리를 위해 nltk 리소스를 불러와준다.

# 표제어 추출기 및 불용어 초기화
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))

def preprocess_text(text):
    # 텍스트를 소문자로 변환
    text = text.lower()
    
    # 구두점 및 특수 문자 제거
    text = re.sub(f"[{re.escape(string.punctuation)}]", "", text)
    
    # 토큰화
    words = word_tokenize(text)
    
    # 불용어 제거 및 표제어 추출
    words = [lemmatizer.lemmatize(word) for word in words if word not in stop_words]
    
    return " ".join(words)

# 'text' 및 'title' 열에 전처리 적용
df['text'] = df['text'].apply(preprocess_text)
df['title'] = df['title'].apply(preprocess_text)

이후 불용어와 특수문자 등을 제거해준다. 우리 머신러닝님이 알아들으실 수 있도록 자연어를 기계어로 만들어서 먹여줘야한다.

from sklearn.preprocessing import LabelEncoder

# 'label' 열에 레이블 인코딩 적용
label_encoder = LabelEncoder()
df['label'] = label_encoder.fit_transform(df['label'])

# 다른 범주형 열에도 필요 시 적용
df['type'] = label_encoder.fit_transform(df['type'])

label이 REAL과 FAKE로 되어 있으므로 숫자 데이터로 인코딩을 해줘야 한다. 따로 추가적으로 사용하고 싶은 범주형 데이터에도 똑같이 인코딩을 적용해준다.

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from transformers import BartTokenizer, BartForConditionalGeneration
import pandas as pd

class NewsDataset(Dataset):
    def __init__(self, texts, titles, labels):
        self.texts = texts
        self.titles = titles
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return {
            'input_ids': self.texts['input_ids'][idx],
            'attention_mask': self.texts['attention_mask'][idx],
            'decoder_input_ids': self.titles['input_ids'][idx],
            'decoder_attention_mask': self.titles['attention_mask'][idx],
            'labels': torch.tensor(self.labels[idx], dtype=torch.long)
        }

데이터셋을 BART 모델에 넣기 위해 데이터를 토큰화하고 필요한 입력 형식으로 전환하기 위해 class를 지정해준다.

# 필요한 전처리된 데이터만 사용
texts = df['text']
titles = df['title']
labels = df['label']

# BART 토크나이저 초기화
tokenizer = BartTokenizer.from_pretrained('facebook/bart-large')

# 토크나이징 및 인코딩
encoded_texts = tokenizer(texts.tolist(), padding=True, truncation=True, return_tensors="pt", max_length=512)
encoded_titles = tokenizer(titles.tolist(), padding=True, truncation=True, return_tensors="pt", max_length=512)

앞에서 전처리한 데이터를 불러와서 BART 토크나이저에 넣어준다.

# BART 모델 구성 설정
config = BartConfig(
    vocab_size=50265,  # 어휘 크기
    d_model=1024,  # 레이어의 차원 크기
    encoder_layers=8,  # 인코더 레이어 수 증가
    decoder_layers=8,  # 디코더 레이어 수 증가
    encoder_attention_heads=8,  # 인코더의 어텐션 헤드 수
    decoder_attention_heads=8,  # 디코더의 어텐션 헤드 수
    encoder_ffn_dim=2048,  # 인코더 피드포워드 레이어의 차원 증가
    decoder_ffn_dim=2048,  # 디코더 피드포워드 레이어의 차원 증가
    activation_function='gelu',  # 활성화 함수
    max_position_embeddings=1024,  # 포지션 임베딩의 최대 크기
    num_labels=2,  # 클래스 수 (fake, real)
    dropout=0.2  # 드롭아웃 확률 증가
)

 

BART 모델 하이퍼 파라미터를 설정하기 위해 미리 config로 지정을 한다. 똑같이 사용할 필요가 없고 자기가 원하는 대로 설정한다. 

class FNDNSModel(nn.Module):
    def __init__(self, config):
        super(FNDNSModel, self).__init__()
        # 전달된 config를 사용하여 BART 모델 초기화
        self.bart = BartModel(config)
        self.classification_head = nn.Linear(config.d_model, config.num_labels)  # 2개의 클래스 (fake, real)
    
    def forward(self, input_ids, attention_mask, decoder_input_ids, decoder_attention_mask, labels=None):
        # BART의 출력 (디코더의 마지막 숨겨진 상태)
        outputs = self.bart(
            input_ids=input_ids,
            attention_mask=attention_mask,
            decoder_input_ids=decoder_input_ids,
            decoder_attention_mask=decoder_attention_mask,
        )
        
        # 디코더의 마지막 토큰의 숨겨진 상태 가져오기
        hidden_states = outputs.last_hidden_state
        
        # 각 샘플에 대해 마지막 토큰의 숨겨진 상태만 가져옴
        decoder_output = hidden_states[:, -1, :]  # [batch_size, d_model]
        
        # 분류 레이어로 전달
        logits = self.classification_head(decoder_output)
        
        # 손실 계산
        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)
        
        return loss, logits

최대한 논문에서 사용한 대로 구현을 하기 위해 FND-NS 모델로 함수를 지정해준다. config를 전달해주기 위해 미리 인자를 설정해준다.

from tqdm import tqdm
from sklearn.metrics import accuracy_score
from torch.amp import autocast, GradScaler  # torch.amp 모듈 사용

# 모델 초기화 및 디바이스 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = FNDNSModel(config).to(device)
model.train()

# 옵티마이저 설정
optimizer = optim.Adam(model.parameters(), lr=1e-4)

# 학습 루프 설정
num_epochs = 10  # 논문에서 사용한 에폭 수

# 혼합 정밀도 스케일러 초기화
scaler = GradScaler()

for epoch in range(num_epochs):
    total_loss = 0
    all_predictions = []
    all_labels = []
    
    # tqdm을 사용하여 데이터 로더를 래핑하여 진행 상황 표시
    progress_bar = tqdm(dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch in progress_bar:
        optimizer.zero_grad()
        
        with autocast(device_type='cuda'):  # 혼합 정밀도 사용
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            decoder_input_ids = batch['decoder_input_ids'].to(device)
            decoder_attention_mask = batch['decoder_attention_mask'].to(device)
            labels = batch['labels'].to(device)
            
            # 모델에 전달하여 출력 및 손실 계산
            loss, logits = model(
                input_ids=input_ids,
                attention_mask=attention_mask,
                decoder_input_ids=decoder_input_ids,
                decoder_attention_mask=decoder_attention_mask,
                labels=labels
            )
        
        # 역전파 및 최적화
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        total_loss += loss.item()
        
        # 예측값과 실제값을 수집
        predictions = torch.argmax(logits, dim=-1)
        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        
        # tqdm 진행 상황 업데이트
        progress_bar.set_postfix(loss=loss.item())

    # 에폭 단위로 평균 손실 및 정확도 계산
    avg_loss = total_loss / len(dataloader)
    accuracy = accuracy_score(all_labels, all_predictions)
    print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")

모델에 config를 전달해주고 진행 상황을 확인하게 tqdm을 불러 와서 루프에 넣어준다. 

 

그런데 성능이

Epoch 1/10: 100%|██████████| 2/2 [00:00<00:00,  6.09it/s, loss=7.11]
Epoch 1/10, Average Loss: 3.8566, Accuracy: 0.5625
Epoch 2/10: 100%|██████████| 2/2 [00:00<00:00,  5.13it/s, loss=4.13]
Epoch 2/10, Average Loss: 2.9444, Accuracy: 0.6875
Epoch 3/10: 100%|██████████| 2/2 [00:00<00:00,  6.19it/s, loss=1.75]
Epoch 3/10, Average Loss: 2.5118, Accuracy: 0.3125
Epoch 4/10: 100%|██████████| 2/2 [00:00<00:00,  5.14it/s, loss=1.64]
Epoch 4/10, Average Loss: 1.4093, Accuracy: 0.3125
Epoch 5/10: 100%|██████████| 2/2 [00:00<00:00,  5.16it/s, loss=1.72]
Epoch 5/10, Average Loss: 1.1094, Accuracy: 0.6875
Epoch 6/10: 100%|██████████| 2/2 [00:00<00:00,  5.10it/s, loss=0.946]
Epoch 6/10, Average Loss: 0.6521, Accuracy: 0.6875
Epoch 7/10: 100%|██████████| 2/2 [00:00<00:00,  5.10it/s, loss=0.729]
Epoch 7/10, Average Loss: 0.6850, Accuracy: 0.6250
Epoch 8/10: 100%|██████████| 2/2 [00:00<00:00,  5.09it/s, loss=0.733]
Epoch 8/10, Average Loss: 0.8878, Accuracy: 0.3125
Epoch 9/10: 100%|██████████| 2/2 [00:00<00:00,  5.09it/s, loss=0.821]
Epoch 9/10, Average Loss: 0.7134, Accuracy: 0.6875
Epoch 10/10: 100%|██████████| 2/2 [00:00<00:00,  5.10it/s, loss=1.07]
Epoch 10/10, Average Loss: 0.7381, Accuracy: 0.6875

 나쁘지 않은데 나쁘다. 사실 논문 그대로 하이퍼 파라미터를 사용해봤는데 성능이 너무 늘쑥 날쑥에 50%의 정확도가 나와서 이게 최대였다. 정확도는 나쁘지 않은데 loss가 너무 구려서 미리 사전에 학습된 모델을 사용해보자.

from transformers import BartTokenizer, BartForSequenceClassification

# BART 모델과 토크나이저 초기화 (사전 학습된 모델 로드)
model_name = "facebook/bart-large-mnli"  # MNLI 데이터셋으로 사전 학습된 BART 모델
tokenizer = BartTokenizer.from_pretrained(model_name)
model = BartForSequenceClassification.from_pretrained(model_name, num_labels=2, ignore_mismatched_sizes=True)  # 크기 불일치 무시

# 디바이스 설정 (GPU 사용 가능 시)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

모델을 불러와서 GPU를 사용할 수 있게 전달해준다.

from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader, Dataset

# 텍스트와 레이블을 리스트로 변환
texts = df['text'].tolist()
labels = df['label'].tolist()

# 학습 데이터셋과 평가 데이터셋으로 분리
train_texts, eval_texts, train_labels, eval_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)

# 텍스트 데이터를 모델 입력 형식으로 인코딩
train_encodings = tokenizer(
    train_texts,
    max_length=512,  # 필요한 최대 시퀀스 길이로 조정
    padding='max_length',
    truncation=True,
    return_tensors='pt'
)

eval_encodings = tokenizer(
    eval_texts,
    max_length=512,
    padding='max_length',
    truncation=True,
    return_tensors='pt'
)

# PyTorch 데이터셋 생성
class NewsDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        item = {key: val[idx] for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

# 학습 데이터셋 및 평가 데이터셋 생성
train_dataset = NewsDataset(train_encodings, train_labels)
eval_dataset = NewsDataset(eval_encodings, eval_labels)

# 데이터 로더 생성
train_dataloader = DataLoader(train_dataset, batch_size=8, shuffle=True)
eval_dataloader = DataLoader(eval_dataset, batch_size=8, shuffle=False)

데이터셋을 새로운 형식으로 전달해주게 데이터로더를 새로 만들어주고 평가용으로 evaluate 데이터셋도 새로 만들어준다.

from transformers import AdamW
from torch.cuda.amp import autocast, GradScaler

# 옵티마이저 설정 (AdamW 사용)
optimizer = AdamW(model.parameters(), lr=5e-5)

# 혼합 정밀도 스케일러 초기화
scaler = GradScaler()

# 학습 루프
num_epochs = 10  # 에폭 수
for epoch in range(num_epochs):
    total_loss = 0
    all_predictions = []
    all_labels = []

    model.train()
    progress_bar = tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{num_epochs}")

    for batch in progress_bar:
        optimizer.zero_grad()
        
        # 데이터 로드 및 디바이스로 이동
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        labels = batch['labels'].to(device)
        
        with autocast():  # 혼합 정밀도 사용
            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            logits = outputs.logits
        
        # 역전파 및 최적화
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        total_loss += loss.item()
        
        # 예측값과 실제값 수집
        predictions = torch.argmax(logits, dim=-1)
        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
        
        # tqdm 진행 상황 업데이트
        progress_bar.set_postfix(loss=loss.item())

    # 에폭 단위로 평균 손실 및 정확도 계산
    avg_loss = total_loss / len(train_dataloader)
    accuracy = accuracy_score(all_labels, all_predictions)
    print(f"Epoch {epoch+1}/{num_epochs}, Average Loss: {avg_loss:.4f}, Accuracy: {accuracy:.4f}")

이후 데이터를 전달해주고 성능을 확인해보니

Epoch 10/10: 100%|██████████| 205/205 [01:25<00:00,  2.39it/s, loss=0.855]Epoch 10/10, Average Loss: 0.6686, Accuracy: 0.6271

왜 성능이 더 떨어진 느낌이 드는 것일까? 뭐가 잘못되도 대단히 잘못된 것 같다. 심지어 저 성능이 10 epoch동안 계속 나왔다....

 

반응형