본문 바로가기
머신러닝, 딥러닝/파이토치

[파이토치 스터디] 준지도 학습 (Semi-Supervised Learning)

by 장찐 2022. 2. 24.

📚 준지도 학습 (Semi-supervised learning) 

 

 정답 라벨이 있는 데이터와 없는 데이터를 함께 사용해서 모델을 학습시키는 방법이다. 더 많은 데이터를 확보해서 성능을 향상할 수 있다는 장점이 있다. 

 


📚 의사 라벨링 (Pseudo labeling) 

 

 사전 학습 모델로 라벨링이 되지 않은 데이터를 예측하고, 그 예측값을 기준으로 라벨링을 해서 기존의 학습 데이터와 함께 사용하는 방식. 단, 예측으로 생성한 라벨 값을 학습에 사용하기 때문에 데이터가 정확하지 않을 수 있다. 의사 라벨링 방식은 크게 두 가지가 있다. 의사 라벨링을 이용하기 전 데이터를 준비하는 과정은 다음과 같이 동일하다. 

 

📘공통 전처리 과정 

 

📌라이브러리 불러오기 

import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset

import numpy as np 
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm  #for 문의 진행상황 알려줌 

# 현재 가능한 장치를 확인한다.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

 

📌데이터 로딩 함수 정의 

#데이터 가져오거 정의하는 양식 그대로 사용, 전처리 작업 위한 transform 추가 
class MyDataset(Dataset):
    
    def __init__(self, x_data, y_data, transform=None):
        self.x_data = x_data   # torch.floattensor로 들어옴
        self.y_data = y_data   #.view(-1,1) # torch.longtensor로 들어옴
        self.transform = transform
        self.len = len(y_data)
    
    def __getitem__(self, index):
        sample = self.x_data[index], self.y_data[index]
        
        if self.transform:
            sample = self.transform(sample)   #self.transform이 None이 아니라면 전처리를 작업한다.
        return sample 
    
    def __len__(self):
        return self.len

 이전에 사용한 함수와 동일 

 

 

📌데이터 전처리 함수 정의 

#데이터 전처리 방식 정의 
class TrainTransform:
    def __call__(self, sample):
        inputs, labels = sample
        #labels = labels.float()

        transf = transforms.Compose([
                    transforms.ToPILImage(),
                    transforms.RandomHorizontalFlip(),
                    transforms.ToTensor()
                    ])
        final_output = transf(inputs)      
        
        return final_output, labels

 이전에 사용한 함수와 동일 

 

 

📌데이터셋 균등 배분 함수 정의 

def balanced_subset(data, labels, num_cls, num_data): # 데이터 / 라벨 / 클래스 수 / 나눠지는 목표 데이터 개수
    num_data_per_class = num_data // num_cls
    data1 = torch.tensor([ ], dtype=torch.float)
    data2 = torch.tensor([ ], dtype=torch.float)
    labels1 = torch.tensor([ ], dtype=torch.long)
    labels2 = torch.tensor([ ], dtype=torch.long)

    #각 클래스마다 정의된 데이터 개수만큼 무작위로 뽑아서 저장
    for cls in range(num_cls):
        idx = np.where(labels.numpy() == cls)[0]    #np.where 사용 위해서 numpy로 변경 
        shuffled_idx = np.random.choice(len(idx), len(idx), replace=False)  #0~len(idx) 중에서 len(idx) 만큼 비복원 추출 

        data1 = torch.cat([data1, data[shuffled_idx[:num_data_per_class]]], dim=0)
        data2 = torch.cat([data2, data[shuffled_idx[num_data_per_class:]]], dim=0) 
            
        labels1 = torch.cat([labels1, labels[shuffled_idx[:num_data_per_class]]], dim=0)
        labels2 = torch.cat([labels2, labels[shuffled_idx[num_data_per_class:]]], dim=0)

    return data1, data2, labels1, labels2

balanced_subset(데이터, 라벨, 클래스 수 , 나눠지는 목표 데이터 개수) 를 입력받는다. 

 

라인2 : num_data_per_class 는 각 클래스에 속하는 데이터의 수를 의미한다. 

 

data1~labels2 에서 나눠지는 두 개의 세트를 저장하기 위해서 빈 데이터와 라벨 텐서를 생성한다. 그리고 for 루프를 돌면서 정의된 데이터 개수만큼 무작위로 뽑아서 저장한다. 

 

 

📌데이터 불러오기 

trainset = torchvision.datasets.MNIST(root='./data', train=True, download=True) # 50000

#라벨 vs 미 라벨 나누기 
labeled_data, unlabeled_data, labels, unlabels = balanced_subset(trainset.data, trainset.targets, num_cls=10, num_data=2000)
#라벨링 된 데이터 중에서 train vs val 나누기 
train_images, val_images, train_labels, val_labels = balanced_subset(labeled_data, labels, num_cls=10, num_data=1000)
print(labeled_data.shape)
print(unlabeled_data.shape)
print(labels.shape)
print(unlabels.shape)

print(train_images.shape)
print(val_images.shape)
print(train_labels.shape)
print(val_labels.shape)

MNIST 손글씨 데이터를 사용한다. 데이터를 불러오고 balanced_subset 함수를 이용해서 데이터셋을 분리한다. 우선 라벨 vs 미 라벨 데이터를 분리하고, 라벨링 된 데이터에 대해서 train vs val로 다시 분류한다. 실제 데이터는 6만개 이지만 여기서는 라벨링 된 데이터가 2000개만 있다고 가정한다. 그리고 train, val 데이터를 각각 1000개씩 사용하고 나머지는 모두 라벨링이 안 된 데이터라고 가정한다. 

 

train_images = train_images.unsqueeze(1) #unsquezze : 해당 위치에 1차원 추가 
val_images = val_images.unsqueeze(1)

trainset = MyDataset(train_images, train_labels, transform=TrainTransform())
trainloader = torch.utils.data.DataLoader(trainset, batch_size=128, shuffle=True) 

validationset = MyDataset(val_images, val_labels)
valloader = torch.utils.data.DataLoader(validationset, batch_size=128, shuffle=False)
print(train_images.shape)
print(val_images.shape)

 데이터로더를 정의한다. CNN은 입력 데이터가 4차원이어야 하는데 MNIST는 흑백이라서 (배치사이즈, 이미지 너비, 이미지 높이) 로 3차원 형태이다. 따라서 unsqeeze(1)를 통해서 인덱스 1에 해당하는 자리에 1차원을 추가한다. 학습 데이터에는 TrainTransform()을 적용하고 val 에는 적용하지 않는다. 

 

 

# 데이터 불러오기 및 전처리 작업
transform = transforms.Compose([transforms.ToTensor()])
testset = torchvision.datasets.MNIST(root='./data', train=False, download=True, transform=transform)
testloader = torch.utils.data.DataLoader(testset, batch_size=100,shuffle=False)

 동일한 방식으로 테스트 데이터도 불러오고 testloader에 담는다. 

 

 

 

📌모델 정의 

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.features = nn.Sequential(
                        nn.Conv2d(1, 64, 3), nn.ReLU(),
                        nn.MaxPool2d(2, 2),
                        nn.Conv2d(64, 192, 3, padding=1), nn.ReLU(),
                        nn.MaxPool2d(2, 2))       
        self.classifier = nn.Sequential(
                        nn.Dropout(0.5),
                        nn.Linear(192*6*6, 1024), nn.ReLU(),
                        nn.Dropout(0.5),
                        nn.Linear(1024, 512), nn.ReLU(),
                        nn.Linear(512, 10))          
    def forward(self, x):
        x = self.features(x)
        x = x.view(-1, 192*6*6)
        x = self.classifier(x)    
        return x

model = Net().to(device) # 모델 선언
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
#scheduler = optim.lr_scheduler.MultiStepLR(optimizer, milestones=[100,200], gamma=0.1)

임의의 모델을 정의한다. 

 

 

📌정확도 평가 함수 정의 

def accuracy(dataloader):
    correct = 0
    total = 0
    with torch.no_grad():
        model.eval()
        for data in dataloader:
            images, labels = data[0].to(device), data[1].to(device)       
            outputs = model(images)
            _, predicted = torch.max(outputs.detach(), 1)
            total += labels.size(0)      
            correct += (predicted == labels).sum().item()

    acc = 100*correct/total
    model.train()
    return acc

 

📌라벨링된 데이터로 벤치마크 성능 확인 

best_acc = 0
for epoch in range(501):
    correct = 0
    total = 0
    for traindata in trainloader: 
       
        inputs, labels = traindata[0].to(device), traindata[1].to(device)     
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)      
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(outputs.detach(), 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    #성능 가장 좋은 모델 저장 
    val_acc = accuracy(valloader)
    if val_acc >= best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), 'cifar_model_for_pseudo_baseline.pth')  
        print('[%d] train acc: %.2f, validation acc: %.2f - Saved the best model' %(epoch, 100*correct/total, val_acc))  
    elif epoch % 10 == 0:
        print('[%d] train acc: %.2f, validation acc: %.2f' %(epoch, 100*correct/total, val_acc))
model.load_state_dict(torch.load('./models/cifar_model_for_pseudo_baseline.pth'))
accuracy(testloader)

라벨링이 되어 있는 1000개의 데이터로 학습을 실시해서 벤치마크 성능을 확인할 수 있다. 

 

 


 

✅ 의사라벨링 방법 1 

 

 의사 라벨링을 포함해서 학습할 경우에는 부정확한 라벨리 섞여 있어서 모델의 성능을 저하시킬 수 있다. 따라서 원래 라벨이 붙어있는 '훈련 라벨'과 '의사 라벨'을 구분하여 별도의 손실 함수 L_tL_p 를 계산한다. 그리고 이 두 개의 손실 함수를 가중치를 곱해서 더해서 최종 손실 함수를 계산한다. L = L_t + (alpht * L_p)  가중치 alpha가 0일 경우 훈련 라벨로만 모델을 최적화한다는 뜻이고, alpha가 커질수록 의사 라벨의 영향도가 커진다는 의미이다. 

 

 

📌모델 재정의 

#파라미터 초기화하고 모델 재정의
model = Net().to(device) # 모델 선언
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

 파라미터를 초기화하고 모델을 재정의 한다. 

 

 

📌모델 학습 

alpha = 0    # 의사라벨 손실함수에 대한 가중치 
alpha_t = 1e-4
T1 = 100
T2 = 450
best_acc = 0

for epoch in range(501):
    correct = 0
    total = 0
    for traindata, pseudodata in zip(trainloader, unlabeledloader): 
       
        inputs, labels = traindata[0].to(device), traindata[1].to(device)     
        pinputs = pseudodata[0].to(device) 
        optimizer.zero_grad()
        outputs = model(inputs)

        if alpha > 0:    #alpha>0이면 의사 라벨 포함해서 손실함수 계산         
            poutputs = model(pinputs)  
            _, plabels = torch.max(poutputs.detach(), 1)     
            loss = criterion(outputs, labels)  + alpha*criterion(poutputs, plabels)   
        
        else:    #alpha=0이면 학습 데이터로만 손실함수 계산 
            loss = criterion(outputs, labels)    
              
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(outputs.detach(), 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    if (epoch > T1) and (epoch < T2):  #epoch이 100부터 450까지일 때 
        alpha = alpha_t*(epoch - T1)/(T2 - T1)

    elif epoch >= T2:    #epoch이 450이상일 때 
        alpha = alpha_t

    val_acc = accuracy(valloader)
    if val_acc >= best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), 'cifar_model_for_pseudo_label.pth')    
        print('[%d] train acc: %.2f, validation acc: %.2f - Saved the best model' %(epoch, 100*correct/total, val_acc))  

    elif epoch % 10 == 0:
        print('[%d] train acc: %.2f, validation acc: %.2f' %(epoch, 100*correct/total, val_acc))

 여기서는 alpha를 0부터 1e-4까지 점차 높여가면서 학습을 진행한다.  epoch=100 까지는 alpha=0으로 학습을 진행하고, 100회~450회 까지는 alpha를 일정하게 크게 만들어 가면서 학습을 진행한다. 450회가 지나면 alpha_t 로 고정해서 학습을 진행하고 마무리한다. 

 

model.load_state_dict(torch.load('./models/cifar_model_for_pseudo_label.pth'))
accuracy(testloader)

모델 성능을 평가한다. 벤치마크 모델보다 성능이 어느 정도 증가한 것을 확인할 수 있다. 

 

 


 

✅ 의사라벨링 방법 2

 

 사전 모델을 통해서 의사 라벨을 생성하고, 이를 실제 데이터로 이용하여 라벨링을 실시한다. 

model = Net().to(device) # 모델 선언
model.load_state_dict(torch.load('cifar_model_for_pseudo_baseline.pth'))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)

 

📌의사 라벨 생성 

pseudo_threshold = 0.99
pseudo_images = torch.tensor([], dtype=torch.float)
pseudo_labels = torch.tensor([], dtype=torch.long)

with torch.no_grad():
    for data in tqdm(unlabeledloader):
        model.eval()
        images = data[0].to(device)
        outputs = model(images)
        #print(outputs.size())
        outputs = torch.nn.functional.softmax(outputs, dim=1)
        
        max_val, predicted = torch.max(outputs.detach(), 1)

        idx = np.where(max_val.cpu().numpy() >= pseudo_threshold)[0]
        
        if len(idx) > 0:
            pseudo_images = torch.cat((pseudo_images, images.cpu()[idx]), 0) 
            pseudo_labels = torch.cat((pseudo_labels, predicted.cpu()[idx]), 0)

#print(pseudo_images.size(), pseudo_labels.size())

라인 11에서 출력 벡터를 0과 1 사이 값으로 만들기 위해서 소프트맥스 함수를 적용한다. 

마지막 부분에서 기준으로 설정한 0.99보다 큰 데이터가 있으면 그 데이터를 빈 텐서 리스트에 저장한다. 

 

print(pseudo_images.size(), pseudo_labels.size())

 

📌데이터 로더 정의 

pseudo_dataset = MyDataset(pseudo_images, pseudo_labels)
pseudoloader = torch.utils.data.DataLoader(pseudo_dataset, batch_size=256, shuffle=True)

 

📌모델 학습 

alpha = 0
alpha_t = 1e-4
T1 = 20
T2 = 450
best_acc = 0

for epoch in range(501):
    correct = 0
    total = 0
    for traindata, pseudodata in zip(trainloader, pseudoloader): 
       
        inputs, labels = traindata[0].to(device), traindata[1].to(device)     
        pinputs, plabels = pseudodata[0].to(device), pseudodata[1].to(device)    
        optimizer.zero_grad()
        outputs = model(inputs)
        poutputs = model(pinputs)
        loss = criterion(outputs, labels) + alpha*criterion(poutputs, plabels)         
        loss.backward()
        optimizer.step()
        _, predicted = torch.max(outputs.detach(), 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    #scheduler.step()
    if (epoch > T1) and (epoch < T2):
        alpha = alpha_t*(epoch - T1)/(T2 - T1)
        
    elif epoch >= T2:    
        alpha = alpha_t

    val_acc = accuracy(valloader)
    if val_acc >= best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(), './models/cifar_model_for_pseudo_label2.pth') 
        print('[%d] train acc: %.2f, validation acc: %.2f - Saved the best model' %(epoch, 100*correct/total, val_acc))  

    elif epoch % 10 == 0:
        print('[%d] train acc: %.2f, validation acc: %.2f' %(epoch, 100*correct/total, val_acc))

 

 

model.load_state_dict(torch.load('./models/cifar_model_for_pseudo_label2.pth'))
accuracy(testloader)

 

 


📚  Reference

 

 

댓글