💫

Google Cloud で、決まった時刻にjobを実行する、2つの方法

2022/12/19に公開

こんにちは、カスタマーエンジニアの大戸(おおど)です。

この記事は、「Google Cloud Japan Advent Calendar 2022 - 今からはじめる Google Cloud」の17日目の記事になります。

はじめに

ここでは、「今から始める Google Cloud」ということで、これから Google Cloud を使っていく/ 使ってみたい ユーザー を対象にした、入門向けの内容になります。

今回は、「決まった時刻にJobを実行する方法」として、「Cloud Tasks」を利用する方法、「Cloud Scheduler」を利用する方法を、それぞれご紹介します。

ここに書くこと/ 書かないこと

今回は、Cloud Tasks/ Cloud Scheduler を初めて使う人向けの記事とし、プロダクトの概要と簡単な比較、使い方に関してフォーカスして記載しています。

ここに書くこと

  • Cloud Tasks/ Cloud Scheduler の概要
  • 他のGoogle Cloud プロダクトとの簡単な比較
  • 定期時刻にJobを実行する例の紹介

書かないこと

  • Cloud Tasks/ Cloud Scheduler の詳細な内容
  • 他社の類似プロダクトとの比較

Cloud Tasks

Cloud Tasksとは

Cloud Tasks とは、非同期でjobを実行する、分散タスクキューのマネージドサービスになります。Cloud Tasks を使用すると、ユーザー リクエストやサービス間リクエストの外部で作業を非同期実行できます。

Cloud Pub/Subとの比較

Cloud TasksCloud Pub/Sub はどちらも、メッセージ送信と非同期統合を実装する際に利用されます。が、正直似ていて、それぞれの使い所が分かりにくい!と思います。
(少なくとも記事を書く際に、一番最初に私が思ったことになります・・・w)

まず、両者の違いを理解するにあたって、両者のアーキテクチャ/ データモデルの違いを理解する必要があります。詳細は、「Pub/Sub を使用したイベント ドリブン アーキテクチャ」を読んでいただけると、より理解が深まると思いますが、大きな違いとして、下記があると思います。

  • Cloud Pub/Sub(Pub/Sub モデル) では、送信側と受信側に応じた、ターゲット(送信側は、topicで、受信者側はsubscripton)があり、topic と subscription の組み合わせは自由

    • topic と subscription の紐付けで、データの流れを制御することができる

    • subscripitionに関しては、2種類あり、用途に応じて選択する

    • 複数の topic を1つの subscription で受信することも可能、また逆もしかり

    • 設定の変更時は、topic と subscription の紐付けを変えることで、データの流れを変えられる

  • Cloud Tasks(messaging-queue モデル)では、送信側と、受信側は、同一のターゲット(task queue)を設定する必要がある

    • topic と subscription を作成する必要がないので、設定箇所は少ない(アーキテクチャはシンプル)
    • データの流れを変更する際には、送信側/受信側 がターゲットを変更する必要がある
    • 複数に対してメッセージを送信する際(ex. task の発行)、ターゲット(task queue)は複数必要になる

図1. Pub/Sub モデルと Message-queue モデル の違い
図1. Pub/Sub モデルと Message-queue モデル の違い

サービス間のより細かい比較に関しては、「Cloud Tasks か Pub/Sub かの選択」に詳細まとまっているので、読んでいただけると良いと思います。個人的に Cloud Tasks で良いと思ったのは、下記の3点になります。

  • (個人的にアツい!) 計画的な配信(queueに載せる際に、実行する時間を設定できる)
  • タスク/メッセージの保持期間が長い
  • job の最大処理時間が、Pub/Sub よりも長い(job の実行環境/呼び出し方法によって、時間は異なる)

Cloud Scheduler

Cloud Schedulerとは

Cloud Scheduler は、フルマネージドジョブスケジューラです。crontab と聞くと、サーバを構築/運用したことがある方には、馴染み深いと思いますが、定期的にコマンドを実行する手段としてお馴染みだと思います。

Cloud Scheduler の場合は、 crontab のように、コマンドを実行するのではなく、「HTTP requestの実行」、「pub/sub の topic へのメッセージ送信」、の 2つのうちから選ぶことができます。(公式ドキュメントでは AppEngine の HTTP request は別扱いされてますが、ここでは簡潔にまとめるため、 2 つとしています。)
ちなみに、「Cloud Workflows 実行」も例として記載がありますが、これは、HTTP Request 実行において、Google Cloud の API を叩くことで、連携を実現しています。

「決まった時刻にjobを実行する」を実践する

Cloud Tasks/ Cloud Schudeler は、イベントトリガーとして利用できます。ここに関しては、使い分けに迷う!みたいなことは少ないと思いますが、「Cloud Tasks と Cloud Scheduler の比較」に詳細の記載があるので、見ていただくといいかと思います。

アーキテクチャ

今回は、シンプルに Job を実行するコンピュートとして、Cloud Functions を利用し、Cloud Tasks、Cloud Scheduler のそれぞれから、HTTP Request(POST)で呼び出す!という形にしました。


図2. 今回の検証のアーキテクチャ

注意点としては、Cloud Tasks、Cloud Schedulerにおいて、多重実行される可能性があるため、worker は冪等性を担保した形で、開発する必要があります。

Cloud Tasks では、「99.999% 以上のタスクが 1 回だけ実行される」と記載があるものの、「完全に重複排除はできない」という注意書きがあります。
また、Cloud Scheduler では、「まれに、同じジョブの複数のインスタンスがリクエストされる可能性があります。」とのことで、こちらも重複実行される可能性があることを書いています。

もし、重複実行を許容できない場合は、「Cloud Pub/Sub 1回かぎりの配信」を利用すると良いと思います。ただし、利用することで、レイテンシーの低下等のデメリットもあるので、注意が必要です。

Cloud Functions の設定

コンソールのクイックスタート(第 1 世代)」を参考に、下記のように設定しました。


図3. Cloud Functions の設定

主な変更箇所は、下記になります。

  • 関数名:tasks-scheduler-worker
  • Region: asia-northeast1
  • トリガータイプ: HTTP
  • 認証: 未認証の呼び出しを許可する
  • 割り当てられるメモリ: 128 MB
  • タイムアウト: 30 (秒)

コードは、python3.10のランタイムを選択し、下記のように記載しました。エントリポイント(実行したい関数名)は、下記のコードに合わせて「worker」にする必要があるので、注意してください。

# main.py
def worker(request):
    """Responds to any HTTP request.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text or any set of values that can be turned into a
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    # NOTE: Cloud Logging で、Cloud Tasks/ Cloud Scheduler どちらに叩かれたか判別できるように、request body を出力する
    print(request.get_data())
    return request.get_data()

また、今回は簡単のために「未認証の呼び出しを許可する」を設定していますが、セキュリティ面を考えると、未認証呼び出しは許可せず、「認証トークンがある HTTP Target タスクの使用 」と組み合わせて利用することをオススメします。

Cloud Tasks の設定

まずは、Cloud Tasks はキューを作成する必要があります。
事前に必要な準備に関しては、「Cloud Tasks キューにタスクを追加する#始める前に」を参考に、APIの有効化の実行をしてください。
APIの有効化が完了したら、「Cloud Tasks キューを作成する」を参考にし、下記のコマンドを Cloud Shell のターミナルで実行し、キューを作成します。

# Cloud Tasks の queue を作成
$ gcloud tasks queues create tasks-test --location=asia-northeast1
Created queue [asia-northeast1/tasks-test].

# 作成した queue の設定を確認
$ gcloud tasks queues describe tasks-test --location=asia-northeast1
name: projects/kazu0716-sandbox/locations/asia-northeast1/queues/tasks-test
rateLimits:
  maxBurstSize: 100
  maxConcurrentDispatches: 1000
  maxDispatchesPerSecond: 500.0
retryConfig:
  maxAttempts: 100
  maxBackoff: 3600s
  maxDoublings: 16
  minBackoff: 0.100s
state: RUNNING

無事に、queueが作成されたことが確認できたので、Targetタスク を作成するためのScriptを作成します。「HTTP Target タスクの作成」を参考に、下記のような Python Script を、Cloud Shell 環境で実行しました。

"""Create a task for a given queue with an arbitrary payload."""

import datetime
import json
from uuid import uuid4

from google.cloud import tasks_v2
from google.protobuf import duration_pb2, timestamp_pb2

# Create a client.
client = tasks_v2.CloudTasksClient()

# NOTE: 下記を自身の環境に応じて変更
project = 'kazu0716-sandbox'
queue = 'tasks-test'
location = 'asia-northeast1'
url = 'https://asia-northeast1-kazu0716-sandbox.cloudfunctions.net/tasks-scheduler-worker'
payload = {'message': 'from cloud tasks'}
in_seconds = 600
# NOTE: 同一名の task を連続して投げられないので、task 名がユニークな文字列になるようにしている(動作確認時に連続して叩けないと不便なので、このように実装している)
task_name = f'tasks-test-{uuid4()}'
deadline = 900

# Construct the fully qualified queue name.
parent = client.queue_path(project, location, queue)

# Construct the request body.
task = {
    "http_request": {  # Specify the type of request.
        "http_method": tasks_v2.HttpMethod.POST,
        "url": url,  # The full url path that the task will be sent to.
    }
}
if payload is not None:
    if isinstance(payload, dict):
        # Convert dict to JSON string
        payload = json.dumps(payload)
        # specify http content-type to application/json
        task["http_request"]["headers"] = {"Content-type": "application/json"}

    # The API expects a payload of type bytes.
    converted_payload = payload.encode()

    # Add the payload to the request.
    task["http_request"]["body"] = converted_payload

if in_seconds is not None:
    # Convert "seconds from now" into an rfc3339 datetime string.
    d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)

    # Create Timestamp protobuf.
    timestamp = timestamp_pb2.Timestamp()
    timestamp.FromDatetime(d)

    # Add the timestamp to the tasks.
    task["schedule_time"] = timestamp

if task_name is not None:
    # Add the name to tasks.
    task["name"] = client.task_path(project, location, queue, task_name)

if deadline is not None:
    # Add dispatch deadline for requests sent to the worker.
    duration = duration_pb2.Duration()
    duration.FromSeconds(deadline)
    task["dispatch_deadline"] = duration

# Use the client to build and send the task.
response = client.create_task(request={"parent": parent, "task": task})

print("Created task {}".format(response.name))

また、Cloud Shell の python 環境には default で Cloud Tasks の Client Library がインストールされていなかったので、インストール後、下記のように実行しました。

# ライブラリのインストール
$ pip install google-cloud-tasks

# スクリプトの実行(組織管理権限のあるユーザーで実行)
$ python create_tasks.py
Created task projects/kazu0716-sandbox/locations/asia-northeast1/queues/tasks-test/tasks/tasks-test-1e76aed5-c727-4749-a83f-13f19066b884

無事に、task が送信されましたが、タスクが実行されていないように見えます。なぜでしょうか?


図4. Cloud Tasks のコンソール画面

それは、タスク作成時に、schedule_timeを指定しており、今回のコードでは下記のように、現在時刻から600秒後にtaskが実行されるように設定してあるためです。

in_seconds = 600
==省略==
    # Convert "seconds from now" into an rfc3339 datetime string.
    d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)

    # Create Timestamp protobuf.
    timestamp = timestamp_pb2.Timestamp()
    timestamp.FromDatetime(d)

    # Add the timestamp to the tasks.
    task["schedule_time"] = timestamp

このようにして、queueにいれたタスクの実行時間を制御することができるため、決まった時間に、多くの task を実行することができます。
下記にて、Cloud Functions の実行ログを確認し、無事にタスクが完了したことを確認できました。


図5. Cloud Functions の設定

最後に、本番環境で使う際の注意点ですが、Cloud Tasks は、 Cloud Pub/Sub ほど柔軟にスケーリングはしないため、大量の task を実行する際には、一度「キューの過負荷」の項目をよく確認した上で、利用してください。

Cloud Scheduler の設定

Cloud Scheduler では、Cloud Tasks のように設定項目は多くなく、簡単に設定ができます。
こちらは、下記の手順実施前に、「cron ジョブをスケジュールして実行する」にもあるように、「Cloud Scheduler API を有効にする」を実施した上で、実行してください。

下記の画面に遷移し、「ジョブのスケジュール設定」のボタンをクリックします。


図6. Cloud Scheduler の画面(ジョブのスケジュール設定)

次に、名前や、リージョンの指定、実行頻度を設定します。今回は、すぐに実行されてほしいので毎分実行するようにしました。
また、タイムゾーンの設定は日本に設定しています。ここが間違っていると、数時間ずれて実行されてしまうので、注意してください。


図7. Cloud Scheduler の画面(スケジュールを定義する)

次に、実行内容の設定です。今回は Cloud Functions に HTTP POST で実行したいので、URLの指定も含めて、設定します。Cloud Scheduler が 実行されたか分かるように、HTTPボディにメッセージが飛ぶように設定してあります。
今回は、ログで見ることしかしませんが、実環境で利用する際には、HTTPヘッダーやボディの値を利用し、処理を変更することで、1つのendpointで複数のcron jobを処理する!というようなこともあると思います。


図8. Cloud Scheduler の画面(実行内容を構成する)

最後に、オプションの設定でリトライの設定等を行います。job が失敗した場合の処理等に関して記述ができるようになっています。


図9. Cloud Scheduler の画面(オプションの設定を行う)

これで、Cloud Scheduler の設定は完了になります。Cloud Schedulerの画面、Cloud Functions のログにて、無事に処理が実行されたことが確認できました。


図10. Cloud Scheduler の画面(処理の成功)

図11. Cloud Functionsのログ

まとめ

今回は、Cloud Tasks と Cloud Scheduler の紹介をし、それぞれの特徴と、Cloud Pub/Sub との比較、決まった時刻に Cloud Functions を呼び出す検証までを実施しました。

それぞれ、決まった時刻に、外部にHTTP Request を飛ばせるという機能は一緒ですが(むしろ、それ以外は全然違ったサービスで共通点を探す方が難しい・・・w)

使い分け(ユースケース)として

  • Cloud Scheduler に関しては、少数の job を定期時刻に実行するのに向いている(但し、ターゲットは、HTTP/Pubsubのみ)
    • ex. 定期的に古いデータを Cloud Storage から削除する job を実行する
  • 決まった時刻に大量の task を実行するには、Cloud Tasks が向いている
    • ex. 機械学習の作成したモデルで、対象となるユーザーを絞り込み、それらに定期時刻において push 通知を実施する

というのがあるような気がしました。
このように、一見すると同じことが、別のサービスを使ってできるように見えますが、それぞれのサービスの特性や制約によって、対応できるケース/ できないケースがあるので、非機能要件まで整理し、適切なサービスを選択して、要件を実現するというのが大事であるということが、わかると思います。

私の記事は以上で終了となります。ここまで読んでくださった方、ありがとうございました。
次回の「Google Cloud Japan Advent Calendar 2022 - 今からはじめる Google Cloud」 は、この記事でも度々出てきた「Cloud Pub/Sub」に関する記事になります。

Google Cloud Japan

Discussion