📡

Cloud Tasks の BufferTask API を試してみる

2024/05/11に公開

はじめに

Cloud Tasks でのタスク作成方法に BufferTask というものが知らないうちに追加されていました。これを利用すると、クライアントライブラリを使わずにタスクの追加ができる[1]ようです 😲
今回は、Google Cloud 公式ブログ で紹介されている「Pub/Sub と Cloud Run との間のバッファとしての Cloud Tasks[2]」を実際にやってみます。

公式サンプルを試してみる

このサンプルでは Cloud Run のイベントトリガーとして Cloud Pub/Sub が設定されています。Cloud Pub/Sub に対してメッセージが多すぎると、Cloud Run が耐えられない可能性があることを想定してます。この際に Cloud Tasks キューの URI オーバーライドと BufferTask API を使えば、メッセージの流入をコントールし解決できるといった具合です。


イメージ図

1. Pub/Sub トリガーの Cloud Run をデプロイする

イベントハンドラとして Cloud Run サービスをデプロイして、Eventarc Pub/Sub トリガーを作成します。

SERVICE=hello
REGION=us-central1
gcloud run deploy $SERVICE \
  --allow-unauthenticated \
  --image=gcr.io/cloudrun/hello \
  --region=$REGION

TRIGGER=trigger-pubsub
gcloud eventarc triggers create $TRIGGER \
  --destination-run-service=$SERVICE \
  --destination-run-region=$REGION \
  --event-filters="type=google.cloud.pubsub.topic.v1.messagePublished" \
  --location=$REGION

トリガーを作成すると新しい Pub/Sub トピックが作成されます。

作成された Eventarc
確認のため、Pub/Sub にメッセージを送信します。

#!/bin/bash
REGION=us-central1
TRIGGER=trigger-pubsub
TOPIC=$(gcloud eventarc triggers describe $TRIGGER --location $REGION --format='value(transport.pubsub.topic)')
for i in {1..5}; do gcloud pubsub topics publish $TOPIC --message="Hello World$i" & done

サンプルアプリケーションは、メッセージを受け取るとそれをログへ出力するように実装されています。

https://github.com/GoogleCloudPlatform/cloud-run-hello/blob/2447ee13ee0aaa984f02aab7e4a684313d04f47f/hello.go#L72-L77

ログを確認するとメッセージの到着時間がほぼ同時刻なので、このままメッセージの数が増えると Cloud Run に負荷がかかりそうな予感がします。


実行ログ

2. Cloud Tasks と BufferTask API を構成する

2.1. Cloud Tasks を作成する


エラーメッセージ

Tasks キューを作成します。キューレベルのルーティングを設定する必要があるので、対象となる Cloud Run の URL をホストとして設定しています。コンソール画面を確認すると、「HTTP Routing」という設定が増えています。

SERVICE=hello
REGION=us-central1
SERVICE_URL=$(gcloud run services describe $SERVICE --region $REGION --format 'value(status.url)')
SERVICE_HOST=$(echo $SERVICE_URL | sed 's,http[s]*://,,g')
QUEUE=pubsub-http-queue-1
LOCATION=us-central1

gcloud tasks queues create $QUEUE \
  --http-uri-override="host:$SERVICE_HOST" \
  --max-concurrent-dispatches=1 \
  --max-dispatches-per-second=1 \
  --location=$LOCATION


キューの設定画面

2.2. Pub/Sub のサブスクリプションを BufferTask API に変更する


サンプル通りの結果

ということで、Tasks キューの キューレベルのルーティング設定にクエリパラメータを設定します。設定は上書きされるので、あらためてホストも設定します。設定のついでに、キューのログも有効化しておきます。

TOPIC=$(gcloud eventarc triggers describe $TRIGGER --location=$LOCATION --format='value(transport.pubsub.topic)' | sed 's#.*/##')

PROJECT_ID="YOUR PROJECT ID"
QUERY="__GCP_CloudEventsMode=CUSTOM_PUBSUB_projects%2F${PROJECT_ID}%2Ftopics%2F${TOPIC}"

gcloud tasks queues update $QUEUE \
  --http-uri-override="host:$SERVICE_HOST,query:$QUERY" \
  --log-sampling-ratio=1.0 \
  --location=$LOCATION


キューの設定画面

キューが Cloud Run を叩いてくれるので、Pub/Sub がタスクを登録するようにします。そのため、Pub/Sub の Subscription の push エンドポイントを BufferTask API に変更します。

SUBSCRIPTION=$(gcloud eventarc triggers describe $TRIGGER --location=$LOCATION --format='value(transport.pubsub.subscription)' | sed 's#.*/##')

PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(project_number)")
gcloud pubsub subscriptions update $SUBSCRIPTION \
  --push-endpoint="https://cloudtasks.googleapis.com/v2beta3/projects/${PROJECT_ID}/locations/${LOCATION}/queues/${QUEUE}/tasks:buffer" \
  --push-auth-service-account="${PROJECT_NUMBER}-compute@developer.gserviceaccount.com"


サブスクリプションの設定画面

3. テストする

5 個のメッセージだと違いがわからなかったので、30 個ほどのメッセージをパブリッシュします。ほぼ同時に 30 個のタスクが登録されており、キューがない場合はこれらがそのまま Cloud Run へと送信されてしまいます。

resource.type="cloud_tasks_queue"
jsonPayload.taskCreationLog.status="OK"


タスク登録ログ

タスクのディスパッチログを確認すると、はじめのうちは同時に実行されていることがわかります。その後、想定していた通り 1 秒ごとにタスクがディスパッチされています。おそらく、Max burst size[3]が 10 になっていることからこのような挙動になっていると推測してます。
またまた、サンプル通りの結果とは異なりますが「流入を制限できるようになること」は確認できました。

resource.type="cloud_tasks_queue"
jsonPayload.attemptDispatchLog.dispatchReason="PUSH_QUEUE"


タスクディスパッチログ

メッセージ処理のログ

【プラス α】最小権限の原則に沿う構成にする

サンプルでは、デフォルトの Compute Engine サービス アカウントを使っているので本番利用を想定して適切にサービスアカウントを構成してみたいと思います。
そのため、デフォルトの Compute Engine サービス アカウントから「Editor」ロールを削除します。これで適切にサービスアカウントを作成していない場合は権限エラーがでるようになります。以降では、各リソースで必要な権限をみていきます。

gcloud projects remove-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:${PROJECT_NUMBER}-compute@developer.gserviceaccount.com" \
  --role="roles/editor"

1. Cloud Run を実行できるサービスアカウトを絞る

Cloud Run を--allow-unauthenticated でデプロイしたので、このままでは未認証でアクセスできてしまいます。これを特定のサービスアカウントのみアクセスできるように構成し直します。

# 1. Cloud Run の実行できるユーザを「すべて」から外す
gcloud run services remove-iam-policy-binding $SERVICE \
  --region=$REGION \
  --member="allUsers" \
  --role="roles/run.invoker"
# 2. Cloud Run を実行できるサービスアカウントを作成する
gcloud iam service-accounts create run-invoker
# 3. メールアドレスの取得
RUN_INVOKER=$(gcloud iam service-accounts list --filter="email:run-invoker*" --format="value(email)")
# 4. Cloud Run の実行権限を付与する
gcloud run services add-iam-policy-binding $SERVICE \
  --region=$REGION \
  --member="serviceAccount:$RUN_INVOKER" \
  --role="roles/run.invoker"


Cloud Run の Permissions 設定

Cloud Run へのアクセスを許可されたサービスアカウントを使って、タスクをディスパッチします。そのため、Cloud Tasks キューの設定を変更します。Cloud Run へのリクエストは OpenID Connect の ID トークンが必要なため、以下のオプションを使って構成します。この設定は、コンソール画面から確認することができませんでした。

# 1. Service Account を設定する
gcloud tasks queues update $QUEUE \
  --location=$LOCATION \
  --http-oidc-service-account-email-override="$RUN_INVOKER"
# 2. 確認する
gcloud tasks queues describe $QUEUE \
  --location=$LOCATION --format="value(httpTarget.oidcToken)"

同様に Eventarc が Cloud Run へイベントを転送できるようにする必要があります。そのため、上記のサービスアカウントを利用するよう以下のオプションを使って構成します。

gcloud eventarc triggers update $TRIGGER \
  --location=$LOCATION \
  --service-account="$RUN_INVOKER"


Eventarc の設定画面

2. Pub/Sub からタスクを登録できるようにする

Pub/Sub には何も権限のないサービスアカウトが付与されているので、キューへタスクを登録できるようにする必要があります。「Cloud Tasks Enqueuer」「Service Account User」が付与されたサービスアカウントを用意します。

# 1. Cloud Tasks キューにタスクを作成できるサービスアカウトを作成する
gcloud iam service-accounts create task-creator
# 2. メールアドレスの取得
TASK_CREATOR=$(gcloud iam service-accounts list --filter="email:task-creator*" --format="value(email)")
# 3. タスクを作成するためのロールを割り当てる
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:${TASK_CREATOR}" \
  --role="roles/cloudtasks.enqueuer"
# 4. タスクをディパッチする際に、Cloud Run を実行するサービスアカウントを割り当てられるようにロールを付与する
gcloud iam service-accounts add-iam-policy-binding $RUN_INVOKER \
  --member="serviceAccount:$TASK_CREATOR" \
  --role='roles/iam.serviceAccountUser'


task-creator の IAM Permissions 設定

run-invoker の Service Accounts Permissions 設定

Pub/Sub サブスクリプションで、上記のサービスアカウトを利用するよう以下のオプションを使って構成します。このとき、push エンドポイントも設定が必須になっているので、注意が必要です。

gcloud pubsub subscriptions update $SUBSCRIPTION \
  --push-endpoint="https://cloudtasks.googleapis.com/v2beta3/projects/${PROJECT_ID}/locations/${LOCATION}/queues/${QUEUE}/tasks:buffer" \
  --push-auth-service-account="$TASK_CREATOR"


Pub/Sub サブスクリプション の Service Accounts 設定

3. もう一度テストします

また 30 個のメッセージを登録して最後にきちんと動くかどうかを確認します。無事 Cloud Run でメッセージが処理されることが確認できました。


サービスアカウント変更後の実行ログ

【おまけ】curl で Buffer Task API をたたいてみる

BufferTask によるタスク登録は、クライアントライブラリのサポートがないとドキュメントに記載があります[4]gcloud tasks buffer でもタスクの登録をすることができます[5]が、body に値をつめることができないので、今回のようなものを実行することはできませんでした。したがって、curl をつかってタスクの登録をコマンドから試してみます。

# ボディが設定できない
gcloud tasks buffer --queue=pubsub-http-queue-1 --location=us-central1

リクエスト body の形式は Pub/Sub と同じにします。文字列を base64 でエンコードして、body の data に使い、それ以外は任意の値を設定します。認証には、ユーザーの認証情報から生成されたアクセストークンを使います[6]

base64 < <(echo -n "Hello World 9999" )
## 結果:SGVsbG8gV29ybGQgOTk5OQ==
curl -X POST \
  -H "Authorization: Bearer $(gcloud auth application-default print-access-token)" \
  -H 'Content-Type: application/json' \
  "https://cloudtasks.googleapis.com/v2beta3/projects/${PROJECT_ID}/locations/${LOCATION}/queues/${QUEUE}/tasks:buffer" \
  -d '{"message": {"data": "SGVsbG8gV29ybGQgOTk5OQ==","messageId": "1","message_id": "1","publishTime": "2024-05-03T00:00:00.000Z","publish_time": "2024-05-03T00:00:00.000Z" }}'


実行結果

おわりに

サンプルを試しながら、キューレベルのルーティング設定BufferTask API について学べました。サンプルがそのまま動かなかったり、Max burst sizeの仕様が実はあったりと思ったよりも学びが多かったです 😁 まだ試したことがないひとの参考になれば幸いです。

参考にしたサイト

脚注
  1. キューレベルのルーティング設定と BufferTask API による HTTP リクエストのバッファリングの簡素化 - InfoQ ↩︎

  2. Cloud Tasks で HTTP リクエストをバッファリングする Google Cloud 公式ブログ ↩︎

  3. Package google.cloud.tasks.v2 > ratelimits   Cloud Tasks Documentation    Google Cloud ↩︎

  4. HTTP Target タスクを作成する    Cloud Tasks のドキュメント    Google Cloud ↩︎

  5. gcloud tasks buffer    Google Cloud CLI Documentation ↩︎

  6. REST を使用して認証する    Google Cloud ↩︎

コミューン株式会社

Discussion