Cloud Tasks で連鎖ジョブを実装する
やりたいこと
Cloud Tasks を使って連鎖ジョブ/チェーンジョブを実行したい。
ある処理の結果を、次の処理のパラメータとして利用するシチュエーションを想定。
Cloud Tasks とは
そもそも Cloud Tasks は
大量の分散タスクの実行、ディスパッチ、配信を管理できるフルマネージド サービス
https://cloud.google.com/tasks
特徴などはこちら
Cloud Pub/Sub との違いや使い分けはこちらにまとまっている
ざっくりした使い分けはこのイメージ。
メモリ不足時の重複実行の問題
単純に連鎖ジョブを実行するのはそれほど難しくない。
実行環境がApp Engineでメモリ不足に陥った場合にタスクを再試行してくれる作りになっているのだが、再試行時に複数回実行されてしまうことが頻繁にある。
チェーンジョブのタスクの作りが甘いと、再試行時に後続のタスクが2回実行されてしまう。
App Engineインスタンスのアップグレードなどの打つべき対策はいくつもあるが、ここではタスクが再試行したとしても後続のタスクが複数回実行されない方法をまとめておく。
重複排除の方法
Cloud Tasks の重複排除の概要
Cloud Tasks の機能に重複排除がある。これは Cloud Pub/Sub にはない便利な特徴である。
ドキュメントがわかりにくかったが、下記のリンクに具体的な方法が掲載されていた。
ドキュメントによると、タスク作成時に明示的にタスクIDを指定すると、そのタスクが実行または削除されてから1時間は同じタスクIDでタスクが作成できないとのこと。
同じタスクIDでタスクを作成しようとすると、 409 AlreadyExists
エラーが発生する。
今回はこの重複排除の機能を活用して、連鎖ジョブの重複実行を防ぐことにする。
パラメータとしてタスクIDを持たせる
数値を渡すと 10 を足した結果を返す add_ten
というタスクを定義することにする。
結果が 100 未満であれば、結果を用いた新しいタスクを作成していき、数値を 10 ずつインクリメントする想定。
add_ten
タスクの擬似コードを書いてみた。
def add_ten(task_id, number):
result = number + 10
if result < 100:
next_task_id = generate_hash(task_id)
next_number = result
create_task(next_task_id, next_number)
return result
ポイントはタスクIDをタスクのパラメータとして持たせている点にある。
実行中のタスクIDを値として持つことで、次のタスクのIDを一意な値で生成できる。
次のタスクのIDを生成する方法は何でもよいが、ハッシュが手っ取り早いと思う。
add_ten
を実行中にエラーが発生して、複数回再試行されたとしても task_id
が同じである限り生成される next_task_id
も同じものになる。
next_task_id
が同じなので、2回目以降に実行された場合は 409 AlreadyExists
エラーとなり、重複したタスクは生成されなくて済む。
ちなみにタスクID自体は App Engine に来るリクエストのヘッダーに含まれるので、タスクのペイロードとして渡す必要はない。
まとめ
タスクIDから次のタスクIDを生成することで、連鎖ジョブでの重複排除を実現することができた。
当初意識していなかったが、ブロックチェーンのような作りになっていて自分で書きながらも興味深かった。
Discussion