戦いは数だよ兄貴!Google Cloud Batch による並列化入門 〜LLMの前処理を例に〜
はじめに
こんにちは、AI Lab NLP チームの張です。
ビッグデータ時代と叫ばれて幾星霜、近年の大規模言語モデル (LLM) の発展もあって {データ,リサーチ}{サイエンティスト,エンジニア} たちが扱うデータの規模がどんどん大きくなってきています。大規模データを扱うための処理基盤技術も進歩してきており、例えば BigQuery などのデータレイクサービスや Dask などの並列コンピューティングライブラリは、複雑な並列処理を裏側のエンジンに隠してくれているおかげでユーザはシンプルな構文で簡単に処理を書くことができます。
テーブルデータのような構造化データのみを扱うのであれば上記の技術で十分ですが、例えばテキストや画像のような非構造化データを前処理する際には多くのヒューリスティックス(e.g. 有害文章や NG ワードの除去、 Unicode 正規化、リサイズ、クロップ、etc.)を必要とします。NLP では前処理が 8 割[1]なんて言われるほど全体で占める割合が高く重要なプロセスであるため、このプロセスを高速化することはプロジェクト全体の高速化につながります。
高速化の手段にも色々あります(他の手段は本記事では省略します)が、今回は Google Cloud Batch による並列化という手法を、LLM 用データの前処理をケーススタディとして取り入れながらハンズオン形式で解説してみたいと思います。
なお、本記事で使われたコードはすべて以下の GitHub リポジトリに置いてあります。ぜひご自身で動かしてみてください。
Google Cloud Batch とは
詳しくは公式の紹介や中の人の記事を読んでもらうとして、超ざっくり要約すると「特定のバッチ処理をめちゃくちゃスケールさせられる Google Cloud のフルマネージドサービス」です。個人的によく使う or 使いそうな機能は以下になります。
- Python はもちろん Docker にも対応しているので実質なんでも実行できる
- 処理を大規模にスケールさせられる
- ただしプロジェクトの割り当て上限に注意
- デフォルトの状態だと最大で 2000 インスタンスを同時実行が可能
- GCP の他のサービスとの連携がスムーズ
- Cloud Scheduler や Workflows などで定期実行や特定のトリガーで実行させられる
- Secret Manager でシークレットを簡単かつ安全に扱える
- Cloud Storage でオブジェクトの読み書きが容易
- GPU が使えるため ML モデルを扱うことも可能
他に似た GCP サービスとして Dataflow や Cloud Run Job がありますが、著者はどちらも使ったことがないので厳密な比較は難しいですが、Batch と比較した時のデメリット(=不採用に至った理由)を以下に挙げておきます。全部使ったことがある方がいたらコメントをいただけると幸いです。
- Dataflow
- 内部で Apache Beam を使用しているため、独特な記法を覚える必要がある(学習コストが高め)
- ストリーミング処理向けなサービスでバッチ処理にはコストが高くなりがち
- Cloud Run Job
- 実行時間に制限(最大 24 時間)があるため、長時間実行や大規模データ処理に不向き
- GPU が使えない
また、Batch を触って感じたデメリットも挙げておきます。
- ググラビリティが低すぎる: Batch というありがちな単語がサービス名になってるせいで検索した時のノイズがひどい
- そもそも日本語の文献が少ない: トラブルシューティングを含め公式ドキュメントが一番充実してる
- (当たり前だが)短時間・小規模の処理には向かないため、見極めが重要: ジョブのキューイングやスケジューリングなどオーバーヘッドが大きいため、小規模処理だと逆に時間がかかる
LLM 用データの前処理
LLM で訓練するためのデータに対する前処理、という設定で高速化の効果を実験してみたいと思います。
具体的には、LLM の事前学習でよく使われている OSCAR データセットの日本語データに対して、分割された各ファイル(=パートファイル)をそれぞれ 1 つの Batch インスタンスで並行に前処理させます(図1)。前処理のベースラインはこちらの記事に書かれている手法になります。なお、重複削除処理 (dedup) は並列化できないので本記事の対象外とします。
事前準備
まず最初に今回利用予定の GCP サービスの API を有効化していきます。それぞれの用途はスニペットにコメントしています。
$ gcloud services enable \
artifactregistry.googleapis.com \ # Docker イメージのリポジトリ
batch.googleapis.com \ # Batch
compute.googleapis.com \ # Batch で使用する Compute Engine
logging.googleapis.com \ # ロギング用
storage.googleapis.com \ # 今回データのやり取りで使用
secretmanager.googleapis.com # シークレット管理用
次に Batch を実行するアカウントに対してロール権限を付与していきます。
$ for role in roles/batch.agentReporter roles/compute.instanceAdmin.v1 roles/logging.logWriter roles/secretmanager.secretAccessor roles/storage.objectAdmin roles/storage.objectCreator do;
gcloud projects add-iam-policy-binding <PROJECT_NAME> --role $role --member serviceAccount:<SERVICE_ACCOUNT_NAME>@developer.gserviceaccount.com;
done
以下のようなロール権限が付与されていれば OK です。
$ gcloud projects get-iam-policy <PROJECT_NAME> \
--flatten="bindings[].members" \
--format='table(bindings.role)' \
--filter="bindings.members:<SERVICE_ACCOUNT_NAME>@developer.gserviceaccount.com"
ROLE
roles/batch.agentReporter
roles/compute.instanceAdmin.v1
roles/logging.logWriter
roles/secretmanager.secretAccessor
roles/storage.objectAdmin
roles/storage.objectCreator
OSCAR データセットは利用条件に同意する必要があるので、同意してから HuggingFace の token を発行しましょう。その token を Secret Manager に格納します。
$ echo $HF_TOKEN | gcloud secrets create hf_token --data-file=-
Created version [1] of the secret [hf_token].
$ gcloud secrets list
NAME CREATED REPLICATION_POLICY LOCATIONS
hf_token 2024-12-08T11:46:00 automatic -
これで大まかな準備は完了しました。次に前処理用の Python スクリプトを用意します。
前処理用の Python スクリプト
ここは本記事のメインではないので基本的に先行手法をそのまま使用します。ただしいくつか使用感向上のための変更と再現性向上のための uv 化を施しました。フルの Python コードについては GitHub を、前処理の詳細については元記事を参照してください。
diff
185,186c185,186
< model_path='./models/ja.arpa.bin',
< sp_model_path='./models/ja.sp.model',
---
> model_path=kenlm_model_path,
> sp_model_path=sp_model_path,
220,229d219
< def get_args():
< parser = argparse.ArgumentParser()
< parser.add_argument('--start', type=int, default=1)
< parser.add_argument('--end', type=int, default=119)
< parser.add_argument('--output', type=str)
< parser.add_argument('--workers', type=int, default=10)
< args = parser.parse_args()
< return args
<
<
231,243c221,223
< args = get_args()
< input_dir = './data'
< # output_dir = './output'
< output_dir = args.output
< print('output_dir...', output_dir)
< token = os.environ['HF_TOKEN']
< start = args.start
< end = args.end
<
< num_jobs = args.workers
< print('start...')
< print(f'start: {start}')
< print(f'end: {end}')
---
> print('start...')
> print(f'start: {start_part}')
> print(f'end: {end_part}')
246c226
< for i in range(start, end + 1):
---
> for i in range(start_part, end_part + 1):
255c235
< token=token
---
> token=args.hf_token
259c239
< show_diff_mem(0, start)
---
> show_diff_mem(0, start_part)
261c241
< show_diff_mem(0.1, start)
---
> show_diff_mem(0.1, start_part)
268c248
< show_diff_mem(8, start)
---
> show_diff_mem(8, start_part)
272c252,253
< clean('./sample2.jsonl', 'sample_output.jsonl')
---
> clean('./data/sample_input.jsonl', './output/sample_output.jsonl')
>
273a255,268
> def get_args():
> parser = argparse.ArgumentParser()
> parser.add_argument('--start', type=int, default=1)
> parser.add_argument('--end', type=int, default=119)
> parser.add_argument('--hf-token', type=str, default='')
> parser.add_argument('--input', type=str, default='./data')
> parser.add_argument('--output', type=str, default='./output')
> parser.add_argument('--ng-word', type=str, default='./ng_word.txt')
> parser.add_argument('--kenlm-model', type=str, default='./models/ja.arpa.bin')
> parser.add_argument('--sentencepiece-model', type=str, default='./models/ja.sp.model')
> parser.add_argument('--workers', type=int, default=10)
> parser.add_argument('--sample-run', action='store_true')
> args = parser.parse_args()
> return args
274a270
>
276,277c272,293
< main()
< # test()
---
> args = get_args()
> print(args)
>
> start_part = args.start
> end_part = args.end
> hf_token = args.hf_token
> input_dir = args.input
> output_dir = args.output
> os.makedirs(output_dir, exist_ok=True)
> ng_word_filepath = args.ng_word
> kenlm_model_path = args.kenlm_model
> sp_model_path = args.sentencepiece_model
> num_jobs = args.workers
>
> if args.sample_run:
> test()
> else:
> if hf_token == '':
> print('Please set the Hugging Face token.')
> sys.exit(1)
>
> main()
Docker 化
Python スクリプトをそのまま実行させることもできますが、汎用性や再現性の観点からひと手間増えますが Docker による実行方法を取りたいと思います。
まず Dockerfile を書きます。
ここでは必要なパッケージを apt でインストールして、kenlm のインストール、モデルダウンロード、Python パッケージインストール、必要なファイルとスクリプトのコピーをします。
ビルドされた Docker イメージを Artifact Registry (AR) に格納させておく必要があり、そのための AR リポジトリを作成します。今回は ailab-advent-calendar-2024-batch
という名前にしました。
$ gcloud artifacts repositories create ailab-advent-calendar-2024-batch \
--location=asia-northeast1 \
--repository-format=docker
次に Docker イメージをビルドします。ビルドを Cloud Build に任せてもいいんですが、マシンスペックの関係でローカルの方が早そうだったのでローカルでやっちゃいます。また、注意すべき点として、プラットフォームを Batch の実行環境である linux/amd64
に合わせておく必要があるのと、Docker のイメージ名は先ほど作成した AR リポジトリパスと同じにしておく必要があります。
$ docker build \
--platform linux/amd64 \
-t asia-northeast1-docker.pkg.dev/<PROJECT_NAME>/ailab-advent-calendar-2024-batch/batch:latest .
ビルドが成功したら AR リポジトリへイメージを Push します。
$ gcloud auth configure-docker asia-northeast1-docker.pkg.dev
$ docker push asia-northeast1-docker.pkg.dev/<PROJECT_NAME>/ailab-advent-calendar-2024-batch/batch
Batch ジョブの設定
次にいよいよ本記事のメインパートである Batch ジョブの設定をします。
Batch は yaml
/json
で書かれた設定ファイルに則って処理を行います。この設定ファイルでは以下のようなことを定義します:
- タスクの実行方法(Docker イメージのパス、実行コマンドなど)
- 様々な変数(環境変数、シークレットなど)
- ストレージ(GCS のマウント先など)
- 並列数
- インスタンスのスペック
- ロギング方法
より詳細なドキュメントについては公式のクイックガイドや API リファレンスを参照してください。
今回使用する設定ファイルは以下になります。
このファイルをもう少し詳しくみていきます。
ここでは Secret Manager で作成した HuggingFace token を環境変数 HF_TOKEN
に結びつけています。詳しくはこちらのドキュメントを参照してください。
ここでは GCS のバケット・オブジェクトを Batch で作成されたインスタンス内のディレクトリにマウントしています。マウント先は /mnt/disks
配下しか選択できないので注意してください。詳しくはこちらのドキュメントを参照してください。
ここでは対象の Docker イメージと実行コマンドを定義しています。
入力ディレクトリと出力ディレクトリそれぞれがマウントされたディレクトリで、Secret Manager から受け取った環境変数 HF_TOKEN
を使っているのがわかると思います。また、BATCH_TASK_INDEX
はタスクグループ内のタスクのインデックス番号を示す事前定義された環境変数の一つで、タスク(=インスタンス)ごとに一つのパートファイルを前処理させるようにしています。なお、BATCH_TASK_INDEX
は 0
スタートなのに対して、OSCAR データセットのパートファイルは 1
スタートになっています。例外処理を書くのがめんどくさいしどのみちエラーで止まるので、あえてそのまま実行して BATCH_TASK_INDEX=0
のインスタンスは無視することにします。
ここではタスクを実行するインスタンスのスペックを定義しています。
ここではタスクグループ全体の並列数と並列方法について定義しています。taskCount
は、ジョブ内で実行するタスクの総数を指定します。今回は「一つのタスクにつき一つのパートファイルを前処理する」という処理にしたいので、パートファイル数 + 1 個のタスク、つまり 119 + 1 = 120 に設定しています。そして parallelism
は、一度にどれだけのタスクを同時に実行するかを指定します。これは、リソースの最適な利用を目的として設定され、今回はタスク数と同じ数の 120 に設定しています。これによって 120 個のタスクに対して、120 個のインスタンスが並行して立ち上がって処理を行います。例えば parallelism
を 60
に設定すると 60 個のインスタンス処理が並行して立ち上がるため、かかる時間は parallelism=120
の場合のおおよそ 2 倍になります。
ジョブの実行
これで諸々が整ったのでいよいよジョブを登録・実行していきます。
$ gcloud batch jobs submit preproc \
--location asia-northeast1 \
--config batchjob.yaml
ちなみに同じ名前のジョブを登録できないので、実行が失敗した際には前のジョブを削除して再登録するか、別の名前にして登録してください。
以下のコマンドかコンソール画面からジョブの状態が確認できます。
$ gcloud batch jobs list # ジョブ一覧の確認
NAME LOCATION STATE
projects/<PROJECT_NAME>/locations/asia-northeast1/jobs/advent-calendar-2024 asia-northeast1 SCHEDULED
$ gcloud batch jobs describe <JOB_NAME> --location=<LOCATION_NAME> # ジョブ詳細の確認
ジョブのステータスは QUEUED
→ SCHEDULED
→ RUNNING
→ SUCCEEDED
/ FAILED
の順に変化します。
-
QUEUED
: ジョブがキューに入れられ、実行の準備が整うのを待っている状態です。リソースの割り当てやスケジューリングがまだ行われていません -
SCHEDULED
: ジョブが実行するためのリソースが確保され、スケジュールが設定された状態です。実行が間もなく始まることを示しています -
RUNNING
: ジョブが実行中の状態 -
SUCCEEDED
/FAILED
: ジョブが完了 (成功/失敗) した状態
実行結果
ジョブ一覧を見てみます。
$ gcloud batch jobs list
NAME LOCATION STATE
projects/<PROJECT_NAME>/locations/asia-northeast1/jobs/advent-calendar-2024 asia-northeast1 FAILED
注意点として Batch の仕様で標準エラー出力があると Failed 判定になってしまうが、tqdm ライブラリを使用した場合、標準エラー出力の方にプログレスバーを出力してしまうので、結果的に Batch ジョブ全体が Failed 判定になってしまうことがあります。なのでちゃんとログを見てみましょう。
上のサマリー欄ではちゃんと成功: 119, 失敗: 1 になっていて、ログの方のエラーも tqdm の出力になっていますね。
また、今回の場合だと GCS にファイルが出力されているはずなのでそれを見に行きます。
$ gcloud storage ls gs://peinan-resource/advent-calendar-2024/output | wc -l
120
ちゃんと 119 ファイルありますね。念のため内容についてもベースライン手法と差異がないか確認します。
$ diff <(gcloud storage cat gs://peinan-resource/advent-calendar-2024/output/2.jsonl | head -n 100) <(head -n 100 ../HojiChar_OSCAR_sample/2.jsonl)
何も出力されなかったため一致しているようです。
最後に実行時間ですが、トータルで 8 分 42 秒 でした。
ベースラインでは、
一旦、debupなしで10000件に対して計測
実行時間は、30s
1ファイルでは、約20mになる計算
199個の全ファイルに対して実行すると、20m * 119で約40hくらいかかる計算
で、今回の環境で並列しない設定だと (42+8*60)*119/60/60=17.3
時間の計算になり、単純計算で実行時間が 1/100 以下になりました。さらに、今回は 119 分割で検証しましたが、より細かく分割することでさらなる速度向上が見込めます。
おわりに
本記事では Google Cloud Batch の概要と用途、そして実際の使い方を LLM データの前処理をケーススタディとしてハンズオン形式で紹介し、その効果を検証しました。冒頭でも触れたように、Batch は並列化できるタスクに対して高い効果が見込めます。さらにその柔軟性の高さから前処理に限らず様々な使い道があります。また、初心者にとってもとっつきやすい部類のサービスだと思うので、ぜひみなさん自身で触って活用してください。
参考
-
小町 守, 自然言語処理の教科書. 技術評論社. 2024. ↩︎
Discussion