Zenn
📌

クラウド基盤mdxに構築したKubernetesクラスタ上で簡単に分散学習を行う

2025/03/24に公開

この記事について

こんにちは、東京大学鈴村研究室で、インフラエンジニアとしてお手伝いさせていただいています、福田と申します。

https://sites.google.com/view/toyolab/鈴村研究室概要

これまで、クラウド基盤mdxの上で構築したKubernetes環境で、サーバレスWebアプリケーションを立ち上げるための手順などを中心に説明してきました。

https://zenn.dev/suzumura_lab/articles/627b5063d6884d

今回は少し話題を変えて、このKubernetes環境で、複数のGPUインスタンスを使った分散学習をする方法について説明します。
Kubernetesを使用しない一般的な分散学習は、ノード間の通信のための設定が色々と大変ですが、Kubernetes上で動作するKubeflow Trainerというフレームワークを使うと、この辺りの設定を抽象化してくれて、とても簡単に分散学習が実現できます。

https://www.kubeflow.org/docs/components/trainer/

本記事では、このKubeflow Trainerを使った分散学習の方法について説明していきます。

前提

この記事ではクラウド基盤mdxの上に、Kubernetesクラスタが構築されていることを前提とするため、mdxの上で仮想マシンを構築する方法や、Kubernetesクラスタ自体の構築方法については説明しません。
これらの手順を知りたい場合は、こちらの過去記事を参照ください。

踏み台サーバへのログイン

まずは、いつもの通り、以下のコマンドで踏み台サーバにログインします。
ssh-agentを設定しておくと、踏み台サーバから、各Workerインスタンスに秘密鍵などを指定することなくsshアクセスできるようになるので、便利です。
また、port6443をport forwardして、Lensが接続できるようにしておきます。
この辺りの手順はこちらの過去記事を参照ください。

eval `ssh-agent`
ssh-add ~/.ssh/mdx_access_key
ssh -L 6443:(master nodeのPrivate IPアドレス):6443 -A mdxuser@(踏み台サーバのGlobal IPアドレス)

Kubeflow Trainerのインストール

踏み台サーバにて、以下のコマンドを実行します。

kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/manager?ref=master"

以下のコマンドを実行し、JobSetとTrainerのcontroller managerが起動していることを確認します。

kubectl get pods -n kubeflow-system
NAME                                                  READY   STATUS    RESTARTS   AGE
jobset-controller-manager-54968bd57b-88dk4            2/2     Running   0          65s
kubeflow-trainer-controller-manager-cc6468559-dblnw   1/1     Running   0          65s

次に以下のコマンドを実行し、Kubeflow Trainerのruntimeをインストールします。

kubectl apply --server-side -k "https://github.com/kubeflow/trainer.git/manifests/overlays/runtimes?ref=master"

Kubeflow Trainerでの分散学習実行

さて、ここまでの手順で、Kubeflow TrainerをKubernetes環境に構築することができました。
ここからは、構築したKubeflow Trainerの上で、実際の分散学習を実施していきます。

Kubeconfigの設定

まずは、手元のPCから、Kubeflow TrainerにJobを投げることができるように、以下の手順に従って、Kubeconfigの設定を行います。
すでにLensをインストールする際に、kubeconfigを設定済みの場合は、この手順はスキップできます。

Kubeconfigの設定

Kubeflow Trainer Clientのインストール

以下のコマンドを実行し、PC端末にKubeflow Trainerのクライアントライブラリをインストールします。

pip install git+https://github.com/kubeflow/trainer.git@master#subdirectory=sdk

学習用コードの作成

PC端末で、以下のような分散学習用のコードを作成します。
この例では、PyTorchで、Fashion-MNISTのデータセットをシンプルな畳み込みニューラルネットワークで学習させることを行っています。

なお、このコードはKubernetesクラスタ内で実行されるので、PC端末にtorchtorchvisionをインストールすることは必須ではありませんが、エディタのエラーが気になる方は、これらのライブラリをpip installすることも、もちろんOKです。

また、コード内で、LOCAL_RANKという環境変数が登場しますが、これはユーザーが指定しなくとも、Kubeflowのフレームワークの中で自動的にこの環境変数を設定してくれます。

def train_pytorch_model():
    import os

    import torch
    from torch import nn
    import torch.nn.functional as F

    from torchvision import datasets, transforms
    import torch.distributed as dist
    from torch.utils.data import DataLoader, DistributedSampler

    # [1] Configure CPU/GPU device and distributed backend.
    # Kubeflow Trainer will automatically configure the distributed environment.
    device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
    dist.init_process_group(backend=backend)

    # このLOCAL_RANKという環境変数はKubeflowのフレームワーク内で自動で設定される
    local_rank = int(os.getenv("LOCAL_RANK", 0))
    print(
        "Distributed Training with WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}.".format(
            dist.get_world_size(),
            dist.get_rank(),
            local_rank,
        )
    )

    # [2] Define PyTorch CNN Model to be trained.
    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 20, 5, 1)
            self.conv2 = nn.Conv2d(20, 50, 5, 1)
            self.fc1 = nn.Linear(4 * 4 * 50, 500)
            self.fc2 = nn.Linear(500, 10)

        def forward(self, x):
            x = F.relu(self.conv1(x))
            x = F.max_pool2d(x, 2, 2)
            x = F.relu(self.conv2(x))
            x = F.max_pool2d(x, 2, 2)
            x = x.view(-1, 4 * 4 * 50)
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return F.log_softmax(x, dim=1)

    # [3] Attach model to the correct device.
    device = torch.device(f"{device}:{local_rank}")
    model = nn.parallel.DistributedDataParallel(Net().to(device))
    model.train()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)

    # [4] Get the Fashion-MNIST dataset and distributed it across all available devices.
    dataset = datasets.FashionMNIST(
        "./data",
        train=True,
        download=True,
        transform=transforms.Compose([transforms.ToTensor()]),
    )
    train_loader = DataLoader(
        dataset,
        batch_size=100,
        sampler=DistributedSampler(dataset),
    )

    # [5] Define the training loop.
    for epoch in range(1000):
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            # Attach tensors to the device.
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = F.nll_loss(outputs, labels)

            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            if batch_idx % 10 == 0 and dist.get_rank() == 0:
                print(
                    "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                        epoch,
                        batch_idx * len(inputs),
                        len(train_loader.dataset),
                        100.0 * batch_idx / len(train_loader),
                        loss.item(),
                    )
                )

    # Wait for the training to complete and destroy to PyTorch distributed process group.
    dist.barrier()
    if dist.get_rank() == 0:
        print("Training is finished")
    dist.destroy_process_group()

分散学習の開始

次に、上記で定義した学習用のコードをKubeflow Trainerで実行するためのコードを定義します。
ファイル名は、start_training_job.py等としておきます。

if __name__ == '__main__':
    from kubeflow.trainer import TrainerClient, CustomTrainer
    job_id = TrainerClient().train(
        trainer=CustomTrainer(
            func=train_pytorch_model,
            num_nodes=2, # 分散学習のnode数を指定する
        ),
        runtime_ref="torch-distributed",
    )
    print(f"Job id: {job_id}")

上記のコードをpythonコマンドで実行すれば、分散学習のJobが開始します。

python start_training_job.py

実行中のJob一覧のStatusの確認

以下のコマンドを実行することで、実行中のJobの状態を確認できます。
list_jobs関数でJobの一覧を出力し、TrainJobクラスの持つ、componentsのpropertyを出力しています。

if __name__ == '__main__':
    from kubeflow.trainer import TrainerClient
    jobs = TrainerClient().list_jobs()
    for job in jobs:
        print(f"Job ID: {job.name}, Job Status: {job.status}")
        job_id = job.name
        for c in TrainerClient().get_job(name=job_id).components:
            print(f"Component: {c.name}, Status: {c.status}")

このコマンドを実行すると、以下のような結果が得られます。
ここで、ComponentのStatusがRunningになっていれば、Jobの実行に成功しています。

Job ID: u76495fea884, Job Status: Created
Component: trainer-node-0, Status: Running
Component: trainer-node-1, Status: Running

Job実行中ログの確認

実行中のJobのログを確認したい場合、以下のコマンドを実行します。
follow引数をTrueにしているので、自動で最新のログが更新て出力されます。

if __name__ == '__main__':
    from kubeflow.trainer import TrainerClient

    job_id = "xxxxxx" # 対象のJob IDを指定します。
    logs = TrainerClient().get_job_logs(name=job_id, follow=True)
    print(logs)

Jobの停止

実行中のJobを停止させたい場合、以下のコマンドを実行します。

if __name__ == '__main__':
    from kubeflow.trainer import TrainerClient
    job_id = "xxxxxx" # 対象のJob IDを指定します。
    TrainerClient().delete_job(name=job_id)

今回のまとめ

今回の記事では、クラウド基盤mdxに構築したKubernetesクラスタで、Kubeflow Trainerというライブラリを使って、分散学習を実行する方法を説明しました。
通常の分散学習は、ノード間の通信の設定など何かと設定が面倒ですが、Kubeflow Trainerを使うことで、これらの設定を自動化してくれるので、非常に簡単に分散学習が実行出来ることを体感できたと思います。

東京大学鈴村研究室について

https://sites.google.com/view/toyolab/鈴村研究室概要

Discussion

ログインするとコメントできます