🔀

複雑なイベント駆動型アーキテクチャに役立つ Eventarc Advanced を紹介

2024/11/20に公開

こんにちは。クラウドエース第一開発部の竹内です。
本記事では2024年10月30日にプレビュー版がリリースされた Eventarc Advanced について紹介します。
Eventarc Advanced を使用すると、さまざまなサービス、アプリ、システム間でメッセージを受信、フィルタリング、変換、ルーティング、配信できます。

はじめに: Eventarc とは

Eventarc はソースからターゲットへイベントを非同期にルーティングする、スケーラブルなフルマネージドサービスで、イベント駆動型アーキテクチャの構築に役立ちます。
alt
https://cloud.google.com/eventarc/docs

現在 Eventarc には2つのエディションがあります。

  • Eventarc Advanced (Preview)
  • Eventarc Standard

Eventarc Advanced

Eventarc Advanced は複雑なイベント駆動型アーキテクチャの構築に役立ちます。
イベントは1つのバスに集約され、イベントのフィルタリングや変換などを行った後、複数の宛先に配信することができます。
多くの Pub/Sub トピックや Kafka キュー、その他のサードパーティのメッセージングシステムを管理するケースに適しています。

alt
出典: Eventarc overview

Eventarc Standard

Eventarc Standard はイベントの受信から配信までを簡単に行えるシンプルな機能を備えています。
特定の条件でトリガーを設定し、それに基づいてイベントをルーティングします。
alt
出典: Eventarc overview

Advanced と Standard の違い

Eventarc Advanced と Eventarc Standard では、次のような違いがあります。

機能 Advanced (Preview) Standard
プロジェクト間のイベント配信 ×
デッドレターキュー ×
最大イベントサイズ 1 MB 512 KB
イベントの変換 ×
サービス上限 ・バスはプロジェクトあたりに1個
・パイプラインはリージョンあたりに100個
・トリガーはプロジェクトあたりに500個
イベントソース ・Google プロバイダ
・直接配信 (API 使用)
・Google プロバイダ
・監査ログを介した Google プロバイダ
・サードパーティ プロバイダ (Preview)
宛先 ・Cloud Run functions
・Cloud Run jobs and services
・Eventarc Advanced buses
・Internal HTTP endpoints in VPC networks
・Pub/Sub topics
・Workflows
・Cloud Run functions
・Cloud Run services
・Internal HTTP endpoints in VPC networks (Preview)
・Public endpoints of private and public GKE services (Preview)
・Workflows

(参考: Choose Eventarc Advanced or Eventarc Standard)

Eventarc Advanced について

Eventarc Advanced を使用すると、異なるサービス、アプリ、システム間でのイベントのルーティングを行うことができます。

主なユースケース

Eventarc Advanced のユースケースとして、公式ドキュメントでは次が挙げられています。

  • 大規模なアプリケーション統合
    • 多くのサービスやアプリケーションを接続するケースに最適。
    • 異なるイベント形式やスキーマを介した非同期コミュニケーションを実現できる。
  • AI や分析のためのイベントストリーミング
    • IoT デバイスや AI ワークロードからのイベントデータを扱うケースに最適。
    • 各種イベントデータを収集し、分析パイプラインに配信する前に、フィルタリングや変換などの処理を行える。
  • ハイブリッドおよびマルチクラウド展開
    • ハイブリッドクラウド構成やマルチクラウド構成に最適。
    • イベント駆動型アーキテクチャを Google Cloud 外に拡張できる。
    • Google ソースやイベントの直接配信など、さまざまなソースからのイベントをルーティングできる。

主な構成要素

Eventarc Advanced の主な構成要素として、次の3つがあります。

  1. Bus
  2. Enrollment
  3. Pipeline
    alt
    出典: Eventarc Advanced overview

1. Bus

バスにはイベント プロバイダによって配信されたすべてのイベントが集約されます。集約されるイベントは、大きく分けて2種類あります。

  • Google ソースのイベント
  • API を使用して直接配信されたイベント
    • API を使用することで、ユーザーのアプリケーションなどから直接イベントを配信できます。Eventarc クライアント ライブラリを使用することも可能です。

2. Enrollment

Enrollment では、特定のバスで受信したイベントを識別し、パイプラインにルーティングされるイベントの条件を定義します。条件式は Common Expression Language (CEL) で記述します。

3. Pipeline

パイプラインは、バスから宛先へのイベント配信を仲介する機能で、Enrollment の条件に一致したイベントを宛先に配信します。
また宛先に配信する前に、イベントのフォーマットや中身のデータを変換することも可能です。

宛先

Eventarc Advanced では、次の5種類の宛先がサポートされています(参考: Supported destinations)。

  • Cloud Run
  • Eventarc Advanced bus
  • HTTP endpoint
  • Pub/Sub topic
  • Workflows

イベントデータのフォーマットと変換

フォーマット

Eventarc のイベントデータは、CloudEvents の仕様に従います。
データフォーマットは、次の3つがサポートされています。

  • Avro
  • JSON
  • Protobuf

また、イベントデータの出力フォーマットは、入力フォーマットとは異なるものにすることが可能です (参考: Format received events)。

CEL での変換

Common Expression Language (CEL) で変換式を記述することで、イベントデータを変換することが可能です (参考: Transform received events)。
例えば、次のような変換があります (参考: Transformation examples)。

  • 正規化
  • マスキング
  • 項目の追加・削除
  • データ型変換
  • 場合分け
  • デフォルト値設定
  • 文字列操作
  • エラーハンドリング

料金

Eventarc Advanced の料金体系は次のようになっています。

対象 料金
任意のソースからバスに配信されるメッセージ 100 万メッセージあたり 1.00 ドル
パイプラインに送信されるメッセージ 100 万メッセージあたり 0.50 ドル
パイプラインでの変換 100 万オペレーションあたり 0.40 ドル

(参考: Eventarc pricing)
※上記の料金は一般提供 (GA) された場合に適用されます。プレビューの場合、料金はかかりません。

検証

実際に Eventarc Advanced を使用して、次のようなデータ連携機能を作ってみました。
alt

  1. ローカル PC から Eventarc に向けてイベントデータを直接配信する。
  2. Eventarc 上でイベントデータのフィルタリングと変換を行い、Pub/Sub トピックにデータを配信する。
  3. Pub/Sub の BigQuery サブスクリプションを使って BigQuery にデータを書き込む。

1. イベント配信用スクリプトの用意

Eventarc API を使用することで、Google Cloud の外部からでも、Eventarc のバスにイベントデータを直接配信することができます。
配信するイベントデータは、CloudEvents の仕様に従います。
今回は Python のクライアントライブラリを使用し、各地域の1日の気温のレポートを配信するシナリオを仮定します。

from google.cloud import eventarc_publishing_v1
import json

PROJECT_ID = "project_id"
REGION = "region"
BUS_NAME = "bus_name"

# ※サンプルデータについての説明
#   events はイベントデータの配列で、各要素が CloudEvents の JSON 形式のデータになっている。
#   (参考: https://cloud.google.com/eventarc/docs/cloudevents-json)
#   要素内の type には地名が含まれ、data には日付 (YYYYMMDD) と気温データ (平均、最高、最低) が含まれている。
events = [
    # 東京の気温 (type が dailyreport.jp.tokyo)
    {"id": "0001", "source": "test-source", "specversion": "1.0", "type": "dailyreport.jp.tokyo", "data": {"date": "20241003", "avg": 23, "max": 25, "min": 21}},
    {"id": "0002", "source": "test-source", "specversion": "1.0", "type": "dailyreport.jp.tokyo", "data": {"date": "20241004", "avg": 25, "max": 30, "min": 22}},
    {"id": "0003", "source": "test-source", "specversion": "1.0", "type": "dailyreport.jp.tokyo", "data": {"date": "20241005", "avg": 21, "max": 24, "min": 18}},
    # 大阪の気温 (type が dailyreport.jp.osaka)
    {"id": "0004", "source": "test-source", "specversion": "1.0", "type": "dailyreport.jp.osaka", "data": {"date": "20241003", "avg": 22, "max": 24, "min": 21}},
    {"id": "0005", "source": "test-source", "specversion": "1.0", "type": "dailyreport.jp.osaka", "data": {"date": "20241004", "avg": 22, "max": 24, "min": 20}},
    {"id": "0006", "source": "test-source", "specversion": "1.0", "type": "dailyreport.jp.osaka", "data": {"date": "20241005", "avg": 23, "max": 27, "min": 20}},
    # アメリカ (アイオワ州) の気温 (type が dailyreport.us.iowa)
    {"id": "0007", "source": "test-source", "specversion": "1.0", "type": "dailyreport.us.iowa", "data": {"date": "20241003", "avg": 18, "max": 28, "min": 11}},
    {"id": "0008", "source": "test-source", "specversion": "1.0", "type": "dailyreport.us.iowa", "data": {"date": "20241004", "avg": 19, "max": 23, "min": 13}},
    {"id": "0009", "source": "test-source", "specversion": "1.0", "type": "dailyreport.us.iowa", "data": {"date": "20241005", "avg": 20, "max": 33, "min": 14}},
]

client = eventarc_publishing_v1.PublisherClient()

for event in events:
    # Eventarc のバスにイベントデータ (JSON 形式) を配信
    client.publish(
        request=eventarc_publishing_v1.PublishRequest(
            json_message=json.dumps(event),
            message_bus=f"projects/{PROJECT_ID}/locations/{REGION}/messageBuses/{BUS_NAME}",
        )
    )

2. データ連携先のリソースの作成

次の3つを作成します。

  • Pub/Sub トピック
    • Eventarc からデータを受け取り、Pub/Sub サブスクリプションにデータを渡す。
  • Pub/Sub BigQuery サブスクリプション
    • Pub/Sub トピックからデータを受け取り、BigQuery テーブルに書き込む。
  • BigQuery テーブル
    • 最終的なデータの書き込み先。

2-1. Pub/Sub トピック

Eventarc の宛先となる PubSub トピックを作成します。

gcloud pubsub topics create TOPIC_NAME

2-2. BigQuery テーブル

最終的な書き込み先となる BigQuery テーブルを作成します。
今回は次のような temperature テーブルを作成しました。
alt

# テーブルスキーマファイル作成
echo '[
    {"name": "id", "type": "STRING", "description": "ID"},
    {"name": "area", "type": "STRING", "description": "地名"},
    {"name": "date", "type": "DATE", "description": "日付"},
    {"name": "avg_temperature", "type": "INTEGER", "description": "平均気温"},
    {"name": "max_temperature", "type": "INTEGER", "description": "最高気温"},
    {"name": "min_temperature", "type": "INTEGER", "description": "最低気温"},
    {"name": "day_type", "type": "STRING", "description": "日の区分 (夏日|真夏日|猛暑日|-)"}
]' > schema.json

# BQ テーブル作成
bq mk --table PROJECT_ID:DATASET_ID.TABLE_ID schema.json

2-3. Pub/Sub BigQuery サブスクリプション

PubSub トピックからデータを受け取って BigQuery テーブルに書き込むための、Pub/Sub BigQuery サブスクリプションを作成します。
topicbigquery-table オプションには、先ほど作成した PubSub トピックと BQ テーブルをそれぞれ設定します。
また今回は、書き込み時に BigQuery テーブルのスキーマを使用するように use-table-schema オプションを指定しました。

gcloud pubsub subscriptions create SUBSCRIPTION_ID \
--topic=projects/PROJECT_ID/topics/TOPIC_ID \
--bigquery-table=PROJECT_ID:DATASET_ID.TABLE_ID \
--use-table-schema

(参考: BigQuery サブスクリプションの作成)

3. バスの作成

Eventarc のバスを作成します。バスはプロジェクトあたりに1つまで作成できます。

gcloud beta eventarc message-buses create BUS_NAME --location=LOCATION

※オプション: Google ソースのイベントを収集する場合は、次のコマンドでバスにソースを追加します。

gcloud beta eventarc google-api-sources create GOOGLE_API_SOURCE_NAME \
--location=LOCATION \
--destination-message-bus=BUS_NAME \
--destination-message-bus-project=PROJECT_ID

これでバスの作成が完了しました。
alt

4. パイプラインと Enrollment の作成

続いてパイプラインと Enrollment を作成します (参考: Enroll to receive events)

4-0. 事前準備: ネットワーク アタッチメントの作成

Eventarc パイプラインでは、イベント配信時に、VPC ネットワークでホストされているエンドポイントとの接続を確立するために、ネットワーク アタッチメントを使用します。
そのためパイプラインを作成する前に、ネットワーク アタッチメントを用意しておきます (参考: Create network attachments)。

gcloud compute network-attachments create ATTACHMENT_NAME \
--region=REGION \
--connection-preference=ACCEPT_MANUAL \
--subnets=SUBNET_NAME

4-1. パイプライン名とリージョンの設定

リージョンはバスと同じにする必要があります(今回は us-central1)。
今回は省略しましたが、その他にリトライポリシーや暗号化鍵などを設定することも可能です。
alt

4-2. Enrollment の作成

このパイプラインで使用するバスを選択します。
また CEL を使用して、イベントのフィルタ条件を設定できます。
alt
今回は次のような条件を設定し、sourcetest-source で、typedailyreport.jp. で始まるイベントのみを宛先に連携するようにしました。

// フィルタ条件
message.source == "test-source"
&& message.type.startsWith("dailyreport.jp.")

4-3. Event mediation の設定

イベントデータの入出力の形式や、変換式を設定します。
今回の構成は次のようにしました。

  • Inbound Schema: JSON
  • Event transformation: Apply a transformation
    • 詳細は「4-4. イベントの変換式 (CEL) の作成」で説明
  • Outbound Schema: Same as input
    alt

4-4. イベントの変換式 (CEL) の作成

今回、手順1で用意した Python スクリプトによって配信されるデータの形式は、次のようになります (CloudEvents の JSON 形式) 。

{
    "id": "0001",
    "source": "test-source",
    "specversion": "1.0",
    "type": "dailyreport.jp.tokyo",
    "data": {
        "date": "20241003",
        "avg": 23,
        "max": 25,
        "min": 21
    }
}

この入力データを、後続の Pub/Sub の BigQuery サブスクリプションで BQ テーブルに書き込めるように、次のような形式に変換したいです。

{
    "data": {
        "id": "0001",
        "area": "tokyo",
        "date": "2024-10-03",
        "avg_temperature": 23,
        "max_temperature": 25,
        "min_temperature": 21,
        "day_type": "夏日"
    }
}

この変換を行うため、Eventarc パイプラインに、次のような CEL 式を設定します。

{
    "data":
    {
        "id": message.id,
        // type から地名を抽出
        "area": re.capture(message.type, ".+\\.(\\S+$)"),
        // 日付のフォーマット
        "date": re.extract(message.data.date, "^(\\d{4})(\\d{2})(\\d{2})", "\\1-\\2-\\3"),
        // 項目名を変更
        "avg_temperature": message.data.avg,
        "max_temperature": message.data.max,
        "min_temperature": message.data.min, 
    }.merge(
        // data.max から「夏日」、「真夏日」、「猛暑日」を判定し、day_type という項目を追加
        message.data.max >= 35?
        {
            "day_type": "猛暑日"
        }:
        message.data.max >= 30?
        {
            "day_type": "真夏日"
        }:
        message.data.max >= 25?
        {
            "day_type": "夏日"
        }:
        // default
        {
            "day_type": "-"
        }
    )
}

4-5. 宛先の設定

手順2-1で作成した Pub/Sub トピックを宛先に設定し、ネットワーク アタッチメントも設定します。
さらに認証を有効化し、必要なロールを付与したサービスアカウントを設定します。
今回使用したサービスアカウントには、次のロールを付与しています。

  • Eventarc Event Receiver (roles/eventarc.eventReceiver)
  • Pub/Sub Publisher (roles/pubsub.publisher)
    alt

これで Eventarc パイプラインの作成が完了しました。
alt

5. イベントデータの直接配信

最後に手順1で用意した Python スクリプトを実行し、イベントデータを Eventarc のバスに直接配信します。

python ファイル名.py

6. 結果確認

書き込み先の BigQuery テーブルを確認したところ、想定通りにデータが書き込まれました。
また、次の点も確認できました。

  • 手順4-2で設定したフィルタ条件により、東京と大阪のデータ (イベントタイプが dailyreport.jp で始まるデータ) のみが連携されている。
  • 手順4-4で設定した CEL の変換式により、データが変換されている (地名の抽出、日付のフォーマット、day_type の判定など)。

alt

まとめ

この記事では2024年10月30日にプレビュー版がリリースされた Google Cloud の Eventarc Advanced を紹介しました。
Eventarc Advanced は、複雑なイベント駆動型アーキテクチャの構築に役立つフルマネージドサービスで、さまざまなサービス、アプリ、システム間でのイベントの受信、フィルタリング、変換、ルーティング、配信を可能にします。

また今回は検証として、外部のイベントデータを Eventarc Advanced → Pub/Sub → BigQuery のように連携する機能を作成しました。
Eventarc Advanced のリリース前は、イベントデータの変換には Cloud Run などの他のサービスを使用する必要がありましたが、Eventarc Advanced では簡単な変換が可能になったため、システムの簡略化や、開発の手間の軽減に役立つことが期待されます。

Discussion