【Cloud Workflows】コールバックパターンで長時間処理を実現

2024/07/05に公開

はじめに

ストリーツ株式会社の@hanamaです。
生成AIの進歩により、複雑なワークフローを利用する機会が急増しています。大規模言語モデルを使った文章生成や画像生成など、多段階の処理を要するタスクが日常的になってきました。そこで使えるCloud Workflowsですが、実は気付きにくいところにタイムアウトの罠があります。今回はCloud Workflowsのコールバックパターンを活用して長時間の処理を行う方法を紹介します。

Cloud Workflowsの利点と制約

Cloud Workflowsは、複数のGoogleクラウドサービスやAPIを連携させるのに非常に便利なツールです。YAMLファイルを使って直感的にワークフローを定義でき、複雑な処理フローも簡単に実装できます。さらに、Cloud Workflows自体の実行可能時間は1年と非常に長く、長期にわたるプロセスの管理も可能です。

ただし、ここで注意が必要なのが、Workflows内で使用するhttp.gethttp.postなどのHTTPリクエスト関数のタイムアウト制限です。これらの関数には最大30分というタイムアウト制限があり、長時間の処理を直接Workflows内で実行する際には気を付ける必要があります。ドキュメント上でもこの制約は少し探すのが難しいため、引っかかってしまった方もいらっしゃるのではないでしょうか。

サーバーレスな長時間処理実行基盤

一方で、生成AIの処理や大規模データの分析など、30分以上の実行時間を要するタスクも増えています。GCP上では、このような長時間の処理に対応するためのサービスとして、Cloud FunctionsやCloud Run Jobsがあります。

  • Cloud Functions(第2世代): 最大60分の実行時間
  • Cloud Run Jobs: 最大24時間の実行時間
    これらのサービスを使えば、長時間の処理をサーバレスで実行できます。
    しかし、Cloud Workflows内でこれらの処理を単純に呼び出してしまうと、Cloud Workflows側のタイムアウト制約に引っかかってしまいます。

コールバックパターンによる解決

この問題を解決する効果的な方法が、「コールバックパターン」です。具体的な実装例を紹介します。

  1. Workflowsの定義(workflow.yaml):
workflow.yaml
main:
  params: [event]
  steps:
    - extract_args:
        assign:
          - input_data: ${event.data.input_data}
    - create_callback:
        call: events.create_callback_endpoint
        args:
          http_callback_method: "POST"
        result: callbackDetails
    - call_long_job:
        call: http.post
        args:
          url: https://your-function-url/long_job
          body:
            input_data: ${input_data}
            callback_url: ${callbackDetails.url}
          timeout: 30
        result: callLongJobResult
    - await_callback:
        call: events.await_callback
        args:
          callback: ${callbackDetails}
          timeout: 3600
    - final:
        return:
          call_long_job_result: ${callLongJobResult}
  1. Cloud Functionsの実装(main.py):
    ここではデータ処理などによく使われる、pythonを使った実装例を紹介します。
main.py
import functions_framework
import requests
import time

@functions_framework.http
def long_job(request):
    request_json = request.get_json()
    input_data = request_json['input_data']
    callback_url = request_json['callback_url']
    # ここで長い時間がかかる処理を行う
    # 例: 大規模言語モデルを用いた文章生成
    time.sleep(2700)  # 45分の処理をシミュレート
    result = "生成された長文..."
    # 処理完了後、コールバックURLにPOSTリクエストを送信
    requests.post(callback_url, json={"result": result})
    return {"status": "Processing completed and callback sent"}

このアプローチのポイントは以下の通りです:

  1. Workflowsでevents.create_callback_endpointを使用してコールバックエンドポイントを作成
  2. 作成したコールバックURLをCloud Functionsに渡す
  3. Cloud Functionsで長時間の処理を実行
  4. 処理完了後、Cloud FunctionsからWorkflowsのコールバックURLにPOSTリクエストを送信
  5. Workflowsはevents.await_callbackを使用してコールバックを最大1時間待機

この方法により、Workflowsのタイムアウト制限を回避しつつ、Cloud Functionsの長い実行時間を活用できます。

まとめ

生成AI時代において、複雑で長時間を要するワークフローの需要は今後さらに高まっていきます。Cloud WorkflowsとCloud Functionsを適切に連携させることで、長い時間の処理もきちんと扱うことができます。
今回紹介したコールバックパターンのメリットは以下です

  1. Workflowsの管理性と可視性を維持しつつ、長時間の処理を実現
  2. エラーハンドリングや再試行ロジックをWorkflows側で一元管理可能
  3. 処理の進捗状況をリアルタイムで監視可能

この手法は、機械学習モデルのトレーニング、大規模なデータ分析、複雑な生成AIタスクなど、様々な長時間処理に適用できます。ぜひプロジェクトに取り入れて、生成AI時代のワークフロー管理を効率化してみてください。

宣伝

弊社では、メディアのための生成AIサービス「apnea」を提供しています。
取材情報をもとにして記事の執筆支援を行うため、ハルシネーションリスクを軽減しながら、短時間で高品質な記事を作成することが可能です。この記事も、apneaの助けを借りながら執筆しています。

一緒に働く仲間も募集しています。興味のある方はぜひ弊社採用情報をご確認ください。Next.jsのApp RouterVercel AI SDKなどの最新技術を使いながら、情報流通を活性化させ、ユニークな文化を育むことを目指しています。

Discussion