TensorFlow・PyTorchによる分散学習
深層学習モデルの学習は、学習データの一部を抽出・勾配を計算するミニバッチ学習によって行われることが一般的です。勾配のばらつきを抑えるためには、ある程度のバッチサイズを保持する必要があります。一方で、バッチサイズの上限は利用するマシン(GPUやTPU)のメモリによって制約を受けるため、大規模なモデルや高解像度画像などを用いる際には、バッチサイズを小さくせざるを得ない場合があります。
これに対して複数のGPUや計算ノードを利用できる場合には、並列化によって単一GPUの時よりも大規模な学習を行うことができます。複数の計算機を用いた並列学習(分散学習)には大きく分けてデータ並列とモデル並列が存在しており、合わせて利用することもできます。
- データ並列(Data Parallel):ミニバッチを複数の計算機に分散する方法
- モデル並列(Model Parallel):一つのモデルを複数の計算機に分散する方法
分散深層学習に関する日本語の資料としては以下が参考になります。
ここではTensorFlowとPyTorchそれぞれを用いた際のデータ並列の方法を確認します。コードは以下に配置しました。NVIDIA A100 GPU×8の計算ノードによって動作を確認しました。マルチノード学習の検証は8つのGPUを4GPU×2に分割することで疑似的に検証を行いました。
※TF・PyTorchそれぞれのデータ読み込みや学習設計まで厳密には揃えていません。そのためこれらの精度や速度比較はしていない点に注意してください。
TensorFlowによる分散学習
基本的に公式ドキュメントが最も参考になります。TensorFlowでの分散学習はtf.distribute.Strategy
というAPIによって提供されています。ここでは最もシンプルなMirroredStrategy
と複数ノードを用いるためのMultiWorkerMirroredStrategy
の利用方法を確認します。
MirroredStrategy
最もシンプルには、MirroredStrategy
オブジェクトは以下のように構築できます。
strategy = tf.distribute.MirroredStrategy()
この場合、現在の環境で見えている全てのGPUを利用することになります。
今回の検証スクリプトでは次のように書きました。この関数では使用するGPUの数を引数n_gpus
として指定し、tf.config.set_visible_devices
によって対象とするGPUを絞っています。例えば8つのGPUがある場合に、n_gpus=4
とすると、物理的なGPU(gpus
)は8つですが、コードが扱う論理的なGPU(logical_gpus
)は4つとなります。
def build_strategy(n_gpus):
gpus = tf.config.list_physical_devices('GPU')
print('Available GPUs:', gpus)
tf.config.set_visible_devices(gpus[:n_gpus], 'GPU')
logical_gpus = tf.config.list_logical_devices('GPU')
print('Visible logical gpus:', logical_gpus)
strategy = tf.distribute.MirroredStrategy(logical_gpus)
return strategy
strategy.scope()
内で、モデルやオプティマイザを構築することで、これらが各GPUにコピーされます。
...
def build_network(image_size, n_classes):
image = layers.Input([*image_size, 3], dtype=tf.float32, name='input')
effnet = EfficientNetB2(include_top=False,
weights='imagenet',
pooling='avg')
feature = effnet(image)
feature = layers.Dropout(0.5)(feature)
logit = layers.Dense(n_classes)(feature)
model = tf.keras.Model(inputs=image, outputs=logit)
return model
...
def run(n_gpus, epochs, batch_size, learning_rate):
strategy = build_strategy(n_gpus)
...
with strategy.scope():
model = build_network(IMAGE_SIZE, N_CLASSES)
optimizer = tf.keras.optimizers.Adam(learning_rate)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metrics = tf.keras.metrics.SparseCategoricalAccuracy()
model.compile(optimizer=optimizer,
loss=loss,
metrics=metrics)
model.fit(...)
非常にストレスなく実装できました。手元のPCなどGPUが存在しない環境でも自動的にCPUのみで動作してくれるので使いやすいと思います。
MultiWorkerMirroredStrategy
MirroredStrategy
ではあくまで1台のマシン上の複数のGPUを用いて分散学習を行いましたが、MultiWorkerMirroredStrategy
では複数のワーカーの複数のGPUを利用して、さらに大規模な分散学習を行うことができます。一つのマシンに複数のワーカーを定義することもできますが、簡単のために1ワーカー=1マシン(計算ノード)として説明します。
MultiWorker*
では複数のマシンによる分散学習を行うために、TF_CONFIG
という環境変数を通じて各マシンのタスクタイプとタスクidを設定する必要があります。例えば今回の検証では以下のように設定し、strategy
を構築しています。
def build_strategy(worker_index):
tf_config = {
'cluster': {
'worker': ['localhost:12345', 'localhost:23456']
},
'task': {'type': 'worker', 'index': worker_index}
}
# Set environment variables for multi-worker communication
os.environ['TF_CONFIG'] = json.dumps(tf_config)
strategy = tf.distribute.MultiWorkerMirroredStrategy()
return strategy
-
'cluster': {'worker': ['localhost:12345', 'localhost:23456']}
が分散学習環境の全体を表します。今回は8GPU×1ノードの環境を仮想的に4GPU×2ノードとして扱って検証を行ったため、worker
はlocalhost
の12345
ポートと23456
ポートを指定しています。 -
'task': {'type': 'worker', 'index': worker_index}
は各ノード(worker)が、cluster
の構成の中のどの役割を担当するかを表します。例えばここでworker_index=0
とすると、cluster['worker'][0] (つまり'localhost:12345')
を指定したことになります。
このようにして構築したコンフィグはJSON文字列としてダンプし、os.environ['TF_CONFIG']
に格納することで、そのワーカーが担当する役割を定義したことになります。あとはMirroredStrategy
と同様にstrategy
を構築し、学習を行うことができます。注意すべき点としてMultiWorker*
を用いる際には、他のTensorFlowオペレーションよりも先にstrategy
を構築する必要があります。
def run(worker_index, epochs, batch_size, learning_rate):
strategy = build_strategy(worker_index)
...
with strategy.scope():
model = build_network(IMAGE_SIZE, N_CLASSES)
optimizer = tf.keras.optimizers.Adam(learning_rate)
loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
metrics = tf.keras.metrics.SparseCategoricalAccuracy()
model.compile(optimizer=optimizer,
loss=loss,
metrics=metrics)
model.fit(...)
実行時にはクラスターを構成する各ワーカーを起動する必要があります。今回の検証では次のように実行時引数-wi
によってworker_index
を指定し、2ワーカーからなる分散学習を実行しました。学習の開始にはクラスターを構成する全てのワーカーを起動する必要があるため、一つ目のワーカーはバックグラウンドで実行しています。
#!/bin/sh
CUDA_VISIBLE_DEVICES=0,1,2,3 python train_tf_multiworker_mirroredstrategy.py -wi 0 > job_tf_mwms_0.log &
CUDA_VISIBLE_DEVICES=4,5,6,7 python train_tf_multiworker_mirroredstrategy.py -wi 1
unset CUDA_VISIBLE_DEVICES
その他
TensorFlowではMirroredStrategy, MultiWorker*
の他にもいくつかの分散学習APIを提供しています。代表的なものとしてはTPUStrategy
があり、TPUを用いた高速な学習が期待できます。
他にParameterServerStrategy
では、パラメータサーバーにモデルの重みを配置し、各ワーカーがこれを逐次的に読み取る形で学習を行います。この場合には例えば以下のように、様々な役割からなるクラスターを構成する必要があります。
os.environ["TF_CONFIG"] = json.dumps({
"cluster": {
"worker": ["host1:port", "host2:port", "host3:port"],
"ps": ["host4:port", "host5:port"],
"chief": ["host6:port"]
},
"task": {"type": "worker", "index": 1}
})
その他の分散学習方法は以下のドキュメントで紹介されています。
PyTorchによる分散学習
PyTorchはラッパーライブラリを用いない場合、データやモデルの重みを手動でGPUに載せる必要があります。この処理を理解するのに少し時間がかかりましたが、結果的にドキュメントと公式のサンプルコードを見れば理解することができました。
DataParallel
torch.nn.DataParallel
は、単一マシンの複数GPUを用いて分散学習を行うためのAPIです。モデルと並列化に用いるGPUのidを渡すことで、モデルをコピーすることができます。注意点として、DataParallel
とは別に明示的にmodel.to(device)
を行い、モデルをどれかのGPUに載せる必要があるようです。
def run(datadir, n_gpus, epochs, batch_size, learning_rate):
n_max_gpus = torch.cuda.device_count()
print(f'{n_max_gpus} GPUs available')
n_gpus = min(n_gpus, n_max_gpus)
print(f'Using {n_gpus} GPUs')
device_ids = list(range(n_gpus))
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
...
model = EfficientNet(backbone='efficientnet_b2', n_classes=N_CLASSES)
model = nn.DataParallel(model, device_ids=device_ids)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
for e in range(epochs):
model.train()
for i, (images, labels) in enumerate(dl_train):
optimizer.zero_grad()
images, labels = images.to(device), labels.to(device)
logits = model(images)
loss = criterion(logits, labels)
loss.backward()
optimizer.step()
...
device_ids
を指定しない場合には実行時に利用できる全てのGPUを利用します。CPUで動かす際には条件分岐などでDataParallel
を実行しないようにする必要があるようです。これらのポイントがあるものの、既存のPyTorch実装への導入は簡単に感じました。
DistributedDataParallel
DistributedDataParallel (以降DDP)
はマルチノードで分散学習を行うためのAPIであり、TensorFlowのMultiWorkerMirroredStrategy
に相当すると言えます。またDDP
による分散学習は以下の二通りで実行できます。
-
torch.multiprocessing.spawn
によってPythonスクリプト内から分散処理を立ち上げる方法 -
torch.distributed.launch.py
をモジュールとして分散処理を立ち上げる方法
ここではTensorFlowのMultiWorker*
と同様に、コマンドラインから分散処理を立ち上げる後者の方法を検証しました。実装上の特徴的な点を抜粋すると以下のようになります。
def run(datadir, local_rank, epochs, batch_size, learning_rate):
pid = os.getpid()
env_dict = {
key: os.environ[key]
for key in ("MASTER_ADDR", "MASTER_PORT", "RANK", "WORLD_SIZE")
}
print(f"[{pid}] Initializing process group with: {env_dict}")
dist.init_process_group("nccl")
world_size = dist.get_world_size() # Total GPUs over nodes
n_gpus = torch.cuda.device_count() # GPUs at current node
print(f'[{pid}] Node info: GPUs: {n_gpus}, local_rank: {local_rank}')
...
bs_per_gpu = batch_size//world_size
# num_replicas and rank arguments are automatically assigned the global ones via dist.get_*
sampler_train = DistributedSampler(ds_train, shuffle=True)
sampler_val = DistributedSampler(ds_val, shuffle=False)
dl_train = DataLoader(ds_train, batch_size=bs_per_gpu, sampler=sampler_train)
dl_val = DataLoader(ds_val, batch_size=bs_per_gpu, sampler=sampler_val)
# Build model and setup training
model = EfficientNet(backbone='efficientnet_b2', n_classes=N_CLASSES).to(local_rank)
ddp_model = DDP(model, device_ids=[local_rank])
for e in range(epochs):
...
dist.destroy_process_group()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='PyTorch DDP training')
parser.add_argument('--local_rank', type=int)
...
args = parser.parse_args()
run(**vars(args))
-
local_rank
:同じノード内のプロセス中何番目かを表す番号。スクリプトの実行時引数として設定することで、launch.py
を経由してプロセスごとに指定される。 - 分散学習のクラスターに関する環境変数。
torch.distributed.launch.py
による実行時に指定する。-
MASTER_ADDR
:プロセスに対応したipアドレス -
MASTER_PORT
:プロセスに対応したのポート -
RANK
:分散学習の全プロセス中何番目かを表す番号 -
WORLD_SIZE
:分散学習のプロセス数 -
dist.get_*
からも参照できる
-
-
dist.init_process_group("nccl")
:各プロセスが通信するための初期化。-
nccl
はプロセス間通信用のバックエンド。GPUを利用する際には基本的にnccl
で良い。 - 参考:Which backend to use?
-
- ミニバッチの分割と抽出
-
bs_per_gpu = batch_size//world_size
:1GPUのバッチサイズの計算 -
sampler_train = DistributedSampler(ds_train, shuffle=True)
:各GPUでデータの重複なくミニバッチを抽出するためのサンプラー。全体のGPU数などは指定しない場合dist.get_*
によって自動的に参照される。 -
dl_train = DataLoader(ds_train, batch_size=bs_per_gpu, sampler=sampler_train)
:データローダーの構築
-
- モデルの並列化
-
model = EfficientNet(...).to(local_rank)
:モデルの構築とGPUへの配置 -
ddp_model = DDP(model, device_ids=[local_rank])
:モデルの並列化
-
-
dist.destroy_process_group()
:各プロセスの終了
検証ではTensorFlowのMultiWorker*
と同様に4GPU×2ノードとして以下のように実行しました。使用するノードの数(--nnodes
)、各ノードの番号(--node_rank
)、各ノードのプロセス数(--nproc_per_node
, 基本的にはGPUの数と同じ)を指定しています。--master_addr, --master_port
もここで指定できます。
#!/bin/sh
CUDA_VISIBLE_DEVICES=0,1,2,3 python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node=4 train_torch_ddp.py ../../datasets/stanford_dogs > job_torch_ddp_0.log &
CUDA_VISIBLE_DEVICES=4,5,6,7 python -m torch.distributed.launch --nnodes=2 --node_rank=1 --nproc_per_node=4 train_torch_ddp.py ../../datasets/stanford_dogs
unset CUDA_VISIBLE_DEVICES
2つ目のノードにおけるprint文の出力は以下のようになっていました。WORLD_SIZE
から合計8個のプロセスが起動されていることが分かります。各プロセスにはノードを跨いだグローバルなRANK
として4〜7、ノード内部のlocal_rank
として0〜3が割り振られていることが分かります。
...
[304404]: Initializing process group with {'MASTER_ADDR': 127.0.0.1, 'MASTER_PORT': '29500', 'RANK': '7', 'WORLD_SIZE': '8'}
[304397]: Initializing process group with {'MASTER_ADDR': 127.0.0.1, 'MASTER_PORT': '29500', 'RANK': '4', 'WORLD_SIZE': '8'}
[304399]: Initializing process group with {'MASTER_ADDR': 127.0.0.1, 'MASTER_PORT': '29500', 'RANK': '5', 'WORLD_SIZE': '8'}
[304401]: Initializing process group with {'MASTER_ADDR': 127.0.0.1, 'MASTER_PORT': '29500', 'RANK': '6', 'WORLD_SIZE': '8'}
[304401]: Node info: GPU: 4, local_rank: 2
[304397]: Node info: GPU: 4, local_rank: 0
[304404]: Node info: GPU: 4, local_rank: 3
[304399]: Node info: GPU: 4, local_rank: 1
...
面白い点として、DDP
は単一マシン・マルチGPUの場合でもDataParallel
より高速になると公式ドキュメントに記載があります。これはDataParallel
がシングルプロセス・マルチスレッドによる並列化であり、GIL(グローバルインタプリタロック)による制約を受けるためのようです。対してDDP
はマルチプロセスにより並列化を行っており、GILの影響を受けないため比較的高速に動作するとされています。
その他
PyTorchをそのまま使う場合はデータやモデルをGPUに載せる処理や、学習ループを明示的にコーディングする必要があります。一方でラッパーライブラリを用いることで、これらの記述を簡潔にすることもできます。例えばPyTorchの代表的なラッパーであるPyTorch LightningではTrainer
オブジェクトの生成時に分散学習の方法を指定できるようです。
感想
TensorFlowとPyTorchのデータ並列による分散学習を検証しました。PyTorchのDDP
はやや時間がかかりましたが、他はかなり簡単に導入できるように感じました。ただしバッチサイズはただ大きくすればよくなるわけではないとも報告されており、利用する際には注意が必要そうです。
また近年は巨大な深層学習モデルを得るためにモデル並列が利用している論文もたびたび見かけます。例えば1750億パラメータからなるNLPモデルであるGPT-3は、各行列積の並列化と各層の並列化を併用しているという記述があります。機会があれば触ってみたいですね。
関連資料
Discussion