Zenn
📽️

Google Cloud の Cloud Tasks と Workflows は併用すると直列でのキュー処理ができないという話

2025/03/05に公開

ハマってしまったけど冷静に考えたらそりゃそうだよねという話

Google Cloud ではフルマネージドのタスクキュー・非同期処理のサービスとして Cloud Task が提供されている。かなりシンプルなキューでpull型もpush型も対応している

そしてデータパイプライン処理・ワークフロー処理のマネージドサービスとして Workflows というサービスも提供されている(名前ややこしいな

これを利用して特定の Workflows をキューで順番に処理しよう、というのを作ろうとしたのだがうまくいかなかった

うまくいかない状態

キューに入れるタスク作成側も workflows だったのだがこんな感じだった

sample_workflows.yaml
- create_task_queue:
    call: googleapis.cloudtasks.v2beta3.projects.locations.queues.tasks.create
    args:
      parent: ${task_queue_id}
      body:
        task:
          httpRequest:
            body: ${base64.encode(json.encode(body_parameter))
            url: ${"https://workflowexecutions.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/workflows/" + workflow_name + "/executions"}
            oauthToken:
              serviceAccountEmail: ${service_acount}
            headers:
              Content-Type: application/json 

こうすると確かにキューにはワークフロー実行のタスクがキュートして格納される。Cloud Tasksではレートリミットの同時実行数を1に設定すればワークフローは一つずつ実行される直列な体制にできる、と思っていた

ちなみにこの辺の設定は以下のドキュメントが参考になる

https://cloud.google.com/workflows/docs/create-http-task?hl=ja

なぜうまくいかなかったか?

シンプルに Workflows のAPIの都合である

Workflows Execution のAPIは実行した際には新規の実行情報が返ってくる、つまり「実行した時点でレスポンスが返り完了を待たない」仕組みになっている

Cloud Tasks はレスポンスが帰り次第「実行完了」とみなして次のタスクに進む

直列に一つずつキューを処理するためにはWorkflowsを挟んではいけなかった

https://cloud.google.com/workflows/docs/reference/executions/rest/v1/projects.locations.workflows.executions/create

ちなみに下記のように Workflows をバッファリングする方法のドキュメントもあるが、これは実行最大数に引っかかったときに使うことを想定している。実行最大数はユーザー側で決められることではなく、1プロジェクトあたり10000同時実行の割り当てがあるだけで、Cloud Functions のインスタンス数みたいなものではない

https://cloud.google.com/workflows/docs/tutorials/buffer-workflows-executions?hl=ja

つまり解決するには「実行後完了までレスポンスを返さない処理」をキューに入れる必要があった

今回の解決法

今回のキューに入れたフローは単一の Cloud Functions を実行するだけだったので、呼び出しを Workflows 経由にせず直接 Cloud Functions を呼び出す形にした

sample_workflows.yaml
- create_task_queue:
    call: googleapis.cloudtasks.v2beta3.projects.locations.queues.tasks.create
    args:
      parent: ${task_queue_id}
      body:
        task:
          httpRequest:
            body: ${base64.encode(json.encode(body))}
            url: ${cloud_function_url}
            oidcToken:
              serviceAccountEmail: ${service_acount}
            headers:
              Content-Type: application/json

Cloud Functions 実行タスクの作成をWorkflowsからやる方法がドキュメントで見つからなかったので少し手探りだったが実のところそんなに変わっていない。一応変更点をまとめると

  • httpRequestの認証情報は oathToken から oidcToken に変更する必要がある
  • bodyに入れるJSONは {"argument": "body_json_string"} の形をもとにJSONエンコーディングしてbase64エンコードする必要があったのが、そのままbodyの情報をJSONエンコーディングしてbase64エンコーディングするだけで良くなる

というぐらいだろうか。一応下記にキューに入れるタスクの作成方法があるのでこれと合わせて推測した

https://cloud.google.com/tasks/docs/tutorial-gcf?hl=ja#creating_the_task

これで Cloud Functions が完了するまでレスポンスを返さない形になりキューを順次実行するという処理ができるようになった

余談

Cloud Functions は第二世代を利用しているので厳密には Cloud Run Functions または Cloud Run 関数なのだが、このポストでは Cloud Functions で統一している

Discussion

ログインするとコメントできます