GCS × Eventarc を活用したイベントドリブンアーキテクチャ
はじめに
こんにちは、HRBrainバックエンドエンジニアをしているなかじです!
今回は、Cloud Storage + Eventarc + Cloud Run を使ったイベントドリブンなアーキテクチャをプロダクトで実装したので、その構成やうまくいかなかったポイントを記事にします!
イベント ドリブン アーキテクチャ
イベント ドリブン アーキテクチャは、マイクロサービスが状態変化(イベント)に対応するためのソフトウェア設計パターンです。イベントは、状態(アイテムの価格や配送先住所など)を伝える場合もあり、識別子(注文の受信または配送の通知など)となる場合もあります。イベントは、共通の目標を達成するために連携するマイクロサービスをトリガーしますが、イベント形式を除き互いに認識する必要はありません。各マイクロサービスは連携して動作していますが、異なるビジネス ロジックを適用して、独自の出力イベントを発生させることができます。
以下の記事もうまく説明してくださっています
自分が感じた選定するメリット
- 疎結合な設計ができる
サービス間はイベントを介して通信するため、互いの実装に依存しない、柔軟で変更に強い構成を実現できる
- 非同期処理
後続の重い処理をバックグラウンドで非同期実行できる
- コストがほぼかからない
Eventarcのトリガーは月に 50,000 件のイベントまで無料枠があるが、Google ソースからのイベントであれば、100 万件イベントが無料である
よくあるパターン
イベントソース | イベントルーター | コンシューマー | 処理例 |
---|---|---|---|
Cloud Storage | Eventarc | Cloud Run | 動画エンコード |
Pub/Sub | Eventarc | Cloud Run | バッチ処理 |
Cloud Audit Logs | Eventarc | Cloud Functions | VM作成通知 |
Eventarc Advanced と Eventarc Standard
どちらのエディションも、スケーラブルでサーバーレスのフルマネージド イベント ソリューションを提供します。このソリューションでは、状態変化(イベント)によってトリガーされ、状態変化に応答する疎結合サービスを使用して、ソースからターゲットへのメッセージを非同期で転送できます。どちらのエディションも、 Google Cloud サービス、カスタム アプリケーション、SaaS アプリケーション、サードパーティ サービスなど、さまざまなイベント プロバイダと宛先をサポートし、配信、セキュリティ、認可、オブザーバビリティ、エラー処理を管理します。
なお、両方のエディションの Eventarc の基盤となるデータモデルは同じです。ユースケースの複雑さが増すにつれて、Eventarc Standard から Eventarc Advanced へのシームレスな移行が可能になります。
今回の記事で紹介するもの
Eventarc を使って Cloud Storage バケットへのファイルアップロードをトリガーに、Cloud Run サービスを呼び出す構成です。
全体構成
- Cloud Storage:アップロード先バケット
- Eventarc:イベントルーター
- Cloud Run:イベント発火先
- Pub/Sub:EventarcとCloud Runの中継(暗黙的に利用)
システム構成(全体図)
[ユーザーがファイルをアップロード]
↓
[GCS バケット]
↓(finalizeイベント)
[Eventarc]
↓
[Cloud Run]
↓(HTTPリクエスト)
[何かしらのジョブ]
Terraformのコード
provider "google" {
project = var.project_id
region = var.region
}
# Cloud Run サービス(先に deploy されている前提)
data "google_cloud_run_service" "cloudrun_service" {
name = var.cloud_run_service_name
location = var.region
}
GCSとEventarc トリガーの設定
# バケット
resource "google_storage_bucket" "upload_bucket" {
name = "${var.project_id}-upload-bucket"
location = var.region
storage_class = "STANDARD"
uniform_bucket_level_access = true
versioning {
enabled = true
}
}
# Eventarc トリガー
resource "google_eventarc_trigger" "storage_trigger" {
project = module.project.project_id
name = "storage-upload-trigger"
location = var.region
matching_criteria {
attribute = "type"
value = "google.cloud.storage.object.v1.finalized"
}
matching_criteria {
attribute = "bucket"
# bucket nameを設定
value = google_storage_bucket.upload_bucket.name
}
destination {
cloud_run_service {
service = data.google_cloud_run_service.cloudrun_service.name
region = var.region
}
}
service_account = var.eventarc_sa_email
}
IAM権限
# run.invoker 権限
resource "google_project_iam_member" "eventarc_sa_run_invoker" {
project = var.project_id
role = "roles/run.invoker"
member = "serviceAccount:${var.eventarc_sa_email}"
}
# Eventarc の権限
resource "google_project_iam_member" "eventarc_event_receiver" {
project = var.project_id
role = "roles/eventarc.eventReceiver"
member = "serviceAccount:service-${data.google_project.project.number}@gcp-sa-eventarc.iam.gserviceaccount.com"
}
Cloud Runのコード
Goでの実装(Cloud Run HTTPハンドラ
package main
import (
"log"
"net/http"
"os"
cloudEventSdk "github.com/cloudevents/sdk-go/v2"
"github.com/googleapis/google-cloudevents-go/cloud/storagedata"
"google.golang.org/protobuf/encoding/protojson"
)
// Eventarc トリガーが Cloud Run を呼び出し、GCS のイベントを検知して動作する
func main() {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
http.HandleFunc("/", eventHandler)
log.Fatal(http.ListenAndServe(":"+port, nil))
}
func eventHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
defer func() {
if rec := recover(); rec != nil {
log.Printf("recovered from panic: %v", rec)
}
w.WriteHeader(http.StatusOK)
}()
event, err := cloudEventSdk.NewEventFromHTTPRequest(r)
if err != nil {
log.Printf("failed to create CloudEvent from request: %v", err)
return
}
if event.Type() != "google.cloud.storage.object.v1.finalized" {
log.Printf("ignored event type: %s", event.Type())
return
}
var storageObjectData storagedata.StorageObjectData
if err := protojson.Unmarshal(event.Data(), &storageObjectData); err != nil {
log.Printf("failed to unmarshal StorageObjectData: %v", err)
return
}
log.Printf("received file: gs://%s/%s", storageObjectData.Bucket, storageObjectData.Name)
// TODO: 処理を書く
}
実装が綺麗にいかなかったポイント
問題
- データ削除の時を考慮して、GCS(Cloud Storage)の バケットを1つのみ使用している。
- 画像ファイルやPDFファイル、動画ファイルなど、異なるファイル種別に応じて処理を分けたい。
- Eventarc の
google.cloud.storage.object.v1.finalized
イベントを使っている。
うまくいかなかったポイント
-
GCS の finalized イベントは「バケット単位」でしか設定できない
→ イベントの発火条件は「このバケットに何かファイルがアップロードされたかどうか」だけ。
-
つまり、ディレクトリ(プレフィックス)やファイル種別(.jpg, .pdfなど)で Eventarc のルーティングを分けることはできない。
-
Eventarc の
matching_criteria
は"bucket"
までは指定できるが、"name"
(ファイル名)や"prefix"
などはサポートされていない。
解決した方法
- Eventarc のルーティングを諦め、正規表現で、バケットパスから、
"name"
(ファイル名)や"prefix"
を抽出した
package main
import (
"fmt"
"log"
"net/http"
"os"
"regexp"
cloudEventSdk "github.com/cloudevents/sdk-go/v2"
"github.com/googleapis/google-cloudevents-go/cloud/storagedata"
"google.golang.org/protobuf/encoding/protojson"
)
func main() {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
# 本当はここでルーティングしたい!!!!!
http.HandleFunc("/", eventHandler)
log.Printf("Listening on port %s...", port)
log.Fatal(http.ListenAndServe(":"+port, nil))
}
func eventHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Eventarcの再試行防止
defer func() {
if rec := recover(); rec != nil {
log.Printf("recovered from panic: %v", rec)
}
w.WriteHeader(http.StatusOK)
}()
event, err := cloudEventSdk.NewEventFromHTTPRequest(r)
if err != nil {
log.Printf("failed to create CloudEvent from request: %v", err)
return
}
if event.Type() != "google.cloud.storage.object.v1.finalized" {
log.Printf("ignored event type: %s", event.Type())
return
}
var storageObjectData storagedata.StorageObjectData
if err := protojson.Unmarshal(event.Data(), &storageObjectData); err != nil {
log.Printf("failed to unmarshal StorageObjectData: %v", err)
log.Printf("raw event data: %s", string(event.Data()))
return
}
log.Printf("GCS event: bucket=%s, name=%s", storageObjectData.Bucket, storageObjectData.Name)
handleSimpleFile(storageObjectData.Name)
}
// ファイル名から処理を振り分け
func handleSimpleFile(objectName string) {
fileType, fileName, err := parseSimplePath(objectName)
if err != nil {
log.Printf("invalid object path: %v", err)
return
}
switch fileType {
case "img":
// TODO: 画像処理を実装
case "pdf":
// TODO: PDF処理を実装
default:
log.Printf("Unsupported file type: %s", fileType)
}
}
var simplePathRegexp = regexp.MustCompile(`^(img|pdf)/([^/]+)$`)
// 例: バケット内ファイルパス構造
// 画像:gs://your-bucket/img/sample.jpg
// PDF:gs://your-bucket/pdf/sample.pdf
func parseSimplePath(objectName string) (fileType, fileName string, err error) {
matches := simplePathRegexp.FindStringSubmatch(objectName)
if len(matches) != 3 {
return "", "", fmt.Errorf("object path does not match pattern: %s", objectName)
}
return matches[1], matches[2], nil
}
まとめ
- Eventarc × Cloud Run を使えば、シンプルにイベントドリブンな構成が作れる
- GCS finalized イベントは バケット単位でしかトリガーできない ため、Cloud Run 側でパスを解釈してルーティングをした
- 他にいい方法があれば、ぜひ教えてください!🙇
参考
Eventarc を使用して Cloud Storage からイベントを受信する
Eventarc でトリガーを作成する
google_storage_bucket
google_eventarc_trigger
Discussion