📖

Cloud Tasks のキューにあるタスクがすべて終了したことを通知したい

2024/10/18に公開

はじめに

Cloud Tasks は、非同期タスクをスケーラブルに実行するための Google Cloud のマネージドサービスです。タスクの完了状況を把握することは、システムの健全性を維持するために重要です。本記事では、Cloud Tasks のタスク完了通知を、メトリクスとモニタリングPub/Sub を活用して実現する方法について解説します。

タスク完了通知が必要となる場面は?

  • システムの健全性確認:タスクが正常に完了しているかを確認し、問題が発生している場合は早期に検知することができます
  • 依存関係のある処理のトリガー:タスク完了をトリガーにして、次の処理を開始することができます
  • ログの分析:タスクの実行状況をログとして記録し、後から分析することで、システムの改善に役立てることができます

メトリクス、モニタリング、Pub/Sub を組み合わせるメリット

  • 柔軟性:さまざまな指標を計測し、柔軟な通知設定を行うことができます
  • リアルタイム性:Pub/Sub を利用することで、すぐにタスク完了を通知することができます
  • スケーラビリティ:システムの規模に合わせて、柔軟にスケールすることができます

他のサービスでも実現可能か?

Cloud WorkflowsCloud Composer などでも実現は可能と思いますが、これらはより複雑な処理に向いています。

構成イメージ

Pub/Sub からの通知を配信する先は、Cloud FunctionsCloud Run など様々なサービスが考えられるため、今回は Pub/Sub に完了メッセージが届くまでを対象とします。

手順

今回はコンソール上から全て構築可能なため、コンソールでの手順を記載します。
また、各サービスのAPI利用やIAM権限の設定は省略します。

Cloud Tasks のキューを作成

  1. Cloud Tasks のキュー一覧画面から、「キューを作成」を選択します
  2. 「キュー名」と「リージョン」を設定し「作成」ボタンをクリックします
キューの作成

キューの一覧

Pub/Sub の設定

詳しい手順はこちらにあります。

  1. [トピック] ページに移動し、「トピックを作成」を選択します
  2. トピックの ID を入力します
  3. 「トピックを作成」 をクリックします
  4. [デフォルトのサブスクリプションを追加] オプションが選択されているので、サブスクリプションも自動で作成されます
  5. サブスクリプションの配信タイプは「pull」型で作成されています
サブスクリプション

Cloud Monitoring の設定

詳しい手順はこちらにあります。

  1. [アラート] ページに移動し、「ポリシーを作成」を選択します
  2. 「指標を選択」をクリックし、メニューの操作でリソース「Cloud Tasks Queue」と指標「Queue depth」を選択して、[適用] をクリックします
  3. 時系列のポイントの配置方法を変更します
    a. ローリング ウィンドウ:1分
    b. ローリング ウィンドウ関数:max
  4. さらに、時系列の結合を行います。子の設定により、先ほど作成した2つのキューが別々に集計されます
    a. 時系列集約:max
    b. 時系列の集約の基準:queue_id
アラートポリシーの編集:ポリシー構成モード

  1. 条件のトリガーを構成します
    a. Alert trigger:任意の時系列の違反
    b. しきい値の位置:しきい値より下
    c. しきい値:1
    d. 条件名:任意
アラートポリシーの編集:トリガーの設定

  1. 通知を構成する前に、通知チャンネルを作成します
    a. 「通知チャンネルを管理する」を選択します
    b. 「Pub/Sub」の横にある、「新規追加」をクリックします
    c. 「チャンネル名」と「トピック」を設定します。トピックはprojects/[PROJECT_ID]/topics/[TOPIC]の形式で指定します
    d. 「テスト通知を送信」をクリックして、トピックにテスト通知を送信しておきます(後ほど確認します)
    e. 「保存」をクリックします
通知チャンネルの管理

  1. 通知を構成します
    a. 通知チャンネル:6 で作成した通知チャンネル
    b. アラートポリシー名:任意
アラートポリシーの編集:通知と名前

  1. 「ポリシーを作成」をクリックして、設定を完了します

動作確認

今回の設定では、Cloud Tasks のキューにあるタスクが0になったとき、Cloud Monitoring から通知が PubSub に送られます。
動作確認の方法として Cloud Tasks のキューを一時停止してタスクを1つ滞留させ、再開することでキューにあるタスクが1から0になる状況を作ります。

Cloud Tasks のキューにタスクを作成

  1. Cloud Tasks のキュー一覧画面にて、キューを選択します
  2. 「HTTP タスクを作成」をクリックします
  3. [HTTP タスクの作成] ページにて以下を設定し、「作成」をクリックします
    a. URL:https://www.google.co.jp/
    b. HTTP メソッド:GET

一時停止したキューにタスクを作成

  1. Cloud Tasks のキュー一覧画面にて、キュー名の隣にあるチェックボックスを選択し、「キューの一時停止」をクリックします
  2. 一時停止したキューに先ほどの手順と同様にタスクを作成します
  3. キューは停止中なので、タスクは実行されずに滞留します

Cloud Monitoring の確認

[アラート] ページに移動し、作成したアラートポリシーを確認すると滞留しているタスクが1つあるキューを確認できます。

一時停止したキューを再開し、タスクを実行する

Cloud Tasks のキュー一覧画面にて、一時停止したキュー名の隣にあるチェックボックスを選択し、「キューの再開」をクリックします。これにより、滞留していたタスクが実行されてなくなります。

Pub/Sub から通知を確認

Cloud Tasks のキューにあるタスクが0になったため、Pub/Sub に通知が行われています。
[Pub/Sub]ページから、[サブスクリプション]のページに移動し、メッセージ画面を表示します。「PULL」をクリックすると、通知を確認できました。

メッセージ本文
{
  "incident": {
    "condition": {
      "conditionThreshold": {
        "aggregations": [
          {
            "alignmentPeriod": "60s",
            "crossSeriesReducer": "REDUCE_MAX",
            "groupByFields": [
              "resource.label.queue_id"
            ],
            "perSeriesAligner": "ALIGN_MAX"
          }
        ],
        "comparison": "COMPARISON_LT",
        "duration": "0s",
        "filter": "resource.type = \"cloud_tasks_queue\" AND metric.type = \"cloudtasks.googleapis.com/queue/depth\"",
        "thresholdValue": 1,
        "trigger": {
          "count": 1
        }
      },
      "displayName": "Cloud Tasks Queue - Queue depth",
      "name": "projects/project000/alertPolicies/5453371518531680893/conditions/4175623076424546133"
    },
    "condition_name": "Cloud Tasks Queue - Queue depth",
    "documentation": {
      "content": "",
      "mime_type": "",
      "subject": "[ALERT - No severity] PROJECT"
    },
    "ended_at": null,
    "incident_id": "0.nizc5rbr1gcg",
    "metadata": {
      "system_labels": {},
      "user_labels": {}
    },
    "metric": {
      "displayName": "Queue depth",
      "labels": {},
      "type": "cloudtasks.googleapis.com/queue/depth"
    },
    "observed_value": "0.000",
    "policy_name": "Cloud Tasks Queue - Queue depth",
    "resource": {
      "labels": {
        "project_id": "project000",
        "queue_id": "testQueue2"
      },
      "type": "cloud_tasks_queue"
    },
    "resource_id": "",
    "resource_name": "project000 Cloud Tasks Queue labels {project_id=project000, queue_id=testQueue2}",
    "resource_type_display_name": "Cloud Tasks Queue",
    "scoping_project_id": "project000",
    "scoping_project_number": 1729171616,
    "severity": "No severity",
    "started_at": 1729171616,
    "state": "open",
    "summary": "Queue depth for project000 Cloud Tasks Queue labels {project_id=project000, queue_id=testQueue2} is below the threshold of 1.000 with a value of 0.000.",
    "threshold_value": "1",
    "url": "https://console.cloud.google.com/monitoring/alerting/alerts/0.nizc5rbr1gcg?channelType=cloud-pubsub&project=project000"
  },
  "version": "1.2"
}

気になった点

ローリング ウィンドウの最小が1分

Cloud Monitoring で監視できる最小単位が1分のため、Cloud Tasks のタスクがすべて終了してからPub/Subへ通知されるまでに1~2分位のラグが発生しました。このラグが許容できない場合は別の方法を検討する必要があります。
また、すべてのタスク終了が1分以内の高速で行われた場合は、タスク数が0から0と変化がないことになり、今回の方法では通知が行われません。そのため何か工夫する必要があります。

Pub/Sub のメッセージのカスタムが制限される

今回、Cloud Monitoring の Cloud Tasks リソースにあるデフォルト指標を使用しましたので、Pub/Sub に通知されるメッセージは、「queue_id」くらいです。
[アラートポリシー] 設定の [ドキュメント] セクションで、通知に含めるコンテンツを指定することができますが、決まった変数に限られます。
もっと柔軟にメッセージを作成したい場合は、カスタム指標などを作成して通知を行うと良いかもしれません。

さいごに

今回、 Cloud Monitoring と Pub/Sub を組み合わせて「Cloud Tasks のキューにあるタスクがすべて終了したことを通知したい」という要件をかなえることができました。
実装自体はコンソールから一通りできるので、手軽に試すことができたのは良かったです。ただ、実装していて気になった点がいくつかありました。これらが気にならない、または解決できるのであれば Cloud Workflows よりもシンプルにフローを構築できそうです。

レスキューナウテックブログ

Discussion