クラウド基盤mdxに構築したKubernetesクラスタ上で簡単に分散学習を行う
この記事について
こんにちは、東京大学鈴村研究室で、インフラエンジニアとしてお手伝いさせていただいています、福田と申します。
これまで、クラウド基盤mdxの上で構築したKubernetes環境で、サーバレスWebアプリケーションを立ち上げるための手順などを中心に説明してきました。
今回は少し話題を変えて、このKubernetes環境で、複数のGPUインスタンスを使った分散学習をする方法について説明します。
Kubernetesを使用しない一般的な分散学習は、ノード間の通信のための設定が色々と大変ですが、Kubernetes上で動作するKubeflow Trainerというフレームワークを使うと、この辺りの設定を抽象化してくれて、とても簡単に分散学習が実現できます。
本記事では、このKubeflow Trainerを使った分散学習の方法について説明していきます。
前提
この記事ではクラウド基盤mdxの上に、Kubernetesクラスタが構築されていることを前提とするため、mdxの上で仮想マシンを構築する方法や、Kubernetesクラスタ自体の構築方法については説明しません。
これらの手順を知りたい場合は、こちらの過去記事を参照ください。
踏み台サーバへのログイン
まずは、いつもの通り、以下のコマンドで踏み台サーバにログインします。
ssh-agentを設定しておくと、踏み台サーバから、各Workerインスタンスに秘密鍵などを指定することなくsshアクセスできるようになるので、便利です。
また、port6443をport forwardして、Lensが接続できるようにしておきます。
この辺りの手順はこちらの過去記事を参照ください。
eval `ssh-agent`
ssh-add ~/.ssh/mdx_access_key
ssh -L 6443:(master nodeのPrivate IPアドレス):6443 -A mdxuser@(踏み台サーバのGlobal IPアドレス)
Kubeflow Trainerのインストール
踏み台サーバにて、以下のコマンドを実行します。
kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/manager?ref=master"
以下のコマンドを実行し、JobSetとTrainerのcontroller managerが起動していることを確認します。
kubectl get pods -n kubeflow-system
NAME READY STATUS RESTARTS AGE
jobset-controller-manager-54968bd57b-88dk4 2/2 Running 0 65s
kubeflow-trainer-controller-manager-cc6468559-dblnw 1/1 Running 0 65s
次に以下のコマンドを実行し、Kubeflow Trainerのruntimeをインストールします。
kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/runtimes?ref=master"
Kubeflow Trainerでの分散学習実行
さて、ここまでの手順で、Kubeflow TrainerをKubernetes環境に構築することができました。
ここからは、構築したKubeflow Trainerの上で、実際の分散学習を実施していきます。
Kubeconfigの設定
まずは、手元のPCから、Kubeflow TrainerにJobを投げることができるように、以下の手順に従って、Kubeconfigの設定を行います。
すでにLensをインストールする際に、kubeconfigを設定済みの場合は、この手順はスキップできます。
Kubeflow Trainer Clientのインストール
以下のコマンドを実行し、PC端末にKubeflow Trainerのクライアントライブラリをインストールします。
pip install git+https://github.com/kubeflow/trainer.git@master#subdirectory=sdk
学習用コードの作成
PC端末で、以下のような分散学習用のコードを作成します。
この例では、PyTorchで、Fashion-MNISTのデータセットをシンプルな畳み込みニューラルネットワークで学習させることを行っています。
なお、このコードはKubernetesクラスタ内で実行されるので、PC端末にtorch
、torchvision
をインストールすることは必須ではありませんが、エディタのエラーが気になる方は、これらのライブラリをpip installすることも、もちろんOKです。
また、コード内で、LOCAL_RANK
という環境変数が登場しますが、これはユーザーが指定しなくとも、Kubeflowのフレームワークの中で自動的にこの環境変数を設定してくれます。
def train_pytorch_model():
import os
import torch
from torch import nn
import torch.nn.functional as F
from torchvision import datasets, transforms
import torch.distributed as dist
from torch.utils.data import DataLoader, DistributedSampler
# [1] Configure CPU/GPU device and distributed backend.
# Kubeflow Trainer will automatically configure the distributed environment.
device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
dist.init_process_group(backend=backend)
# このLOCAL_RANKという環境変数はKubeflowのフレームワーク内で自動で設定される
local_rank = int(os.getenv("LOCAL_RANK", 0))
print(
"Distributed Training with WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}.".format(
dist.get_world_size(),
dist.get_rank(),
local_rank,
)
)
# [2] Define PyTorch CNN Model to be trained.
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 20, 5, 1)
self.conv2 = nn.Conv2d(20, 50, 5, 1)
self.fc1 = nn.Linear(4 * 4 * 50, 500)
self.fc2 = nn.Linear(500, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = F.max_pool2d(x, 2, 2)
x = F.relu(self.conv2(x))
x = F.max_pool2d(x, 2, 2)
x = x.view(-1, 4 * 4 * 50)
x = F.relu(self.fc1(x))
x = self.fc2(x)
return F.log_softmax(x, dim=1)
# [3] Attach model to the correct device.
device = torch.device(f"{device}:{local_rank}")
model = nn.parallel.DistributedDataParallel(Net().to(device))
model.train()
optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)
# [4] Get the Fashion-MNIST dataset and distributed it across all available devices.
dataset = datasets.FashionMNIST(
"./data",
train=True,
download=True,
transform=transforms.Compose([transforms.ToTensor()]),
)
train_loader = DataLoader(
dataset,
batch_size=100,
sampler=DistributedSampler(dataset),
)
# [5] Define the training loop.
for epoch in range(1000):
for batch_idx, (inputs, labels) in enumerate(train_loader):
# Attach tensors to the device.
inputs, labels = inputs.to(device), labels.to(device)
# Forward pass
outputs = model(inputs)
loss = F.nll_loss(outputs, labels)
# Backward pass
optimizer.zero_grad()
loss.backward()
optimizer.step()
if batch_idx % 10 == 0 and dist.get_rank() == 0:
print(
"Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
epoch,
batch_idx * len(inputs),
len(train_loader.dataset),
100.0 * batch_idx / len(train_loader),
loss.item(),
)
)
# Wait for the training to complete and destroy to PyTorch distributed process group.
dist.barrier()
if dist.get_rank() == 0:
print("Training is finished")
dist.destroy_process_group()
分散学習の開始
次に、上記で定義した学習用のコードをKubeflow Trainerで実行するためのコードを定義します。
ファイル名は、start_training_job.py
等としておきます。
if __name__ == '__main__':
from kubeflow.trainer import TrainerClient, CustomTrainer
job_id = TrainerClient().train(
trainer=CustomTrainer(
func=train_pytorch_model,
num_nodes=2, # 分散学習のnode数を指定する
),
runtime_ref="torch-distributed",
)
print(f"Job id: {job_id}")
上記のコードをpythonコマンドで実行すれば、分散学習のJobが開始します。
python start_training_job.py
実行中のJob一覧のStatusの確認
以下のコマンドを実行することで、実行中のJobの状態を確認できます。
list_jobs関数でJobの一覧を出力し、TrainJobクラスの持つ、componentsのpropertyを出力しています。
if __name__ == '__main__':
from kubeflow.trainer import TrainerClient
jobs = TrainerClient().list_jobs()
for job in jobs:
print(f"Job ID: {job.name}, Job Status: {job.status}")
job_id = job.name
for c in TrainerClient().get_job(name=job_id).components:
print(f"Component: {c.name}, Status: {c.status}")
このコマンドを実行すると、以下のような結果が得られます。
ここで、ComponentのStatusがRunningになっていれば、Jobの実行に成功しています。
Job ID: u76495fea884, Job Status: Created
Component: trainer-node-0, Status: Running
Component: trainer-node-1, Status: Running
Job実行中ログの確認
実行中のJobのログを確認したい場合、以下のコマンドを実行します。
follow引数をTrueにしているので、自動で最新のログが更新て出力されます。
if __name__ == '__main__':
from kubeflow.trainer import TrainerClient
job_id = "xxxxxx" # 対象のJob IDを指定します。
logs = TrainerClient().get_job_logs(name=job_id, follow=True)
print(logs)
Jobの停止
実行中のJobを停止させたい場合、以下のコマンドを実行します。
if __name__ == '__main__':
from kubeflow.trainer import TrainerClient
job_id = "xxxxxx" # 対象のJob IDを指定します。
TrainerClient().delete_job(name=job_id)
今回のまとめ
今回の記事では、クラウド基盤mdxに構築したKubernetesクラスタで、Kubeflow Trainerというライブラリを使って、分散学習を実行する方法を説明しました。
通常の分散学習は、ノード間の通信の設定など何かと設定が面倒ですが、Kubeflow Trainerを使うことで、これらの設定を自動化してくれるので、非常に簡単に分散学習が実行出来ることを体感できたと思います。
東京大学鈴村研究室について
Discussion