🐷

DeepLearning実装の旅 VGG

2022/02/10に公開約7,200字

ちゃんとモデルの中身構造も理解しようと論文から再実装してみる。
Google colabでやってく。 torchでやってく。
ここではVGG https://arxiv.org/abs/1409.1556 を実装してみる。

作ったコードはここに保管してく。

https://github.com/yoyoyo-yo/DeepLearningReImples

ライブラリのインストール

必要なものは画像のデータ拡張を簡単にしてくれる albumentationsを使う。

!pip install -q albumentations==0.4.6

ライブラリのimport

import os
import time

from tqdm.notebook import tqdm
import numpy as np

from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision
import albumentations as A
from albumentations.pytorch import ToTensorV2

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

モデル定義

VGGモデルは単純なconv(3x3)を積み上げたもの。
convをいくつか繋げた後にmax-poolingで特徴量の縦横を圧縮して、を繰り返す。
ここではconvがつながったものをVGGBlockと定義した。
VGGモデルはVGG16とVGG19の2種類があって、それぞれこんな構造になる。

VGG16        VGG19
Conv x 2    Conv x 2
MaxPool     Maxpool
Conv x 2    Conv x 2
MaxPool     Maxpool
Conv x 3    Conv x 4
MaxPool     Maxpool
Conv x 3    Conv x 4
MaxPool     Maxpool
Conv x 3    Conv x 4
MaxPool     Maxpool
MLP x 2     MLP x 2
MLP         MLP
class VGGBlock(nn.Module):
    def __init__(self, dim, stride=1, repeat=3):
        super().__init__()

        module = []
        for r in range(repeat):
            module += [nn.LazyConv2d(dim, kernel_size=3, padding=1, stride=1), nn.ReLU(), nn.LazyBatchNorm2d()]

        self.module = nn.Sequential(*module)
        
    def forward(self, x):
        return self.module(x)


class VGG(nn.Module):
    def __init__(self, out_dim, conv_dim_base=64, hidden_dim=4096, dropratio=0.2, mode=16):
        super().__init__()
        repeat = 3 if mode == 16 else 4

        self.block1 = VGGBlock(conv_dim_base, repeat=2)
        self.pool1  = nn.MaxPool2d([2, 2], padding=0, stride=2)
        self.block2 = VGGBlock(conv_dim_base * 2, repeat=2)
        self.pool2  = nn.MaxPool2d([2, 2], padding=0, stride=2)
        self.block3 = VGGBlock(conv_dim_base * 4, repeat=repeat)
        self.pool3  = nn.MaxPool2d([2, 2], padding=0, stride=2)
        self.block4 = VGGBlock(conv_dim_base * 8, repeat=repeat)
        self.pool4  = nn.MaxPool2d([2, 2], padding=0, stride=2)
        self.block5 = VGGBlock(conv_dim_base * 8, repeat=repeat)
        self.pool5  = nn.MaxPool2d([2, 2], padding=0, stride=2)

        self.flatten = nn.Flatten()

        self.top = nn.Sequential(
            nn.LazyLinear(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropratio),
            nn.LazyLinear(hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropratio),
        )

        self.clf = nn.LazyLinear(out_dim)

    def forward(self, x):
        x = self.forward_conv(x)
        x = self.flatten(x)
        x = self.top(x)
        x = self.clf(x)
        return x

    def forward_conv(self, x):
        x = self.block1(x)
        x = self.pool1(x)
        x = self.block2(x)
        x = self.pool2(x)
        x = self.block3(x)
        x = self.pool3(x)
        x = self.block4(x)
        x = self.pool4(x)
        x = self.block5(x)
        x = self.pool5(x)
        return x

DataLoaderの定義

ここではCifar100用のdataloaderを定義する。

class TrainDataset(Dataset):
    def __init__(self, xs, ys, transforms=None):
        self.xs = xs
        self.ys = ys
        self.transforms=transforms
        self.data_num = len(xs)
        
    def __len__(self):
        return self.data_num
    
    def __getitem__(self, idx):
        x = self.xs[idx]
        y = self.ys[idx]

        if self.transforms:
            x = self.transforms(image=x)["image"]

        return x, y

画像データ拡張の定義

transforms_train = A.Compose([
    A.Resize(32, 32),
    A.HorizontalFlip(p=0.5),
    A.VerticalFlip(p=0.5),
    A.Normalize(max_pixel_value=255.0, p=1.0),
    ToTensorV2(p=1.0),
])

transforms_valid = A.Compose([
    A.Resize(32, 32),
    A.Normalize(max_pixel_value=255.0, p=1.0),
    ToTensorV2(p=1.0),
])

学習用の関数

こまかい部分は省略する。
ここではcifar100を学習:評価 = 8:2として試してみる。

最終的には
epoch:60/60 [train]loss:1.5480 acc:0.5677 [val]loss:2.0474 acc:0.4775 [time:s]total:18 train:16 val:2
くらいになって、まあそんなに判別できていないことが分かる。

def train():
    train_ds = torchvision.datasets.CIFAR100(root="./", train=True, download=True, transform=None)
    train_xs = train_ds.data
    train_ts = np.array(train_ds.targets)

    xs_train, xs_valid, ts_train, ts_valid = train_test_split(train_xs, train_ts, test_size=0.2, random_state=42, stratify=train_ts)

    dl_train = DataLoader(TrainDataset(xs_train, ts_train, transforms=transforms_train), batch_size=256, num_workers=os.cpu_count(), shuffle=True)
    dl_valid = DataLoader(TrainDataset(xs_valid, ts_valid, transforms=transforms_valid), batch_size=256 * 2, num_workers=os.cpu_count(), shuffle=False)

    model = VGG(out_dim=100, mode=16)
    model = model.to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.002)
    criterion = nn.CrossEntropyLoss(reduction="mean")

    EPOCH = 60

    for epoch in range(EPOCH):
        model.train()
        
        # train
        _labels, _preds = [], []
        tr_loss = 0

        train_time_start = time.time()
        
        for step, batch in tqdm(enumerate(dl_train), total=len(dl_train), leave=False):
            optimizer.zero_grad()

            xs = batch[0].to(device)
            ts = batch[1].to(device)
        
            ys = model(xs)
            
            loss = criterion(ys, ts)
            loss.backward()
            optimizer.step()

            tr_loss += loss.item() / len(dl_train)

            _, preds = torch.max(ys.data, 1)
            _labels.extend(ts.detach().cpu().numpy().tolist())
            _preds.extend(preds.detach().cpu().numpy().tolist())
        
        train_accuracy = accuracy_score(_labels, _preds)

        train_time_end = time.time()

        # val
        model.eval()

        val_loss = 0
        _labels, _preds = [], []
        
        val_time_start = time.time()
        
        with torch.no_grad():
            for step, batch in tqdm(enumerate(dl_valid), total=len(dl_valid), leave=False):
                xs = batch[0].to(device)
                ts = batch[1].to(device)
                
                ys = model(xs)

                loss = criterion(ys, ts)
                val_loss += loss.item() / len(dl_valid)
                
                outputs = F.softmax(ys, dim=1)
                _, preds = torch.max(ys.data, 1)
                _labels.extend(ts.detach().cpu().numpy().tolist())
                _preds.extend(preds.detach().cpu().numpy().tolist())

        val_time_end = time.time()
        train_time_total = train_time_end - train_time_start
        val_time_total = val_time_end - val_time_start
        total_time = train_time_total + val_time_total

        val_accuracy = accuracy_score(_labels, _preds)

        print(f"epoch:{epoch + 1}/{EPOCH} [train]loss:{tr_loss:.4f} acc:{train_accuracy:.4f} [val]loss:{val_loss:.4f} acc:{val_accuracy:.4f}  [time:s]total:{total_time:.0f} train:{train_time_total:.0f} val:{val_time_total:.0f}")

    print("train finished")

Discussion

ログインするとコメントできます