🐈

databrickでマルチノード/GPUを扱ってみる

2025/01/27に公開

要検証と記載されている部分は動作確認中の部分です。参考程度に

nodeのタイプ

driver ノード (1つ)
→ worker ノードに指示するところ

worker ノード (複数)
→ 今回で言うとGPUを使ってtrainを回すところ

注意事項

  • CPUとGPU、それぞれがどのノードにあるのかを意識してプログラムする

Apache Spark変数

アタッチされたノートブックには、Apache Spark変数が定義されています。
たとえば、SparkContextというクラスが変数名scで定義されています。

参考:
https://docs.databricks.com/ja/notebooks/notebook-compute.html

SparkContextを使って、workerのGPUをチェックする。

import os
# sc.parallelizeでRDDを作成する。そのRDDに対して命令する感じ
# "nvidia-smi --query-gpu=index,uuid --format=csv"
gpu_info = sc.parallelize([0,1]).map(lambda _: os.popen("nvidia-smi").read()).collect()

# 結果を表示
for info in gpu_info:
    print(info)

RDD(耐障害性分散データセット)

RDD は、クラスタ内の複数のノードに配置されたデータ要素の不変の集合体であり、変換その他の操作のための基礎的な API と並行して使用することが可能です。
https://www.databricks.com/jp/glossary/what-is-rdd

rdd =spark.sparkContext.parallelize(ここにコレクション)

上記のようにして作成することができます。

torchのバージョンの確認

%pip show torch

databricksのモデルのロードの速度に関して

unitycatalogなどにモデルがあり、そこからロードすると遅いです。

なのでモデルのパスをlocal_disk0等に移動させてから、実行するのが良いかなと思っていたりします。
1nodeの場合は、気楽に試せるのでこちらをオススメします。

HFモデルの分散時の問題点

device_map="auto" はGPUを自動で分散して利用してくれるが、1nodeの対応であることに留意する。

TorchDistributorによるDDP/FSDP

起動に関して

def 自作関数(引数):
    ...

from pyspark.ml.torch.distributor import TorchDistributor

distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(<自作関数>, <ここに自作関数の引数>)
  • local_mode

トレーニングにドライバーノードを使用するかどうかを決定するブール値です。デフォルトは false

  • num_processes

異なる並列タスクが許可される数

引用:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.torch.distributor.TorchDistributor.html

それぞれのノードでモデルをロードさせる。

前提: 1 modelが1nodeに乗る前提の場合

from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import ShardingStrategy
from transformers import AutoTokenizer,AutoModelForCausalLM,AutoConfig

def 自作関数():
    import torch.distributed as dist

    dist.init_process_group("nccl",rank=rank, world_size=world_size)
  
    model = AutoModelForCausalLM.from_pretrained(LOCAL_PATH,
                                            local_files_only=True,
                                            low_cpu_mem_usage=True,
                                            torch_dtype=torch.bfloat16)
    model.to(f"cuda:{local_rank}")

    # ddpの場合
    ddp_model = DDP(
        model,
        device_ids=[local_rank],
        output_device=local_rank
    )

    # fsdpの場合
    fsdp_model = FSDP(
        model,
        sharding_strategy=ShardingStrategy.FULL_SHARD,  # フルシャーディング
        device_id=local_rank,
    )

この関数はそれぞれのワーカーを通ります

この時cuda:{local_rank}にモデルがあることに留意する。

DDPはモデルの各パラメータに対してオートグラッドフックを登録します。バックワードパスが実行されると、このフックが発火し、すべてのプロセス間で勾配の同期が行われます。これにより、各プロセスが同じ勾配を持ちます。

この処理はそれぞれのワーカーを通るため、モデルを読み込むためのLOCAL_PATHはワーカーから参照できる場所であることに注意する必要があります。

FSDPでそれでもOOMが出る場合

cpu_offload=CPUOffload(offload_params=True)を追加する。

 FSDP(
        model,
        sharding_strategy=ShardingStrategy.FULL_SHARD,  # フルシャーディング
        device_id=local_rank,
        cpu_offload=CPUOffload(offload_params=True)
  )

tokenizerの設定に関して

元のデータは基本文章であったりするのでtokenizerを用います。

paddingの設定は?

if not tokenizer.pad_token_id:
        tokenizer.pad_token_id = tokenizer.eos_token_id

https://github.com/meta-llama/llama-cookbook/blob/main/src/llama_cookbook/finetuning.py

paddingの方向は?

tokenizer.padding_side="right"

deecoder only model
(現状推論時に、leftを選択して実行するとnanになってしまうので、とりあえずright)

データセットの読み込み

torch.utils.data.distributed.DistributedSampler

DistributedSampler

バッチの固め方を分散環境で定めるために利用
自作関数内で定義しています。

  from datasets import load_dataset

  # データセットの準備
  dataset = load_dataset("llm-jp/databricks-dolly-15k-ja", split="train")
  tokenized_datasets = dataset.map(tokenize_function, batched=False)

 # バッチの固め方を決める
  train_sampler = DistributedSampler(tokenized_datasets, shuffle=False)

load_datasetはネットからデータセットをロードしてきています。
datasetはトークン化するする必要があるのでtokenize_functionという関数を別に自作で定義しています。batched=Falseになっているのは、ラムダ式で、tokenize_functionが1データセットにしか対応していない作りになっているためで、batched=Trueにして作っておいても問題ありません。

DistributedSampleは分散環境で自動で割りあてを設定してくれるので必ずかませます。

データセットのマスク化()

先ほどtokenize_functionという関数を別に自作で定義

  • データセットの種類によって実装を変える。
  • データセットの作り方によっても実装は変わる
    上記のことに留意です。

データセットをトークナイザーでトークン化するクラスです。
アテンションマスクをもセットしておきます。

実装に関しては下記↓
https://zenn.dev/timoneko/articles/70fde2fa81bb5d

max_lengthを大きくすれば、より大きなデータセットを作れます。

DataLoader

  • バッチサイズを決定する
  • collate_fn→torch.Tensorの形にする
  def collate_fn(batch):
    input_ids = torch.stack([torch.tensor(item["input_ids"], dtype=torch.long) for item in batch])
    labels = torch.stack([torch.tensor(item["labels"], dtype=torch.long) for item in batch])
    attention_mask = torch.stack([torch.tensor(item["attention_mask"], dtype=torch.long) for item in batch])

    return {
        "input_ids": input_ids,
        "attention_mask": attention_mask,
        "labels": labels
    }

py
 train_dataloader = DataLoader(
        tokenized_datasets, batch_size=batch_size, shuffle=None, sampler=train_sampler,collate_fn=collate_fn
  )

batch_size

各バッチに含まれるサンプル数。バッチサイズが大きいほど、勾配が安定する傾向がありますが、メモリを多く消費します。(ここに関しては後述)

DataLoaderの戻り値

PyTorch の DataLoader は、デフォルトでバッチを辞書の形 { 'input_ids': ..., 'attention_mask': ... } で返します。
また、ddp_modelはGPU側にあるのでtensorとして入れてdeviceに送信するためcollate_fnを利用します。

collate_fn

collate_fnはそれを操作し、最終的にはtorch.Tensorにする関数です。

train(要検証)

下記の実装では、train_dataloaderに含まれているデータセット分しか動作しません。
for文を追加することでイテレーションを回すことができます。
注)下記の処理には、モデルの保存は含まれていません。

  fsdp_model.train()
  optimizer = torch.optim.Adam(fsdp_model.parameters(), lr=5e-5) # optimizer

  gradient_accumulation_steps = 32  # 例: 32ステップ毎に勾配を更新する (gradient_accumulation_steps * batch_sizeが実際の大きさ)
  dist.barrier() # 各プロセスの同期待ち
  for step, batch in enumerate(train_dataloader):
      outputs = ddp_model(input_ids=batch["input_ids"].to(f"cuda:{local_rank}"),
                      attention_mask=batch["attention_mask"].to(f"cuda:{local_rank}"),
                      labels=labels.to(f"cuda:{local_rank}")
                      )

      loss= outputs.loss ##ex) tensor(26.4758, device='cuda:0', grad_fn=<NllLossBackward0>)
      loss = loss / gradient_accumulation_steps
     
      # 勾配のスケーリングと逆伝播
      loss.backward()
      # パラメータの更新
      if (step + 1) % gradient_accumulation_steps == 0:
        max_norm = 1.0 
        fsdp_model.clip_grad_norm_(max_norm)
        optimizer.step()
        optimizer.zero_grad()

        print(loss)
        print(step)

  dist.destroy_process_group()

optimizer

optimizerです。いいのを選びましょう。

optimizer = torch.optim.Adam(fsdp_model.parameters(), lr=5e-5)

dist.barrier()

同期を待ちます。

gradient_accumulation_steps

dataloaderのバッチサイズを上げるとメモリがOOMになる可能性が高まります。
ですが、性能としてはある程度batchをまとめてから更新をかけた方が精度が上がります。
そこでgradient_accumulation_stepsなるものを定義しておき、プログラムの通り、batchの回数がgradient_accumulation_stepsになったときに、optimizerによる更新をかけます。
gradient_accumulation_steps * 、batch_sizeが、実際に更新するbatch_sizeになるわけです。

gradient_accumulation_stepsのバグに関して
https://zenn.dev/dalab/articles/76c7bf20547c1f

loss = loss / gradient_accumulation_steps

loss.backward()

backwardメソッドでは勾配が累積される。

勾配クリッピング

max_norm = 1.0 
fsdp_model.clip_grad_norm_(max_norm)

勾配爆発を防ぐために導入されている模様
勾配クリッピングは通常、loss.backward() の後、optimizer.step() の前に適用
FSDPの場合はFSDP用のものを使用するようです。

FSDPではない場合は下記

torch.nn.utils.clip_grad_norm_(model.parameters(),gradient_clipping_threshold)

optimizer.step()/optimizer.zero_grad()

zero_grad → 勾配を0に戻す

モデルの保存

dist.barrier()
  ## modelのstate_dictを保存する。
  with FSDP.state_dict_type(
        fsdp_model, StateDictType.FULL_STATE_DICT, FullStateDictConfig(offload_to_cpu=True, rank0_only=True)
    ):
        cpu_state = fsdp_model.state_dict()
  if rank == 0:
    save_full_path = "ここに保存するパス"
    torch.save(cpu_state, save_full_path)
    print("セーブが完了しました。")
  dist.barrier()

付録

torch.distributed backendに関して

gloo

CPUでのトレーニングであればこれを選択する。

mpi

nccl

GPUでのトレーニングであればこれを選択する。
InfiniBand と GPUDirect をサポートしている唯一のバックエンド

参考:https://alband.github.io/doc_view/distributed.html

引用一覧

https://github.com/meta-llama/llama-cookbook/blob/main/src/llama_cookbook/utils/train_utils.py#L70

Discussion