🚀

TensorFlow・PyTorchによる分散学習

2021/05/19に公開

深層学習モデルの学習は、学習データの一部を抽出・勾配を計算するミニバッチ学習によって行われることが一般的です。勾配のばらつきを抑えるためには、ある程度のバッチサイズを保持する必要があります。一方で、バッチサイズの上限は利用するマシン(GPUやTPU)のメモリによって制約を受けるため、大規模なモデルや高解像度画像などを用いる際には、バッチサイズを小さくせざるを得ない場合があります。

これに対して複数のGPUや計算ノードを利用できる場合には、並列化によって単一GPUの時よりも大規模な学習を行うことができます。複数の計算機を用いた並列学習(分散学習)には大きく分けてデータ並列とモデル並列が存在しており、合わせて利用することもできます。

  • データ並列(Data Parallel):ミニバッチを複数の計算機に分散する方法
  • モデル並列(Model Parallel):一つのモデルを複数の計算機に分散する方法

分散深層学習に関する日本語の資料としては以下が参考になります。

https://www.slideshare.net/iwiwi/nips17-86470238

ここではTensorFlowとPyTorchそれぞれを用いた際のデータ並列の方法を確認します。コードは以下に配置しました。NVIDIA A100 GPU×8の計算ノードによって動作を確認しました。マルチノード学習の検証は8つのGPUを4GPU×2に分割することで疑似的に検証を行いました。

https://github.com/daigo0927/blog/tree/master/multi-gpu-training

※TF・PyTorchそれぞれのデータ読み込みや学習設計まで厳密には揃えていません。そのためこれらの精度や速度比較はしていない点に注意してください。

TensorFlowによる分散学習

基本的に公式ドキュメントが最も参考になります。TensorFlowでの分散学習はtf.distribute.StrategyというAPIによって提供されています。ここでは最もシンプルなMirroredStrategyと複数ノードを用いるためのMultiWorkerMirroredStrategyの利用方法を確認します。

https://www.tensorflow.org/guide/distributed_training?hl=ja#examples_and_tutorials

MirroredStrategy

最もシンプルには、MirroredStrategyオブジェクトは以下のように構築できます。

strategy = tf.distribute.MirroredStrategy()

この場合、現在の環境で見えている全てのGPUを利用することになります。

今回の検証スクリプトでは次のように書きました。この関数では使用するGPUの数を引数n_gpusとして指定し、tf.config.set_visible_devicesによって対象とするGPUを絞っています。例えば8つのGPUがある場合に、n_gpus=4とすると、物理的なGPU(gpus)は8つですが、コードが扱う論理的なGPU(logical_gpus)は4つとなります。

train_tf_mirroredstrategy.py
def build_strategy(n_gpus):
    gpus = tf.config.list_physical_devices('GPU')
    print('Available GPUs:', gpus)
    tf.config.set_visible_devices(gpus[:n_gpus], 'GPU')
    logical_gpus = tf.config.list_logical_devices('GPU')
    print('Visible logical gpus:', logical_gpus)

    strategy = tf.distribute.MirroredStrategy(logical_gpus)
    return strategy

strategy.scope()内で、モデルやオプティマイザを構築することで、これらが各GPUにコピーされます。

train_tf_mirroredstrategy.py
...

def build_network(image_size, n_classes):
    image = layers.Input([*image_size, 3], dtype=tf.float32, name='input')

    effnet = EfficientNetB2(include_top=False,
                            weights='imagenet',
                            pooling='avg')
    feature = effnet(image)
    feature = layers.Dropout(0.5)(feature)
    logit = layers.Dense(n_classes)(feature)
    model = tf.keras.Model(inputs=image, outputs=logit)
    return model
    
...

def run(n_gpus, epochs, batch_size, learning_rate):
    strategy = build_strategy(n_gpus)
    ...
    with strategy.scope():
        model = build_network(IMAGE_SIZE, N_CLASSES)
        optimizer = tf.keras.optimizers.Adam(learning_rate)
        loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        metrics = tf.keras.metrics.SparseCategoricalAccuracy()

    model.compile(optimizer=optimizer,
                  loss=loss,
                  metrics=metrics)

    model.fit(...)

非常にストレスなく実装できました。手元のPCなどGPUが存在しない環境でも自動的にCPUのみで動作してくれるので使いやすいと思います。

MultiWorkerMirroredStrategy

MirroredStrategyではあくまで1台のマシン上の複数のGPUを用いて分散学習を行いましたが、MultiWorkerMirroredStrategyでは複数のワーカーの複数のGPUを利用して、さらに大規模な分散学習を行うことができます。一つのマシンに複数のワーカーを定義することもできますが、簡単のために1ワーカー=1マシン(計算ノード)として説明します。

https://www.tensorflow.org/tutorials/distribute/multi_worker_with_keras

MultiWorker*では複数のマシンによる分散学習を行うために、TF_CONFIGという環境変数を通じて各マシンのタスクタイプとタスクidを設定する必要があります。例えば今回の検証では以下のように設定し、strategyを構築しています。

train_tf_multiworker_mirroredstrategy.py
def build_strategy(worker_index):
    tf_config = {
        'cluster': {
            'worker': ['localhost:12345', 'localhost:23456']
            },
        'task': {'type': 'worker', 'index': worker_index}
        }
    # Set environment variables for multi-worker communication
    os.environ['TF_CONFIG'] = json.dumps(tf_config)
    
    strategy = tf.distribute.MultiWorkerMirroredStrategy()    
    return strategy
  • 'cluster': {'worker': ['localhost:12345', 'localhost:23456']}が分散学習環境の全体を表します。今回は8GPU×1ノードの環境を仮想的に4GPU×2ノードとして扱って検証を行ったため、workerlocalhost12345ポートと23456ポートを指定しています。
  • 'task': {'type': 'worker', 'index': worker_index}は各ノード(worker)が、clusterの構成の中のどの役割を担当するかを表します。例えばここでworker_index=0とすると、cluster['worker'][0] (つまり'localhost:12345')を指定したことになります。

このようにして構築したコンフィグはJSON文字列としてダンプし、os.environ['TF_CONFIG']に格納することで、そのワーカーが担当する役割を定義したことになります。あとはMirroredStrategyと同様にstrategyを構築し、学習を行うことができます。注意すべき点としてMultiWorker*を用いる際には、他のTensorFlowオペレーションよりも先にstrategyを構築する必要があります。

train_tf_multiworker_mirroredstrategy.py
def run(worker_index, epochs, batch_size, learning_rate):
    strategy = build_strategy(worker_index)
    ...
    with strategy.scope():
        model = build_network(IMAGE_SIZE, N_CLASSES)
        optimizer = tf.keras.optimizers.Adam(learning_rate)
        loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        metrics = tf.keras.metrics.SparseCategoricalAccuracy()

    model.compile(optimizer=optimizer,
                  loss=loss,
                  metrics=metrics)

    model.fit(...)

実行時にはクラスターを構成する各ワーカーを起動する必要があります。今回の検証では次のように実行時引数-wiによってworker_indexを指定し、2ワーカーからなる分散学習を実行しました。学習の開始にはクラスターを構成する全てのワーカーを起動する必要があるため、一つ目のワーカーはバックグラウンドで実行しています。

run_tf_mwms.sh
#!/bin/sh

CUDA_VISIBLE_DEVICES=0,1,2,3 python train_tf_multiworker_mirroredstrategy.py -wi 0 > job_tf_mwms_0.log &
CUDA_VISIBLE_DEVICES=4,5,6,7 python train_tf_multiworker_mirroredstrategy.py -wi 1
unset CUDA_VISIBLE_DEVICES

その他

TensorFlowではMirroredStrategy, MultiWorker*の他にもいくつかの分散学習APIを提供しています。代表的なものとしてはTPUStrategyがあり、TPUを用いた高速な学習が期待できます。

他にParameterServerStrategyでは、パラメータサーバーにモデルの重みを配置し、各ワーカーがこれを逐次的に読み取る形で学習を行います。この場合には例えば以下のように、様々な役割からなるクラスターを構成する必要があります。

os.environ["TF_CONFIG"] = json.dumps({
    "cluster": {
        "worker": ["host1:port", "host2:port", "host3:port"],
        "ps": ["host4:port", "host5:port"],
        "chief": ["host6:port"]
    },
    "task": {"type": "worker", "index": 1}
})

その他の分散学習方法は以下のドキュメントで紹介されています。

https://www.tensorflow.org/guide/distributed_training?hl=ja#examples_and_tutorials

PyTorchによる分散学習

PyTorchはラッパーライブラリを用いない場合、データやモデルの重みを手動でGPUに載せる必要があります。この処理を理解するのに少し時間がかかりましたが、結果的にドキュメントと公式のサンプルコードを見れば理解することができました。

https://pytorch.org/tutorials/beginner/dist_overview.html#data-parallel-training

DataParallel

torch.nn.DataParallelは、単一マシンの複数GPUを用いて分散学習を行うためのAPIです。モデルと並列化に用いるGPUのidを渡すことで、モデルをコピーすることができます。注意点として、DataParallelとは別に明示的にmodel.to(device)を行い、モデルをどれかのGPUに載せる必要があるようです。

train_torch_dp.py
def run(datadir, n_gpus, epochs, batch_size, learning_rate):
    n_max_gpus = torch.cuda.device_count()
    print(f'{n_max_gpus} GPUs available')
    n_gpus = min(n_gpus, n_max_gpus)
    print(f'Using {n_gpus} GPUs')

    device_ids = list(range(n_gpus))
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    ...
    model = EfficientNet(backbone='efficientnet_b2', n_classes=N_CLASSES)
    model = nn.DataParallel(model, device_ids=device_ids)
    model.to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    for e in range(epochs):
        model.train()
        for i, (images, labels) in enumerate(dl_train):
            optimizer.zero_grad()
            images, labels = images.to(device), labels.to(device)
            logits = model(images)
            loss = criterion(logits, labels)
            loss.backward()
            optimizer.step()
    ...

device_idsを指定しない場合には実行時に利用できる全てのGPUを利用します。CPUで動かす際には条件分岐などでDataParallelを実行しないようにする必要があるようです。これらのポイントがあるものの、既存のPyTorch実装への導入は簡単に感じました。

DistributedDataParallel

DistributedDataParallel (以降DDP)はマルチノードで分散学習を行うためのAPIであり、TensorFlowのMultiWorkerMirroredStrategyに相当すると言えます。またDDPによる分散学習は以下の二通りで実行できます。

  • torch.multiprocessing.spawnによってPythonスクリプト内から分散処理を立ち上げる方法
  • torch.distributed.launch.pyをモジュールとして分散処理を立ち上げる方法

https://github.com/pytorch/examples/tree/master/distributed/ddp

ここではTensorFlowのMultiWorker*と同様に、コマンドラインから分散処理を立ち上げる後者の方法を検証しました。実装上の特徴的な点を抜粋すると以下のようになります。

train_torch_ddp.py
def run(datadir, local_rank, epochs, batch_size, learning_rate):
    pid = os.getpid()
    env_dict = {
        key: os.environ[key]
        for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
    }
    print(f"[{pid}] Initializing process group with: {env_dict}")
    dist.init_process_group("nccl")

    world_size = dist.get_world_size()  # Total GPUs over nodes
    n_gpus = torch.cuda.device_count()  # GPUs at current node
    print(f'[{pid}] Node info: GPUs: {n_gpus}, local_rank: {local_rank}')

    ...
    
    bs_per_gpu = batch_size//world_size
    # num_replicas and rank arguments are automatically assigned the global ones via dist.get_*
    sampler_train = DistributedSampler(ds_train, shuffle=True)
    sampler_val = DistributedSampler(ds_val, shuffle=False)
    dl_train = DataLoader(ds_train, batch_size=bs_per_gpu, sampler=sampler_train)
    dl_val = DataLoader(ds_val, batch_size=bs_per_gpu, sampler=sampler_val)

    # Build model and setup training
    model = EfficientNet(backbone='efficientnet_b2', n_classes=N_CLASSES).to(local_rank)
    ddp_model = DDP(model, device_ids=[local_rank])
    
    for e in range(epochs):
        ...

    dist.destroy_process_group()

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='PyTorch DDP training')
    parser.add_argument('--local_rank', type=int)
    ...
    args = parser.parse_args()
    run(**vars(args))
  • local_rank:同じノード内のプロセス中何番目かを表す番号。スクリプトの実行時引数として設定することで、launch.pyを経由してプロセスごとに指定される。
  • 分散学習のクラスターに関する環境変数。torch.distributed.launch.pyによる実行時に指定する。
    • MASTER_ADDR:プロセスに対応したipアドレス
    • MASTER_PORT:プロセスに対応したのポート
    • RANK:分散学習の全プロセス中何番目かを表す番号
    • WORLD_SIZE:分散学習のプロセス数
    • dist.get_*からも参照できる
  • dist.init_process_group("nccl"):各プロセスが通信するための初期化。
    • ncclはプロセス間通信用のバックエンド。GPUを利用する際には基本的にncclで良い。
    • 参考:Which backend to use?
  • ミニバッチの分割と抽出
    • bs_per_gpu = batch_size//world_size:1GPUのバッチサイズの計算
    • sampler_train = DistributedSampler(ds_train, shuffle=True):各GPUでデータの重複なくミニバッチを抽出するためのサンプラー。全体のGPU数などは指定しない場合dist.get_*によって自動的に参照される。
    • dl_train = DataLoader(ds_train, batch_size=bs_per_gpu, sampler=sampler_train):データローダーの構築
  • モデルの並列化
    • model = EfficientNet(...).to(local_rank):モデルの構築とGPUへの配置
    • ddp_model = DDP(model, device_ids=[local_rank]):モデルの並列化
  • dist.destroy_process_group():各プロセスの終了

検証ではTensorFlowのMultiWorker*と同様に4GPU×2ノードとして以下のように実行しました。使用するノードの数(--nnodes)、各ノードの番号(--node_rank)、各ノードのプロセス数(--nproc_per_node, 基本的にはGPUの数と同じ)を指定しています。--master_addr, --master_portもここで指定できます。

run_torch_ddp.sh
#!/bin/sh

CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node=4 train_torch_ddp.py ../../datasets/stanford_dogs > job_torch_ddp_0.log &
CUDA_VISIBLE_DEVICES=4,5,6,7 python -m torch.distributed.launch --nnodes=2 --node_rank=1 --nproc_per_node=4 train_torch_ddp.py ../../datasets/stanford_dogs
unset CUDA_VISIBLE_DEVICES

2つ目のノードにおけるprint文の出力は以下のようになっていました。WORLD_SIZEから合計8個のプロセスが起動されていることが分かります。各プロセスにはノードを跨いだグローバルなRANKとして4〜7、ノード内部のlocal_rankとして0〜3が割り振られていることが分かります。

...
[304404]: Initializing process group with {'MASTER_ADDR': 127.0.0.1, 'MASTER_PORT': '29500', 'RANK': '7', 'WORLD_SIZE': '8'}
[304397]: Initializing process group with {'MASTER_ADDR': 127.0.0.1, 'MASTER_PORT': '29500', 'RANK': '4', 'WORLD_SIZE': '8'}
[304399]: Initializing process group with {'MASTER_ADDR': 127.0.0.1, 'MASTER_PORT': '29500', 'RANK': '5', 'WORLD_SIZE': '8'}
[304401]: Initializing process group with {'MASTER_ADDR': 127.0.0.1, 'MASTER_PORT': '29500', 'RANK': '6', 'WORLD_SIZE': '8'}
[304401]: Node info: GPU: 4, local_rank: 2
[304397]: Node info: GPU: 4, local_rank: 0
[304404]: Node info: GPU: 4, local_rank: 3
[304399]: Node info: GPU: 4, local_rank: 1
...

面白い点として、DDPは単一マシン・マルチGPUの場合でもDataParallelより高速になると公式ドキュメントに記載があります。これはDataParallelがシングルプロセス・マルチスレッドによる並列化であり、GIL(グローバルインタプリタロック)による制約を受けるためのようです。対してDDPはマルチプロセスにより並列化を行っており、GILの影響を受けないため比較的高速に動作するとされています。

https://pytorch.org/tutorials/beginner/dist_overview.html#torch-nn-parallel-distributeddataparallel

その他

PyTorchをそのまま使う場合はデータやモデルをGPUに載せる処理や、学習ループを明示的にコーディングする必要があります。一方でラッパーライブラリを用いることで、これらの記述を簡潔にすることもできます。例えばPyTorchの代表的なラッパーであるPyTorch LightningではTrainerオブジェクトの生成時に分散学習の方法を指定できるようです。

https://pytorch-lightning.readthedocs.io/en/stable/advanced/multi_gpu.html

感想

TensorFlowとPyTorchのデータ並列による分散学習を検証しました。PyTorchのDDPはやや時間がかかりましたが、他はかなり簡単に導入できるように感じました。ただしバッチサイズはただ大きくすればよくなるわけではないとも報告されており、利用する際には注意が必要そうです。

また近年は巨大な深層学習モデルを得るためにモデル並列が利用している論文もたびたび見かけます。例えば1750億パラメータからなるNLPモデルであるGPT-3は、各行列積の並列化と各層の並列化を併用しているという記述があります。機会があれば触ってみたいですね。

関連資料

https://arxiv.org/abs/1706.02677

https://openreview.net/forum?id=Syx4wnEtvH

https://github.com/tensorflow/mesh

https://aws.amazon.com/jp/sagemaker/distributed-training/

https://arxiv.org/abs/2105.04663

Discussion