Cloud Tasks で連鎖ジョブを実装する

2 min読了の目安(約2000字TECH技術記事

やりたいこと

Cloud Tasks を使って連鎖ジョブ/チェーンジョブを実行したい。
ある処理の結果を、次の処理のパラメータとして利用するシチュエーションを想定。

Cloud Tasks とは

そもそも Cloud Tasks は

大量の分散タスクの実行、ディスパッチ、配信を管理できるフルマネージド サービス
https://cloud.google.com/tasks

特徴などはこちら

Cloud Pub/Sub との違いや使い分けはこちらにまとまっている

ざっくりした使い分けはこのイメージ。

メモリ不足時の重複実行の問題

単純に連鎖ジョブを実行するのはそれほど難しくない。
実行環境がApp Engineでメモリ不足に陥った場合にタスクを再試行してくれる作りになっているのだが、再試行時に複数回実行されてしまうことが頻繁にある。
チェーンジョブのタスクの作りが甘いと、再試行時に後続のタスクが2回実行されてしまう。

App Engineインスタンスのアップグレードなどの打つべき対策はいくつもあるが、ここではタスクが再試行したとしても後続のタスクが複数回実行されない方法をまとめておく。

重複排除の方法

Cloud Tasks の重複排除の概要

Cloud Tasks の機能に重複排除がある。これは Cloud Pub/Sub にはない便利な特徴である。
ドキュメントがわかりにくかったが、下記のリンクに具体的な方法が掲載されていた。

ドキュメントによると、タスク作成時に明示的にタスクIDを指定すると、そのタスクが実行または削除されてから1時間は同じタスクIDでタスクが作成できないとのこと。
同じタスクIDでタスクを作成しようとすると、 409 AlreadyExists エラーが発生する。

今回はこの重複排除の機能を活用して、連鎖ジョブの重複実行を防ぐことにする。

パラメータとしてタスクIDを持たせる

数値を渡すと 10 を足した結果を返す add_ten というタスクを定義することにする。
結果が 100 未満であれば、結果を用いた新しいタスクを作成していき、数値を 10 ずつインクリメントする想定。

add_ten タスクの擬似コードを書いてみた。

擬似コード
def add_ten(task_id, number):
    result = number + 10
    if result < 100:
        next_task_id = generate_hash(task_id)
        next_number = result
        create_task(next_task_id, next_number)
    return result

ポイントはタスクIDをタスクのパラメータとして持たせている点にある。
実行中のタスクIDを値として持つことで、次のタスクのIDを一意な値で生成できる。
次のタスクのIDを生成する方法は何でもよいが、ハッシュが手っ取り早いと思う。

add_ten を実行中にエラーが発生して、複数回再試行されたとしても task_id が同じである限り生成される next_task_id も同じものになる。
next_task_id が同じなので、2回目以降に実行された場合は 409 AlreadyExists エラーとなり、重複したタスクは生成されなくて済む。

ちなみにタスクID自体は App Engine に来るリクエストのヘッダーに含まれるので、タスクのペイロードとして渡す必要はない。

まとめ

タスクIDから次のタスクIDを生成することで、連鎖ジョブでの重複排除を実現することができた。

当初意識していなかったが、ブロックチェーンのような作りになっていて自分で書きながらも興味深かった。