databrickでマルチノード/GPUを扱ってみる
要検証と記載されている部分は動作確認中の部分です。参考程度に
nodeのタイプ
driver ノード (1つ)
→ worker ノードに指示するところ
worker ノード (複数)
→ 今回で言うとGPUを使ってtrainを回すところ
注意事項
- CPUとGPU、それぞれがどのノードにあるのかを意識してプログラムする
Apache Spark変数
アタッチされたノートブックには、Apache Spark変数が定義されています。
たとえば、SparkContextというクラスが変数名scで定義されています。
参考:
SparkContextを使って、workerのGPUをチェックする。
import os
# sc.parallelizeでRDDを作成する。そのRDDに対して命令する感じ
# "nvidia-smi --query-gpu=index,uuid --format=csv"
gpu_info = sc.parallelize([0,1]).map(lambda _: os.popen("nvidia-smi").read()).collect()
# 結果を表示
for info in gpu_info:
print(info)
RDD(耐障害性分散データセット)
RDD は、クラスタ内の複数のノードに配置されたデータ要素の不変の集合体であり、変換その他の操作のための基礎的な API と並行して使用することが可能です。
https://www.databricks.com/jp/glossary/what-is-rdd
rdd =spark.sparkContext.parallelize(ここにコレクション)
上記のようにして作成することができます。
torchのバージョンの確認
%pip show torch
databricksのモデルのロードの速度に関して
unitycatalogなどにモデルがあり、そこからロードすると遅いです。
なのでモデルのパスをlocal_disk0等に移動させてから、実行するのが良いかなと思っていたりします。
1nodeの場合は、気楽に試せるのでこちらをオススメします。
HFモデルの分散時の問題点
device_map="auto" はGPUを自動で分散して利用してくれるが、1nodeの対応であることに留意する。
TorchDistributorによるDDP/FSDP
起動に関して
def 自作関数(引数):
...
from pyspark.ml.torch.distributor import TorchDistributor
distributor = TorchDistributor(num_processes=2, local_mode=False, use_gpu=True)
distributor.run(<自作関数>, <ここに自作関数の引数>)
- local_mode
トレーニングにドライバーノードを使用するかどうかを決定するブール値です。デフォルトは false
- num_processes
異なる並列タスクが許可される数
引用:
それぞれのノードでモデルをロードさせる。
前提: 1 modelが1nodeに乗る前提の場合
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
from torch.distributed.fsdp import ShardingStrategy
from transformers import AutoTokenizer,AutoModelForCausalLM,AutoConfig
def 自作関数():
import torch.distributed as dist
dist.init_process_group("nccl",rank=rank, world_size=world_size)
model = AutoModelForCausalLM.from_pretrained(LOCAL_PATH,
local_files_only=True,
low_cpu_mem_usage=True,
torch_dtype=torch.bfloat16)
model.to(f"cuda:{local_rank}")
# ddpの場合
ddp_model = DDP(
model,
device_ids=[local_rank],
output_device=local_rank
)
# fsdpの場合
fsdp_model = FSDP(
model,
sharding_strategy=ShardingStrategy.FULL_SHARD, # フルシャーディング
device_id=local_rank,
)
この関数はそれぞれのワーカーを通ります。
この時cuda:{local_rank}にモデルがあることに留意する。
DDPはモデルの各パラメータに対してオートグラッドフックを登録します。バックワードパスが実行されると、このフックが発火し、すべてのプロセス間で勾配の同期が行われます。これにより、各プロセスが同じ勾配を持ちます。
この処理はそれぞれのワーカーを通るため、モデルを読み込むためのLOCAL_PATHはワーカーから参照できる場所であることに注意する必要があります。
FSDPでそれでもOOMが出る場合
cpu_offload=CPUOffload(offload_params=True)を追加する。
FSDP(
model,
sharding_strategy=ShardingStrategy.FULL_SHARD, # フルシャーディング
device_id=local_rank,
cpu_offload=CPUOffload(offload_params=True)
)
tokenizerの設定に関して
元のデータは基本文章であったりするのでtokenizerを用います。
paddingの設定は?
if not tokenizer.pad_token_id:
tokenizer.pad_token_id = tokenizer.eos_token_id
https://github.com/meta-llama/llama-cookbook/blob/main/src/llama_cookbook/finetuning.py
paddingの方向は?
tokenizer.padding_side="right"
deecoder only model
(現状推論時に、leftを選択して実行するとnanになってしまうので、とりあえずright)
データセットの読み込み
torch.utils.data.distributed.DistributedSampler
DistributedSampler
バッチの固め方を分散環境で定めるために利用
自作関数内で定義しています。
from datasets import load_dataset
# データセットの準備
dataset = load_dataset("llm-jp/databricks-dolly-15k-ja", split="train")
tokenized_datasets = dataset.map(tokenize_function, batched=False)
# バッチの固め方を決める
train_sampler = DistributedSampler(tokenized_datasets, shuffle=False)
load_datasetはネットからデータセットをロードしてきています。
datasetはトークン化するする必要があるのでtokenize_functionという関数を別に自作で定義しています。batched=Falseになっているのは、ラムダ式で、tokenize_functionが1データセットにしか対応していない作りになっているためで、batched=Trueにして作っておいても問題ありません。
DistributedSampleは分散環境で自動で割りあてを設定してくれるので必ずかませます。
データセットのマスク化()
先ほどtokenize_functionという関数を別に自作で定義
- データセットの種類によって実装を変える。
- データセットの作り方によっても実装は変わる
上記のことに留意です。
データセットをトークナイザーでトークン化するクラスです。
アテンションマスクをもセットしておきます。
実装に関しては下記↓
max_lengthを大きくすれば、より大きなデータセットを作れます。
DataLoader
- バッチサイズを決定する
- collate_fn→torch.Tensorの形にする
def collate_fn(batch):
input_ids = torch.stack([torch.tensor(item["input_ids"], dtype=torch.long) for item in batch])
labels = torch.stack([torch.tensor(item["labels"], dtype=torch.long) for item in batch])
attention_mask = torch.stack([torch.tensor(item["attention_mask"], dtype=torch.long) for item in batch])
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"labels": labels
}
train_dataloader = DataLoader(
tokenized_datasets, batch_size=batch_size, shuffle=None, sampler=train_sampler,collate_fn=collate_fn
)
batch_size
各バッチに含まれるサンプル数。バッチサイズが大きいほど、勾配が安定する傾向がありますが、メモリを多く消費します。(ここに関しては後述)
DataLoaderの戻り値
PyTorch の DataLoader は、デフォルトでバッチを辞書の形 { 'input_ids': ..., 'attention_mask': ... } で返します。
また、ddp_modelはGPU側にあるのでtensorとして入れてdeviceに送信するためcollate_fnを利用します。
collate_fn
collate_fnはそれを操作し、最終的にはtorch.Tensorにする関数です。
train(要検証)
下記の実装では、train_dataloaderに含まれているデータセット分しか動作しません。
for文を追加することでイテレーションを回すことができます。
注)下記の処理には、モデルの保存は含まれていません。
fsdp_model.train()
optimizer = torch.optim.Adam(fsdp_model.parameters(), lr=5e-5) # optimizer
gradient_accumulation_steps = 32 # 例: 32ステップ毎に勾配を更新する (gradient_accumulation_steps * batch_sizeが実際の大きさ)
dist.barrier() # 各プロセスの同期待ち
for step, batch in enumerate(train_dataloader):
outputs = ddp_model(input_ids=batch["input_ids"].to(f"cuda:{local_rank}"),
attention_mask=batch["attention_mask"].to(f"cuda:{local_rank}"),
labels=labels.to(f"cuda:{local_rank}")
)
loss= outputs.loss ##ex) tensor(26.4758, device='cuda:0', grad_fn=<NllLossBackward0>)
loss = loss / gradient_accumulation_steps
# 勾配のスケーリングと逆伝播
loss.backward()
# パラメータの更新
if (step + 1) % gradient_accumulation_steps == 0:
max_norm = 1.0
fsdp_model.clip_grad_norm_(max_norm)
optimizer.step()
optimizer.zero_grad()
print(loss)
print(step)
dist.destroy_process_group()
optimizer
optimizerです。いいのを選びましょう。
optimizer = torch.optim.Adam(fsdp_model.parameters(), lr=5e-5)
dist.barrier()
同期を待ちます。
gradient_accumulation_steps
dataloaderのバッチサイズを上げるとメモリがOOMになる可能性が高まります。
ですが、性能としてはある程度batchをまとめてから更新をかけた方が精度が上がります。
そこでgradient_accumulation_stepsなるものを定義しておき、プログラムの通り、batchの回数がgradient_accumulation_stepsになったときに、optimizerによる更新をかけます。
gradient_accumulation_steps * 、batch_sizeが、実際に更新するbatch_sizeになるわけです。
gradient_accumulation_stepsのバグに関して
loss = loss / gradient_accumulation_steps
loss.backward()
backwardメソッドでは勾配が累積される。
勾配クリッピング
max_norm = 1.0
fsdp_model.clip_grad_norm_(max_norm)
勾配爆発を防ぐために導入されている模様
勾配クリッピングは通常、loss.backward() の後、optimizer.step() の前に適用
FSDPの場合はFSDP用のものを使用するようです。
FSDPではない場合は下記
torch.nn.utils.clip_grad_norm_(model.parameters(),gradient_clipping_threshold)
optimizer.step()/optimizer.zero_grad()
zero_grad → 勾配を0に戻す
モデルの保存
dist.barrier()
## modelのstate_dictを保存する。
with FSDP.state_dict_type(
fsdp_model, StateDictType.FULL_STATE_DICT, FullStateDictConfig(offload_to_cpu=True, rank0_only=True)
):
cpu_state = fsdp_model.state_dict()
if rank == 0:
save_full_path = "ここに保存するパス"
torch.save(cpu_state, save_full_path)
print("セーブが完了しました。")
dist.barrier()
付録
torch.distributed backendに関して
gloo
CPUでのトレーニングであればこれを選択する。
mpi
nccl
GPUでのトレーニングであればこれを選択する。
InfiniBand と GPUDirect をサポートしている唯一のバックエンド
参考:https://alband.github.io/doc_view/distributed.html
引用一覧
Discussion