🧠

Dataflowを使った大規模言語モデル(LLM)のストリーミング処理

2023/11/29に公開

はじめに

こんにちは、クラウドエース データMLディビジョン所属の伊藤です。
今回は Apache Beam の RunInference という機能を使って、Google Cloud の Dataflow で大規模言語モデルを使ったストリーミング処理(リアルタイム処理)を行う方法を紹介したいと思います。

LLM とは

大規模言語モデル(Large Language Model、LLM)とは、GPT-3 や PaLM2 などのように、大量のテキストデータを使って学習された自然言語処理モデルのことを指します。

Dataflowの概要

Dataflow は、Apache Beam という分散処理プログラミングモデルの実行環境(Apache Beam ではこの実行基盤のことをランナーと呼びます)として Google Cloud が提供しているプロダクトになります。
Apache Beam で書かれた処理は、その他のランナーである Spark や Flink などでも実行することができますが、なかでも Dataflow はサーバーレスである点や、スケーラビリティが高い点などが特徴です。
ですので、Dataflow を使いたいとなった場合は、Apache Beam で処理を書き、Dataflow で実行するという流れになります。

また、Dataflow ではストリーミング処理も行うことができ、今回はこのストリーミング処理中に LLM を使った翻訳を行ってみたいと思います。
ストリーミング処理中に LLM を使った推論を行うことで、推論結果を保存先が BigQuery なら分析に対して、Spanner などデータベースならアプリケーションに対して即時に活用することができます。ETL 処理で完結することのメリットといえるでしょう。

今回使うLLMモデル

Hugging Face で公開されている T5 モデルを使います。
T5 とは Text-To-Text Transfer Transformer の略で、分類、翻訳、要約といった自然言語のマルチタスクを行うモデルになります。
T5 のモデルにはいくつかサイズがありますが、今回はローカルでも無理なく動かせるであろうt5-baseを使います。

※T5 モデルは英語、フランス語、ドイツ語、ルーマニア語が対応しています。

実装概要

今回行うタスクとしては、英語の文章をフランス語に翻訳するというものです。
入力データである英語のテキストに対して、T5モデルを使ってフランス語に翻訳するという処理を行います。
Apache Beam の公式サイトで紹介されているサンプルコードを改造して、Dataflow 上で Pub/Sub → Dataflow → BigQuery という処理の流れで実装してみます。

architecture

以下で紹介するコードは、GitHub上で公開していますので、こちらを参照ください。
以下のコマンドでGitHubからコードをクローンできます。

git clone https://github.com/cloud-ace/zenn-dataflow-llm-streaming.git

試してみる

ローカルでの実行

まずはローカルで実行してみたいと思います。
ローカルで実行する場合は、DirectRunner というランナーを使います。
ローカル実行での入力データはコード内でハードコードしている英語の文章を使います。

実装のコア部分となるパイプライン部分は以下部分です。

    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
            pipeline
            | "CreateInputs" >> beam.Create(task_sentences)
            | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
            | "RunInference" >> RunInference(model_handler=model_handler)
            | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer)))
  1. 入力となる英語の文章を作成する
  2. モデル入力に適した形に前処理を行う
  3. 翻訳を行う
  4. 後処理を行う&ログに出力する

という処理内容です。

1番の入力部分は以下のようなデータを使っており、5つの英語の文章を入力としています。

    eng_sentences = [
        "The house is wonderful.",
        "I like to work in NYC.",
        "My name is Shubham.",
        "I want to work for Google.",
        "I am from India."
    ]
    task_prefix = "translate English to French: "
    task_sentences = [task_prefix + sentence for sentence in eng_sentences]
    tokenizer = AutoTokenizer.from_pretrained(known_args.model_name)

入力の各文字列たちに対して、「translate English to French: 」という文字列を先頭に付与しています。これで翻訳タスクであることをモデルに教えています。

ポイントとなるのは3番目の「RunInference」の部分です。
以下の部分で PyTorch モデルに対するハンドラを生成して使っています。

    gen_fn = make_tensor_model_fn('generate')
    model_handler = PytorchModelHandlerTensor(
        state_dict_path=known_args.model_state_dict_path,
        model_class=T5ForConditionalGeneration,
        model_params={
            "config": AutoConfig.from_pretrained(known_args.model_name)
        },
        device="cpu",
        inference_fn=gen_fn)

これを Apache Beam の RunInferernce に渡しています。

4番目の後処理(Postprocess クラス内)では翻訳されたことを確認するために、翻訳結果をログ出力しています。

    print(f"{decoded_inputs} \t Output: {decoded_outputs}")

さて、上記で解説したコードを実行してみます。
実行前の環境設定や実行方法の詳細はGitHubのREADMEに書いてありますので、そちらを参照ください。

python local_main.py --runner DirectRunner \
            --model_state_dict_path t5-base-model/state_dict.pth \
            --model_name t5-base

実行結果は以下になります。

(略)
translate English to French: The house is wonderful. 	 Output: La maison est merveilleuse.
translate English to French: I like to work in NYC. 	 Output: J'aime travailler à New York.
translate English to French: My name is Shubham. 	 Output: Je m'appelle Shubham.
translate English to French: I want to work for Google. 	 Output: Je veux travailler pour Google.
translate English to French: I am from India. 	 Output: Je suis originaire de l'Inde.
(略)

前後の Apache Beam のログは省略しています。
無事に英語がフランス語に翻訳されたことを確認できました。(筆者はフランス語がわからないので、確からしさの確認は省略します。)

Dataflowでの実行

次に Dataflow で実行してみます。
入力のデータは Create で作成するのではなく、Pub/Sub から取得するようにします。
今回サンプルとして用意しているのは、Pub/Sub にメッセージを送信するスクリプトと、Pub/Sub からメッセージを受け取って翻訳を行う Apache Beam のコードです。

さて、Dataflow での実行の処理内容は以下です。

  1. (new) Pub/Subから入力メッセージを受け取る
  2. (new) 翻訳タスク用の文字列を先頭に付与する
  3. モデル入力に適した形に前処理を行う
  4. 翻訳を行う
  5. 後処理を行う&ログに出力する
  6. (new) BigQueryに結果を保存する
    with beam.Pipeline(options=pipeline_options) as pipeline:
        _ = (
            pipeline
            | "PubSub Inputs" >> beam.io.ReadFromPubSub(topic=known_args.pubsub_topic)
            | "Add Text Prefix" >> beam.ParDo(AddTextPrefix(text_prefix="translate English to French"))
            | "Preprocess" >> beam.ParDo(Preprocess(tokenizer=tokenizer))
            | "RunInference" >> RunInference(model_handler=model_handler)
            | "PostProcess" >> beam.ParDo(Postprocess(tokenizer=tokenizer))
            | "Write BigQuery" >> beam.io.WriteToBigQuery(
                table=known_args.table_path,
                schema=table_schema,
                write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
                create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
        )

ローカルからの変更点は、1番の入力部分と2番のテキスト追加部分、それと6番目の出力部分です。
入力は指定した Pub/Sub トピックからメッセージを受け取るようにして、翻訳タスクのために文字列の prefix(translate English to French)を付与するようにしています。

最後の出力部分は BigQuery に結果(入力の文字列と翻訳後の文字列)を保存するようにしています。

こちらのコードもGitHubで公開していますので、こちらを参照ください。

実行コマンドは以下です。

python main.py --runner DataflowRunner \
        --pubsub_topic projects/$PROJECT_ID/topics/$TOPIC_NAME \
        --model_state_dict_path gs://$BUCKET/t5-base-model/state_dict.pth \
        --model_name t5-base \
        --table_path $PROJECT_ID.$DATASET_NAME.dataflow_llm \
        --project $PROJECT_ID \
        --region us-central1 \
        --requirements_file requirements.txt \
        --staging_location gs://$BUCKET/staging \
        --temp_location gs://$BUCKET/tmp \
        --machine_type n1-highmem-16 \ \
        --disk_size_gb=200 \
        --streaming

起動前には Cloud Storage バケットの作成、Pub/Sub トピックの作成、BigQuery テーブルの作成が必要になります。GitHub の README に書いてありますので、そちらを参照ください。

Dataflow の実行を確認したら翻訳対象の Pub/Sub メッセージを送信してみます。(これは自分で独自のテキストを送信してもOKです)

python publish_texts.py --project_id $PROJECT_ID --topic $TOPIC_NAME

PROJECT_ID と TOPIC_NAME は適宜置き換えてください。(REDAME の手順だと環境変数として扱っています。)

Dataflowの実行結果確認

実行された Dataflow のジョブは以下のようになります。

dataflow_launch

Pub/Sub にメッセージを送信した後、しばらくすると Dataflow で処理が行われ始めていることを確認できます。
例えば RunInference のステップでは以下のようになっており、処理されている要素数を確認できたり、最も処理が重いステップであることがわかります。
dataflow_processing

LLM のモデルを使った推論なので、ついでに Dataflow のメトリクスを確認してみます。

dataflow_metrics

メモリ使用量がおよそ 50GB となっており、RunInference でのメモリ使用量が大きいこと推測できます。
これが今回 Dataflow 起動時にマシンタイプを指定している理由です。モデルに応じて適切なマシンタイプを選択する必要があります。

最後に実行結果を確認します。
BigQuery を確認すると入力のテキストと翻訳後のテキストが保存されていることがわかります。

bigquery

まとめ

今回、大規模言語モデル(LLM)を使ったリアルタイム処理を Dataflow で実装してみました。
Pub/Sub から流れてくるメッセージを翻訳するという処理を行い、BigQuery に結果を保存するところまで確認することができました。

実際の利用では今回のような翻訳だけでなく多種多様な LLM タスクに応用できると思います。また、Dataflow という処理基盤により、リアルタイム性の高い推論基盤の構築を実現できると思います。

今回は筆者の趣味で RunInference を使って実装してみましたが、これは独自でチューニングしたモデルを使えるところがメリットとなります。
一方で、汎用的なモデルで十分な場合は、OpenAI の API や Google Cloud の PaLM2 API などを直接叩いても良いでしょう。
他にもVertex AI とコラボレーションしてより強力な推論基盤を構築することも可能です。

ぜひお試しください。

参考文献

[1] https://huggingface.co/t5-base
[2] https://beam.apache.org/documentation/ml/large-language-modeling/

Discussion