Cloud Workflows で途中実行を実現
はじめに
去年サッカーゲームのイレブンを Google Cloud 製品 X 数理最適化 を用いて解く記事を投稿しました。
こちらの記事で書いたとおり、データ取得から数理最適化のアウトプットまでの実行を Cloud Workflows を使って日々実行しています。
上記の記事では 3 つの Cloud Run Jobs を順番に呼び出す役割を担っています。
今回は Cloud Workflows で不便に思っている途中実行について書いていきます。
Cloud Workflows
Cloud Workflows は Google Cloud 製品の API を呼び出す順序関係を解決してくれるワークフロー製品です。
特徴としては yaml で記述するため簡単なロジックを書くことができませんが、処理の依存関係を解決することだけに注力できます。
またサーバレスで Airflow などのサーバありきのワークフローとは違いサーバ管理不要なので柔軟に扱うことができます。
また従量課金でもあるため使用頻度の低いバッチ処理であれば無料枠に収まるため非常に経済的です。
安価だが途中実行が UI からできない
便利な Cloud Workflows ですが Airflow と比べて "途中から実行ができない" ので時々不便に感じます。
例えばデータ取得からデータレイク作成まで完了、データマート作成時にエラーが発生したとしましょう。
データにおける冪等性が保たれている前提で、Airflow の場合は UI からの操作でデータマート作成のステップから再実行ができます。
しかし Cloud Workflows にはその機能がなく、最初から、データ取得からの実行になります。
これでは再実行の完了に時間がかかりますし、ステップ数がかさみ無料枠に収まらなくなってしまうおそれがあります。
今回は途中実行を実現する方法について紹介します。
愚直に書いてみる
Cloud Workflows で処理順序を記述するとこのようになります。
main:
steps:
- step_a:
call: http.get
args:
url: https://example1.com
- step_b:
call: http.get
args:
url: https://example2.com
- step_c:
call: http.get
args:
url: https://example2.com
例えばこのワークフローの実行で step_b
が失敗してしまった場合があったとします。
この場合 Cloud Workflows で再実行すると step_a
からの実行になってしまうが、場合によっては step_b
から実行したい場合もあります。
愚直に書くとすると次のステップを引数で与えて、next
で飛ばす方法が思いつきます。
以下のように書き直し、実行時の引数に { "next": "step_b" }
と記述すると step_b
まで処理を飛ばします。
これで引数によって次に実行したいステップを指定することが出来るようになります。
main:
params: [args]
steps:
- init:
assign:
- next: ${default(map.get(args, "next"), "step_a")}
- switch_next_step:
switch:
- condition: ${next == "step_a"}
next: step_a
- condition: ${next == "step_b"}
next: step_b
- condition: ${next == "step_c"}
next: step_c
- step_a:
call: http.get
args:
url: https://example.com
- step_b:
call: http.get
args:
url: https://examplexxx.com
- step_c:
call: http.get
args:
url: https://example1.com
自動で判別
引数で指定するのも良いですがミスが怖いです。
前述しましたが、再実行するケースは途中で処理が失敗した時の場合が多いので失敗した箇所を自動で特定できれば next
を使いスキップが可能です。
Cloud Workflows の API を用いて直前の実行結果を取得して次に実行すべきステップを判別してみます。
例として先程と同じで step_b
が失敗したとして、再実行時は step_b
から実行開始したいケースを考えます。
以下のように API から実行結果を取得する部分を get_next
というサブワークフローのように記述します。
返り値として前回実行で失敗した場合に失敗したステップを返します。
main:
steps:
- check_next:
call: get_next
result: get_next_result
- assign_next:
assign:
- next: ${get_next_result}
- switch_next_step:
switch:
- condition: ${next == "step_a"}
next: step_a
- condition: ${next == "step_b"}
next: step_b
- condition: ${next == "step_c"}
next: step_c
- step_a:
call: http.get
args:
url: https://example1.com
- step_b:
call: http.get
args:
url: https://examplexxx.com
- step_c:
call: http.get
args:
url: https://exampleyyy.com
get_next:
steps:
- list:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.list
args:
parent: projects/xxxxxxxxxxxxxxx/locations/us-central1/workflows/workflow-test
pageSize: 2 # 再実行と直前の実行の記録を取得する
view: FULL
result: list_result
- detail:
call: googleapis.workflowexecutions.v1.projects.locations.workflows.executions.get
args:
name: ${list_result.executions[1].name} # 直前の実行である 1 番目を取得
view: FULL
result: detail_result
- check_state:
switch:
- condition: ${detail_result.state == "SUCCESS"} # 直前の実行が成功の場合はスキップ
next: done
next: get_failed
- get_failed:
return: ${detail_result.status.currentSteps[0].step}
- done:
return: "step_a"
まとめ
Cloud Workflows は安価でサーバ管理不要でワークフロー製品としては優秀だと思います。
ですが、現状は途中実行を気軽にできないので手で実装する必要があります。
この記事を書いてる最中に久々に Cloud Workflows の管理画面を覗いたところステップごとの実行ステータスを見る画面が追加になっていました。
ステップ単位での実行結果がわかりにくかったところがこれで解決したので途中実行機能も追加されるかもしれません。
この記事が役に立たなくなることを願ってアップデートを待とうと思います。🙏
Discussion