🔖

ログをトリガーにして Cloud Run を重複しないように指定時刻に実行する

2023/11/24に公開

はじめに

過去に Cloud Storage をトリガーにして、 Cloud Run を起動する流れを記事にしました。
今回は Cloud Logging にログが書き込まれたことをトリガーにして Cloud Run を起動してみたいと思います。
ただ、これだと過去に記事にした流れとほとんど変わらないため、ログに書かれた時刻に Cloud Run を起動するようにし、かつ指定された時刻では一度だけ実行するようにしたいと思います。

この記事では、各Google Cloud のサービスについての詳細な説明は行いません。詳細は、参考や本文中のリンク先をご覧ください。

概要

処理の流れは、
Cloud Logging → Cloud Pub/Sub(Topic) → Cloud Functins → Cloud Tasks → Cloud Run
となります。各処理の役割は以下の通りです。

  1. Cloud Loggingログルーター
    • ログのフィルタリングを行う
    • Pub/Sub へ通知する
  2. Cloud Pub/Sub(Topic)
    • ログの配信を行う
  3. Cloud Functins
    • ログの内容から時刻を拾う
    • 時刻指定して Cloud Tasks にタスクを登録する
  4. Cloud Tasks
    • Cloud Run を指定時刻に実行する
    • タスクの重複を排除する
  5. Cloud Run
    • 指定時刻に実行されるジョブ

構築

1. Cloud Logging(ログルーター)

Cloud Logging では、ログルーター が シンク を使用して Pub/Sub へログエントリを転送するように構成します。

ログルーターのシンクを作成

シンクでは、包含フィルタを設定して、特定のログを選択するように構成します。

Cloud Logging - ログルーター の画面から、「シンクの作成」をクリックします

  • 「①シンクの詳細」にて、シンクの名称を入力します
「②シンクの宛先」にて、「シンクサービスの選択」で「Cloud Pub/Sub トピック」を選択し、トピックを新規作成します

「トピックID」を入力して、作成をクリックします

「シンクに含めるログの選択」にて、包括フィルタを作成します


* 今回は、以下の内容をフィルタとします
jsonPayload.message="My_test_entry"

  • また「シンクに含めないログの選択」にて、除外フィルタ を作成することができますが、今回は省略します
  • 入力後、「シンクを作成」をクリックします

2. Cloud Pub/Sub(Topic)

先ほどの手順において、「トピックID」を入力して作成したものです。作成したトピックは、Pub/Sub - トピック 画面から確認できます。

Cloud Fuctionsをトリガー

作成したトピックIDをクリックすると、トピックの詳細画面へ移動します。そこに「Cloud Fuctions の関数をトリガー」という項目があります。次項にて解説する Cloud Fuctions は、ここから作成します。

3. Cloud Functins

Cloud Functins をデプロイ

「Cloud Fuctions の関数をトリガー」をクリックすると以下の画面が表示されます。ここに関数名を入力し、「DEPLOY FUNCTION」をクリックします。環境について、今回は第1世代でも良いのですが最新の第2世代を選択します。ソースコードは後ほど修正します。

Cloud Functins を確認

Cloud Functins の画面にて、先ほど作成した関数を確認します。トリガー列がトピックとなっていることを確認できます。

Cloud Functins のソースコードを修正

続いて Cloud Functins のソースコードを修正します。
ソースコードは、GitHub にあるサンプルコードなどを参考に作成します。言語は Python です。作成物は以下の通りです。

google-cloud-tasks==2.13.1
create_http_task_with_token.py
# [START cloud_tasks_create_http_task_with_token]
from typing import Optional
from datetime import datetime
from google.cloud import tasks_v2
from google.protobuf import duration_pb2, timestamp_pb2

def create_http_task_with_token(
    project: str,
    location: str,
    queue: str,
    url: str,
    payload: bytes,
    service_account_email: str,
    audience: Optional[str] = None,
    scheduled_datetime: Optional[str] = None,
    task_id: Optional[str] = None,
    deadline_in_seconds: Optional[int] = None,
) -> tasks_v2.Task:
    """Create an HTTP POST task with an OIDC token and an arbitrary payload.
    Args:
        project: The project ID where the queue is located.
        location: The location where the queue is located.
        queue: The ID of the queue to add the task to.
        url: The target URL of the task.
        payload: The payload to send.
        service_account_email: The service account to use for generating the OIDC token.
        audience: Audience to use when generating the OIDC token.
        scheduled_seconds_from_now: Seconds from now to schedule the task for.
        task_id: ID to use for the newly created task.
        deadline_in_seconds: The deadline in seconds for task.
    Returns:
        The newly created task.
    """
    timestamp = None
    # Convert scheduled_datetime
    if scheduled_datetime is not None:
        time_format = '%Y-%m-%dT%H:%M:%S+09:00'
        dt = datetime.strptime(scheduled_datetime, time_format)
        print(dt.strftime(time_format))

        # Create Timestamp protobuf.
        timestamp = timestamp_pb2.Timestamp()
        timestamp.FromDatetime(dt)

    # Create a client.
    client = tasks_v2.CloudTasksClient()

    # Construct the request body.
    task = tasks_v2.Task(
        http_request=tasks_v2.HttpRequest(
            http_method=tasks_v2.HttpMethod.POST,
            url=url,
            headers={"Content-type": "application/json"},
            oidc_token=tasks_v2.OidcToken(
                service_account_email=service_account_email,
                audience=audience,
            ),
            body=payload,
        ),
        name=(
            client.task_path(project, location, queue, dt.strftime('%Y%m%dT%H%M%S'))
        ),
    )

    if timestamp is not None:
        # Add the timestamp to the tasks.
        task.schedule_time = timestamp

    # Convert "deadline in seconds" to a Protobuf Duration
    if deadline_in_seconds is not None:
        duration = duration_pb2.Duration()
        duration.FromSeconds(deadline_in_seconds)
        task.dispatch_deadline = duration

    # Use the client to build and send the task.
    return client.create_task(
        tasks_v2.CreateTaskRequest(
            # The queue to add the task to
            parent=client.queue_path(project, location, queue),
            task=task,
        )
    )
# [END cloud_tasks_create_http_task_with_token]
main.py
import os
import sys
import base64
import json

from typing import Optional
from google.cloud import tasks_v2
from create_http_task_with_token import create_http_task_with_token


# set-env-vars
PROJECT = os.getenv("PROJECT", None)
SERVICE_ACCOUNT = os.getenv("SERVICE_ACCOUNT", None)
RUN_URL = os.getenv("RUN_URL", None)
REGION = os.getenv("REGION", None)
QUEUE_NAME = os.getenv("QUEUE_NAME", None)


def check_params():
    if PROJECT is None:
        return False
    if SERVICE_ACCOUNT is None:
        return False
    if RUN_URL is None:
        return False
    if REGION is None:
        return False
    if QUEUE_NAME is None:
        return False
    return True


class ObjectLike(dict):
    __getattr__ = dict.get


def main(event, context):
    """Triggered from a message on a Cloud Pub/Sub topic.
    Args:
         event (dict): Event payload.
         context (google.cloud.functions.Context): Metadata for the event.
    """
    try:
        print('STARTED')

        if check_params():
            print('PARAMS OK')
        else:
            print('PARAMS NOT OK')
            return 'PARAMS NOT OK', 500

        # convert pubsub message
        pubsub_message = base64.b64decode(event['data']).decode('utf-8')
        print(pubsub_message)
        obj = json.loads(pubsub_message, object_hook=ObjectLike)
        print(obj.jsonPayload.message)

        # create Tasks
        task = create_http_task_with_token(
            PROJECT,
            REGION,
            QUEUE_NAME,
            RUN_URL,
            json.dumps(obj.jsonPayload).encode(),
            SERVICE_ACCOUNT,
            None,
            obj.jsonPayload.resv_dte
        )

    except:
        print(sys.exc_info())
        return 'EXCEPTION', 500

    finally:
        print('FINISHED')

    return 'OK', 200

以下、ポイントを解説します。

ログを取得する

pubsub_message = base64.b64decode(event['data']).decode('utf-8')
        print(pubsub_message)
        obj = json.loads(pubsub_message, object_hook=ObjectLike)

ログを取得する部分です。イベントペイロードから data 部をデコードします。デコードしたあとは、json 形式に変換し、obj 変数に格納します。

ログから指定時刻を拾う

        obj.jsonPayload.resv_dte

先ほど取得した json 形式のペイロードから当該の時刻を取得します。

時刻指定して Cloud Tasks にタスクを登録する

        # Add the timestamp to the tasks.
        task.schedule_time = timestamp

task に 時刻を指定しています。最大で720時間(30日)後まで指定ができます。

タスクの重複を排除する

        name=(
            client.task_path(project, location, queue, dt.strftime('%Y%m%dT%H%M%S'))
        ),

Cloud Tasks は、同じタスクIDでタスクを作成しようとすると 409 AlreadyExists エラーとなります。タスクは実行または削除されてから1時間は同じIDで作成できません。この仕組みを利用し、タスク作成時に時刻をIDとして指定しています。次項の Cloud Tasks にて少し解説します。

サービスアカウント

"<プロジェクト番号>-compute@developer.gserviceaccount.com" がデフォルトのサービスアカウントとなりますが、指定可能とするために環境変数としています。

呼び出す Cloud Run について

環境変数:RUN_URL は、最終的に呼び出したい Cloud Run の URL です。Cloud Run はまだ作成していないので、環境変数は空白としておきます。

4. Cloud Tasks

Cloud Tasks では、「完全に重複排除はできない」という注意書きがあるものの、「99.999% 以上のタスクが 1 回だけ実行される」と記載されています。重複が一切許容できないということでなければ、 登録タスクの重複排除には Cloud Tasks で十分かと思います。

Cloud Tasks のキューを作成

前項の Cloud Functins でタスクを登録する部分を先に作成しましたが、ここではタスクの格納先である Cloud Tasks キューを作成します。

  • Cloud Tasks の画面から「PUSH キューの作成」をクリックします
  • push キューの作成画面にて、キュー名 と リージョン を指定し、作成ボタンをクリックして作成します
    • キュー名 と リージョンは、前項の Cloud Functins でタスクを登録する部分で指定した値となります
Cloud Run を指定時刻に実行する

前項の Cloud Functins で プログラミングしたように、指定時刻にタスクを実行します。Cloud Tasks に登録されたキューは以下のようになります。「ETA」が指定日時となります。

5. Cloud Run

指定時刻に実行されるジョブは、呼び出しが確認できれば良いのでサンプルの【Hello World】を使用します。デプロイはリンク先の手順に沿って実行します。詳細はリンク先をご覧ください。

作成出来たら、URL を Cloud Functins に設定して再デプロイします。

Cloud Functions の環境変数

動作確認

ここまで構築したものを動作確認します。確認にあたって対象のログを登録する gcloud コマンドを実行します。

  • 対象のログを登録する gcloud コマンド
gcloud logging write --payload-type=json my-log '{ "message": "My_test_entry", "resv_dte": "2023-11-24T00:34:41+09:00"}'
gcloud コマンド が対象のログを出力したことを確認

対象のログを登録する gcloud コマンド を実行してログを出力します。ログが出力されたことを確認します。

続いて、
Cloud Logging → Cloud Pub/Sub(Topic) → Cloud Functins
が正常に動作していることを確認するため、Cloud Tasks にタスクを登録する Cloud Functions のログを見てみます。

Cloud Tasks にタスクを登録する Cloud Functions のログ

正常に動作していれば、以下のように Cloud Functions が動作しているはずです。

続いて、Cloud Functions → Cloud Tasks の部分が動作しているか確認します。

Cloud Tasks に時刻指定して登録されたタスク

Cloud Run を指定時刻に実行する にて紹介したように Cloud Tasks にタスクが登録されています。

最後に、Cloud Run が指定時刻に実行されたことを確認します。

Cloud Run のログ

指定時刻に正常に動作していれば、以下のように Cloud Run が動作しているはずです。

時刻が重複した場合

指定した時刻が重複していた場合、ログに 409 AlreadyExists エラーが出力されて登録しません。

さいごに

Cloud Run の実行には様々な方法がありますが、今回は色々とサービスを組み合わせつつ、 Cloud Tasks で時刻を指定して呼び出してみました。参考になれば幸いです。

参考

レスキューナウテックブログ

Discussion