【Cloud Workflows】コールバックパターンで長時間処理を実現
はじめに
ストリーツ株式会社の@hanamaです。
生成AIの進歩により、複雑なワークフローを利用する機会が急増しています。大規模言語モデルを使った文章生成や画像生成など、多段階の処理を要するタスクが日常的になってきました。そこで使えるCloud Workflowsですが、実は気付きにくいところにタイムアウトの罠があります。今回はCloud Workflowsのコールバックパターンを活用して長時間の処理を行う方法を紹介します。
Cloud Workflowsの利点と制約
Cloud Workflowsは、複数のGoogleクラウドサービスやAPIを連携させるのに非常に便利なツールです。YAMLファイルを使って直感的にワークフローを定義でき、複雑な処理フローも簡単に実装できます。さらに、Cloud Workflows自体の実行可能時間は1年と非常に長く、長期にわたるプロセスの管理も可能です。
ただし、ここで注意が必要なのが、Workflows内で使用するhttp.getやhttp.postなどのHTTPリクエスト関数のタイムアウト制限です。これらの関数には最大30分というタイムアウト制限があり、長時間の処理を直接Workflows内で実行する際には気を付ける必要があります。ドキュメント上でもこの制約は少し探すのが難しいため、引っかかってしまった方もいらっしゃるのではないでしょうか。
サーバーレスな長時間処理実行基盤
一方で、生成AIの処理や大規模データの分析など、30分以上の実行時間を要するタスクも増えています。GCP上では、このような長時間の処理に対応するためのサービスとして、Cloud FunctionsやCloud Run Jobsがあります。
- Cloud Functions(第2世代): 最大60分の実行時間
- Cloud Run Jobs: 最大24時間の実行時間
これらのサービスを使えば、長時間の処理をサーバレスで実行できます。
しかし、Cloud Workflows内でこれらの処理を単純に呼び出してしまうと、Cloud Workflows側のタイムアウト制約に引っかかってしまいます。
コールバックパターンによる解決
この問題を解決する効果的な方法が、「コールバックパターン」です。具体的な実装例を紹介します。
- Workflowsの定義(
workflow.yaml
):
main:
params: [event]
steps:
- extract_args:
assign:
- input_data: ${event.data.input_data}
- create_callback:
call: events.create_callback_endpoint
args:
http_callback_method: "POST"
result: callbackDetails
- call_long_job:
call: http.post
args:
url: https://your-function-url/long_job
body:
input_data: ${input_data}
callback_url: ${callbackDetails.url}
timeout: 30
result: callLongJobResult
- await_callback:
call: events.await_callback
args:
callback: ${callbackDetails}
timeout: 3600
- final:
return:
call_long_job_result: ${callLongJobResult}
- Cloud Functionsの実装(
main.py
):
ここではデータ処理などによく使われる、pythonを使った実装例を紹介します。
import functions_framework
import requests
import time
@functions_framework.http
def long_job(request):
request_json = request.get_json()
input_data = request_json['input_data']
callback_url = request_json['callback_url']
# ここで長い時間がかかる処理を行う
# 例: 大規模言語モデルを用いた文章生成
time.sleep(2700) # 45分の処理をシミュレート
result = "生成された長文..."
# 処理完了後、コールバックURLにPOSTリクエストを送信
requests.post(callback_url, json={"result": result})
return {"status": "Processing completed and callback sent"}
このアプローチのポイントは以下の通りです:
- Workflowsで
events.create_callback_endpoint
を使用してコールバックエンドポイントを作成 - 作成したコールバックURLをCloud Functionsに渡す
- Cloud Functionsで長時間の処理を実行
- 処理完了後、Cloud FunctionsからWorkflowsのコールバックURLにPOSTリクエストを送信
- Workflowsは
events.await_callback
を使用してコールバックを最大1時間待機
この方法により、Workflowsのタイムアウト制限を回避しつつ、Cloud Functionsの長い実行時間を活用できます。
まとめ
生成AI時代において、複雑で長時間を要するワークフローの需要は今後さらに高まっていきます。Cloud WorkflowsとCloud Functionsを適切に連携させることで、長い時間の処理もきちんと扱うことができます。
今回紹介したコールバックパターンのメリットは以下です
- Workflowsの管理性と可視性を維持しつつ、長時間の処理を実現
- エラーハンドリングや再試行ロジックをWorkflows側で一元管理可能
- 処理の進捗状況をリアルタイムで監視可能
この手法は、機械学習モデルのトレーニング、大規模なデータ分析、複雑な生成AIタスクなど、様々な長時間処理に適用できます。ぜひプロジェクトに取り入れて、生成AI時代のワークフロー管理を効率化してみてください。
Discussion