株式会社HRBrain
🧚

GCS × Eventarc を活用したイベントドリブンアーキテクチャ

に公開

はじめに

こんにちは、HRBrainバックエンドエンジニアをしているなかじです!

今回は、Cloud Storage + Eventarc + Cloud Run を使ったイベントドリブンなアーキテクチャをプロダクトで実装したので、その構成やうまくいかなかったポイントを記事にします!

イベント ドリブン アーキテクチャ

イベント ドリブン アーキテクチャは、マイクロサービスが状態変化(イベント)に対応するためのソフトウェア設計パターンです。イベントは、状態(アイテムの価格や配送先住所など)を伝える場合もあり、識別子(注文の受信または配送の通知など)となる場合もあります。イベントは、共通の目標を達成するために連携するマイクロサービスをトリガーしますが、イベント形式を除き互いに認識する必要はありません。各マイクロサービスは連携して動作していますが、異なるビジネス ロジックを適用して、独自の出力イベントを発生させることができます。

https://cloud.google.com/eventarc/docs/event-driven-architectures?hl=ja

以下の記事もうまく説明してくださっています

https://qiita.com/Suzuki_Cecil/items/a51d353c73e9277f46d8

自分が感じた選定するメリット

  • 疎結合な設計ができる

サービス間はイベントを介して通信するため、互いの実装に依存しない、柔軟で変更に強い構成を実現できる

  • 非同期処理

後続の重い処理をバックグラウンドで非同期実行できる

  • コストがほぼかからない

Eventarcのトリガーは月に 50,000 件のイベントまで無料枠があるが、Google ソースからのイベントであれば、100 万件イベントが無料である

https://cloud.google.com/eventarc/pricing?hl=ja#eventarc-pricing

よくあるパターン

イベントソース イベントルーター コンシューマー 処理例
Cloud Storage Eventarc Cloud Run 動画エンコード
Pub/Sub Eventarc Cloud Run バッチ処理
Cloud Audit Logs Eventarc Cloud Functions VM作成通知

Eventarc Advanced と Eventarc Standard

どちらのエディションも、スケーラブルでサーバーレスのフルマネージド イベント ソリューションを提供します。このソリューションでは、状態変化(イベント)によってトリガーされ、状態変化に応答する疎結合サービスを使用して、ソースからターゲットへのメッセージを非同期で転送できます。どちらのエディションも、 Google Cloud サービス、カスタム アプリケーション、SaaS アプリケーション、サードパーティ サービスなど、さまざまなイベント プロバイダと宛先をサポートし、配信、セキュリティ、認可、オブザーバビリティ、エラー処理を管理します。

なお、両方のエディションの Eventarc の基盤となるデータモデルは同じです。ユースケースの複雑さが増すにつれて、Eventarc Standard から Eventarc Advanced へのシームレスな移行が可能になります。

https://cloud.google.com/eventarc/advanced/docs/choose-product-edition?utm_source=chatgpt.com&hl=ja

今回の記事で紹介するもの

Eventarc を使って Cloud Storage バケットへのファイルアップロードをトリガーに、Cloud Run サービスを呼び出す構成です。

全体構成

  • Cloud Storage:アップロード先バケット
  • Eventarc:イベントルーター
  • Cloud Run:イベント発火先
  • Pub/Sub:EventarcとCloud Runの中継(暗黙的に利用)

システム構成(全体図)

[ユーザーがファイルをアップロード]
  ↓
[GCS バケット]
  ↓(finalizeイベント)
[Eventarc]
  ↓
[Cloud Run]
  ↓(HTTPリクエスト)
[何かしらのジョブ]

Terraformのコード

provider "google" {
  project = var.project_id
  region  = var.region
}

# Cloud Run サービス(先に deploy されている前提)
data "google_cloud_run_service" "cloudrun_service" {
  name     = var.cloud_run_service_name
  location = var.region
}

GCSとEventarc トリガーの設定

# バケット
resource "google_storage_bucket" "upload_bucket" {
  name                        = "${var.project_id}-upload-bucket"
  location                    = var.region
  storage_class               = "STANDARD"
  uniform_bucket_level_access = true
  versioning {
    enabled = true
  }
}

# Eventarc トリガー
resource "google_eventarc_trigger" "storage_trigger" {
 project  = module.project.project_id
  name     = "storage-upload-trigger"
  location = var.region

  matching_criteria {
    attribute = "type"
    value     = "google.cloud.storage.object.v1.finalized"
  }

  matching_criteria {
    attribute = "bucket"
    # bucket nameを設定
    value     = google_storage_bucket.upload_bucket.name
  }
 
  destination {
    cloud_run_service {
      service = data.google_cloud_run_service.cloudrun_service.name
      region  = var.region
    }
  }

  service_account = var.eventarc_sa_email
}

IAM権限

# run.invoker 権限
resource "google_project_iam_member" "eventarc_sa_run_invoker" {
  project = var.project_id
  role    = "roles/run.invoker"
  member  = "serviceAccount:${var.eventarc_sa_email}"
}

# Eventarc の権限
resource "google_project_iam_member" "eventarc_event_receiver" {
  project = var.project_id
  role    = "roles/eventarc.eventReceiver"
  member  = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-eventarc.iam.gserviceaccount.com"
}

Cloud Runのコード

Goでの実装(Cloud Run HTTPハンドラ

package main

import (
	"log"
	"net/http"
	"os"

	cloudEventSdk "github.com/cloudevents/sdk-go/v2"
	"github.com/googleapis/google-cloudevents-go/cloud/storagedata"
	"google.golang.org/protobuf/encoding/protojson"
)

//  Eventarc トリガーが Cloud Run を呼び出し、GCS のイベントを検知して動作する

func main() {
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}

	http.HandleFunc("/", eventHandler)
	log.Fatal(http.ListenAndServe(":"+port, nil))
}

func eventHandler(w http.ResponseWriter, r *http.Request) {
	ctx := r.Context()

	defer func() {
		if rec := recover(); rec != nil {
			log.Printf("recovered from panic: %v", rec)
		}
		w.WriteHeader(http.StatusOK)
	}()

	event, err := cloudEventSdk.NewEventFromHTTPRequest(r)
	if err != nil {
		log.Printf("failed to create CloudEvent from request: %v", err)
		return
	}

	if event.Type() != "google.cloud.storage.object.v1.finalized" {
		log.Printf("ignored event type: %s", event.Type())
		return
	}

	var storageObjectData storagedata.StorageObjectData
	if err := protojson.Unmarshal(event.Data(), &storageObjectData); err != nil {
		log.Printf("failed to unmarshal StorageObjectData: %v", err)
		return
	}

	log.Printf("received file: gs://%s/%s", storageObjectData.Bucket, storageObjectData.Name)

	// TODO: 処理を書く
}

実装が綺麗にいかなかったポイント

問題

  • データ削除の時を考慮して、GCS(Cloud Storage)の バケットを1つのみ使用している。
  • 画像ファイルやPDFファイル、動画ファイルなど、異なるファイル種別に応じて処理を分けたい。
  • Eventarc の google.cloud.storage.object.v1.finalized イベントを使っている。

うまくいかなかったポイント

  • GCS の finalized イベントは「バケット単位」でしか設定できない

    → イベントの発火条件は「このバケットに何かファイルがアップロードされたかどうか」だけ。

  • つまり、ディレクトリ(プレフィックス)やファイル種別(.jpg, .pdfなど)で Eventarc のルーティングを分けることはできない。

  • Eventarc の matching_criteria"bucket" までは指定できるが、"name"(ファイル名)や "prefix" などはサポートされていない。

解決した方法

  • Eventarc のルーティングを諦め、正規表現で、バケットパスから、"name"(ファイル名)や "prefix" を抽出した
package main

import (
	"fmt"
	"log"
	"net/http"
	"os"
	"regexp"

	cloudEventSdk "github.com/cloudevents/sdk-go/v2"
	"github.com/googleapis/google-cloudevents-go/cloud/storagedata"
	"google.golang.org/protobuf/encoding/protojson"
)

func main() {
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
	}

	# 本当はここでルーティングしたい!!!!!
	http.HandleFunc("/", eventHandler)
	log.Printf("Listening on port %s...", port)
	log.Fatal(http.ListenAndServe(":"+port, nil))
}

func eventHandler(w http.ResponseWriter, r *http.Request) {
	ctx := r.Context()

	// Eventarcの再試行防止
	defer func() {
		if rec := recover(); rec != nil {
			log.Printf("recovered from panic: %v", rec)
		}
		w.WriteHeader(http.StatusOK)
	}()

	event, err := cloudEventSdk.NewEventFromHTTPRequest(r)
	if err != nil {
		log.Printf("failed to create CloudEvent from request: %v", err)
		return
	}

	if event.Type() != "google.cloud.storage.object.v1.finalized" {
		log.Printf("ignored event type: %s", event.Type())
		return
	}

	var storageObjectData storagedata.StorageObjectData
	if err := protojson.Unmarshal(event.Data(), &storageObjectData); err != nil {
		log.Printf("failed to unmarshal StorageObjectData: %v", err)
		log.Printf("raw event data: %s", string(event.Data()))
		return
	}

	log.Printf("GCS event: bucket=%s, name=%s", storageObjectData.Bucket, storageObjectData.Name)

	handleSimpleFile(storageObjectData.Name)
}

// ファイル名から処理を振り分け
func handleSimpleFile(objectName string) {
	fileType, fileName, err := parseSimplePath(objectName)
	if err != nil {
		log.Printf("invalid object path: %v", err)
		return
	}

	switch fileType {
	case "img":
		// TODO: 画像処理を実装
	case "pdf":
		// TODO: PDF処理を実装
	default:
		log.Printf("Unsupported file type: %s", fileType)
	}
}

var simplePathRegexp = regexp.MustCompile(`^(img|pdf)/([^/]+)$`)

// 例: バケット内ファイルパス構造
// 画像:gs://your-bucket/img/sample.jpg
// PDF:gs://your-bucket/pdf/sample.pdf
func parseSimplePath(objectName string) (fileType, fileName string, err error) {
	matches := simplePathRegexp.FindStringSubmatch(objectName)
	if len(matches) != 3 {
		return "", "", fmt.Errorf("object path does not match pattern: %s", objectName)
	}
	return matches[1], matches[2], nil
}

まとめ

  • Eventarc × Cloud Run を使えば、シンプルにイベントドリブンな構成が作れる
  • GCS finalized イベントは バケット単位でしかトリガーできない ため、Cloud Run 側でパスを解釈してルーティングをした
  • 他にいい方法があれば、ぜひ教えてください!🙇

参考

Eventarc を使用して Cloud Storage からイベントを受信する

https://cloud.google.com/run/docs/tutorials/eventarc?hl=ja

Eventarc でトリガーを作成する

https://cloud.google.com/run/docs/triggering/trigger-with-events?hl=ja

https://cloud.google.com/eventarc/docs/creating-triggers-terraform?utm_source=chatgpt.com&hl=ja

google_storage_bucket

https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/storage_bucket.html?utm_source=chatgpt.com

google_eventarc_trigger

https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/eventarc_trigger?utm_source=chatgpt.com

株式会社HRBrain
株式会社HRBrain

Discussion