Open6

Cloud Tasks 調査

Tomonori Hayashi / @pHaya72Tomonori Hayashi / @pHaya72

サンプルコード

import json
import os

from typing import Optional
import functions_framework

from google.cloud import tasks_v2


@functions_framework.cloud_event
def create_http_task_with_token(cloud_event):
    print("Functions Started")

    client = tasks_v2.CloudTasksClient()

    # 環境変数を取得
    URL = os.getenv("URL")
    SA = os.getenv("SA")
    GCP_PROJECT = os.getenv("GCP_PROJECT")

    # 1から10までのタスクを作成する
    for i in range(1, 11):
        json_payload = {"text": f"これは{i}番です"}

        # タスクを構成する
        task = tasks_v2.Task(
            http_request = tasks_v2.HttpRequest(
                http_method = tasks_v2.HttpMethod.POST,
                url = URL,
                headers = {"Content-type": "application/json"},
                oidc_token = tasks_v2.OidcToken(
                    service_account_email = SA
                ),
                body = json.dumps(json_payload).encode()
            ),
        )

        # タスクをキューに送信する
        client.create_task(
            tasks_v2.CreateTaskRequest(
                parent = client.queue_path(
                    project = GCP_PROJECT,
                    location = 'asia-northeast1',
                    queue = 'exec-cloud-functions'
                ),
                task = task,
            )
        )

        print(f"{i}番目のタスクを送信しました")

    print("Functions Finished")

    return "OK"
Tomonori Hayashi / @pHaya72Tomonori Hayashi / @pHaya72

tasks_v2.Task

  • Google Cloud Tasks API の v2 バージョンで使用される Task というデータ構造を Python で定義している
  • プロトコルバッファで定義された Task メッセージ型を Python クラスとして表現している

https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-tasks/google/cloud/tasks_v2/types/task.py

Tomonori Hayashi / @pHaya72Tomonori Hayashi / @pHaya72

tasks_v2.HttpRequest

  • Google Cloud Tasks API の v2 バージョンで使用されるデータ構造を Python で定義している
  • HTTP リクエスト関連のデータ構造に焦点を当てている
  • manifest には HttpMethod, HttpRequest, AppEngineHttpRequest, AppEngineRouting, OAuthToken, OidcToken といった、HTTP リクエスト関連のメッセージ型が追加されている
    • これらのメッセージ型は、Cloud Tasks がタスクを実行する際に使用されるリクエストの詳細を定義している

https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-tasks/google/cloud/tasks_v2/types/target.py#L22-L219

Tomonori Hayashi / @pHaya72Tomonori Hayashi / @pHaya72

tasks_v2.CloudTasksClient.create_task

  • Cloud Tasks のキューに新しいタスクを作成する処理を抽象化している
  • proto パッケージは直接的には使われていませんが、google.cloud.tasks_v2.types で定義されたメッセージ型を引数や戻り値の型として利用している
  • リトライ、タイムアウト、メタデータの指定など、API リクエストの細かい設定を柔軟に行うことが可能
  • 内部では、API の呼び出しに必要な CreateTaskRequest オブジェクトを生成し、RPC メソッドを通じて API リクエストを送信する

主要な引数

  • request: google.cloud.tasks_v2.types.CreateTaskRequest 型のオブジェクト、または dict 型のオブジェクトを受け付けます。これは、タスク作成に必要な情報をまとめたリクエストオブジェクトです。省略可能です。
  • parent: str 型で、タスクを作成するキューの名前(例: "projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID")を指定する
  • task: google.cloud.tasks_v2.types.Task 型で、作成するタスクの詳細情報を指定する
  • retry: google.api_core.retry.Retry 型で、API リクエストのリトライ設定を指定する
    • デフォルト値は gapic_v1.method.DEFAULT
  • timeout: float 型で、API リクエストのタイムアウト値を秒単位で指定する
    • デフォルト値は gapic_v1.method.DEFAULT
  • metadata: Sequence[Tuple[str, Union[str, bytes]]] 型で、リクエストとともに送信するメタデータを指定する

戻り値

  • 値の型は google.cloud.tasks_v2.types.Task です。これは作成されたタスクを表すオブジェクト

https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-tasks/google/cloud/tasks_v2/services/cloud_tasks/client.py#L2336-L2493

Tomonori Hayashi / @pHaya72Tomonori Hayashi / @pHaya72

tasks_v2.CreateTaskRequest

  • Google Cloud Tasks API の CreateTask メソッドのリクエストメッセージである CreateTaskRequest を定義している
  • メッセージは create_task メソッド(前の解説を参照)に渡されるリクエストデータの中核となる部分
  • タスク作成に必要な情報 (parent, task, response_view) を保持するためのコンテナ

フィールド一覧

  • parent: str = proto.Field(proto.STRING, number=1)
    • parent という名前の属性で、型は str (文字列)
    • タスクを作成するキューの名前 (例: "projects/PROJECT_ID/locations/LOCATION_ID/queues/QUEUE_ID") を指定するために使う
  • task: gct_task.Task = proto.Field(proto.MESSAGE, number=2, message=gct_task.Task)
    • task という名前の属性で、型は gct_task.Task (以前の解説で定義した Task メッセージ型)
    • 作成するタスクの詳細情報を含むオブジェクトを指定するために使う
  • response_view: gct_task.Task.View = proto.Field(proto.ENUM, number=3, enum=gct_task.Task.View):
    • response_view という名前の属性で、型は gct_task.Task.View (以前の解説で定義した Task メッセージの View Enum)
    • API レスポンスで返す Task オブジェクトの表示形式 (BASIC または FULL) を指定するために使う

https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-tasks/google/cloud/tasks_v2/types/cloudtasks.py#L424-L505

Tomonori Hayashi / @pHaya72Tomonori Hayashi / @pHaya72

プロトコル周辺のメモ

  • gRPC
    • リモートプロシージャコールプロトコル
    • HTTP/2 ベース(80/443 が利用される)
    • マイクロサービス間通信などが主な利用用途
    • 遠隔の手続きをローカルのように呼び出す
      • REST API はリソース(/users/123 といった URI)がどんな操作をするか(GET, POST といった HTTP メソッド)を指定して分離されているサーバーに問い合わせる(リクエスト)
  • プロトコルバッファ
    • gRPC で通信する際のデータフォーマット
    • .proto ファイルに記述された内容を protoc コンパイラ(もしくは protoc プラグイン)で各言語で扱えるコードが生成される
    • バイナリ形式でデータ効率がいい
  • Websocket
    • 双方向通信プロトコル
    • クライアントとサーバー間で双方向のリアルタイム通信を実現
    • TCPの上に直接構築され、HTTPハンドシェイク後に、データを双方向に送受信

Cloud Tasks に関するメモ

  • バックエンドサーバーからワーカーへメッセージを投げて非同期で処理させたい場合に利用される
  • そのため基本的には内部通信になるため、Cloud Tasks API を叩く際(サービス自体を操作する)にはプロトコルは gRPC が利用されている
  • Task や CreateTaskRequest はプロトコルバッファ形式のコンテナ(データ構造)となっている
  • HttpRequest はワーカーに対してどのようなプロトコルでリクエストするかというコンテナとなっている