Cloud Tasks の BufferTask API を試してみる
はじめに
Cloud Tasks でのタスク作成方法に BufferTask というものが知らないうちに追加されていました。これを利用すると、クライアントライブラリを使わずにタスクの追加ができる[1]ようです 😲
今回は、Google Cloud 公式ブログ で紹介されている「Pub/Sub と Cloud Run との間のバッファとしての Cloud Tasks[2]」を実際にやってみます。
公式サンプルを試してみる
このサンプルでは Cloud Run のイベントトリガーとして Cloud Pub/Sub が設定されています。Cloud Pub/Sub に対してメッセージが多すぎると、Cloud Run が耐えられない可能性があることを想定してます。この際に Cloud Tasks キューの URI オーバーライドと BufferTask API を使えば、メッセージの流入をコントールし解決できるといった具合です。
イメージ図
1. Pub/Sub トリガーの Cloud Run をデプロイする
イベントハンドラとして Cloud Run サービスをデプロイして、Eventarc Pub/Sub トリガーを作成します。
SERVICE=hello
REGION=us-central1
gcloud run deploy $SERVICE \
--allow-unauthenticated \
--image=gcr.io/cloudrun/hello \
--region=$REGION
TRIGGER=trigger-pubsub
gcloud eventarc triggers create $TRIGGER \
--destination-run-service=$SERVICE \
--destination-run-region=$REGION \
--event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
--location=$REGION
トリガーを作成すると新しい Pub/Sub トピックが作成されます。
作成された Eventarc
確認のため、Pub/Sub にメッセージを送信します。
#!/bin/bash
REGION=us-central1
TRIGGER=trigger-pubsub
TOPIC=$(gcloud eventarc triggers describe $TRIGGER --location $REGION --format='value(transport.pubsub.topic)')
for i in {1..5}; do gcloud pubsub topics publish $TOPIC --message="Hello World$i" & done
サンプルアプリケーションは、メッセージを受け取るとそれをログへ出力するように実装されています。
ログを確認するとメッセージの到着時間がほぼ同時刻なので、このままメッセージの数が増えると Cloud Run に負荷がかかりそうな予感がします。
実行ログ
2. Cloud Tasks と BufferTask API を構成する
2.1. Cloud Tasks を作成する
エラーメッセージ
Tasks キューを作成します。キューレベルのルーティングを設定する必要があるので、対象となる Cloud Run の URL をホストとして設定しています。コンソール画面を確認すると、「HTTP Routing」という設定が増えています。
SERVICE=hello
REGION=us-central1
SERVICE_URL=$(gcloud run services describe $SERVICE --region $REGION --format 'value(status.url)')
SERVICE_HOST=$(echo $SERVICE_URL | sed 's,http[s]*://,,g')
QUEUE=pubsub-http-queue-1
LOCATION=us-central1
gcloud tasks queues create $QUEUE \
--http-uri-override="host:$SERVICE_HOST" \
--max-concurrent-dispatches=1 \
--max-dispatches-per-second=1 \
--location=$LOCATION
キューの設定画面
2.2. Pub/Sub のサブスクリプションを BufferTask API に変更する
サンプル通りの結果
ということで、Tasks キューの キューレベルのルーティング設定にクエリパラメータを設定します。設定は上書きされるので、あらためてホストも設定します。設定のついでに、キューのログも有効化しておきます。
TOPIC=$(gcloud eventarc triggers describe $TRIGGER --location=$LOCATION --format='value(transport.pubsub.topic)' | sed 's#.*/##')
PROJECT_ID="YOUR PROJECT ID"
QUERY="__GCP_CloudEventsMode=CUSTOM_PUBSUB_projects%2F${PROJECT_ID}%2Ftopics%2F${TOPIC}"
gcloud tasks queues update $QUEUE \
--http-uri-override="host:$SERVICE_HOST,query:$QUERY" \
--log-sampling-ratio=1.0 \
--location=$LOCATION
キューの設定画面
キューが Cloud Run を叩いてくれるので、Pub/Sub がタスクを登録するようにします。そのため、Pub/Sub の Subscription の push エンドポイントを BufferTask API に変更します。
SUBSCRIPTION=$(gcloud eventarc triggers describe $TRIGGER --location=$LOCATION --format='value(transport.pubsub.subscription)' | sed 's#.*/##')
PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(project_number)")
gcloud pubsub subscriptions update $SUBSCRIPTION \
--push-endpoint="https://cloudtasks.googleapis.com/v2beta3/projects/${PROJECT_ID}/locations/${LOCATION}/queues/${QUEUE}/tasks:buffer" \
--push-auth-service-account="${PROJECT_NUMBER}-compute@developer.gserviceaccount.com"
サブスクリプションの設定画面
3. テストする
5 個のメッセージだと違いがわからなかったので、30 個ほどのメッセージをパブリッシュします。ほぼ同時に 30 個のタスクが登録されており、キューがない場合はこれらがそのまま Cloud Run へと送信されてしまいます。
resource.type="cloud_tasks_queue"
jsonPayload.taskCreationLog.status="OK"
タスク登録ログ
タスクのディスパッチログを確認すると、はじめのうちは同時に実行されていることがわかります。その後、想定していた通り 1 秒ごとにタスクがディスパッチされています。おそらく、Max burst size
[3]が 10 になっていることからこのような挙動になっていると推測してます。
またまた、サンプル通りの結果とは異なりますが「流入を制限できるようになること」は確認できました。
resource.type="cloud_tasks_queue"
jsonPayload.attemptDispatchLog.dispatchReason="PUSH_QUEUE"
タスクディスパッチログ
メッセージ処理のログ
【プラス α】最小権限の原則に沿う構成にする
サンプルでは、デフォルトの Compute Engine サービス アカウントを使っているので本番利用を想定して適切にサービスアカウントを構成してみたいと思います。
そのため、デフォルトの Compute Engine サービス アカウントから「Editor」ロールを削除します。これで適切にサービスアカウントを作成していない場合は権限エラーがでるようになります。以降では、各リソースで必要な権限をみていきます。
gcloud projects remove-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
--role="roles/editor"
1. Cloud Run を実行できるサービスアカウトを絞る
Cloud Run を--allow-unauthenticated
でデプロイしたので、このままでは未認証でアクセスできてしまいます。これを特定のサービスアカウントのみアクセスできるように構成し直します。
# 1. Cloud Run の実行できるユーザを「すべて」から外す
gcloud run services remove-iam-policy-binding $SERVICE \
--region=$REGION \
--member="allUsers" \
--role="roles/run.invoker"
# 2. Cloud Run を実行できるサービスアカウントを作成する
gcloud iam service-accounts create run-invoker
# 3. メールアドレスの取得
RUN_INVOKER=$(gcloud iam service-accounts list --filter="email:run-invoker*" --format="value(email)")
# 4. Cloud Run の実行権限を付与する
gcloud run services add-iam-policy-binding $SERVICE \
--region=$REGION \
--member="serviceAccount:$RUN_INVOKER" \
--role="roles/run.invoker"
Cloud Run の Permissions 設定
Cloud Run へのアクセスを許可されたサービスアカウントを使って、タスクをディスパッチします。そのため、Cloud Tasks キューの設定を変更します。Cloud Run へのリクエストは OpenID Connect の ID トークンが必要なため、以下のオプションを使って構成します。この設定は、コンソール画面から確認することができませんでした。
# 1. Service Account を設定する
gcloud tasks queues update $QUEUE \
--location=$LOCATION \
--http-oidc-service-account-email-override="$RUN_INVOKER"
# 2. 確認する
gcloud tasks queues describe $QUEUE \
--location=$LOCATION --format="value(httpTarget.oidcToken)"
同様に Eventarc が Cloud Run へイベントを転送できるようにする必要があります。そのため、上記のサービスアカウントを利用するよう以下のオプションを使って構成します。
gcloud eventarc triggers update $TRIGGER \
--location=$LOCATION \
--service-account="$RUN_INVOKER"
Eventarc の設定画面
2. Pub/Sub からタスクを登録できるようにする
Pub/Sub には何も権限のないサービスアカウトが付与されているので、キューへタスクを登録できるようにする必要があります。「Cloud Tasks Enqueuer」「Service Account User」が付与されたサービスアカウントを用意します。
# 1. Cloud Tasks キューにタスクを作成できるサービスアカウトを作成する
gcloud iam service-accounts create task-creator
# 2. メールアドレスの取得
TASK_CREATOR=$(gcloud iam service-accounts list --filter="email:task-creator*" --format="value(email)")
# 3. タスクを作成するためのロールを割り当てる
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${TASK_CREATOR}" \
--role="roles/cloudtasks.enqueuer"
# 4. タスクをディパッチする際に、Cloud Run を実行するサービスアカウントを割り当てられるようにロールを付与する
gcloud iam service-accounts add-iam-policy-binding $RUN_INVOKER \
--member="serviceAccount:$TASK_CREATOR" \
--role='roles/iam.serviceAccountUser'
task-creator の IAM Permissions 設定
run-invoker の Service Accounts Permissions 設定
Pub/Sub サブスクリプションで、上記のサービスアカウトを利用するよう以下のオプションを使って構成します。このとき、push エンドポイントも設定が必須になっているので、注意が必要です。
gcloud pubsub subscriptions update $SUBSCRIPTION \
--push-endpoint="https://cloudtasks.googleapis.com/v2beta3/projects/${PROJECT_ID}/locations/${LOCATION}/queues/${QUEUE}/tasks:buffer" \
--push-auth-service-account="$TASK_CREATOR"
Pub/Sub サブスクリプション の Service Accounts 設定
3. もう一度テストします
また 30 個のメッセージを登録して最後にきちんと動くかどうかを確認します。無事 Cloud Run でメッセージが処理されることが確認できました。
サービスアカウント変更後の実行ログ
【おまけ】curl で Buffer Task API をたたいてみる
BufferTask によるタスク登録は、クライアントライブラリのサポートがないとドキュメントに記載があります[4]。gcloud tasks buffer
でもタスクの登録をすることができます[5]が、body に値をつめることができないので、今回のようなものを実行することはできませんでした。したがって、curl をつかってタスクの登録をコマンドから試してみます。
# ボディが設定できない
gcloud tasks buffer --queue=pubsub-http-queue-1 --location=us-central1
リクエスト body の形式は Pub/Sub と同じにします。文字列を base64 でエンコードして、body の data に使い、それ以外は任意の値を設定します。認証には、ユーザーの認証情報から生成されたアクセストークンを使います[6]。
base64 < <(echo -n "Hello World 9999" )
## 結果:SGVsbG8gV29ybGQgOTk5OQ==
curl -X POST \
-H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
-H 'Content-Type: application/json' \
"https://cloudtasks.googleapis.com/v2beta3/projects/${PROJECT_ID}/locations/${LOCATION}/queues/${QUEUE}/tasks:buffer" \
-d '{"message": {"data": "SGVsbG8gV29ybGQgOTk5OQ==","messageId": "1","message_id": "1","publishTime": "2024-05-03T00:00:00.000Z","publish_time": "2024-05-03T00:00:00.000Z" }}'
実行結果
おわりに
サンプルを試しながら、キューレベルのルーティング設定と BufferTask API について学べました。サンプルがそのまま動かなかったり、Max burst size
の仕様が実はあったりと思ったよりも学びが多かったです 😁 まだ試したことがないひとの参考になれば幸いです。
参考にしたサイト
- 時間のかかっているバッチ処理を Cloud Pub/Sub で改善する話 - Speaker Deck
- Google Cloud Tasks のあれこれ
- 【GCP】Cloud Tasks のトークンバケットアルゴリズムについてまとめた
Discussion