🦕

kaggle HMS-HBAC ResNet34d Baseline [LB 0.49] 解説

2024/04/11に公開

概要

https://www.kaggle.com/code/ttahara/hms-hbac-resnet34d-baseline-training
https://www.kaggle.com/code/ttahara/hms-hbac-resnet34d-baseline-inference
ResNetを使用した脳波分類タスクのソリューション
リンク先でUpvote(高評価)を忘れずに!

コンペ説明

患者の脳波が示す活動の種類を分類する。専門家の評価を正解として予測を行うコンペ

このコンペでは、生の脳波波形(eeg)とスペクトログラム(spectrogram)の両方が提供されており、スペクトログラムは10分、脳波波形は50秒です、またこれらのデータの中央の50秒は同じデータであり、この期間は同じデータを二つの表現方法で提示しています。

eeg: 一般に想像する波形データ(株価など)
spectrogram: 信号を[横軸:時間,縦軸:周波数,強さ:色]で表すデータ形式

より詳しく知りたい方は、コンペの概要と以下のデータ説明を読むと分かりやすいと思います。
https://www.kaggle.com/competitions/hms-harmful-brain-activity-classification/discussion/468010

ResNetとは

ResNet(Residual Networkは、2015年にMicrosoftによって提案された深層ネットワークです。R残差ブロック(residual blocks)と呼ばれるメカニズムの導入により、ネットワークを遥かに深くできるようになりました。

残差ブロックは、通常のCNNにshort cut connectionを追加した構造をとっており、モデルが入力前の値を直接学習に利用できるように(恒等写像を学習できるように)なりました。
これにより勾配消失などの問題が大幅に改善し、モデルを深くすることを可能にしました。

残差ブロックを使用してモデルを深くしたことにより、大幅な性能向上と汎用性の向上が得られました。これは、ILSVRC2015の優勝という輝かしい成績により証明されています。

コード解説 学習編

1. スペクトログラムの正規化

# 画像ごとにスペクトログラムの正規化
img = np.load(path)  # shape: (Hz, Time) = (400, 300)
eps = 1e-6
img_mean = img.mean(axis=(0, 1))
img = img - img_mean
img_std = img.std(axis=(0, 1))
img = img / (img_std + eps)

ここではスペクトログラムの正規化を行っています。

  • img = np.load(path) # shape: (Hz, Time) = (400, 300)
    周波数軸400, 時間軸300のスペクトログラムをnumpyで読み込む
  • eps = 1e-6
    ゼロ除算を防ぐための微小値
  • img_mean = img.mean(axis=(0, 1))
    縦軸と横軸を含めた(axis=(0, 1))平均値を求める
  • img = img - img_mean
    画像から平均値を引いてスペクトログラムを0近辺の値にする
  • img_std = img.std(axis=(0, 1))
    0近辺の値にしたスペクトログラムの標準偏差を求める
  • img = img / (img_std + eps)
    0近辺のスペクトログラムを標準偏差で割ることで、標準偏差1かつ0近辺の正規化を行う(epsでゼロ除算を防ぐ)

2. 初期設定

import sys
import os
import gc
import copy
import yaml
import random
import shutil
from time import time
import typing as tp
from pathlib import Path

import numpy as np
import pandas as pd

from tqdm.notebook import tqdm
from sklearn.model_selection import StratifiedGroupKFold

import torch
from torch import nn
from torch import optim
from torch.optim import lr_scheduler
from torch.cuda import amp

import timm

import albumentations as A
from albumentations.pytorch import ToTensorV2

os.environ["CUDA_VISIBLE_DEVICES"] = "0"

ライブラリのインポートと設定を行います。

  • os.environ["CUDA_VISIBLE_DEVICES"] = "0"
    最初のGPU以外使わないようにする(複数のGPUによる複雑さを考慮しなくて良い)
ROOT = Path.cwd().parent
INPUT = ROOT / "input"
OUTPUT = ROOT / "output"
SRC = ROOT / "src"

DATA = INPUT / "hms-harmful-brain-activity-classification"
TRAIN_SPEC = DATA / "train_spectrograms"
TEST_SPEC = DATA / "test_spectrograms"

TMP = ROOT / "tmp"
TRAIN_SPEC_SPLIT = TMP / "train_spectrograms_split"
TEST_SPEC_SPLIT = TMP / "test_spectrograms_split"
TMP.mkdir(exist_ok=True)
TRAIN_SPEC_SPLIT.mkdir(exist_ok=True)
TEST_SPEC_SPLIT.mkdir(exist_ok=True)


RANDAM_SEED = 1086
CLASSES = ["seizure_vote", "lpd_vote", "gpd_vote", "lrda_vote", "grda_vote", "other_vote"]
N_CLASSES = len(CLASSES)
FOLDS = [0, 1, 2, 3, 4]
N_FOLDS = len(FOLDS)

ファイルパスの設定とseed値、分類先クラス、FOLD数の設定を行います。

  • ROOT = Path.cwd().parent
    pathlibモジュールのPathを使用して現在の作業ディレクトリを取得し、その親ディレクトリをROOTに設定
  • TMP.mkdir(exist_ok=True)
    mkdirによりディレクトリを作成(すでに存在していても良い)

3. データの読み込み

train = pd.read_csv(DATA / "train.csv")

# convert vote to probability
train[CLASSES] /= train[CLASSES].sum(axis=1).values[:, None]

print(train.shape)
## (106800, 15)

データを読み込み、形状を確認します。106800行15列のcsv形式のデータであることが分かります。

  • train[CLASSES] /= train[CLASSES].sum(axis=1).values[:, None]
    分類先クラスの列に対して、各行の合計値を求めて配列にして、形状を縦方向に変更します。そして/=で各列を除算することにより、CLASSES列を正規化します。
train = train.groupby("spectrogram_id").head(1).reset_index(drop=True)
print(train.shape)

学習高速化のために、各spectrogram_idに割り当てられているsub_spectrogram_idは最初の一行のものを使用する

  • train = train.groupby("spectrogram_id").head(1).reset_index(drop=True)
    groupby操作は、指定した列の一意な要素ごとにdfをグループに分割します。そのグループに対して
    head(1)で一行目を取り出すことで、各spectrogram_idの一行目の要素を取り出しています。(reset_indexは取り出しによってバラバラになったインデックスを修正します)

4. K-fold設定

sgkf = StratifiedGroupKFold(n_splits=N_FOLDS, shuffle=True, random_state=RANDAM_SEED)

train["fold"] = -1

for fold_id, (_, val_idx) in enumerate(
    sgkf.split(train, y=train["expert_consensus"], groups=train["patient_id"])
):
    train.loc[val_idx, "fold"] = fold_id

StratifiedGroupKFoldは複雑なのでこちらで解説しています。
ここではTrain dataに"fold"列を追加して、評価に使用する行をグループ化しています。

train.groupby("fold")[CLASSES].sum()

各foldで使用されるテストデータについて、それぞれのfoldの正解ラベルの分布割合が等しい時、各foldに含まれる投票結果の割合はどの程度なのか出力。
要するにうまく分割できているか確認している。

5. スペクトログラムファイルの前処理

for spec_id, df in tqdm(train.groupby("spectrogram_id")):
    spec = pd.read_parquet(TRAIN_SPEC / f"{spec_id}.parquet")
    
    spec_arr = spec.fillna(0).values[:, 1:].T.astype("float32")  # (Hz, Time) = (400, 300)
    
    for spec_offset, label_id in df[
        ["spectrogram_label_offset_seconds", "label_id"]
    ].astype(int).values:
        spec_offset = spec_offset // 2
        split_spec_arr = spec_arr[:, spec_offset: spec_offset + 300]
        np.save(TRAIN_SPEC_SPLIT / f"{label_id}.npy" , split_spec_arr)

スペクトログラムファイルを縦軸時間、横軸周波数にして、offset timeを中心とした時間軸300固定のnpyデータとして保存します。

  • for spec_id, df in tqdm(train.groupby("spectrogram_id")):
    Groupbyオブジェクトをイテレーションすると、集計したkeyとそれに対応するdataframeが返される
  • spec_arr = spec.fillna(0).values[:, 1:].T.astype("float32") # (Hz, Time) = (400, 300)
    スペクトログラムの欠損値を0埋め、valuesでnumpy.arrayに変更(スライスや転置はpdでは行えない)し、インデックスやタイムスタンプである可能性のある最初の列を除外し、転置して周波数を横軸、時間を縦軸にする
  • spec_offset = spec_offset // 2
    切り捨て除算
  • split_spec_arr = spec_arr[:, spec_offset: spec_offset + 300]
    オフセット時間の半分から開始して、300秒後までスペクトログラムを取得。各スペクトログラムデータには、オフセットを含めた余剰時間があるため、index errorは起きない
  • np.save(TRAIN_SPEC_SPLIT / f"{label_id}.npy" , split_spec_arr)
    label_idに基づいた名前でnpyファイルとして保存。データ型と形状の情報を記憶できる形式

6. モデル、データセット、損失関数の定義

・モデル定義

class HMSHBACSpecModel(nn.Module):

    def __init__(
            self,
            model_name: str,
            pretrained: bool,
            in_channels: int,
            num_classes: int,
        ):
        super().__init__()
        self.model = timm.create_model(
            model_name=model_name, pretrained=pretrained,
            num_classes=num_classes, in_chans=in_channels)

    def forward(self, x):
        h = self.model(x)      

        return h
  • class HMSHBACSpecModel(nn.Module):
    pytorchのnn.Moduleを継承してモデルを定義
  • self.model = timm.create_model(model_name=model_name, pretrained=pretrained, num_classes=num_classes, in_chans=in_channels)
    画像認識ライブラリのtimmからモデルを選択して定義
  • def forward(self, x): \ h = self.model(x) \ return h
    modelに順伝播を通して結果を返す

・データセット定義

FilePath = tp.Union[str, Path]
Label = tp.Union[int, float, np.ndarray]

class HMSHBACSpecDataset(torch.utils.data.Dataset):

    def __init__(
        self,
        image_paths: tp.Sequence[FilePath],
        labels: tp.Sequence[Label],
        transform: A.Compose,
    ):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index: int):
        img_path = self.image_paths[index]
        label = self.labels[index]

        img = np.load(img_path)  # shape: (Hz, Time) = (400, 300)
        
        # log transform
        img = np.clip(img,np.exp(-4), np.exp(8))
        img = np.log(img)
        
        # normalize per image
        eps = 1e-6
        img_mean = img.mean(axis=(0, 1))
        img = img - img_mean
        img_std = img.std(axis=(0, 1))
        img = img / (img_std + eps)

        img = img[..., None] # shape: (Hz, Time) -> (Hz, Time, Channel)
        img = self._apply_transform(img)

        return {"data": img, "target": label}

    def _apply_transform(self, img: np.ndarray):
        """apply transform to image and mask"""
        transformed = self.transform(image=img)
        img = transformed["image"]
        return img

pytorchのモデルで使用するデータセットを定義しています。

  • FilePath = tp.Union[str, Path] \ Label = tp.Union[int, float, np.ndarray]
    FilePath,Labelという型エイリアスを定義しています。例えばFilePathはstrかPathのテータ形式を取ることを示します。
  • class HMSHBACSpecDataset(torch.utils.data.Dataset):
    pytorchのDatasetを継承してデータセットを定義
  • def __len__(self): \ return len(self.image_paths)
    image fileの総数を返す関数。pytorchではデータの総数を返す関数を__len__として定義する必要がある。
  • def __getitem__(self, index: int):
    pytorchではindexを指定して、特徴量(data)と正解ラベル(target)を返す関数を__getitem__として定義する必要がある。
  • img = np.clip(img,np.exp(-4), np.exp(8))
    画像を指定の範囲内に丸める
  • img = np.log(img)
    対数変換を行う。データの特に高い値の影響を低減し、微細な特徴を捉えられるようになる。画像データや信号データ等、大きな値の範囲を持つデータに有効
  • # normalize per image
    最初に紹介した画像データの正規化
  • img = img[..., None] # shape: (Hz, Time) -> (Hz, Time, Channel)
    軸の追加。Channel用の軸を追加する
  • img = self._apply_transform(img)
    self.transformに指定された変換を行う

・損失関数の定義

class KLDivLossWithLogits(nn.KLDivLoss):

    def __init__(self):
        super().__init__(reduction="batchmean")

    def forward(self, y, t):
        y = nn.functional.log_softmax(y,  dim=1)
        loss = super().forward(y, t)

        return loss


class KLDivLossWithLogitsForVal(nn.KLDivLoss):
    
    def __init__(self):
        """"""
        super().__init__(reduction="batchmean")
        self.log_prob_list  = []
        self.label_list = []

    def forward(self, y, t):
        y = nn.functional.log_softmax(y, dim=1)
        self.log_prob_list.append(y.numpy())
        self.label_list.append(t.numpy())
        
    def compute(self):
        log_prob = np.concatenate(self.log_prob_list, axis=0)
        label = np.concatenate(self.label_list, axis=0)
        final_metric = super().forward(
            torch.from_numpy(log_prob),
            torch.from_numpy(label)
        ).item()
        self.log_prob_list = []
        self.label_list = []
        
        return final_metric

使用する損失関数を定義します。今回のコンペでは予測と教師データの分布を比較するKLダイバージェンスが評価指標として設定されています。

  • class KLDivLossWithLogits(nn.KLDivLoss):
    nnライブラリのKLDivLossを継承して損失関数を定義
  • def init(self): \ super().init(reduction="batchmean")
    親クラスのinitを呼び出す。reduction="batchmean "引数は、損失がバッチサイズで平均化されることを指定する(数学的にはこれが正しい)。
  • def forward(self, y, t):
    損失関数使用時の呼び出し。yは予測された確率分布、tは教師データの確率分布
  • y = nn.functional.log_softmax(y, dim=1)
    モデルが出力する生の予測。yは通常、正規化されていない各クラスのスコアなので、softmaxで予測確率分布に変更(形状が[batch_size, num_classes]であるためdim=1)
  • loss = super().forward(y, t)
    親クラスのforwardメソッドでyとtのKLDivを計算
  • class KLDivLossWithLogitsForVal(nn.KLDivLoss):
    評価時に使用するための損失関数。forwardでバッチごとの損失を蓄積し、conputeで蓄積した損失をまとめて計算して返します。

7. モデル学習設定

・設定

class CFG:
    model_name = "resnet34d"  # 使用モデルにResNet-34の改良版を指定
    img_size = 512            # 入力画像のサイズ。この設定では画像を512x512ピクセルにリサイズする
    max_epoch = 9             # 訓練を行う最大エポック数。1エポックは訓練データセットを一周することを意味する
    batch_size = 32           # バッチサイズ。1回の訓練ステップでネットワークに渡されるサンプルの数
    lr = 1.0e-03              # 学習率。モデルの重みを更新する際のステップサイズを決定する
    weight_decay = 1.0e-02    # 重み減衰。過学習を防ぐために正則化項を追加
    es_patience = 5           # Early Stoppingを行うタイミング。このエポック数内で改善しない場合、学習を早期終了する
    seed = 1086               # 乱数シード値
    deterministic = True      # 決定論的挙動の有効/無効。有効の場合、同じ初期条件と入力から始めた場合に、毎回同じ結果を出力する
    enable_amp = True         # 自動混合精度(Automatic Mixed Precision)の有効/無効。浮動小数点等の最適化
    device = "cuda"           # 訓練に使用するデバイス。"cuda"はNVIDIA GPU

・訓練用の関数

def set_random_seed(seed: int = 42, deterministic: bool = False):
    """Set seeds"""
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)  # type: ignore
    torch.backends.cudnn.deterministic = deterministic  # type: ignore
    
def to_device(
    tensors: tp.Union[tp.Tuple[torch.Tensor], tp.Dict[str, torch.Tensor]],
    device: torch.device, *args, **kwargs
):
    if isinstance(tensors, tuple):
        return (t.to(device, *args, **kwargs) for t in tensors)
    elif isinstance(tensors, dict):
        return {
            k: t.to(device, *args, **kwargs) for k, t in tensors.items()}
    else:
        return tensors.to(device, *args, **kwargs)
  • def set_random_seed(seed: int = 42, deterministic: bool = False):
    乱数シード値の設定
  • def to_device(tensors, device):
    pytorchテンソルを、高速で計算するためにGPU環境に移動させる関数
def get_path_label(val_fold, train_all: pd.DataFrame):
    """Get file path and target info."""
    
    train_idx = train_all[train_all["fold"] != val_fold].index.values
    val_idx   = train_all[train_all["fold"] == val_fold].index.values
    img_paths = []
    labels = train_all[CLASSES].values
    for label_id in train_all["label_id"].values:
        img_path = TRAIN_SPEC_SPLIT / f"{label_id}.npy"
        img_paths.append(img_path)

    train_data = {
        "image_paths": [img_paths[idx] for idx in train_idx],
        "labels": [labels[idx].astype("float32") for idx in train_idx]}

    val_data = {
        "image_paths": [img_paths[idx] for idx in val_idx],
        "labels": [labels[idx].astype("float32") for idx in val_idx]}
    
    return train_data, val_data, train_idx, val_idx


def get_transforms(CFG):
    train_transform = A.Compose([
        A.Resize(p=1.0, height=CFG.img_size, width=CFG.img_size),
        ToTensorV2(p=1.0)
    ])
    val_transform = A.Compose([
        A.Resize(p=1.0, height=CFG.img_size, width=CFG.img_size),
        ToTensorV2(p=1.0)
    ])
    return train_transform, val_transform

画像のpathと正解ラベルの分布を取得する関数と、データ拡張の定義を取得する関数を定義します。

  • def get_path_label(val_fold, train_all: pd.DataFrame):
    foldの値とモデル学習用(kfold評価用も含む)のdataframeを受け取り、辞書型の訓練データとkfold評価データ、及びそれらデータの元のdataframeにおけるインデックスを返す関数
  • train_idx = train_all[train_all["fold"] != val_fold].index.values
    元のdataframeから今回のfoldでない行のみを抜き出し、その行のインデックスを取得してnumpyに変換。df[条件式]で、特定の行のみを抜き出せる
  • val_idx = train_all[train_all["fold"] == val_fold].index.values
    元のdataframeから今回のfoldの行を抜き出し、その行のインデックスを取得してnumpyに変換
  • labels = train_all[CLASSES].values
    dataframeから分類先クラスの列のみを抜き出し
  • for label_id in train_all["label_id"].values: ...
    5. のコードで保存した縦軸時間300、横軸周波数400のスペクトログラムデータの画像データへのpathを全てimg_pathsに追加
  • train_data = {...}
    kfoldで分割された、訓練側のインデックスを使用して、画像データのpathのリストと、それに対応する分類先クラスの分布をリスト化して、"image_paths","labels"をkeyとする辞書を作成
  • val_data = {...}
    kfoldで分割された、評価側のインデックスを使用して、画像データのpathのリストと、それに対応する分類先クラスの分布をリスト化して、"image_paths","labels"をkeyとする辞書を作成

※label_idはtrain_allに含まれる108600行のデータに対して一意のidを提供している。つまり、10秒毎ズレのサブシーケンス一つ一つに対応している

  • def get_transforms(CFG):
    7. で定義したCFGクラスの設定で、Albumentaio(A)ライブラリによる画像データの拡張を定義し、その定義を返す関数
  • train_transform = A.Compose([...])
    Albumentationライブラリで、複数の操作を一度に行う際の記述方法
  • A.Resize(p=1.0, height=CFG.img_size, width=CFG.img_size),
    CFG.img_sizeのサイズにリサイズする。pはこの操作が適用される可能性を示し、p=1.0で全ての画像に適用される。
  • ToTensorV2(p=1.0) 画像データをPyTorchのテンソルに変換します。p=1.0はこの変換が全ての画像に対して適用されることを意味します。
def train_one_fold(CFG, val_fold, train_all, output_path):
    """Main"""
    torch.backends.cudnn.benchmark = True
    set_random_seed(CFG.seed, deterministic=CFG.deterministic)
    device = torch.device(CFG.device)
    
    train_path_label, val_path_label, _, _ = get_path_label(val_fold, train_all)
    train_transform, val_transform = get_transforms(CFG)
    
    train_dataset = HMSHBACSpecDataset(**train_path_label, transform=train_transform)
    val_dataset = HMSHBACSpecDataset(**val_path_label, transform=val_transform)
    
    train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=True, drop_last=True)
    val_loader = torch.utils.data.DataLoader(
        val_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=False, drop_last=False)
    
    model = HMSHBACSpecModel(
        model_name=CFG.model_name, pretrained=True, num_classes=6, in_channels=1)
    model.to(device)
    
    optimizer = optim.AdamW(params=model.parameters(), lr=CFG.lr, weight_decay=CFG.weight_decay)
    scheduler = lr_scheduler.OneCycleLR(
        optimizer=optimizer, epochs=CFG.max_epoch,
        pct_start=0.0, steps_per_epoch=len(train_loader),
        max_lr=CFG.lr, div_factor=25, final_div_factor=4.0e-01
    )
    
    loss_func = KLDivLossWithLogits()
    loss_func.to(device)
    loss_func_val = KLDivLossWithLogitsForVal()
    
    use_amp = CFG.enable_amp
    scaler = amp.GradScaler(enabled=use_amp)
    
    best_val_loss = 1.0e+09
    best_epoch = 0
    train_loss = 0
    
    for epoch in range(1, CFG.max_epoch + 1):
        epoch_start = time()
        model.train()
        for batch in train_loader:
            batch = to_device(batch, device)
            x, t = batch["data"], batch["target"]
                
            optimizer.zero_grad()
            with amp.autocast(use_amp):
                y = model(x)
                loss = loss_func(y, t)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
            scheduler.step()
            train_loss += loss.item()
            
        train_loss /= len(train_loader)
            
        model.eval()
        for batch in val_loader:
            x, t = batch["data"], batch["target"]
            x = to_device(x, device)
            with torch.no_grad(), amp.autocast(use_amp):
                y = model(x)
            y = y.detach().cpu().to(torch.float32)
            loss_func_val(y, t)
        val_loss = loss_func_val.compute()        
        if val_loss < best_val_loss:
            best_epoch = epoch
            best_val_loss = val_loss
            # print("save model")
            torch.save(model.state_dict(), str(output_path / f'snapshot_epoch_{epoch}.pth'))
        
        elapsed_time = time() - epoch_start
        print(
            f"[epoch {epoch}] train loss: {train_loss: .6f}, val loss: {val_loss: .6f}, elapsed_time: {elapsed_time: .3f}")
        
        if epoch - best_epoch > CFG.es_patience:
            print("Early Stopping!")
            break
            
        train_loss = 0
            
    return val_fold, best_epoch, best_val_loss
  • def train_one_fold(CFG, val_fold, train_all, output_path):
    foldを指定して学習を行い、最も良いepochとその評価を返す関数
  • torch.backends.cudnn.benchmark = True
    -Trueの場合、cuDNNに複数の畳み込みアルゴリズムをベンチマークさせ、最も高速なものを選択させる
  • set_random_seed(CFG.seed, deterministic=CFG.deterministic)
    -seed値設定関数の呼び出し
  • device = torch.device(CFG.device)
    -CFGで定義したデバイスを、pytorchの使用デバイスに設定
  • train_path_label, val_path_label, _, _ = get_path_label(val_fold, train_all)
    -訓練及び評価用の画像pathとラベルを含む辞書を取得
  • train_transform, val_transform = get_transforms(CFG)
    -データ拡張のオブジェクトを取得
  • train_dataset = HMSHBACSpecDataset(**train_path_label, transform=train_transform)
    -画像pathのリストと正解ラベルのリストを辞書で、データ拡張をオブジェクトで渡し、pytorchのdatasetを定義
  • train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=True, drop_last=True)
    -pytorchのdataloaderの定義。dataloaderは、datasetから効率的にデータを取得し、学習に使用するためのクラスです。
    • 引数
      batch_size: 1度のバッチで学習に使用されるデータの数
      num_wokers: メインの処理系から独立して並列に動くサブプロセスの数。値は2、もしくは物理コアの数に設定するのが良さそうです。
      shuffle: データのロード時にdatasetがシャッフルされる
      drop_last: Trueで、datasetのサイズがバッチサイズで割りきれない場合、最後のデータは切り捨てられる
  • model=HMSHBACSpecModel(model_name=CFG.model_name,pretrained=True, num_classes=6, in_channels=1)
    -CFGで定義したモデルをtimmから取得して、modelを定義。
    • 引数
      pretarined: Trueで事前学習済みモデルを使用
      num_classes: 分類先の数
      in_chans: 今回は強さが単一のスペクトログラムなのでチャネルは1
  • model.to(device)
    -取得したモデルをGPU環境にロード
  • optimizer = optim.AdamW(params=model.parameters(), lr=CFG.lr, weight_decay=CFG.weight_decay)
    -重みを更新する最適化関数を定義。
    • 引数
      params: 初期パラメータは取得したモデルのものを使用
      lr: 学習率
      weight_decay: 正則化項の強さを調整するパラメータ
  • scheduler = lr_scheduler.OneCycleLR(
    optimizer=optimizer, epochs=CFG.max_epoch,
    pct_start=0.0, steps_per_epoch=len(train_loader),
    max_lr=CFG.lr, div_factor=25, final_div_factor=4.0e-01
    )
    初期学習率と最終学習率を決定し、onecycleポリシーに従って指定されたエポック数に渡って学習率を上げ、その後徐々に下げることで、局所最適解に陥るリスクを低減するスケジューラーを定義。この関数はpytorchより提供されている。
    • 引数
      optimizer: 最適化関数
      epochs: 学習の全エポック数
      pct_start: 学習の初期フェーズ(学習率が増加するフェーズ)が全体の何割を占めるか。(通常は0.3などが指定される)
      steps_per_epoch: 1エポックで何度バッチ学習が行われるか。1エポックあたりのステップ(バッチ)数。訓練用データセットのサイズをバッチサイズで割った値に相当。
      max_lr: 学習率の最大値
      div_factor: 初期学習率を最大学習率の何分の1とするかを指定。max_lr / div_factorが初期学習率となる。
      final_div_factor: 最終的な学習率を最大学習率の何分の1とするかを指定。サイクルの最後には max_lr / final_div_factor が最終学習率となる。
  • loss_func = KLDivLossWithLogits()
    -損失関数を定義
  • loss_func.to(device)
    -損失関数をGPU環境にロード
  • loss_func_val = KLDivLossWithLogitsForVal()
    -評価用の損失関数を定義
  • use_amp = CFG.enable_amp
    -自動混合精度の設定を取得
  • scaler = amp.GradScaler(enabled=use_amp)
    -GPUの自動混合制度の設定を管理するオブジェクトを定義。pytorchより提供
  • best_val_loss = 1.0e+09
    -大きな値を、損失の初期値に設定

以下学習

  • for epoch in range(1, CFG.max_epoch + 1):
    -max_epochの回数学習を繰り返す
  • epoch_start = time()
    -時間を計測
  • model.train()
    -モデルの学習中であることをモデルに伝える。これはDropoutやBatchNormのような、学習時と評価時で異なる動作をするように設計されたレイヤーに情報を与えるのに有用である。例えば、学習時には、BatchNormは新しいバッチごとに移動平均を更新するが、評価モードではこれらの更新は行われない。
  • for batch in train_loader:
    -dataloaderをイテレートすると、バッチサイズ分の訓練用データと正解データが得られる
  • batch = to_device(batch, device)
    -取得したバッチデータをGPU環境にロード
  • x, t = batch["data"], batch["target"]
    -バッチ(辞書)から訓練データと正解ラベルを取得。
    -形状は[32, 1, 512, 512](32枚の画像データ)と、[32, 6](32個のクラス分類分布(正解データ))
  • optimizer.zero_grad()
    -最適化関数の保持する重みの勾配情報を0にする
  • with amp.autocast(use_amp):
    -この内部の計算は、精度を保持しながら浮動小数点が自動で調整される
  • y = model(x)
    -バッチまとめて予測を行う
  • loss = loss_func(y, t)
    -予測結果と、正解ラベルを比較して損失を計算
  • scaler.scale(loss).backward()
    -逆伝播を行い、重みの勾配を求める(scalerによって浮動小数点が調整されている)
  • scaler.step(optimizer)
    -最適化関数によって重みの更新を行う
  • scaler.update()
    -GradScalerのスケールを更新。スケールの係数を調整し、次のイテレーションでの勾配のオーバーフローやアンダーフローのリスクを最小化する
    -scheduler.step()
    -学習率スケジューラーを更新。定義されたポリシーに従ってエポックごとに学習率を調整する。OneCycleLRやCosineAnnealingLRなど、さまざまなスケジューリング戦略が存在する。
  • train_loss += loss.item()
    -エポック全体の損失を累積
  • train_loss /= len(train_loader)
    -エポック全体の予測における平均損失を取得

以下評価

  • model.eval()
    -モデルを評価モードに変更
  • with torch.no_grad(), amp.autocast(use_amp):
    -このスコープ内の計算は、勾配情報を保持せず、浮動小数点が自動で調整される
  • y = y.detach().cpu().to(torch.float32)
    出力結果の計算グラフを切り(これ以上勾配を計算しない)、CPU環境にロード
  • loss_func_val(y, t)
    評価用の損失関数で1エポック分損失を累積
  • val_loss = loss_func_val.compute()
    累積した損失をまとめて計算
  • if val_loss < best_val_loss: ...
    エポックの損失が過去のものより小さい場合、エポック数と損失、モデルを保存
  • torch.save(model.state_dict(), str(output_path / f'snapshot_epoch_{epoch}.pth'))
    モデルの保存(モデル, 保存先path)
  • elapsed_time = time() - epoch_start
    学習の経過時間を計算
  • if epoch - best_epoch > CFG.es_patience:
    指定のエポック数、スコアが更新されなかった場合は早期終了
  • train_loss = 0
    学習時の累積損失を初期化
  • return val_fold, best_epoch, best_val_loss
    評価に使用したfold、最高スコアのエポック、最高スコアの損失を返す

8. モデルの学習

score_list = []
for fold_id in FOLDS:
    output_path = Path(f"fold{fold_id}")
    output_path.mkdir(exist_ok=True)
    print(f"[fold{fold_id}]")
    score_list.append(train_one_fold(CFG, fold_id, train, output_path))
  • output_path = Path(f"fold{fold_id}")
    出力が保存されるディレクトリのpath
  • score_list.append(train_one_fold(CFG, fold_id, train, output_path))
    fold毎に、評価に使用したfold、最高スコアのエポック、最高スコアの損失のタプル をリストに追加

9. 学習結果(Out Of Fold)

print(score_list)
# [(0, 3, 0.7446164488792419), (1, 6, 0.703400194644928), (2, 5, 0.7020435333251953), (3, 5, 0.7091140747070312), (4, 4, 0.7230966091156006)]

評価に使用したfold、最高スコアのエポック、最高スコアの損失 を確認

best_log_list = []
for (fold_id, best_epoch, _) in score_list:
    
    exp_dir_path = Path(f"fold{fold_id}")
    best_model_path = exp_dir_path / f"snapshot_epoch_{best_epoch}.pth"
    copy_to = f"./best_model_fold{fold_id}.pth"
    shutil.copy(best_model_path, copy_to)
    
    for p in exp_dir_path.glob("*.pth"):
        p.unlink()

出力の整理を行います

  • for (fold_id, best_epoch, _) in score_list:
    最終出力からfold番号とベストエポックを取得
  • exp_dir_path = Path(f"fold{fold_id}")
    データが出力されているディレクトリのpath
  • best_model_path = exp_dir_path / f"snapshot_epoch_{best_epoch}.pth"
    ベストエポックのモデルのpath
  • copy_to = f"./best_model_fold{fold_id}.pth"
    ベストエポックのモデルを移動させるディレクトリ
  • shutil.copy(best_model_path, copy_to)
    ベストエポックのモデルを移動
  • for p in exp_dir_path.glob("*.pth"): \ p.unlink()
    出力されていたモデルを全て削除
def run_inference_loop(model, loader, device):
    model.to(device)
    model.eval()
    pred_list = []
    with torch.no_grad():
        for batch in tqdm(loader):
            x = to_device(batch["data"], device)
            y = model(x)
            pred_list.append(y.softmax(dim=1).detach().cpu().numpy())
        
    pred_arr = np.concatenate(pred_list)
    del pred_list
    return pred_arr

実際のtestデータに対して推論を行う関数を定義しています。

  • run_inference_loop(model, loader, device):
    モデル、データローダー、デバイスを受け取って予測結果の配列を返す関数
  • model.to(device)
    モデルをGPU環境にロード
  • model.eval()
    モデルを評価モードに変更
  • with torch.no_grad():
    スコープ内では勾配を計算しない
  • for batch in tqdm(loader):
    データローダーからバッチデータを取得
  • x = to_device(batch["data"], device)
    バッチから取得した学習用dataのテンソルをGPU環境にロード
  • y = model(x)
    モデルで予測
  • pred_list.append(y.softmax(dim=1).detach().cpu().numpy())
    予測結果の行方向にsoftmax関数を適用して、勾配計算を切り、CPU環境にロード、numpyに変更した予測の分布をpred_listに追加
  • pred_arr = np.concatenate(pred_list)
    二重配列のpred_listを縦に結合し、二次元配列(dataframeのような形状)に変更
  • return pred_arr
    予測結果の二次元配列を返す
label_arr = train[CLASSES].values
oof_pred_arr = np.zeros((len(train), N_CLASSES))
score_list = []

for fold_id in range(N_FOLDS):
    print(f"\n[fold {fold_id}]")
    device = torch.device(CFG.device)

    # # get_dataloader
    _, val_path_label, _, val_idx = get_path_label(fold_id, train)
    _, val_transform = get_transforms(CFG)
    val_dataset = HMSHBACSpecDataset(**val_path_label, transform=val_transform)
    val_loader = torch.utils.data.DataLoader(
        val_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=False, drop_last=False)
    
    # # get model
    model_path = f"./best_model_fold{fold_id}.pth"
    model = HMSHBACSpecModel(
        model_name=CFG.model_name, pretrained=False, num_classes=6, in_channels=1)
    model.load_state_dict(torch.load(model_path, map_location=device))
    
    # # inference
    val_pred = run_inference_loop(model, val_loader, device)
    oof_pred_arr[val_idx] = val_pred
    
    del val_idx, val_path_label
    del model, val_loader
    torch.cuda.empty_cache()
    gc.collect()
  • label_arr = train[CLASSES].values
    分類先クラスの列のみを取得
  • oof_pred_arr = np.zeros((len(train), N_CLASSES))
    予測値を格納する(データ列数, 分類先クラス数)の0配列
  • for fold_id in range(N_FOLDS):
    foldの数だけ繰り返し
  • device = torch.device(CFG.device)
    デバイスを定義

以下dataset, dataloader定義

  • _, val_path_label, _, val_idx = get_path_label(fold_id, train)
    評価用データの、画像pathと正解ラベルの辞書、及びそのインデックス(numpy配列)を取得
  • _, val_transform = get_transforms(CFG)
    評価用データ用のデータ拡張オブジェクトを取得
  • val_dataset = HMSHBACSpecDataset(**val_path_label, transform=val_transform)
    評価用データセットを定義
  • val_loader = torch.utils.data.DataLoader(val_dataset,batch_size=CFG.batch_size, num_workers=4, shuffle=False, drop_last=False)
    評価用データローダーを定義。datasetはdataloaderに内包される。

以下model定義

  • model_path = f"./best_model_fold{fold_id}.pth"
    fold毎に最も性能の良いモデルのpathを取得
  • model = HMSHBACSpecModel(model_name=CFG.model_name, pretrained=False, num_classes=6,in_channels=1)
    モデルをtimmから読み込んで定義
  • model.load_state_dict(torch.load(model_path, map_location=device))
    最も性能の良いモデルの重みをロードして上書き

以下推論

  • val_pred = run_inference_loop(model, val_loader, device)
    dataloaderに内包されたdatasetに対して、modelを使用して推論を行い、データセットの各行のデータに対応する分類先クラスの分布を二次元配列として受け取る

  • oof_pred_arr[val_idx] = val_pred
    推論用データのindex配列(val_idx)の行に、対応する推論結果を格納

  • del model, val_loader, torch.cuda.empty_cache(), gc.collect()
    リソースを解放し、GPUメモリとPythonのガベージコレクションを解放する

import sys
sys.path.append('/kaggle/input/kaggle-kl-div')
from kaggle_kl_div import score

true = train[["label_id"] + CLASSES].copy()

oof = pd.DataFrame(oof_pred_arr, columns=CLASSES)
oof.insert(0, "label_id", train["label_id"])

cv_score = score(solution=true, submission=oof, row_id_column_name='label_id')
print('CV Score KL-Div for ResNet34d',cv_score)
# CV Score KL-Div for ResNet34d 0.715760646315781

KLダイバージェンスを計算して出力します。

  • sys.path.append('/kaggle/input/kaggle-kl-div'):
    pythonのsysモジュールのpathリストに新しいパスを追加。この操作により、カスタムモジュールやライブラリが含まれているディレクトリをインポートパスに追加し、その後のインポートで使用できるようなる
  • from kaggle_kl_div import score
    kaggleから取得したライブラリをインポート
  • true = train[["label_id"] + CLASSES].copy()
    各行のidと正しいラベルを含むdataframeを作成
  • oof = pd.DataFrame(oof_pred_arr, columns=CLASSES)
    予測結果を格納しているNumPy配列oof_pred_arrから、クラス名を列名とするDataFrame oofを作成
  • oof.insert(0, "label_id", train["label_id"])
    作成したoofの最初の列に"label_id"列を追加。これにより、各行がどのデータポイントに対応するかを識別できるようになる
  • cv_score = score(solution=true, submission=oof, row_id_column_name='label_id')
    予測結果と真の正解ラベルの分布を比較してKLDivスコアを求める

コード解説 推論編

1. 初期設定

import sys
import os
import gc
import copy
import yaml
import random
import shutil
from time import time
import typing as tp
from pathlib import Path

import numpy as np
import pandas as pd

from tqdm.notebook import tqdm
from sklearn.model_selection import StratifiedGroupKFold

import torch
from torch import nn
from torch import optim
from torch.optim import lr_scheduler
from torch.cuda import amp

import timm

import albumentations as A
from albumentations.pytorch import ToTensorV2

os.environ["CUDA_VISIBLE_DEVICES"] = "0"
ROOT = Path.cwd().parent
INPUT = ROOT / "input"
OUTPUT = ROOT / "output"
SRC = ROOT / "src"

DATA = INPUT / "hms-harmful-brain-activity-classification"
TRAIN_SPEC = DATA / "train_spectrograms"
TEST_SPEC = DATA / "test_spectrograms"

TRAINED_MODEL = INPUT / "hms-hbac-resnet34d-baseline-exp02"

TMP = ROOT / "tmp"
TRAIN_SPEC_SPLIT = TMP / "train_spectrograms_split"
TEST_SPEC_SPLIT = TMP / "test_spectrograms_split"
TMP.mkdir(exist_ok=True)
TRAIN_SPEC_SPLIT.mkdir(exist_ok=True)
TEST_SPEC_SPLIT.mkdir(exist_ok=True)


RANDAM_SEED = 1086
CLASSES = ["seizure_vote", "lpd_vote", "gpd_vote", "lrda_vote", "grda_vote", "other_vote"]
N_CLASSES = len(CLASSES)
FOLDS = [0, 1, 2, 3, 4]
N_FOLDS = len(FOLDS)

2. データの読み込み

test = pd.read_csv(DATA / "test.csv")
test.head()
#   spectrogram_id	eeg_id	patient_id
# 0	853520	3911565283	6885

データの読み込みと確認を行います。リークを防ぐため、ここでは一行しか表示されませんが、ファイルの提出時は自動的に正しいtest.csvに置き換えられて採点されます。

for spec_id in test["spectrogram_id"]:
    spec = pd.read_parquet(TEST_SPEC / f"{spec_id}.parquet")
    
    spec_arr = spec.fillna(0).values[:, 1:].T.astype("float32")  # (Hz, Time) = (400, 300)
    
    np.save(TEST_SPEC_SPLIT / f"{spec_id}.npy", spec_arr)
  • for spec_id in test["spectrogram_id"]:
    spectrogram_idの数だけ繰り返し
  • spec = pd.read_parquet(TEST_SPEC / f"{spec_id}.parquet")
    parquetファイルをdataframeとして読み込み
  • spec_arr = spec.fillna(0).values[:, 1:].T.astype("float32") # (Hz, Time) = (400, 300)
    欠損値を0埋め、式別情報などの可能性がある最初の列を落とす、転置して周波数を縦、時間を横軸にする

3. モデルの定義

class HMSHBACSpecModel(nn.Module):

    def __init__(
            self,
            model_name: str,
            pretrained: bool,
            in_channels: int,
            num_classes: int,
        ):
        super().__init__()
        self.model = timm.create_model(
            model_name=model_name, pretrained=pretrained,
            num_classes=num_classes, in_chans=in_channels)

    def forward(self, x):
        h = self.model(x)      

        return h

4. dataset定義

FilePath = tp.Union[str, Path]
Label = tp.Union[int, float, np.ndarray]

class HMSHBACSpecDataset(torch.utils.data.Dataset):

    def __init__(
        self,
        image_paths: tp.Sequence[FilePath],
        labels: tp.Sequence[Label],
        transform: A.Compose,
    ):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, index: int):
        img_path = self.image_paths[index]
        label = self.labels[index]

        img = np.load(img_path)  # shape: (Hz, Time) = (400, 300)
        
        # log transform
        img = np.clip(img,np.exp(-4), np.exp(8))
        img = np.log(img)
        
        # normalize per image
        eps = 1e-6
        img_mean = img.mean(axis=(0, 1))
        img = img - img_mean
        img_std = img.std(axis=(0, 1))
        img = img / (img_std + eps)

        img = img[..., None] # shape: (Hz, Time) -> (Hz, Time, Channel)
        img = self._apply_transform(img)

        return {"data": img, "target": label}

    def _apply_transform(self, img: np.ndarray):
        """apply transform to image and mask"""
        transformed = self.transform(image=img)
        img = transformed["image"]
        return img

5. 学習設定

class CFG:
    model_name = "resnet34d"
    img_size = 512
    max_epoch = 9
    batch_size = 32
    lr = 1.0e-03
    weight_decay = 1.0e-02
    es_patience =  5
    seed = 1086
    deterministic = True
    enable_amp = True
    device = "cuda"

6. 推論用関数

def to_device(
    tensors: tp.Union[tp.Tuple[torch.Tensor], tp.Dict[str, torch.Tensor]],
    device: torch.device, *args, **kwargs
):
    if isinstance(tensors, tuple):
        return (t.to(device, *args, **kwargs) for t in tensors)
    elif isinstance(tensors, dict):
        return {
            k: t.to(device, *args, **kwargs) for k, t in tensors.items()}
    else:
        return tensors.to(device, *args, **kwargs)

    
def get_test_path_label(test: pd.DataFrame):
    """Get file path and dummy target info."""
    
    img_paths = []
    labels = np.full((len(test), 6), -1, dtype="float32")
    for spec_id in test["spectrogram_id"].values:
        img_path = TEST_SPEC_SPLIT / f"{spec_id}.npy"
        img_paths.append(img_path)
        
    test_data = {
        "image_paths": img_paths,
        "labels": [l for l in labels]}
    
    return test_data

def get_test_transforms(CFG):
    test_transform = A.Compose([
        A.Resize(p=1.0, height=CFG.img_size, width=CFG.img_size),
        ToTensorV2(p=1.0)
    ])
    return test_transform

get_test_path_label関数が学習時から変更されています

  • to_device():
    tensorとdeviceを受け取り、deviceにロードしたtensorを返す
  • get_test_path_label():
    testデータのdataframeを受け取り、辞書{"image_paths":イメージパスを含むリスト, "labels": 分布格納用の初期化されたnumpy配列を含むリスト}を返す
  • labels = np.full((len(test), 6), -1, dtype="float32")
    形状(testデータの長さ, 6)で値が全て1のnumpy配列。正解データの分布を可能する配列を定義
  • get_test_transforms():
    推論用のデータ拡張オブジェクトを返す
def run_inference_loop(model, loader, device):
    model.to(device)
    model.eval()
    pred_list = []
    with torch.no_grad():
        for batch in tqdm(loader):
            x = to_device(batch["data"], device)
            y = model(x)
            pred_list.append(y.softmax(dim=1).detach().cpu().numpy())
        
    pred_arr = np.concatenate(pred_list)
    del pred_list
    return pred_arr
  • run_inference_loop()
    model, loader, deviceを受け取り、loaderに含まれるdatasetをdevice上のmodelで推論し、結果をnumpyの二次元配列で返す
test_preds_arr = np.zeros((N_FOLDS, len(test), N_CLASSES))

test_path_label = get_test_path_label(test)
test_transform = get_test_transforms(CFG)
test_dataset = HMSHBACSpecDataset(**test_path_label, transform=test_transform)
test_loader = torch.utils.data.DataLoader(
    test_dataset, batch_size=CFG.batch_size, num_workers=4, shuffle=False, drop_last=False)

device = torch.device(CFG.device)

for fold_id in range(N_FOLDS):
    print(f"\n[fold {fold_id}]")
    
    # # get model
    model_path = TRAINED_MODEL / f"best_model_fold{fold_id}.pth"
    model = HMSHBACSpecModel(
        model_name=CFG.model_name, pretrained=False, num_classes=6, in_channels=1)
    model.load_state_dict(torch.load(model_path, map_location=device))
    
    # # inference
    test_pred = run_inference_loop(model, test_loader, device)
    test_preds_arr[fold_id] = test_pred
    
    del model
    torch.cuda.empty_cache()
    gc.collect()

学習編で学習したモデルの重みをロードして推論を行います。

  • test_preds_arr = np.zeros((N_FOLDS, len(test), N_CLASSES))
    推論結果を保持する三次元配列(fold数、データ数、分類先クラス数)

以下モデルのロード

  • model_path = TRAINED_MODEL / f"best_model_fold{fold_id}.pth"
    学習したモデルの保存先
  • model = HMSHBACSpecModel(model_name=CFG.model_name, pretrained=False, num_classes=6, in_channels=1)
    timmからモデルを定義
  • model.load_state_dict(torch.load(model_path, map_location=device))
    保存先からモデルの重みをロード

以下推論

  • test_pred = run_inference_loop(model, test_loader, device)
    指定のモデル、ローダー(dataset)、デバイスで学習を行う

  • test_preds_arr[fold_id] = test_pred
    予測結果の二次元配列を三次元配列にまとめて保存

  • del model, torch.cuda.empty_cache(), gc.collect()
    ガベージコレクションの解放

7. 提出用ファイルの作成

test_pred = test_preds_arr.mean(axis=0)

test_pred_df = pd.DataFrame(
    test_pred, columns=CLASSES
)

test_pred_df = pd.concat([test[["eeg_id"]], test_pred_df], axis=1)

提出用ファイルを作成します。

  • test_pred = test_preds_arr.mean(axis=0)
    foldの次元で平均を取り、5foldの結果を平均する。(三次元配列→二次元配列)
  • test_pred_df = pd.DataFrame(test_pred, columns=CLASSES)
    numpyの二次元配列をdataframeに変更して列名を追加
  • test_pred_df = pd.concat([test[["eeg_id"]], test_pred_df], axis=1)
    列方向にtestデータの"eeg_id"の列を追加
smpl_sub = pd.read_csv(DATA / "sample_submission.csv")

sub = pd.merge(
    smpl_sub[["eeg_id"]], test_pred_df, on="eeg_id", how="left")

sub.to_csv("submission.csv", index=False)

sub.head()
#   eeg_id	seizure_vote	lpd_vote	gpd_vote	lrda_vote	grda_vote	other_vote
# 0	3911565283	0.020114	0.113317	0.000733	0.307661	0.01935	0.538826

提出用ファイルを作成し、データを確認しています。

  • smpl_sub = pd.read_csv(DATA / "sample_submission.csv")
    提供されているsample_submission.csvのデータ保存先
  • sub = pd.merge(smpl_sub[["eeg_id"]], test_pred_df, on="eeg_id", how="left")
    sample_submissionの持つeeg_idの列と推論結果をマージ(互換性のある行のみが残る)

解説は以上です。

今回のコンペでは、出力にはsubmission.csvという名前のファイルが必要であり(評価対象)、提出後にコードは正しいtestデータにより再実行され、そのデータで推論が行われます。
評価対象はcsvファイルですが、実質コードコンペティションとも言えます。

Discussion