Google Cloud の Cloud Tasks と Workflows は併用すると直列でのキュー処理ができないという話
ハマってしまったけど冷静に考えたらそりゃそうだよねという話
Google Cloud ではフルマネージドのタスクキュー・非同期処理のサービスとして Cloud Task が提供されている。かなりシンプルなキューでpull型もpush型も対応している
そしてデータパイプライン処理・ワークフロー処理のマネージドサービスとして Workflows というサービスも提供されている(名前ややこしいな
これを利用して特定の Workflows をキューで順番に処理しよう、というのを作ろうとしたのだがうまくいかなかった
うまくいかない状態
キューに入れるタスク作成側も workflows だったのだがこんな感じだった
- create_task_queue:
call: googleapis.cloudtasks.v2beta3.projects.locations.queues.tasks.create
args:
parent: ${task_queue_id}
body:
task:
httpRequest:
body: ${base64.encode(json.encode(body_parameter))
url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_name + "/executions"}
oauthToken:
serviceAccountEmail: ${service_acount}
headers:
Content-Type: application/json
こうすると確かにキューにはワークフロー実行のタスクがキュートして格納される。Cloud Tasksではレートリミットの同時実行数を1に設定すればワークフローは一つずつ実行される直列な体制にできる、と思っていた
ちなみにこの辺の設定は以下のドキュメントが参考になる
なぜうまくいかなかったか?
シンプルに Workflows のAPIの都合である
Workflows Execution のAPIは実行した際には新規の実行情報が返ってくる、つまり「実行した時点でレスポンスが返り完了を待たない」仕組みになっている
Cloud Tasks はレスポンスが帰り次第「実行完了」とみなして次のタスクに進む
直列に一つずつキューを処理するためにはWorkflowsを挟んではいけなかった
ちなみに下記のように Workflows をバッファリングする方法のドキュメントもあるが、これは実行最大数に引っかかったときに使うことを想定している。実行最大数はユーザー側で決められることではなく、1プロジェクトあたり10000同時実行の割り当てがあるだけで、Cloud Functions のインスタンス数みたいなものではない
つまり解決するには「実行後完了までレスポンスを返さない処理」をキューに入れる必要があった
今回の解決法
今回のキューに入れたフローは単一の Cloud Functions を実行するだけだったので、呼び出しを Workflows 経由にせず直接 Cloud Functions を呼び出す形にした
- create_task_queue:
call: googleapis.cloudtasks.v2beta3.projects.locations.queues.tasks.create
args:
parent: ${task_queue_id}
body:
task:
httpRequest:
body: ${base64.encode(json.encode(body))}
url: ${cloud_function_url}
oidcToken:
serviceAccountEmail: ${service_acount}
headers:
Content-Type: application/json
Cloud Functions 実行タスクの作成をWorkflowsからやる方法がドキュメントで見つからなかったので少し手探りだったが実のところそんなに変わっていない。一応変更点をまとめると
- httpRequestの認証情報は
oathToken
からoidcToken
に変更する必要がある - bodyに入れるJSONは
{"argument": "body_json_string"}
の形をもとにJSONエンコーディングしてbase64エンコードする必要があったのが、そのままbodyの情報をJSONエンコーディングしてbase64エンコーディングするだけで良くなる
というぐらいだろうか。一応下記にキューに入れるタスクの作成方法があるのでこれと合わせて推測した
これで Cloud Functions が完了するまでレスポンスを返さない形になりキューを順次実行するという処理ができるようになった
余談
Cloud Functions は第二世代を利用しているので厳密には Cloud Run Functions または Cloud Run 関数なのだが、このポストでは Cloud Functions で統一している
Discussion