♻️

Pytorchによるテーブルデータのmixup

2023/12/22に公開

こんにちは、tonic(@tonic3561)です。この記事はマケデコ Advent Calendar 2023の22日目への寄稿です。今年こそは何かアウトプットしたいと思っていたので、参加することができてとてもうれしいです。

はじめに

いもすさん(@imos)がマケデコのAMAで金融データのmixupはいいぞ、とおっしゃっていたので、ディープラーニングのPythonライブラリであるPytorchを使って実装してみました。いもすさんがおっしゃる通り、結構いい感じかもしれないです。
本記事では、PytorchのDatasetを用いて実装を行います。Pytorchの基礎的な知識(MNISTを解く簡単なCNNを組める程度)があれば読みやすいと思いますが、多くの方に読んでいただけるよう、できる限り詳細に解説しています。
なお、本記事では実装方法のみを取り扱っており、実データでの検証は行っていません。いずれJ-Quants APIを利用した検証結果等も共有できればと思っています。また、実装を通してPytorchのtensorの扱いについて理解が深まったので、そういった意味でも取り組んでみる価値があると感じました。みなさまもぜひトライしてみてください。

mixupとは?

mixupはデータ拡張手法の一つで、画像認識の分野でよく使われます。学習データからランダムに2つのサンプルを取り出し、特徴量(画像)とラベルを一定の割合で混ぜ合わせることで、新たなサンプルを生成します。具体的には、抽出した教師ありサンプル (X_1, y_1), (X_2, y_2) とある定数 \alpha \in (0, 1) に対し、

X = \alpha X_1 + (1 - \alpha)X_2, \space\space y = \alpha y_1 + (1 - \alpha)y_2

を満たす新たなサンプルを生成します。
mixupは単純な手法であり、テーブルデータにも柔軟に適用することができます。特に金融データの場合はサンプル数の制限に悩まされることが多いため、汎化性の向上が期待できます。ただし、Pytorchの実装済みのmixupライブラリは画像分類用のものしかないようです(もし汎用な実装があれば教えてください)。そこで本記事では、テーブルデータにおけるmixupをPytorchのDatasetを用いて一から実装していきます。

Pytorchにおける基本的なDataset

以下では、1行が1サンプルに対応するような、基本的なテーブルデータを扱います。また、タスクは回帰とし、特徴量もすべて数値データとします。日足のOHLCVから翌日のリターンを数値で予測するような問題を想像してもらえばよいと思います。
Pytorchでは、データをミニバッチごとに抽出するDatasetとDataLoaderを実装し、逐次的にデータを読み込みながら学習を行います。まず、上記のような簡単な構造を持つpandasのDataFrameから、最も基本的なDatasetを実装してみましょう。torch.utils.data.Datasetクラスを継承し、データのサンプル数を示す__len__メソッドと、与えられたインデックスから単一のサンプルを返す__getitem__メソッドを実装することで、自前のDatasetを作ることができます。先に全体像を示します。

import pandas as pd
import torch

class MyDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        # ラベル、特徴量列を含むDataFrame
        df: pd.DataFrame,
        # targetとして使用する列名
        target_column="y",
    ):
        # おまじない
        super().__init__()
        # targetと特徴量をnumpy配列で保持
        self.features = df.drop(columns=[target_column]).values
        self.targets = df[[target_column]].values
        # gpuが使える場合はデバイスに設定
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    def __len__(self):
        # len()で返されるDatasetの長さを定義
        return len(self.features)

    def __getitem__(self, idx):
        # batch生成時に指定されるindexに対し、データを取得してtorch.tensorに変換
        features = torch.tensor(self.features[idx, ...], dtype=torch.float)
        # 並列処理のためにnon_blocking=Trueを指定し、デバイスに転送
        features = features.to(self.device, non_blocking=True)
        
        target = torch.tensor(self.targets[idx, ...], dtype=torch.float)
        target = target.to(self.device, non_blocking=True)
        
        return features, target

__init__では、複数の特徴量およびラベルが格納されたDataFrameを受け取り、Datasetのセットアップを行います。バッチ生成時の処理速度を高めるため、特徴量とラベルをnumpy配列で保持します。また、利用可能なデバイスを判定しておきます。

    def __init__(
        self,
        # ラベル、特徴量列を含むDataFrame
        df: pd.DataFrame,
        # targetとして使用する列名
        target_column="y",
    ):
        # おまじない
        super().__init__()
        # targetと特徴量をnumpy配列で保持
        self.features = df.drop(columns=[target_column]).values
        self.targets = df[[target_column]].values
        # gpuが使える場合はデバイスに設定
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

続いて__len__メソッドでは、データのサンプル数を返すように実装を行います。今回の例では行数がそのままサンプル数ですので、特徴量行列の長さを返せばよいです。

    def __len__(self):
        # len()で返されるDatasetの長さを定義
        return len(self.features)

最後に__getitem__メソッドでは、ランダムに抽出されるインデックスを受け取り、該当する単一のサンプル(特徴量群とラベルのtuple)を返すように実装を行います。今回の例では、そのまま該当するインデックスのサンプルを取り出し、torch.tensorに変換した後、GPU等のデバイスに転送するだけでOKです。なお、torch.tensorは、Pytorchで扱う行列のオブジェクトであり、Pytorch用のnumpy配列みたいなもの、と捉えていただければ問題ありません。

   def __getitem__(self, idx):
        # batch生成時に指定されるindexに対し、データを取得してtorch.tensorに変換
        features = torch.tensor(self.features[idx, ...], dtype=torch.float)
        # 並列処理のためにnon_blocking=Trueを指定し、デバイスに転送
        features = features.to(self.device, non_blocking=True)
        
        target = torch.tensor(self.targets[idx, ...], dtype=torch.float)
        target = target.to(self.device, non_blocking=True)
        
        return features, target

これで手元のデータをPytorchで扱うためのDatasetが完成しました。実際にDatasetからミニバッチごとにデータを読み込むには、torch.utils.data.DataLoaderにDatasetを渡し、DataLoaderオブジェクトを作成します。DataLoaderオブジェクトからは、forループで各ミニバッチを取得することができます。

def get_data_loader(ds, batch_size=128, shuffle=True):
    return torch.utils.data.DataLoader(
        # Dataset
        ds,
        # バッチサイズ
        batch_size=batch_size,
        # データをランダムな順番で抽出するかどうか
        shuffle=shuffle,
    )

if __name__ == '__main__':    
    df = pd.read_csv("hoge.csv")
    ds = MyDataset(df)
    dl = get_data_loader(ds)
    
    for i, (features, targets) in enumerate(dl):
        print(i, features.shape, targets.shape)

mixupの実装

さて、ここからはいよいよ本題のmixupの実装に取り掛かっていきます。

実装の方針

ここまででミニバッチを逐次的に生成するDatasetの基本形が完成しました。ここからは、このDatasetにmixupを行う機能を追加していきます。
一般的なmixupでは、イテレーションごとにミニバッチ内のデータからサンプリングを行い、逐次的にデータを生成します。また、PytorchのDataLoaderでは、ミニバッチを抽出した後に任意の処理を行う関数をcollete_fn引数で指定することができます。本記事ではこの機能を利用して実装を行っていきます。
ミニバッチごとに逐次的にデータを生成するとは言っても、特定のミニバッチ内での処理は行列演算で効率的に行いたいです。そこで、以下の手順を踏むことで、行列演算のみを用いてmixupを実装します。

  1. ミニバッチ内の全データを表す(バッチサイズ, 特徴量)の形状を持つ行列(tensor)をバッチ方向にシャッフルして複製する
  2. mixupの適用割合を行(サンプル)ごとにランダムに抽出したベクトルを生成する(p_1
  3. p_2 = 1 - p_1 をみたす p_2 を求める
  4. 「3」の2つのベクトルを(バッチサイズ, 特徴量)の行列に(同じ行の特徴量に同じ値が適用されるように)引き延ばす
  5. 元のミニバッチ行列と「1」で生成したミニバッチ行列に、「4」の2つの行列をそれぞれ要素ごとに掛け合わせる
  6. 「5」の2つのミニバッチ行列を要素ごとに足し合わせる

なお、ラベルについては、「1」の要領で同じように複製を行い、「3」までで求めた p_1, p_2 をそのまま適用すればよいです。
少し難しいかもしれませんが、行列演算的な頭の使い方の良いトレーニングになるので、頑張って実装していきましょう!

いざ実装

前節までで実装したMyDatasetを拡張していきましょう。まず、バッチ内でmixupを適用する行の割合を、Datasetの初期化時に指定できるようにします。すべてのデータを混合してしまうと実際のデータを学習できなくなりますので、少しは生データも残しておきたいという気持ちです。MyDataset.__init__メソッドに引数を追加しておきましょう。

class MyDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        df: pd.DataFrame,
        mixup_rate=0.5,
        ...
    ):
        ...
        self.mixup_rate = mixup_rate

次に、抽出したミニバッチに適用する関数collete_fnの本体をMyDatasetのメソッドとして実装していきます。この関数には、Dataset.__getitem__の返り値をバッチサイズ分だけ格納したリストが渡されますので、これを組み込み関数のlist, zip関数とtorch.stack関数でtensorの形に復元し、mixupを適用して返します。

    def collate_fn(self, batch):
        # batchはlist[(features, targets), ...]の形で渡される
        # これを(batch_size, feature_dim), (batch_size, target_dim(=1))のtensorに変換
        ## 特徴量、ラベルそれぞれのリストに分解
        features, targets = list(zip(*batch))
        ## リストを結合してtorch.tensorに変換
        features = torch.stack(features)
        targets = torch.stack(targets)
        # mixupを適用(未実装)
        features, targets = self.mixup(features, targets)
        return features, targets

さて、いよいよ本題のmixupメソッドの実装に取り掛かりましょう。先ほど述べた実装の方針に従い、行列演算によってmixupを行います。文章で説明するよりコードを読んだ方がわかりやすいと思うので、ここではコメントで随時解説する形を取ります。もし途中で詰まってしまった場合は、実際にそこまでのコードを動かし、各tensorの形状やメソッドの挙動を確認してみると、理解が深まると思います。

    def mixup(self, features, targets):
        # バッチサイズと特徴量次元を取得
        batch_size, feature_dim = features.shape
        # batch方向にindexをシャッフル
        ## torch.randperm…0からbatch_size-1までの整数をランダムに並べた1次元のtensorを返す
        shuffled_indices = torch.randperm(batch_size)
        # batch内でシャッフルされた特徴量、targetを複製
        shuffled_features = features[shuffled_indices]
        shuffled_targets = targets[shuffled_indices]
        # 各行をmixする係数をランダムに生成
        ## torch.rand…0から1までの乱数を指定した形状で生成する
        ## 生成したtensorはデバイスに転送する
        p_1 = torch.rand(size=(batch_size,)).to(self.device)
        # mixupを適用しない行をランダムに抽出する
        # p_1に対し、(1 - self.mixup_rate)の確率で要素が0となる{0, 1}のmaskを生成
        ## 適用割合が0ということはmixupを適用しないということ
        ### torch.full…指定した値で一様に埋められたtensorを生成する
        mask = torch.full(size=(batch_size,), fill_value=self.mixup_rate)
        ### torch.bernoulli…tensorの各要素に指定された確率で1、それ以外で0を生成する
        mask = torch.bernoulli(mask).to(self.device)
        # maskを適用し、2つのデータのmixupの係数を計算
        p_1 = p_1 * mask
        p_2 = 1 - p_1
        # p_1, p_2を特徴量方向の次元に拡張(同じ行の特徴量は同じ係数になるように)
        ## tensor.unsqueeze(1)…特徴量方向の次元を追加((batch_size,) -> (batch_size, 1))
        p_1 = p_1.unsqueeze(1)
        p_2 = p_2.unsqueeze(1)
        ## tensor.repeat(a, b)…1次元目をa回、2次元目をb回繰り返す
        ## → batch方向には繰り返しを行わず、特徴量方向に同じ適用係数のベクトルを並べる
        feature_p_1 = p_1.repeat(1, feature_dim)
        feature_p_2 = p_2.repeat(1, feature_dim)
        # mixupを適用
        features = feature_p_1 * features + feature_p_2 * shuffled_features
        # targetは1次元なので、特徴量方向の次元の拡張は不要
        targets = p_1 * targets + p_2 * shuffled_targets

        return features, targets

これでmixupを適用する準備が整いました。ミニバッチごとにmixupを適用するために、DataLoaderに実装した関数を渡してあげましょう。

def get_data_loader(ds, batch_size=128, shuffle=True):
    return torch.utils.data.DataLoader(
        ds,
        batch_size=batch_size,
        shuffle=shuffle,
        # 実装した関数をcollete_fn引数に渡す
        collate_fn=ds.collate_fn,
    )

コードの全体像

やっと完成しましたね!ここまで読んでくださった方々、お疲れさまでした。実装したコードの全体像をまとめておきましょう。

import pandas as pd
import torch

class MyDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        # target、特徴量列を含むDataFrame
        df: pd.DataFrame,
        # mixupの適用割合
        mixup_rate=0.5,
        # targetとして使用する列名
        target_column="y",
    ):
        # おまじない
        super().__init__()
        # targetと特徴量をnumpy配列で保持
        self.features = df.drop(columns=[target_column]).values
        self.targets = df[[target_column]].values
        # gpuが使える場合はデバイスに設定
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        # mixupの適用割合を保持
        self.mixup_rate = mixup_rate
        
    def __len__(self):
        # len()で返されるDatasetの長さを定義
        return len(self.features)

    def __getitem__(self, idx):
        # batch生成時に指定されるindexに対し、データを取得してtorch.tensorに変換
        features = torch.tensor(self.features[idx, ...], dtype=torch.float)
        # 並列処理のためにnon_blocking=Trueを指定し、デバイスに転送
        features = features.to(self.device, non_blocking=True)
        
        target = torch.tensor(self.targets[idx, ...], dtype=torch.float)
        target = target.to(self.device, non_blocking=True)
        
        return features, target

    def collate_fn(self, batch):
        # batchはlist[(features, targets), ...]の形で渡される
        # これを(batch_size, feature_dim), (batch_size, target_dim(=1))のtensorに変換
        ## 特徴量、ラベルそれぞれのリストに分解
        features, targets = list(zip(*batch))
        ## リストを結合してtorch.tensorに変換
        features = torch.stack(features)
        targets = torch.stack(targets)
        # mixupを適用
        features, targets = self.mixup(features, targets)
        return features, targets

    def mixup(self, features, targets):
        # バッチサイズと特徴量次元を取得
        batch_size, feature_dim = features.shape
        # batch方向にindexをシャッフル
        ## torch.randperm…0からbatch_size-1までの整数をランダムに並べた1次元のtensorを返す
        shuffled_indices = torch.randperm(batch_size)
        # batch内でシャッフルされた特徴量、targetを複製
        shuffled_features = features[shuffled_indices]
        shuffled_targets = targets[shuffled_indices]
        # 各行をmixする係数をランダムに生成
        ## torch.rand…0から1までの乱数を指定した形状で生成する
        ## 生成したtensorはデバイスに転送する
        p_1 = torch.rand(size=(batch_size,)).to(self.device)
        # mixupを適用しない行をランダムに抽出する
        # p_1に対し、(1 - self.mixup_rate)の確率で要素が0となる{0, 1}のmaskを生成
        ## 適用割合が0ということはmixupを適用しないということ
        ### torch.full…指定した値で一様に埋められたtensorを生成する
        mask = torch.full(size=(batch_size,), fill_value=self.mixup_rate)
        ### torch.bernoulli…tensorの各要素に指定された確率で1、それ以外で0を生成する
        mask = torch.bernoulli(mask).to(self.device)
        # maskを適用し、2つのデータのmixupの係数を計算
        p_1 = p_1 * mask
        p_2 = 1 - p_1
        # p_1, p_2を特徴量方向の次元に拡張(同じ行の特徴量は同じ係数になるように)
        ## tensor.unsqueeze(1)…特徴量方向の次元を追加((batch_size,) -> (batch_size, 1))
        p_1 = p_1.unsqueeze(1)
        p_2 = p_2.unsqueeze(1)
        ## tensor.repeat(a, b)…1次元目をa回、2次元目をb回繰り返す
        ## → batch方向には繰り返しを行わず、特徴量方向に同じ適用係数のベクトルを並べる
        feature_p_1 = p_1.repeat(1, feature_dim)
        feature_p_2 = p_2.repeat(1, feature_dim)
        # mixupを適用
        features = feature_p_1 * features + feature_p_2 * shuffled_features
        # targetは1次元なので、特徴量方向の次元の拡張は不要
        targets = p_1 * targets + p_2 * shuffled_targets

        return features, targets


def get_data_loader(ds, batch_size=128, shuffle=True):
    return torch.utils.data.DataLoader(
        # Dataset
        ds,
        # バッチサイズ
        batch_size=batch_size,
        # データをランダムな順番で抽出するかどうか
        shuffle=shuffle,
        # 実装した関数をcollete_fn引数に渡す
        collate_fn=ds.collate_fn,
    )

    
    
if __name__ == '__main__':    
    df = pd.read_csv("hoge.csv")
    ds = MyDataset(df)
    dl = get_data_loader(ds)
    
    for i, (features, targets) in enumerate(dl):
        print(i, features.shape, targets.shape)

まとめ

本記事では、PytorchのDataset, DataLoaderを使って、基本的なテーブルデータにおけるmixupを実装しました。本記事の少しコードを改変すれば任意の形状を持つデータに適用可能なので、ぜひお手元の問題設計でお試しください。さまざまな問題設計やデータ形状を扱えるのがNNの醍醐味ですよね!
今回は実データでの検証結果はまとめていませんが、今後J-Quants APIを活用した実験をまとめて記事にできたらいいなと考えています。個人的には、mixupだけで精度が爆上がりするわけではないものの、かなり手ごたえがある印象を持ちました。皆様もぜひご活用ください!

Discussion