👌

Google Cloud Tasksのあれこれ

2023/12/18に公開

このエントリーは一休.com Advent Calendar 2023の18日目の記事になります。


一休のいくつかのプロダクトで便利に活用されているGoogle Cloud TasksのHttpタスクあれこれについてまとめます。Httpタスクを使えば、パブリック IP アドレスを持つ任意のHTTP エンドポイントに、キューを経由して、タスクを処理させることができます。

再試行関係のパラメータを動かしながら確認してみる

公式ドキュメントは、https://cloud.google.com/tasks/docs/configuring-queues?hl=ja#retry

Google Cloud Consoleから設定できるパラメータは、以下の通り。

  • 最大試行回数 ... 公式ドキュメントのMAX_ATTEMPTS
  • 最小バックオフ ... 公式ドキュメントのMIN_INTERVAL
  • 最大バックオフ ... 公式ドキュメントのMAX_INTERVAL
  • 最大倍増回数 ... 公式ドキュメントのMAX_DOUBLINGS
  • 最大試行時間 ... 公式ドキュメントのMAX_RETRY_DURATION

では、実際に試してみます。以下のようにしてみます。

  • 最大試行回数 ... 20
  • 最小バックオフ ... 10
  • 最大バックオフ ... 600
  • 最大倍増回数 ... 4

実際に以下のGoのコードで試してみます。

package main

import (
	"context"
	"log"

	cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
	taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
)

func main() {
	ctx := context.Background()
	client, err := cloudtasks.NewClient(ctx)
	if err != nil {
		log.Fatal(err.Error())
	}
	defer client.Close()

	queuePath := "projects/hoge/locations/asia-northeast1/queues/test-queue"

	req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
			MessageType: &taskspb.Task_HttpRequest{
				HttpRequest: &taskspb.HttpRequest{
					HttpMethod: taskspb.HttpMethod_POST,
					Url:        "https://httpbin.org/status/500",
				},
			},
		},
	}
	req.Task.GetHttpRequest().Body = []byte("test")

	_, err = client.CreateTask(ctx, req)
	if err != nil {
		log.Fatal(err.Error())
	}
}

タスクの向き先を https://httpbin.org/status/500 へして、強制的にタスクを失敗させます。また、事前にCloud Tasksのログを有効にしておき、実行後、ログエクスプローラーを見てみます。

さて、Cloud Taskのログは、ひとつのタスクにつき、taskCreationLog/attemptDispatchLog/attemptResponseLogの3つのログが出力されます。
これらについては、ドキュメントが乏しく、フィールド名からその役割を類推するしかないのですが、おそらく、以下だと思われます。

  • taskCreationLog ... タスクがキューにpushされたことを示すログ
  • attemptDispatchLog ... タスクが実行された(ルーティング先のエンドポイントにリクエストされた)を示すログ
  • attemptResponseLog ... リクエストに対するレスポンスのログ

実際に、↑のパラメータで、リクエストしたところ、最初taskCreationLogが、ひとつ出力され、次から、attemptDispatchLogattemptResponseLogが組みで出力されました。

↑の画像は実際のログエクスプローラーです。attemptResponseLogのログがエラーになっているのがわかります。ハイライトされているタイムスタンプは、attemptResponseLog.scheduleTime というフィールドです。この時間の間隔を上から見ていくと、以下のように解釈できるでしょう。

  • 最初の試行と次のリトライ試行の差は約10.60秒です。
    • 最小バックオフの10秒が最初のリトライ試行までの間隔になる。
  • 2番目の試行と3番目の試行は約20.12秒です。
    • 10秒 * 2で20秒間隔
  • 次の差は約40.16秒です。
    • 20秒 * 2で40秒間隔
  • 次の差は約80.13秒です。
    • 40秒 * 2で80秒間隔
  • 次の差は約160.14秒です。
    • 80秒 * 2で160秒間隔。ここで、最大倍増回数の4回に到達。
  • 次の差は約320.13秒です。
    • 最大倍増回数に達しているので、倍ではなく、160秒 + 160秒で320秒。
  • 次の差は約480.13秒です。
    • 最大倍増回数に達しているので、倍ではなく、320秒 + 160秒で480秒。
  • 次の差は約600.13秒です。
    • 最大倍増回数に達しているので、倍ではなく、480秒 + 160秒で640秒のはずだが、最大バックオフの600秒を超えるので、600秒。
  • 次の差は約600.13秒です。
    • 以後、600秒のまま。
  • 次の差は約600.15秒です。
    • 以後、600秒のまま。
  • 次の差は約600.13秒です。
  • ... 以下、20回の試行が終わるまで、各試行の差は、約600秒。

という挙動になりました。
ただし、試行間隔は、正確に倍にならない場合もあるようです。たとえば、最小バックオフが5秒の場合、2回目のリトライは、筆者が試した限りでは、15秒になります。

まとめ

  • Cloud Tasksのログの見方と、https://httpbin.org/status/500 を使えば、簡単にリトライの挙動を確認できることがわかりました。
  • 最大試行回数に達してもまだ、成功しなかったタスクは、自動的に削除されます。そのようなタスクについて、どう扱うかは、Cloud Tasksの外の世界で考える必要がありそうです。
    • 削除のタイミングで特別なログが吐かれるということもないようです。

ディスパッチレート関係のパラメータを動かしながら確認してみる

ディスパッチという言葉は多義的ですが、Cloud Tasksにおいては、タスクの送信 ... 指定したhttpのエンドポイントにタスクという名のHttp Requestを(多くの場合POSTで)投げる ... のことと理解すればよさそうです。

公式ドキュメントは、https://cloud.google.com/tasks/docs/configuring-queues?hl=ja#rate

画面から設定できるパラメータは以下のふたつ。

  • 最大ディスパッチ数 ... 公式ドキュメントのDISPATCH_RATE
  • 最大同時ディスパッチ数 ... 公式ドキュメントのMAX_RUNNING

... と、ここで留意しなければならない点があります。実は、ディスパッチレートには、(コンソールで設定した場合は)暗黙に決まってしまう第3のパラメータがあるのです。公式ドキュメントのディスパッチレート制御のアルゴリズムの説明によれば、トークンバケットという、キューイングが急増しても、常に一定のペースで配送できるようにするアルゴリズムを利用して、配送制御を行っています。そして、このアルゴリズムには、バケットのサイズという重要なパラメータがありますが、これは、コンソールから設定できないようになっています。

トークンバケットについていくつかのサイトを参考にして、筆者の理解したレベルでまとめると、

  • バケットには一定の速度でトークンが補充される。
    • この一定の速度が、最大ディスパッチ数(DISPATCH_RATE)で設定される。つまり、最大ディスパッチ数 = 秒間に並列で実行される最大のタスク数ではない。
  • バケットにはサイズ(保持できるトークンの最大数)がある。
    • これによって、タスクがキューに追加されたら速やかに処理を開始することと、タスクが短時間にキューに追加されるた場合のリソース利用を抑制することの両方を実現する。
  • バケットにトークンがあれば、キューのタスクを実行し、トークンが1消費される。
  • バケットにトークンがなくなったら、タスクの処理はストップする(キューは長くなる)。
    • トークンは一定の速度で補充されることで、一時的にバケット内のトークンがなくなっても一定の処理レートは担保される。
  • 例えば、実行に時間がかかるタスクが散発し始めると、バケット内のトークンが増え(トークンの補充はされるが消費がされないので)、実行タスクが急増し、最大ディスパッチ数(DISPATCH_RATE)を超えた実行が行われる可能性がある。最大同時ディスパッチ数(MAX_RUNNING)は、このような状況を防ぐためのパラメータである。

最大ディスパッチ数が、蛇口から流れる水の流量、バケットサイズは洗面台の大きさ、最大同時ディスパッチ数が排水口の太さ、といような理解でよさそうです。

いずれにしろ、最大ディスパッチ数ととバケットのサイズを調整することで、流用の調整ができそうですが、前述の通り、バケットサイズは、コンソールからは設定できません。コンソールでは、バケットサイズは、最大ディスパッチ数 / 5 で、自動的に決まります。

'gcloud' コマンドで、バケットサイズを確認することができます。以下は、最大ディスパッチ数を500に設定した場合です。

$> gcloud tasks queues describe test-queue --location=asia-northeast1
name: projects/hogehoge/locations/asia-northeast1/queues/test-queue
rateLimits:
  maxBurstSize: 100 ## これがバケットサイズ 500 / 5 になっている
  maxConcurrentDispatches: 1000
  maxDispatchesPerSecond: 500.0
retryConfig:
  maxAttempts: 10 
  maxBackoff: 100s
  maxDoublings: 2
  maxRetryDuration: 100s
  minBackoff: 20s
stackdriverLoggingConfig:
  samplingRatio: 1.0
state: RUNNING

では、実際に試してみます。秒間に10タスクくらい処理できるキューにしたいと思いますので、最大ディスパッチ数を50(バケットサイズは10)にしてみます。

以下のGoのコードで試してみます。

package main

import (
	"context"
	"log"

	cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
	"golang.org/x/sync/errgroup"
	taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
)

func main() {
	ctx := context.Background()
	client, err := cloudtasks.NewClient(ctx)
	if err != nil {
		log.Fatal(err.Error())
	}
	defer client.Close()
	eg := new(errgroup.Group)
	i := 0
	for i <= 100 {
		eg.Go(func() error {
			_, err = client.CreateTask(ctx, create())
			return err
		})
		i++
	}

	if err := eg.Wait(); err != nil {
		log.Fatal(err.Error())
	}
}

func create() *taskspb.CreateTaskRequest {

	queuePath := "projects/hogehoge/locations/asia-northeast1/queues/test-queue"

	req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
			MessageType: &taskspb.Task_HttpRequest{
				HttpRequest: &taskspb.HttpRequest{
					HttpMethod: taskspb.HttpMethod_POST,
					Url:        "https://abcdefghijk.a.run.app/200",
				},
			},
		},
	}
	req.Task.GetHttpRequest().Body = []byte("test")
	return req
}

100のタスクをゴルーチンで、いっきに、エンキューしてみます。宛先は、Cloud Runにしてみました。Cloud Run側で、リクエストの着弾時間を見てみるためです。

以下が、リクエストを受け付けたCloud Run側のログです。

秒間10タスクほど着弾している想定でしたが、実際は秒間5リクエスト程度でした。

以下は、Cloud Task側のログのヒストグラムです。

左端に山があり、その後、一気に低減しています。しらべたところ、左端の山は、taskCreationLogのログが、キューイングしたタスクの分だけでていました。つまり、キューイング自体は、1秒以内に一気に行われている ... 筆者のgoの書き方やマシンスペックがダメで、キューの性能を使切れていない、というわけではなさそうです。
Cloud Task側のログをさらに細かく見ると、attemptDispatchLogのログのタイムスタンプが、およそ、Cloud Run側のタイムスタンプと同じ時間間隔でしたので、やはり、ディスパッチするタイミングですでに、秒間5タスク程度になっていたようです。おそらく、実際のディスパッチレートを決める要因はまだ、ほかにもあるはず(Http Requestにかかる時間など)なので、パラメータから算出した理論値は参考程度で考えておく必要がありそうです。

まとめ

  • ディスパッチレートには、バケットサイズという隠れたパラメータがある。
    • これはコンソールからは明示的に設定できない。
      • Terraformでも、Outputとなっており、設定はできない模様。Cloud Task APIでも設定できない。
    • ドキュメントによれば、queue.yamlを使ってレートを定義する場合に、バケットサイズを指定できるらしい。筆者未検証。https://cloud.google.com/tasks/docs/configuring-queues?hl=ja#rate
      • ただし、多くの場合において、暗黙に設定される値で最適らしい。
  • 実際のディスパッチレートは、理論値から導き出せないので、転送先のhttpサーバの性能や、キューイングの増減の仕方などを考慮しつつ、ワークロードに適合したパラメータを探る必要がありそう。

タスクの順序を考慮する

Cloud Tasksは、エンキューされた順でタスクが実行されることは保証されていないです。
したがって、あるタスクAが複数回リトライ中のときに、別タスクBが1回で成功した結果、本来、タスクA => タスクBの順で処理したかったのに、逆の順になってしまう、ということが発生するかもしれません。

X-CloudTasks-TaskETAを使ってみる

公式ドキュメント(https://cloud.google.com/tasks/docs/creating-http-target-tasks?hl=ja#handler)によれば、 Httpタスクを処理するハンドラには、X-CloudTasks-TaskETAというヘッダが、付与されておりここに、UNIX時間が設定されているようです。

これを使えば、ハンドラ側で、実行順序を考慮した処理ができそうです。
ただし、注意点があります。デフォルトでは、このX-CloudTasks-TaskETAの値は、タスクが実行された時間が設定されてしまう ... エンキューされたときのタイムスタンプではない ... という点です。
つまり、タスクA => タスクBの順でエンキューをしたとしても、タスク実行が、タスクB => タスクAになった場合、タイムスタンプは、タスクB < タスクAとなるので、エンキューの順になりません。

これは、エンキューするときに明示的にタイムスタンプを指定することで回避できます。

...()...
import (
	timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)
...()...
req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
			ScheduleTime: timestamppb.Now(), // これを追加
			MessageType: &taskspb.Task_HttpRequest{
				HttpRequest: &taskspb.HttpRequest{
					HttpMethod: taskspb.HttpMethod_POST,
					Url:        "https://abcdefghijk.a.run.app/200",
				},
			},
		},
	}

goなら、↑のように、ScheduleTime: timestamppb.Now()を追加します。

Cloud Runにリクエストヘッダをすべて標準出力にダンプするハンドラを用意して、そこにタスクを投げてみた結果が↓です。

X-Cloudtasks-Tasketa が付与されていることがわかります。UNIX時間は、マイクロ秒までの精度ですね。
この値を、DB等に保存しておけば、実行順を考慮した処理が書けそうです。

X-CloudTasks-TaskNameを使ってみる

↑の画像に、X-Cloudtasks-Taskname: 63523181031091362651 というのが含まれています。これは、タスク名を表すヘッダであり、デフォルトでは、Google Cloud側で生成される一意なIDが設定されます。
これも、明示的に指定することができるので、エンキューするときに順序がわかる情報 ... 例えばDBで連番を生成して付与する ... を設定すれば、ハンドラ側で処理順を考慮できそうです。
ただし、設定する上で注意する点があります。

  • タスク名は、projects/<PROJECT_ID>/locations/<LOCATION_ID>/queues/<QUEUE_ID>/tasks/<TASK_ID>のフォーマットで指定しなければならない。
  • タスク名は、ユニークでなければならない。違反すると、rpc error: code = AlreadyExists desc = Requested entity already existsというエラーになります。

これらを考慮して、以下のようなコードでテストしてみます。

...()...
import (
	"github.com/google/uuid"
)
...()...
        queuePath := "projects/hoge/locations/asia-northeast1/queues/test-queue"
	req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
                        Name: fmt.Sprintf("%s/tasks/%s_%d", queuePath, uuid.New().String(), time.Now().UnixNano()),
			MessageType: &taskspb.Task_HttpRequest{
				HttpRequest: &taskspb.HttpRequest{
					HttpMethod: taskspb.HttpMethod_POST,
					Url:        "https://abcdefghijk.a.run.app/200",
				},
			},
		},
	}

Name: fmt.Sprintf("%s/tasks/%s_%d", queuePath, uuid.New().String(), time.Now().UnixNano())というふうに名前を指定します。
UnixNanoだけだと重複する可能性もあるので、先頭にuuidを付与しました。
結果は以下の通り。

想定通り、uuidとUNIX時間を_で区切った文字列が設定されているのがわかります。

まとめ

  • ハンドラ側で、処理順したい場合、タスクの実行順ではなく、タスクのエンキュー順が必要になる場合が多い。
  • この場合、X-CloudTasks-TaskETAヘッダや、X-CloudTasks-TaskNameヘッダが活用できそう。
    • X-CloudTasks-TaskNameヘッダよりも、X-CloudTasks-TaskETAヘッダのほうが簡便か。

キューレベルのルーティングを構成する

この記事を執筆している時点では、pre-GAですが、キューにタスク配送先のURLを指定することで、タスクに設定されているURLがオーバーライドできるようです。

公式ドキュメント(https://cloud.google.com/tasks/docs/configuring-queues?hl=ja#gcloud)を見ると、一度キューを停止した上で、Google Cloud APIをcurlでコールして設定変更する手順が示されています。
しかし、gcloudコマンドのリファレンスを見ると、betaのコマンドを使えば、一発で設定できます。
以下の通り。

gcloud beta tasks queues update test-queue --http-uri-override=scheme:https,host:abcdefg.a.run.app,path:/200

パラメータの指定方法が独特ですね。ここでは、schemehttpshostabcdefg.a.run.app、pathを/200 に指定することで、Cloud Runのエンドポイントhttps://abcdefg.a.run.app/200 へ、タスクを配送するようにオーバーライドします。

実際の挙動を例によって、Goで確認してみます。

	req := &taskspb.CreateTaskRequest{
		Parent: queuePath,
		Task: &taskspb.Task{
			MessageType: &taskspb.Task_HttpRequest{
				HttpRequest: &taskspb.HttpRequest{
					HttpMethod: taskspb.HttpMethod_POST,
					Url: "https://httpbin.org/status/200",
				},
			},
		},
	}

タスクでは配送先をhttps://httpbin.org/status/200 にし、前述と同じように、goroutineで一気に流してみます。

そして、Cloud Runのログを見ると、リクエストを受信していることが確認できました。
注意する必要があるのは、現時点では、Cloud Task側のログのtargetAddressフィールドには、オーバーライドしたURLではなくタスクで指定したURLが出力される点です。また、同様に現時点では、オーバーライドの設定をコンソールで確認できません。

まとめ

  • pre-GAの機能として、キューで配送先のURLを指定できる。
  • これを指定すると、タスクで指定したURLがオーバーライドされる。
  • なんらかの事情で、早急に配送先のURLを一律変更したいが、エンキューしているクライアントが複数あってきっきに切り替えるのが大変、、、といったときに期待できる機能。

まとめ

Google Cloud Tasksの基礎的な挙動を実際に動かしながら確認しました。
本格運用していくには、認証やCloud Monitoringでの監視やメトリクス収集なども把握しておきたいところです。
※漏れたトピックを別の記事にまとめました。

※トークンバケットアルゴリズムについては以下のサイト/記事を参考にさせていただきました。

https://ja.wikipedia.org/wiki/トークンバケット

https://zenn.dev/nananaoto/articles/c62d63223e669af63c80

https://anveloper.com/2023/04/05/トークンバケットアルゴリズムとは/

Discussion