관리 메뉴

나는 문어~ 꿈을 꾸는 문어~

[Image Retrieval] 캐글 - Shopee (2) 학습~검증 본문

대회 & 프로젝트

[Image Retrieval] 캐글 - Shopee (2) 학습~검증

harrykur139 2022. 4. 24. 17:22

대회 링크 : https://www.kaggle.com/c/shopee-product-matching

코드 링크 : https://github.com/euiraekim/kaggle-shopee/blob/main/train.ipynb

 

 

개요

딥러닝을 활용하여 특정 이미지가 어떤 class인지 알아낼 때 가장 일반적인 방법은 classification이다. 각 class마다 여러장의 사진을 준비하여 학습을 시키면, 모델은 특정한 이미지가 들어왔을 때 어떤 class에 가장 가까운 지 알아내는 능력을 갖게 된다.

 

이러한 방식을 언제나 사용할 수 있는 것은 아니다. 이 대회의 주최 측의 경우와 같이 class가 계속 추가될 때가 그렇다. 예를 들어 기존에 1000가지 종류의 제품이 올라와 있어서 이 제품들을 분류하기 위해 1000개의 class로 모델을 학습시켰다고 가정해보자. 1001번째 종류의 새로운 제품이 올라오면 어떻게 될까? 해당 제품은 학습이 되어있지 않기 때문에 1001번째 제품임에도 불구하고 모델은 1000개의 class 중에 어느 것에 가장 가까운지 찾아서 반환을 할 것이다. 그렇다면 새로운 제품이 들어올 때마다 새롭게 모델을 학습시켜서 배포하면 되지 않을까? 딱봐도 말이 안된다. 하루에 새로운 제품이 수백 수천개씩 쏟아질 것이기 때문이다.

 

그렇다면 한 번 학습시켜 놓고 새로운 class의 이미지에 대하여도 작동하는 방식의 모델이 필요하다. 이것이 바로 이 대회에서 사용할 유사도 검색이다. 예를 들면 얼굴 인식이 대표적인 유사도 검색 방식이다. 분명히 얼굴 인식 기계는 나를 처음 볼텐데, 사진 한방만 찍어놓으면 언제든 내가 나인지 아닌지 판단한다.

 

이미지가 CNN이나 SWin와 같은 backbone을 학습시켜 특정 길이의 벡터인 embedding을 얻을 수 있다. 어떤 방법을 사용하냐에 따라 이 embedding의 분포가 유사도 검색에 적합하도록 학습될 수 있다. 사실 일반적인 classification을 하기 위해 softmax로 학습한 모델에서도 마지막 classification layer만 떼어내면 특정 길이의 embedding을 얻을 수 있고 그를 통해 유사도 검색이 가능하다. 하지만 이는 어찌보면 Softmax로 얻은 부산물에 불과하다고 생각한다. 왜냐하면 Softmax는 유사도 검색 기능을 만들도록 직접적으로 설계되지 않았기 때문이다. 따라서 새로운 class에 대응은 되지만 성능이 좋지는 않다고 한다. 그렇기 때문에 이를 위한 loss 함수에 대한 연구가 끊임없이 이루어졌고 현재 가장 흔히 쓰이는 것이 Triplet Loss와 ArcFace Loss 등이다. 본 대회에서 나는 ArcFace Loss를 사용할 것이고 이 엄청난 기술에 대하여 공부한 내용을 아래 링크에 정리한 적이 있다.

 

논문 정리 : https://harrykur139.tistory.com/19?category=1045621 

 

ArcFace 논문 정리

논문 원본 링크 : https://arxiv.org/pdf/1801.07698.pdf ArcFace: Additive Angular Margin Loss for Deep Face Recognition Abstract DCNN(Deep Convolutional Neural Network)을 사용해서 대규모 얼굴 인식을..

harrykur139.tistory.com

구현 : https://harrykur139.tistory.com/21?category=1045621 

 

ArcFace 구현

ArcFace를 공부하던 와중 Kaggle에 ArcFace의 기본을 구현하는 너무 좋은 코드가 있었다. 포스팅을 통해 이 코드를 내 것으로 만들도록 하자. 코드 링크 : https://www.kaggle.com/code/slawekbiel/arcface-explai..

harrykur139.tistory.com

 

서론이 길었는데, 앞으로 할 것은 간략히 다음과 같다.

  1. Group K Fold를 사용하여 데이터를 train set과 valid set으로 나눈다.
  2. ArcFace Loss를 사용하여 모델을 학습한다.
  3. 적절한 threshold 값을 찾 valid set에 대하여 검증한다.
  4. valid data에 대하여 유사도 검색을 하고 시각화하여 눈으로 확인해본다.

4번은 아마 다음 포스팅에서 진행할 것 같다.

 

학습 코드

학습은 코랩에서 진행했다.

 

먼저 필요한 라이브러리들을 가져오고, gpu를 사용하여 학습할 것이므로 device에 cuda를 등록해주자.

import pandas as pd
import numpy as np
import os
import cv2
from tqdm.notebook import tqdm

import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import CosineAnnealingLR
import albumentations
import timm

import matplotlib.pyplot as plt
%matplotlib inline
import seaborn as sns
from pylab import rcParams

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import GroupKFold

from warnings import filterwarnings
filterwarnings("ignore")

device = torch.device('cuda')

 

 

학습 과정에 사용되는 설정 값들을 정해주자.

image_size = 512
batch_size = 16
n_worker = 4
init_lr = 3e-4
n_epochs = 6
fold_id = 0
valid_every = 1
save_after = 0
margin = 0.5

# 0.3부터 1까지 0.1간격으로 threshold를 검증하기 위해 만든 배열
search_space = np.arange(0.3, 1, 0.1)

backbone_name = 'resnet18'
model_dir = './weights/'
data_dir = './data/'

# model weight 폴더 생성
!mkdir $model_dir

 

 

train data를 로드하고, 학습할 때 데이터 로더에서 이미지 데이터를 가져와야하기 때문에 file_path 컬럼을 만들어주자.

df_train_all = pd.read_csv(os.path.join(data_dir, 'train.csv'))
df_train_all['file_path'] = df_train_all.image.apply(lambda x: os.path.join(data_dir, 'train_images', x))
df_train_all.head(5)

 

 

사이킷런의 GroupKFold를 이용하여 데이터셋을 나눠주자. Group K Fold를 사용하면 train과 valid셋에 겹치는 class가 없게 된다. 예를 들어 클래스가 A, B, C, D, E가 있다고 하면 Stratified K Fold의 경우 A, B, C, D, E 각각의 특정 비율이 train으로, 나머지가 valid로 들어간다. 즉 train, valid 모두 ABCDE를 갖는 것이다. 하지만 Group K Fold는 train에는 ABC, valid에는 DE가 들어가는 식이다. 이렇게 하는 이유는 해당 모델을 검증할 때 새로운 class에 대한 대응 능력을 판단해야하기 때문이다.

gkf = GroupKFold(n_splits=5)
df_train_all['fold'] = -1
for fold, (train_idx, valid_idx) in enumerate(gkf.split(df_train_all, None, df_train_all.label_group)):
    df_train_all.loc[valid_idx, 'fold'] = fold

 

 

하나의 fold에 대하여만 진행할 예정이므로 미리 설정해둔 fold_id = 0에 대하여 train과 valid를 나눠준다. 그리고 사이킷런의 LabelEncoder를 사용하여 label_group(class)를 0부터 클래스 개수-1 개로 바꿔준다.

df_train = df_train_all[df_train_all['fold'] != fold_id]
df_valid = df_train_all[df_train_all['fold'] == fold_id]

le = LabelEncoder()
df_train.label_group = le.fit_transform(df_train.label_group)
df_train.head()

 

 

augmentation 설정을 한다. 기본적으로 resize, flip, nomalize 정도만 해준다.

transforms_train = albumentations.Compose([
    albumentations.Resize(image_size, image_size),
    albumentations.HorizontalFlip(p=0.5),
    albumentations.Normalize()
])

transforms_valid = albumentations.Compose([
    albumentations.Resize(image_size, image_size),
    albumentations.Normalize()
])

 

 

데이터셋을 만든다. mode가 test일 경우 이미지만 리턴하고, 그렇지 않을 경우 이미지와 label을 리턴한다. test mode는 valid set이 들어가는 경우 사용하는데 이는 임베딩을 얻기 위해서기 때문에 굳이 label 값을 리턴할 필요가 없다.

class SHOPEEDataset(Dataset):
    def __init__(self, df, mode, transform=None):
        self.df = df.reset_index(drop=True)
        self.mode = mode
        self.transform = transform
        
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self, index):
        row = self.df.loc[index]
        img = cv2.imread(row.file_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        if self.transform is not None:
            res = self.transform(image=img)
            img = res['image'].transpose(2,0,1)
        
        if self.mode == 'test':
            return torch.tensor(img).float()
        else:
            return torch.tensor(img).float(), torch.tensor(row.label_group)

 

 

dataset이 잘 동작하는지 보기 위해 이미지를 몇 개 꺼내 시각화해보자. 아름답게 꺼내지는 것을 볼 수 있다.

dataset = SHOPEEDataset(df_train_all, 'train', transform = transforms_train)
rcParams['figure.figsize'] = 15,5
for i in range(2):
    f, axarr = plt.subplots(1,5)
    for p in range(5):
        idx = i*5 + p
        img, label = dataset[idx]
        axarr[p].imshow(img.transpose(0,1).transpose(1,2).squeeze())
        axarr[p].set_title(label.item())

 

 

ArcFace의 모델과 Loss 함수를 정의한다. backbone은 기본적으로 resnet18을 사용했고 잘 동작한다. 지금은 resnet18보다 성능 좋은 모델이 넘쳐나기 때문에 다른 backbone을 사용하면 더 좋은 성능을 기대할 수 있다. 실제로 해봤는데 그렇다. timm 라이브러리를 통해 모델을 가져왔고, 해당 라이브러리에는 고전 모델부터 최신 모델까지 알려진 거의 모든 모델을 지원한다. 

모델 ResnetArcFace는 train일 경우 classifier까지 거치고, valid일 경우 embedding을 출력할 수 있게 설계했다.

 

def arcface_loss(cosine, targ, m=.5, s=30, output_classes=11014):
    cosine = cosine.clip(-1+1e-7, 1-1e-7)
    arcosine = cosine.arccos()
    arcosine += F.one_hot(targ, num_classes = output_classes) * m
    cosine2 = arcosine.cos()
    cosine2 *= s
    return F.cross_entropy(cosine2, targ)

class ArcFaceClassifier(nn.Module):
    def __init__(self, in_features, output_classes):
        super().__init__()
        self.W = nn.Parameter(torch.Tensor(in_features, output_classes))
        nn.init.kaiming_uniform_(self.W)
    def forward(self, x):
        x_norm = F.normalize(x)
        W_norm = F.normalize(self.W, dim=0)
        return x_norm @ W_norm
    
class ResnetArcFace(nn.Module):
    def __init__(self):
        super().__init__()
        self.backbone = timm.create_model(backbone_name, pretrained=True)
        embedding_size = self.backbone.get_classifier().in_features
        self.after_conv=nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.BatchNorm1d(embedding_size))   
        self.classifier = ArcFaceClassifier(embedding_size, df_train.label_group.nunique())
    
    def forward(self, x, output_embs=False):
        embeddings = self.after_conv(self.backbone.forward_features(x))
        if output_embs:
            return F.normalize(embeddings)
        return self.classifier(embeddings)

 

 

모델과 optimizer, 데이터로더 등 학습에 필요한 것들을 정의하자.

model = ResnetArcFace()
model.to(device)

optimizer = optim.Adam(model.parameters(), lr = init_lr)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, n_epochs)

dataset_train = SHOPEEDataset(df_train, 'train', transform = transforms_train)
dataset_valid = SHOPEEDataset(df_valid, 'test', transform = transforms_valid)

train_loader = torch.utils.data.DataLoader(dataset_train, batch_size=batch_size, shuffle=True, num_workers = n_worker)
valid_loader = torch.utils.data.DataLoader(dataset_valid, batch_size=batch_size, shuffle=False, num_workers = n_worker)

 

 

학습과 검증에 필요한 함수들을 정의한다. 아래에서 각 함수에 대하여 설명한다.

def train_func(train_loader):
    model.train()
    bar = tqdm(train_loader)
    losses = []
    for batch_idx, (images, targets) in enumerate(bar):

        images, targets = images.to(device), targets.to(device)            
        
        cosine = model(images)
        loss = arcface_loss(cosine, targets, m=.5, s=30, output_classes=df_train.label_group.nunique())
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        losses.append(loss.item())

        bar.set_description(f'loss: {loss.item():.5f}')

    loss_train = np.mean(losses)
    return loss_train

def get_embeddings(data_loader):
    model.eval()
    embs = []
    with torch.no_grad():
        for batch_idx, (images) in enumerate(tqdm(data_loader)):
            images = images.to(device)
            features = model(images, output_embs=True)
            embs += [features.detach().cpu()]
    embs = torch.cat(embs).cpu().numpy()
    return embs
    
def row_wise_f1_score(labels, preds):
    scores = []
    for label, pred in zip(labels, preds):
        n = len(np.intersect1d(label, pred))
        score = 2 * n / (len(label)+len(pred))
        scores.append(score)
    return scores, np.mean(scores)

def find_threshold(df, embs, search_space):
    score_by_threshold = []
    best_score = 0
    best_thres = -1
    for thres in tqdm(search_space):
        sim_matrix = ((embs@embs.T) > thres).cpu().numpy()

        pred = []
        for row in sim_matrix:
            pred.append(df.iloc[row].posting_id.tolist())

        tmp = df.groupby('label_group').posting_id.agg('unique').to_dict()
        target = df.label_group.map(tmp)
        scores, score = row_wise_f1_score(target, pred)
        
        score_by_threshold.append(score)
        if score > best_score:
            best_score = score
            best_thres = thres
            
    plt.figure(figsize=(15,5))
    plt.plot(score_by_threshold)
    plt.xlabel('threshold')
    plt.ylabel('score')
    plt.show()
    print(f'Best score is {best_score} and best threshold is {best_thres}')
  • train_func : train data loader에 대하여 1 epoch 학습하는 함수
  • get_embeddings : valid data loader에 대하여 모든 데이터를 모델에 입력하여 embedding 리스트를 출력
  • find_threshold : 위에서 정의한 search_space에 있는 모든 값들을 threshold로 하여 score를 계산하고 가장 높은 score를 반환하는 threshold를 찾는 함수
  • row_wise_f1_score : find_threshold에서 사용하는 함수. 대회에서 요구하는 대로 각 제품마다 f1 score를 계산하여 평균낸다.

 

 

준비는 끝났다. 드디어 학습을 해보자. valid_every에 해당하는 epoch마다 검증을 진행한다.

for epoch in range(n_epochs):
    scheduler.step()
    loss_train = train_func(train_loader)

    if epoch % valid_every == 0: 
        embs = get_embeddings(valid_loader)
        embs = torch.tensor(embs).cuda()
        find_threshold(df_valid, embs, search_space)

        if epoch >= save_after:
            torch.save(model.state_dict(), os.path.join(model_dir, f'{backbone_name}_{image_size}_epoch{epoch}.pth'))

총 6 epoch를 학습했고 각 epoch마다 최적의 threshold를 찾는 것을 볼 수 있다. 5 epoch째에서 가장 높은 score를 기록했다. 이 때 threshold는 0.5다.

 

학습을 진행하면 weights 폴더에 모델의 가중치 파일이 저장되는 것을 확인할 수 있다.

 

다음 포스팅에서는 해당 모델을 이용하여 검증 데이터셋에서 실제로 유사도 검색을 해보고 결과를 시각화해보겠다.

Comments