TFRecordとWebDatasetを使った分散並列学習とパフォーマンス調査

2023/07/28に公開2

はじめに

Turing株式会社の自動運転MLチームでエンジニアをしている越智 (@chizu_potato)と塩塚 (@shiboutyoshoku) です。

Turingが目指す自動運転は、大量のデータで学習された非常に賢い機械学習モデルを活用することです。そのために、走行パートナーの方たちと協力して創業時からこれまで大量の走行データを取得してきました。走行データは車両に取り付けられた複数カメラによる360度をカバーした動画と、そのときの速度やGPSなどの走行ログを含んでいます。データサイズは80TBを超え、時間換算で3500時間程度です。

これだけのデータサイズでモデルを学習するためには、1枚のGPUだけで頑張るには限界があり複数のGPU (multi-GPU) による分散並列学習が必要となってきます。しかし、ただ分散並列学習を行うだけではmulti-GPUに対し、データの入出力 (Input/Output; IO) が追いつかず効率よくGPUを使って学習を行うことができません。

そこで本記事では、PyTorchによる分散並列学習のやり方を説明した後、複数のデータローダーを使ったパフォーマンス調査の結果を紹介します。

3行でまとめると

  • PyTorchデータローダー、TFRecord、WebDatasetで分散並列学習を行った。
  • 1GPU→4GPUにするとそれぞれ3倍程度、学習時間が短縮された。
  • TFRecordとWebDatasetはそれぞれPyTorchのデータローダーに比べ20%程度の向上があったが、実験条件次第ではもう少し向上する可能性がありそう。

実験条件

はじめに、実験条件を確認します。AWSを使って実験を行っています。

  • 計算機 : EC2 p3.8xlarge (公式ページ)
    • GPUs : V100x4
    • GPUメモリ : 64GB
    • vCPU : 32
    • メモリ : 244GB
    • ネットワーク帯域幅10Gbps
  • データストレージ : S3 bucket

本記事で頻出する用語として、GPUは文字通り一枚のGPUを指すのに対し、nodeはGPUを搭載したコンピューターのことを指します。また、LOCAL_RANKはGPUごとに振られた識別値であり、world_sizeは全ノードで足したGPUの数です。

本実験環境の場合、1node、4GPUです。そのため、world_sizeは4で、各GPUにはLOCAL_RANK 0、1、2、3が振られます。分散並列学習の用語の説明は弊社のテックブログ『弊社のテックブログ『大規模モデルを支える分散並列学習のしくみ Part1』でよくまとまっています。

データストレージであるS3バケットをgoofysを使ってEC2インスタンスからマウントしました。

それでは、本題に入っていきます。

1章 PyTorchで分散並列学習

まず、PyTorchで分散並列学習を行うための方法を説明します。ただし、もうDDPについては知っているよ、という方はこの章は呼び飛ばしてもらっても大丈夫です!2章の逐次的なデータロードによるIO律速の軽減まで飛んでください。

PyTorchで分散並列学習は、DataParallel (DP)DistributedDataParallel (DDP) を使うことで実現できます。どちらを使うべきかと言われるとDDPを使うべきです。公式のTutorialsページには以下のような記述があります。DPはDDPに比べオーバーヘッドが大きかったり、single-nodeしか対応していなかったり、そもそもDDPのほうが速いようです。

DataParallel is an older approach to data parallelism. DP is trivially simple (with just one extra line of code) but it is much less performant. DDP improves upon the architecture in a few ways:

DataParallel DistributedDataParallel
More overhead; model is replicated and destroyed at each forward pass Model is replicated only once
Only supports single-node parallelism Supports scaling to multiple machines
Slower; uses multithreading on a single process and runs into Global Interpreter Lock (GIL) contention Faster (no GIL contention) because it uses multiprocessing

DistributedDataParallel(DDP)とは、GPUを複数台使用してグローバルなバッチサイズを上げることで訓練時間を短縮するアプローチです。

PyTorchのDDPでは、GPUごとに学習するプロセスを作り、そのプロセスごとにDataLoaderやモデルを定義して学習を進めます。各プロセス間は、逆伝播後の勾配を得たタイミングで通信します。各プロセスは他のプロセスの勾配値を集め、最終的な勾配値を得ることで、モデルのパラメータを更新します。集計された最終的な勾配値は、各プロセスで同じ値になるので、DDPを使用すると実質的にグローバルなバッチサイズが大きくなるように見えます。

PyTorchで分散並列学習をするやり方については、 Distributed Data Parallel in PyTorch - Video Tutorials が公式の動画が非常にわかりやすかったので、一連の動画を観ることをおすすめします。

1-a. 実行手順

それでは、具体的にsingle-GPUで動いていた訓練コードをmulti-GPUに対応するために必要なことを書いていきます。

プロセスの初期化

プロセス自身がどの LOCAL_RANK で動いているかを教えてあげます。ここはおまじないみたいなものです。

def ddp_setup():
    init_process_group(backend="nccl")
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))

if __name__ == "__main__":
    ddp_setup()

DDPモデルの構築

モデルを定義したら、 DDP でラップします。DDPを使うと、逆伝播時にデフォルトでプロセス間の勾配の共有されるので、forwardからbackwardする実装はsingle-GPUのままで良いです。

しかし、損失を取得する時点では、そのプロセス自体のlossであって全体のlossにはならないことに注意してください。

from torch.nn.parallel import DistributedDataParallel as DDP

rank = int(os.environ["LOCAL_RANK"]
model = DDP(model.to(rank), device_ids=[rank])

プロセスごとのデータの分配

詳しくは後ろの章で話します。single-GPUのコードをそのまま使うと、全GPUで同じデータで学習することになり、意味を成さないのでプロセスごとに学習するデータを配分します。

dataset = torch.utils.data.Dataset()  # ここは適当なPyTorchのデータセットです
sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=False)
loader = torch.utils.data.DataLoader(dataset, shuffle=(sampler is None), sampler=sampler)

モデルの重みの保存

DDPモデルを構築した場合、stateを取得するには model.module.state_dict() を指定する必要があります。

# single-GPUの場合
torch.save(
    {
        "state_dict": model.state_dict(),
    },
    save_path,
)

# multi-GPU(DDPを使う)の場合
torch.save(
    {
        "state_dict": model.module.state_dict(),
    },
    save_path,
)

生成物をログ、ファイルに吐き出すのは一つのプロセスから

前提として、DDPでは全てのプロセスで勾配を共有しているため、同じ重みを保持しています。したがって、lossの値、モデルの重みは全プロセスで等しいです。

次のように、1つのプロセスからのみログを出したり、重みを保存したりすることが多いです。

rank = int(os.environ["LOCAL_RANK"])
if rank == 0:
    save_model(model, "model.pt")  # モデルの重みの保存
    print(f"train: loss(step): {loss.item()}")

torchrunで実行

torchrunとは、PyTorchが提供している分散並列学習のためのツールです。

torchrunは次のようにコマンドラインから実行することができます。

torchrun \
    --nproc-per-node=4  \ # 1ノードあたりの使用するGPU数
    TRAINING_SCRIPT.py (--args ...)

# single-gpuで実行する場合
# python TRAINING_SCRIPT.py (--args ...)

python で実行すると、シングルプロセスでプログラムが実行されますが、 torchrun を実行することで、GPUごとにプロセスが立ち上がり、そのプロセスにGPUを識別する環境変数群がセットされ、プログラムが実行されます。

これらによって、single-gpuで実行されていた学習コードをDDPで分散並列学習することができます。作業としてはここまでなのですが、どのようにして各プロセス(GPU)で計算した結果を全体で共有しているのでしょうか。またデータの分配は各プロセスにどのように行われているのでしょうか。詳しく観ていきましょう。

1-b. プロセス間の勾配の共有

DDPでは各プロセス同士が勾配を共有し、その勾配から重みを更新することで、全プロセスで同じ重みを共有します。ここで重要なのが、どうやって各プロセス同士が勾配を共有するかです。

DDPデフォルトの勾配の共有にはRing-AllReduceという手法が使われます。これは、全てのプロセスが持っているデータを集約し、各プロセスが等しい結果を取得する手法の一つです。All-Reduceではプロセス0 → プロセス 1 → プロセス2 → プロセス3 → プロセス0というように、円を描くように自分のデータと受け取ったデータを渡していきます。


https://www.youtube.com/watch?v=Cvdhwx-OBBo より

PyTorchのDDPを使用する場合は、デフォルトで Ring-AllReduce を使ったアルゴリズムで勾配を共有し、合計された結果の勾配を取得します。各プロセスは同じ勾配を用いて逆伝播します。

1-c. プロセスごとのデータの分配

torchrunを使うことで、各GPUごとにプロセスを立ち上げてスクリプトを実行することができるところまではわかりました。ここからは、各GPUに割り当てられたプロセスの中で何のデータを使って学習するかという話題に入ります。

single-GPUで動かすコードをそのまま使うと、全てのGPUプロセスで同じデータを同じように学習してしまいます。そこで上述したようにDistributedSamplerという仕組みを使います。

PyTorchには、プロセスごとにデータを分割してくれる DistributedSampler という機能があります。 DistributedSampler は、Samplerの実装の一種です。SamplerはDataLoaderの引数に渡し、データセットからバッチを作るロジックをカスタムできるものです。

DistributedSampler がやることを簡単に図示しました。(デフォルトの場合は)データセットをn飛びで参照するようなイテレータを返します。

DistributedSampler の本質部分は次の2行に詰まっていて、インデックスの列を飛び飛びで参照するだけです。

self.rank はLocal rankを、 self.total_size は最後のインデックスを、 self.num_replicas はworld_sizeを表しています。

indices = list(range(len(self.dataset)))
# subsample
indices = indices[self.rank:self.total_size:self.num_replicas]

DistributedSampler は特段難しいことをしていなくて、ただプロセスごとにDatasetに対するインデックスを割り当てているだけです。逆に捉えると、 DistributedSampler を使わなくても分散並列学習はできます。これは後述する内容でも生きてきますのでぜひ覚えていてください。

ちなみに、データ数がGPU数で割り切れなかった場合には、余ったデータを切り捨てたり、同じデータを参照することで全てのGPUに均等に割り振ります。

ここまでで、PyTorchによるDDPを使った分散並列学習のやりかたとその仕組みについて触れました。これで、大量のデータを複数のGPUに分配して高速学習を達成!できれば良かったのですが、実際はそうなりません。

データを供給する仕組みと供給先 (multi-GPU) が確保できても、そもそも供給量が足りなければGPUを効率的に使うことはできません。しかし、画像のようなデータを扱う場合しばしばIO律速が発生します。次章ではシーケンシャルにデータを読み込むことでこの問題に対処する方法を紹介します。

2章 逐次的なデータロードによるIO律速の軽減

本章に入る前にもう一度ここで、問題設定を確認します。

  • 学習データは一度にメモリに乗らないような大規模なものである。
  • 学習データは計算機のローカルストレージではなく、クラウド上に存在しそこから読み出して学習を行う。
  • 計算機とストレージはAWS上に存在しそのネットワーク帯域は10Gbpsとそれなりに高速である。

つまり、multi-GPUで訓練する仕組みはできたが、それに見合うデータロードができていない状況です。そこで、ここではTFRecordDatasetWebDatsetを使って、データロードが高速になるか試しました。これら2つは、データセットを複数のシャード (shard) と呼ばれる単位に分割し、シーケンシャルにデータを読むことで高速なデータロードを行います。

2-a. TFRecordを使った高速化

まずは、TFRecordと呼ばれる、TensorFlowが提供している、構造化データをシリアライズし、保存する形式を使った分散並列学習についてお話します。

TFRecord形式でデータを保存する場合は、データを均等に分割して保存します。この分割の操作をシャーディングと呼び、1つのファイルをshardと呼びます。

TFRecord形式のshardはシーケンシャルに保存されているので、TensorFlowのAPIのストリーミングによるアクセスによって高速なデータ処理を実現します。

TFRecord : single-GPU

そもそも、TFRecordはどのようにしてデータセットを書き出したり読み出したりするのでしょうか。

ここでは、任意の形状の配列が入ることを想定し、tf.io.serialize_tensorを使用して配列をバイナリ列に変換します。これをFeatureに詰めた後Exampleに詰めてライターに渡して書き込みます。

import tensorflow as tf
import numpy as np

filename = "0000.tfrecord"
shard_size = 10
input_imgs = np.load("input_imgs.npy")  # (長さ, 縦, 横)
label = np.load("label.npy")  # (長さ, ラベル長)

with tf.io.TFRecordWriter(filename) as writer:
    for i in range(shard_size)):
        features={
            "input_imgs": tf.train.Feature(
                bytes_list=tf.train.BytesList(
                    value=[tf.io.serialize_tensor(input_imgs[i]).numpy()]
                )
            ),
            "label": tf.train.Feature(
                bytes_list=tf.train.BytesList(
                    value=[tf.io.serialize_tensor(label[i]).numpy()]
                )
            ),
        }

        example = tf.train.Example(features=tf.train.Features(feature=features))
        writer.write(example.SerializeToString())

この一連のコードで画像とラベルのデータが10個、が 0000.tfrecord というTFRecord形式のファイルに書き込むことができました。

次は、このTFRecord形式のファイルを読み出して配列を取得します。 decode_trfrecord_data 関数は、シリアライズされたデータをデシリアライズする役割を担っています。

Exampleの定義時点では型に string (バイナリ)を指定していますが、バイナリをデコードするtf.io.parse_tensor関数を呼び出すときに float に変換するように指定しています。この処理を通してシリアライズされたデータを構造化データに変換することができます。

データセットをイテレーションする際にも注目すべきポイントがあります。TFRecordのデータセットは .as_numpy_iterator() をつけてあげることで、TensorFlowではなくNumPyの形式で出力してくれるようになります。

from tensorflow.python.ops.numpy_ops import np_config

tf.config.experimental.set_visible_device([], "GPU")
np_config.enable_numpy_behavior()

def decode_tfrecord_data(serialized_string):
    # exampleの定義
    feature = {
        "input_imgs": tf.io.FixedLenFeature([], tf.string),
        "label": tf.io.FixedLenFeature([], tf.string)

    }
    example = tf.io.parse_single_example(serialized_string, feature)

    # tfrecordから読み込んだデータをdecode
    input_imgs = tf.io.parse_tensor(example["input_imgs"], out_type=tf.float)
    label = tf.io.parse_tensor(example["label"], out_type=tf.float)

    # 出力データ
    output = {
        "input_imgs": array,
	"label": label,
    }
    return output

filename = "0000.tfrecord"
dataset = tf.data.TFRecordDataset(
    filenames=[filename], num_parallel_reads=tf.data.experimental.AUTOTUNE  # AUTOTUNEは筆者の環境では-1
)
dataset = dataset.map(decode_tfrecord_data)  # ここでデシリアライズする
dataloader = dataset.as_numpy_iterator()  # NumPyの形式でバッチを出力するイテレータで、DataLoaderと同じ役割を担う

もちろんバッチ処理もできるので、single-GPUであればPyTorchのDataLoaderと同じように学習に使うことができます。バッチ処理やデータセット内の前処理に関する話はTensorFlow公式のtf.dataのチュートリアルを読むと良いと思います。

ここまでは、1つのTFRecordファイルを作る方法を紹介しました。実際はデータセットをシャーディング(分割)して各TFRecordのファイルサイズが数百MB程度になるように調整すると良いとされています。

TensorFlowにはシャーディングを実現するAPIは用意されていないので、自分で実装する必要があります。自分でシャードサイズ(ファイルに含めるインデックスの数)を決めてインデックシングするのがシンプルなシャーディングの方法だと思います。

idx_list = list(range(30))  # indexのリスト
length_per_shard = 10  # 1つのファイルに入れるindexの数。ここは人が決める。
length = len(idx_list)
num_shards = length // length_per_shard  # ファイルの数
for i in range(num_shards):
    filename = f"{str(i).zfill(4)}.tfrecord"
    i_idx = idx_list[i * length_per_shard:(i + 1) * length_per_shard]
    print(filename, i_idx)
    # ここからは1つのファイルに書き込む処理を書く

""" 出力結果
0000.tfrecord [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
0001.tfrecord [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
0002.tfrecord [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
"""

読み出すときは、 TFRecordDatasetfilenames 引数にパスのリストを渡すとシーケンシャルにデータを読み出してくれます。ここは1ファイルでも複数ファイルでもやり方は同じです。

filenames = ["0000.tfrecord", "0001.tfrecord", "0002.tfrecord"]
dataset = tf.data.TFRecordDataset(
    filenames=filenames, num_parallel_reads=tf.data.experimental.AUTOTUNE  # AUTOTUNEは筆者の環境では-1
)

TFRecord : multi-GPU

では、tfrecordを使ってmulti-GPUで学習する場合はどのようにしたら良いでしょうか。

multi-GPUで学習する場合、プロセスごとに読み込むデータセットを分配しなければなりません。

ここでは、「プロセスごとに分配されたTFRecordパスからTensorFlowのデータセットを定義し、学習に使う」アプローチを取ります。例えば、0番目のプロセスには00, 04, 08, …、1番目のプロセスには01, 05, 09, …といったように読み込ませるTFRecordのパスを変えます。

これを実装したのが次です。各プロセスにファイルを順番に割り当て、足りなければ最初のファイルを追加するようにしています。

def get_tfrecordpaths(tfrecord_paths: List[str], local_rank: int, world_size: int) -> List[str]:
    """.tfrecordのファイルを均等に分割する"""
    max_length = -(-len(tfrecord_paths) // world_size)  # 切り上げ、1GPUあたりのshard数
    result_paths = tfrecord_paths[local_rank::world_size]
    if len(result_paths) != max_length:  # repaetの処理
        result_paths.append(tfrecord_paths[local_rank])
    return result_paths

動かすと、各プロセスごとにtfrecordファイルが割り当てられていることがわかります。各プロセスからこの関数を叩き、パスをTFRecordDatasetに入力し、学習に使います。

world_size = 4
for local_rank in range(4):
    print(f"Process {i}:", get_tfrecordpaths(paths, i, world_size))
# === output ===
# Process 0: ['00.tfrecord', '04.tfrecord', '08.tfrecord', '12.tfrecord', '16.tfrecord']
# Process 1: ['01.tfrecord', '05.tfrecord', '09.tfrecord', '13.tfrecord', '17.tfrecord']
# Process 2: ['02.tfrecord', '06.tfrecord', '10.tfrecord', '14.tfrecord', '02.tfrecord']
# Process 3: ['03.tfrecord', '07.tfrecord', '11.tfrecord', '15.tfrecord', '03.tfrecord']

余談ですが、上手くいかなかった2つ目のアプローチを紹介します。

2つ目のアプローチは、TFRecordを読むPyTorchのdatasetを定義し、Distributedsamplerを使うことでプロセスごとにデータを分配する方法です。

tfrecordライブラリという、TFRecord形式のデータを読んでPyTorchのIterableDatasetとしてラップするライブラリがあります。このライブラリを使った実装はできたのですが、datasetにlengthが無いことによるsamplerが使えないことと、メモリリークの2つの問題に悩まされてしまったので、2つ目のアプローチの実装はスキップしました。

そもそもDistributedSamplerは、プロセスごとに飛び飛びのインデックスのデータを割り当てるような実装をしているので、シーケンシャルにデータを読み込むTFRecordとの相性は良くないと思われます。

もし、PyTorchのdatasetを使った良い実装方法があれば教えてください!

2-b. WebDatasetを使った高速化

次に、WebDatasetを使った方法を試します。

WebDatasetの詳しい説明は弊社の過去のテックブログを参照してください。

https://zenn.dev/turing_motors/articles/petabyte_webdataset

WebDataset : Single GPU

まずはsingle-GPUを例に基本的な使い方について書きます。

インストール

$ pip install webdataset

または

$ pip install git+https://github.com/tmbdev/webdataset.git

データの準備

WebDatasetを使うためのデータを準備します。通常、shardとよばれる複数のtarファイルによって構成されます。

$ ls data | head
000000.tar
000001.tar
000002.tar
000003.tar
000004.tar
000005.tar
000006.tar
000007.tar
000008.tar
000009.tar

またひとつひとつのtarファイルの中身は以下のようになっています。

tar -tf 000000.tar | head
0000.input_imgs.npy
0000.label.npy
0001.input_imgs.npy
0001.label.npy
0002.input_imgs.npy
0002.label.npy
0003.input_imgs.npy
0003.label.npy
0004.input_imgs.npy
0004.label.npy

通常のtarファイルですので、tarコマンドでも作成できます。

ここではPythonから作成する例を記載します。

import os
import numpy as np
from glob import glob
import webdataset as wbs

INPUT_DIR = "data/npy"
OUTPUT_DIR = "data/web"

os.makedirs(OUTPUT_DIR, exist_ok=True)
writer = wds.ShardWriter(os.path.join(OUTPUT_DIR, "%04d.tar"))

for path in tqdm(glob(INPUT_DIR)):
    input_imgs_path = os.path.join(path, "input_imgs.npy")
    label_path = os.path.join(path, "label.npy")
    input_imgs = np.load(input_imgs_path)
    label = np.load(label_path)

    data = {
        "__key__": path,
        "input_imgs.npy": input_imgs,
        "label.npy": label,
    }
    writer.write(data)

	writer.close()

wds.ShardWriter でshardを書き込むディレクトリを指定します。引数で maxsize を指定することでshardひとつあたりの容量を定義することができます。デフォルトでは約3GBになっています。また今回は拡張子に.npyを使っていますが、以下の拡張子ならどれでも扱うことができます。

decoders = {
    "txt": lambda data: data.decode("utf-8"),
    "text": lambda data: data.decode("utf-8"),
    "transcript": lambda data: data.decode("utf-8"),
    "cls": lambda data: int(data),
    "cls2": lambda data: int(data),
    "index": lambda data: int(data),
    "inx": lambda data: int(data),
    "id": lambda data: int(data),
    "json": lambda data: json.loads(data),
    "jsn": lambda data: json.loads(data),
    "pyd": lambda data: pickle.loads(data),
    "pickle": lambda data: pickle.loads(data),
    "pth": lambda data: torch_loads(data),
    "ten": tenbin_loads,
    "tb": tenbin_loads,
    "mp": msgpack_loads,
    "msg": msgpack_loads,
    "npy": npy_loads,
    "npz": lambda data: np.load(io.BytesIO(data)),
    "cbor": cbor_loads,
}

ここまでで、データの準備ができました。

データを読み込む

data_dir = "path_to_your_shards/%04d.tar"
dataset = wds.WebDataset(data_dir).decode().to_tuple('input_imgs.npy', 'label.npy').shuffle(shuffle_buffer)

.decode()の中身に拡張子を指定しなければ、自動的に適したdecodeをしてくれます。npyなどのバイナリ形式ではなくpngなどで用意している場合は、画像用にdecoderが定義されているのでそこからpilrgbなどを指定してください。

dataset = wds.WebDataset(data_dir).decode("pil").to_tuple('input_imgs.png', 'label.cls').shuffle(shuffle_buffer)

あとは、PyTorchのデータローダーに渡せばOKです。.

dataloader = torch.utils.data.DataLoader(dataset, batch_size=64, num_workers=4)

WebDataset : Multi-GPU

次にMulti-GPUを使ってDDPをする方法を紹介です。とWebDatasetでDDPする方法は簡単で、datasetを作成するタイミングで、引数にnodesplitter=wds.split_by_nodeを指定するだけです。

data_dir = "path_to_your_shards/%04d.tar"
dataset = wds.WebDataset(data_dir, nodesplitter=wds.split_by_node).decode().to_tuple('input_imgs.npy', 'label.npy').shuffle(shuffle_buffer)

中身の実装も、src (shardのlist)を, world_size(GPUの数)スキップして、それぞれのGPUにshardを割り振っているだけです。

def split_by_node(src, group=None):
    rank, world_size, worker, num_workers = utils.pytorch_worker_info(group=group)
    if world_size > 1:
        for s in islice(src, rank, None, world_size):
            yield s
    else:
        for s in src:
            yield s

WebDatasetで分散並列学習する方法は以上です。では次の章で実験しましょう。

3章 3つのデータローダーで学習時のパフォーマンス測定

ここまでは、torchrunを使った分散並列学習、プロセスごとにデータを分配させる仕組みについて紹介しました。

ここからは、データローダーごとに訓練時間がどれだけ変わるかというのを実験を通して検証します。

3-a 比較対象

3つのデータローダで比較しました。PyTorchのナイーブなデータセットとTFRecord, WebDatasetです。

ナイーブなデータセット(ベースライン)

データは.npy形式で保存されていて、インデックスごとにデータをロードしててバッチ処理するPyTorchのデータセットを作り、DistributedSamplerを使ってプロセスごとにデータを分配して学習します。

TFRecord, WebDataset

シャードサイズは8と50のものを用意しました。また、学習時のデータ数の条件を等しくするため、シャードサイズ8のレコードを100個、シャードサイズ50のレコードを16個使うようにして、トータルのデータサイズを揃えます。

  • シャードサイズ8: 約600MB
  • シャードサイズ50: 約3.8GB

プロセスごとにデータを分配する方法は2章で説明した方法を使います。

3-b 調査内容

ナイーブなデータセット、TFRecord、WebDatasetをそれぞれ使った場合の学習時間を1GPUと4GPUとで訓練した場合で比較してみます。

TFRecordとWebDatasetについては、シャードサイズ(1ファイルに入れるデータのインデックス数)を8と50の場合について検証しました。

学習時の条件は次の通りです。

  • 入力は画像の回帰タスク
  • backboneには efficientnet_b2 を使用
  • エポック数 = 1
  • バッチサイズ = 16 で固定
  • Data Augmentationはしない

3-c 調査結果

まずは1GPUで学習させたときの訓練時間を計測しました。

ベースラインの、ナイーブなデータセットを使った場合は18.45分でした。

シャードサイズ TFRecord [分] (改善率) WebDataset [分] (改善率)
8 18.17 (x0.985) 17.42 (x0.944)
50 18.10 (x0.981) 16.87 (x0.914)

わかったこと

  • パフォーマンス: ナイーブな実装 < TFRecord < WebDataset
  • シャードサイズを上げると、パフォーマンスも良くなった

次に、4GPUで学習させたときの訓練時間を計測しました。

ベースラインの、ナイーブなデータセットを使った場合は7.13分でした。

シャードサイズ TFRecord [分] (改善率) WebDataset [分] (改善率)
8 6.00 (x0.842) 6.18 (x0.867)
50 5.31 (x0.745) 5.82 (x0.816)

わかったこと

  • パフォーマンス: ナイーブな実装 < WebDataset < TFRecord
  • シャードサイズを上げると、パフォーマンスも良くなった
  • 分散並列学習のによる高速化 (1GPU → 4GPUの学習時間の変化)
    • TFRecord: 3.4倍
    • WebDataset: 2.9倍

3-d ディスカッション

実験結果でわかったことをまとめます。今回の実験では、

  • 1GPUの場合、WebDatasetを使うのが一番速い
  • 4GPUの場合、TFRecordを使うのが一番速い
  • 分散並列学習の効果はWebDatasetよりTFRecordのほうが高かった
  • シャードサイズを8→50にしたことで、パフォーマンスが良くなった

分散並列学習の有無によってTFRecordとWebDatasetのパフォーマンスが異なっている

実験結果では、1GPUのときはWebDataset、4GPUではTFRecordのほうがパフォーマンスが良いことを確認しました。

これについては原因はよくわかっていませんが、考えられるものとして、プリフェッチの効率さがパフォーマンスに影響を及ぼした可能性があります。

プリフェッチとは、使うデータを予め取ってきてメモリに乗っける事を指します。データローダーにおけるプリフェッチでは、GPUの計算中にCPUが次に渡すバッチを準備します。これにより、データのロードの高速化が期待できます。TFRecordやWebDataset、PyTorchのDataLoaderはプリフェッチをサポートしています。

今回の実験において、データセットはS3にあり、毎回S3にリクエストを送ってデータを取得しています。データセット自体が大きく、分散並列学習によって複数プロセスがデータを取得し、ネットワーク帯域が律速になることはよくあります。このようなときにプリフェッチが活躍します。

パフォーマンスの測定をしてないので、言い切りはできませんが、プリフェッチの性能による差が、TFRecordを使った訓練時間を縮めた可能性があります。

シャードサイズを上げたらパフォーマンスが向上した

一般には1つのファイルが数百MBになるようにシャードサイズを調整するのが良いとされていますが、本実験では数GBになるようにシャードサイズを調整したほうがパフォーマンスが良くなりました。

今回の条件では、インスタンス上にメモリが十分にあり、またネットワーク帯域も10Gbpsと比較的大きいため、細切れでデータを読むよりも一気にデータを取ってくるほうが効率的だったと解釈しています。

本実験では、ナイーブなデータセットではランダムなクセスではなく、シーケンシャルにデータを取ってくるような実装で比較を行いましたが、別のデータセットでTFRecordやWebDatasetと同様にランダムにアクセスした場合では、社内の別の実験で25倍程度の高速化が確認できました。

まとめ

本記事では、PyTorchで分散並列学習をする方法、大容量の学習データに対する効率的なデータロードを紹介し、データロードと訓練時間のパフォーマンスの測定をしました。

弊社では他にも分散並列学習について解説していますのでぜひ読んでみてください!

データ並列に限らない、一般の分散並列学習に関する包括的な解説

https://zenn.dev/turing_motors/articles/0e6e2baf72ebbc

LLMをDeepSpeedというライブラリを使って80台のGPUで分散並列学習した話

https://zenn.dev/turing_motors/articles/ce20c5202e107e#abciの80台のgpuを使ったマルチノード分散学習

Turingはこれまで大量のデータで機械学習モデルを学習することで、自動運転AIを開発してきました。今後も走行データは増え続け、そう遠くない未来にペタバイドスケールのデータとそれを使用した深層学習を行っていきます。そのためには、データを機械学習で扱うだけでなく、データ自体の管理やクリーニングなど多くの面白い課題が残っています。ぜひペタバイト級のデータを使って、最高の自動運転モデルを開発しませんか。興味がある方は、以下の弊社求人情報をご覧ください。また、気軽にオフィスに遊びに来てもらえるイベントも月イチくらいでやっているのぜひ!

https://www.turing-motors.com/jobs

Tech Blog - Turing

Discussion

LaniakeaLaniakea

大変興味深い記事をありがとうございます!

tensorflowのDatasetは.shard()メソッドで分割できたと思います。さらにはtf.interleave()関数でスマートに読み込みもできたはず…ですが、PyTorchのDDP学習時に正常にデータを渡せるかまでは確認できておりません。

また、これはPyTorch公式のものではありませんが、NVIDIAのDALIというライブラリはtfrecordを読めてかつPyTorch tensorを吐き出せます。ただ、こちらもDDPで使えるのかまではよくわかりません…

WebDataset、名前は聞いたことありましたが、この分だとローカルの1GPUとかでもかな〜り使えそうですね。いいことを聞きました!早速ちょっと遊んでみようと思います!

chizuchizuchizuchizu

早速コメントしてくださり、ありがとうございます!

tensorflowのDatasetは.shard()メソッドで分割できたと思います。

なるほどです。.shard(n, i) でプロセスに分配するのはできそうです。使ってみます!!

さらにはtf.interleave()関数でスマートに読み込みもできたはず

ちょうどさっき、 .interleave() の仕様について調べていて、複数のデータソースから同時に読み込めるようですね。これで整合性のあるバッチ処理ができるのか、パフォーマンスが改善するのか、試してみたいです!

NVIDIAのDALIというライブラリはtfrecordを読めてかつPyTorch tensorを吐き出せます。

おお、 知りませんでした。ロード、前処理の様々な事を高速化できるライブラリなんですね。シャーディングのサポートも確認できたので、使えそうです。

Multiple GPU Support — NVIDIA DALI 1.28.0 documentation

大変勉強になりました!!