본문 바로가기
머신러닝, 딥러닝/파이토치

[파이토치 스터디] 신경망으로 회귀분석 + Cross Validation

by 장찐 2022. 1. 29.

◈ 모델 구조 확인 

 

파이토치에서 기본적인 모델 구조와 파라미터를 확인하는 방법 

import torch
from torch import nn
import torch.nn.functional as F
from torchsummary import summary

class Regressor(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(13, 50, bias=True) #첫 번째 레이어 
        self.fc2 = nn.Linear(50, 30, bias=True) #두 번째 레이어
        self.fc3 = nn.Linear(30, 1, bias=True) #출력 레이어 
        self.dropout = nn.Dropout(0.5) #연산 마다 50%의 노드를 랜덤하게 없앤다
        
    def forward(self, x):
        x = F.relu(self.fc1(x)) #활성화 함수 적용 
        x = self.dropout(F.relu(self.fc2(x))) #은닉층에서 전달할 때, 50% 를 dropout 
        x = F.relu(self.fc3(x))
        return x

model = Regressor()

 

✔ 전체 모델 구조 확인 

#모델 구조 확인 
print(model)

 

 

✔ 개별 파라미터 이름와 수 확인 

#개별 파라미터 확인 : 개별 변수명과 변수 갯수 한번에 확인 
for name, param in model.named_parameters():
    print(name, param.size())

 

✔ 모델 summary 

#전체 요약 : input_size 입력해야 함 
summary(model, (10,13))

두 번째 인자로 반드시 input 크기를 입력해 줘야 한다. : summary(model, (행, 열)) 

열 크기가 안 맞으면 에러 발생함. 

 

 


 

◈ 단층 퍼셉트론 - 선형회귀 모델 구현 (5.1) 

 

 선형 회귀모델은 입력층과 출력층만 있는데 이를 단층 퍼셉트론 이라고 한다. 이러한 선형 모델은 XOR 문제를 해결할 수가 없었는데, 은닉층을 추가한 다층 퍼셉트론을 통해서 이러한 문제를 해결할 수 있었다. 

import torch
import torch.nn as nn
from matplotlib import pyplot as plt

#선형 회귀모델을 파이토치로 구현 
x = torch.FloatTensor(range(5)).unsqueeze(1) #1차원 형태로 
y = 2*x + torch.rand(5,1)

필요한 라이브러리를 불러오고 임의의 데이터를 생성한다. 

 

 

def LinearRegressor(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc = nn.Linear(1, 1, bias=True)
        
    def forward(self, x):
        y = self.fc(x)
        
        return y

라인 4 : 

선형 회귀식 y = wx + b 는 nn.Linear(N, M, bias=True)로 표현할 수 있다. 여기서 N = 입력 노드 수, M = 출력 노드 수 이다. 

 

라인 6 : 

forward는 x값이 들어와서 연산이 진행되어서 y로 가게끔 순서와 방법을 정하는 곳이다. 

self.fc로 위에서 정의된 선형식으로 x값을 받아서 y 를 반환하도록 함 

 

 

model = LinearRegressor()
learning_rate = 1e-3
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

모델을 선언하고 학습률, 손실함수, 옵티마이저를 설정한다. 

 

 

loss_stack = [] #에폭마다 손실함수 값 저장 

for epoch in range(1001):
    optimizer.zero_grad() #매 에폭마다 누적된 값을 초기화 
    
    y_hat = model(x) 
    loss = criterion(y_hat,y)
    
    loss.backward() #역전파 기준을 손실함수로 설정 
    optimizer.step() #옵티마이저로 최적화 실시 
    loss_stack.append(loss.item()) #손실함수 값 그리기 위해서 저장 

    if epoch % 100 ==0:
        print(f'Epoch {epoch}:{loss.item()}')

전반적으로 텐서플로우, 케라스와는 코드 다른 부분들이 많기 때문에 유의. 

 

 

with torch.no_grad():
    y_hat = model(x)

모델 학습을 실시한다. 예측을 할 때는 라인1 을 통해서 requries_grad를 비활성화해야한다. 

 

 

plt.figure(figsize=(10,5))
plt.subplot(121)
plt.plot(loss_stack)
plt.title('Loss')

#예측 결과 그림 
plt.subplot(122)
plt.plot(x,y,'.b')
plt.plot(x,y_hat,'r-')
plt.legend(['ground truth','prediction'])
plt.title('prediction result')
plt.show()

 

 

 

 


 

다층 신경망 : 집값 예측하기 

 비선형 데이터 구조를 학습하기 위한 다층 신경망 모델을 구축한다. 

 

✅ 전처리 

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error

import torch 
from torch import nn,optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F

df = pd.read_csv('C:/Users/Yeong/Desktop/파이토치스터디/data/reg.csv',index_col=[0])

df.shape

#넘파이 배열로 만들기
X = df.drop('Price', axis=1).to_numpy()
Y = df['Price'].to_numpy().reshape(-1,1)

x_train, x_test, y_train, y_test = train_test_split(X,Y, test_size=0.5)

데이터를 불러오고 train/test로 분리한다. 

 

class TensorData(Dataset):
    
    def __init__(self, x_data, y_data):
        self.x_data = torch.FloatTensor(x_data) #데이터를 받아서 텐서로 변환 
        self.y_data = torch.FloatTensor(y_data)
        self.len = self.y_data.shape[0]
    
    def __getitem__(self,index):
        #dataloder로 샘플이 요청되면, 인덱스에 해당하는 샘플을 찾아서 반환함 
        return self.x_data[index], self.y_data[index] 
    
    def __len__(self):
        return self.len #크기 반환 

trainsets= TensorData(x_train, y_train)
trainloader = torch.utils.data.DataLoader(trainsets, batch_size=32, shuffle=True) #데이터를 배치 형태로 사용할 수 있도록

testsets = TensorData(x_test, y_test)
testloader = torch.utils.data.DataLoader(testsets, batch_size=32, shuffle=False)

데이터를 배치 형태로 학습할 수 있도록 변경한다. 

각 loader는 배치 반복 수가 들어간다. 

라인 19에서 테스트셋에 대해서는 shuffle 을 실시하지 않으므로 유의. 

 

✅ 모델 구축 

class Regressor(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(13, 50, bias=True) #첫 번째 레이어 
        self.fc2 = nn.Linear(50, 30, bias=True) #두 번째 레이어
        self.fc3 = nn.Linear(30, 1, bias=True) #출력 레이어 
        self.dropout = nn.Dropout(0.5) #연산 마다 50%의 노드를 랜덤하게 없앤다
        
    def forward(self, x):
        x = F.relu(self.fc1(x)) #활성화 함수 적용 
        x = self.dropout(F.relu(self.fc2(x))) #은닉층에서 전달할 때, 50% 를 dropout 
        x = F.relu(self.fc3(x))
        return x

모델을 정의한다. 레이어의 수, 노드를 설정하는 부분과 활성화 함수를 설정하는 부분은 별도의 함수로 설정한다. 

dropout 은 학습할 때 과적합 방지를 위해서 일정 비율의 노드를 사용하지 않는 것인데, 여기서는 랜덤으로 50%를 배제한다. 사용자가 원하는 위치에 설정할 수 있지만, 절대로 output layer에 사용해서는 안된다. 

 

✅ 모델 학습 

model = Regressor()
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-7)
#weight decay : L2 정규화에서 패널티 계수를 의미함. 클수록 제약조건 강함

모델을 학습한다. 여기서 weight_decay는 L2 정규화의 패널티 계수를 의미한다. 값이 클수록 제약조건이 강하다. 

 

loss_ = []
n = len(trainloader)
for epoch in range(400): #400번 반복 학습 
    running_loss =0.0 #매 에폭의 평균 loss 구하기 위해서 초기값 0으로 
    for data in trainloader: #각 배치를 불러온다 
        inputs, values = data
        optimizer.zero_grad()
        outputs = model(inputs) #예측값 산출 
        #손실함수 계산 및 최적화 
        loss = criterion(outputs, values) 
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
    loss_.append(running_loss/n)

에폭을 반복하면서 모델을 학습한다. 

 

✅ trainset 손실함수 확인 

plt.plot(loss_)
plt.title('손실함수')
plt.xlabel('epoch')
plt.show()

epoch에 따라서 loss가 줄어드는 것을 확인할 수 있다. 

 

 

 

 

✅ Testset 에서 모델 평가하기 

def evaluation(dataloader):
    
    predictions = torch.tensor([], dtype=torch.float) #예측값 저장을 위한 빈 텐서 
    actual = torch.tensor([], dtype=torch.float) #실제값 저장을 위한 빈 텐서 
    with torch.no_grad(): #requires_grad 비활성화 
        model.eval() #dropout과 같은 모델 학습시에만 사용하는 기법들을 비활성화 
        
        #배치 단위로 데이터를 예측하고 예측값과 실제값을 누적해서 저장 
        for data in dataloader:
            inputs, values = data
            outputs = model(inputs)
            
            #0차원으로 누적한다는 의미
            predictions = torch.cat((predictions, outputs), 0)  
            actual = torch.cat((actual, values), 0)
    
    predictions = predictions.numpy()
    actual = actual.numpy()
    rmse = np.sqrt(mean_squared_error(predictions, actual))
    
    return rmse
train_rmse = evaluation(trainloader)
test_rmse = evaluation(testloader)

print('학습용 셋 : ', train_rmse)
print('테스트 셋 : ', test_rmse)

모델의 성능을 평가하는 함수를 정의한다. 

라인5에서 평가 단계에서는 모델 파라미터에 대한 업데이트가 필요 없으므로 requires_grad를 비활성화 한다. 

라인6에서는 평가를 위해서 모델을 eval 모드로 변경한다. 

 

라인9 부터 배치 단위로 데이터를 받아서 예측값과 실제 값을 저장하고 누적시킨다.

라인14~15의 torch.cat 부분에서 0차원으로 누적한다. 예를 들어, 10x2 와 10x2 의 두 텐서가 있을 때, 0이면 20x2 로 1이면 10x4로 저장된다.

 

 


  Cross Validation 실시 

 

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
import torch 
from torch import nn,optim
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
#CF 라이브러리 (추가)
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error


df = pd.read_csv('C:/Users/Yeong/Desktop/파이토치스터디/data/reg.csv',index_col=[0])

#넘파이 배열로 만들기
X = df.drop('Price', axis=1).to_numpy()
Y = df['Price'].to_numpy().reshape(-1,1)

위 일반 예제와 동일하게 필요한 데이터와 라이브러리를 불러온다.

 

#데이터 loader 불러오기 
class TensorData(Dataset):
    
    def __init__(self, x_data, y_data):
        self.x_data = torch.FloatTensor(x_data) #데이터를 받아서 텐서로 변환 
        self.y_data = torch.FloatTensor(y_data)
        self.len = self.y_data.shape[0]
    
    def __getitem__(self,index):
        #dataloder로 샘플이 요청되면, 인덱스에 해당하는 샘플을 찾아서 반환함 
        return self.x_data[index], self.y_data[index] 
    
    def __len__(self):
        return self.len #크기 반환 
    
x_train, x_test, y_train, y_test = train_test_split(X,Y, test_size=0.7)
    
trainsets= TensorData(x_train, y_train)
#trainset은 교차검증을 실시하기 때문에 dataloader로 정의하지 않는다. 
#trainloader = torch.utils.data.DataLoader(trainsets, batch_size=32, shuffle=True) 

testsets = TensorData(x_test, y_test)
testloader = torch.utils.data.DataLoader(testsets, batch_size=32, shuffle=False)

텐서 데이터를 만드는데, 라인20은 trainset에 대해서 CV를 실시해야 하기 때문에 실행하지 않는다. 

 

 

class Regressor(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(13, 50, bias=True) #첫 번째 레이어 
        self.fc2 = nn.Linear(50, 30, bias=True) #두 번째 레이어
        self.fc3 = nn.Linear(30, 1, bias=True) #출력 레이어 
        self.dropout = nn.Dropout(0.5) #연산 마다 50%의 노드를 랜덤하게 없앤다
        
    def forward(self, x):
        x = F.relu(self.fc1(x)) #활성화 함수 적용 
        x = self.dropout(F.relu(self.fc2(x))) #은닉층에서 전달할 때, 50% 를 dropout 
        x = F.relu(self.fc3(x))
        return x

모델 구조는 이전과 동일하다. 

 

 

kfold = KFold(n_splits=3, shuffle=True)
criterion = nn.MSELoss()

손실함수와 교차 검증 방식을 정의한다. 여기서는 3-fold로 데이터를 나눈다. 

위 예제에서는 여기서 바로 모델 학습을 진행하지만 교차 검증시에는 루프를 돌리면서 별도로 학습을 진행한다. 

 

 

def evaluation(dataloader):
    
    predictions = torch.tensor([], dtype=torch.float) #예측값 저장을 위한 빈 텐서 
    actual = torch.tensor([], dtype=torch.float) #실제값 저장을 위한 빈 텐서 
    with torch.no_grad(): #requires_grad 비활성화 
        model.eval() #dropout과 같은 모델 학습시에만 사용하는 기법들을 비활성화 
        
        #배치 단위로 데이터를 예측하고 예측값과 실제값을 누적해서 저장 
        for data in dataloader:
            inputs, values = data
            outputs = model(inputs)
            
            #0차원으로 누적한다는 의미
            predictions = torch.cat((predictions, outputs), 0)  
            actual = torch.cat((actual, values), 0)
    
    predictions = predictions.numpy()
    actual = actual.numpy()
    rmse = np.sqrt(mean_squared_error(predictions, actual))
    model.train()
    
    return rmse

모델 평가 함수는 동일하게 사용 

 

 

validation_loss = []
for fold, (train_idx, val_idx) in enumerate(kfold.split(trainset)):
    train_subsampler = torch.utils.data.SubsetRandomSampler(train_idx)
    val_subsampler = torch.utils.data.SubsetRandomSampler(val_idx)
    
    trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, sampler=train_subsampler)
    valloader = torch.utils.data.DataLoader(trainset, batch_size=32, sampler=val_subsampler)
    
    model=Regressor()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-7)
    
    for epoch in range(400):
        for data in trainloader:
            inputs, values = data
            optimizer.zero_grad()
            
            outputs = model(inputs)
            loss = criterion(outputs, values)
            loss.backward()
            optimizer.step()

    train_rmse = evaluation(trainloader)
    val_rmse = evaluation(valloader)
    print('k-fold', fold, 'Train Loss : %.4f, Validation Loss: %.4f' %(train_rmse, val_rmse))

    validation_loss.append(val_rmse)

교차 검증을 실시해서 각 폴더별 정확도를 확인한다. 

 

validation_loss = np.array(validation_loss)
mean = np.mean(validation_loss)
std = np.std(validation_loss)

train_rmse = evaluation(trainloader)
test_rmse = evaluation(testloader)
print("validation score : %.4f, +-%.4f"%(mean,std))

validation 셋의 평균 정확도와 표준편차를 확인한다. 

 

 

 

 

trainloader = torch.utils.data.DataLoader(trainset, batch_size=32, shuffle=True)
train_rmse = evaluation(trainloader)
test_rmse = evaluation(testloader)

print('학습용 셋 : ', train_rmse)
print('테스트 셋 : ', test_rmse)

다시 train set 데이터를 학습하고, test set에 대해서 정확도를 확인한다. 

 

 

 

 


Reference

 

딥러닝을 위한 파이토치 입문, 딥러닝호형 저, 영진닷컴, 2022년 01월 20일

 

댓글