💎

FirestoreをトリガーとしてCloudRunを実行する方法(FastAPI)

2025/01/18に公開

経緯

FirebaseのFirestoreにdocumentを追加、更新、削除することをトリガーとしてEventArc経由でCloudRun上にデプロイした関数を実行することを行いました。FastAPIで実装している記事がWeb上で見当たらず苦労したのでその備忘録も兼ねて記録しています。

FastAPIでの実装方法

firestoreからのリクエストボディは単なるJson形式ではなく、Cloudeventという型だったので苦労しました。

リクエストボディをfastAPI.Request型で受け取り、これをCloudeventに変換してからfirestore上で編集されたDocumentのIDを取得するといった方法を採用しました。

ソースコードは以下の通りです。

from fastapi import FastAPI, Request
from cloudevents.http import from_http
import google.cloud.logging
import logging

# ログ用
client = google.cloud.logging.Client()
client.setup_logging()

app = FastAPI()

@app.post("/doProcess")
async def process(request:Request):
    try:
        body = await request.body()
        event = from_http(request.headers, body)
        document = event.get("document")
        inputId = document.split("/")[-1]
        logging.info(f"✅inputID : {inputId}")
        #処理を行う

    except Exception as e:
        logging.error(f"Error processing Firestore event: {e}")
        return {"status": "error", "message": str(e)}    

(追記2025年1月27日)
エンコードされたデータの処理方法が分かりました。
以下参考文献です。
https://stackoverflow.com/questions/79144768/how-to-decode-a-protobuf-firestore-event-payload

from google.events.cloud import firestore
from google.protobuf.json_format import MessageToDict

encoded_data = event.data
firestore_payload = firestore.DocumentEventData()
firestore_payload._pb.ParseFromString(encoded_data)
old_value_dict = MessageToDict(firestore_payload.old_value._pb) if firestore_payload.old_value else {}
new_value_dict = MessageToDict(firestore_payload.value._pb) if firestore_payload.value else {}
print(new_value_dict)
print(old_value_dict)

これを追記することでfirestoreに対して行った更新前/更新後のデータを取得できます。
これらのデータは以下の辞書形式になっています。

{
  "name": string,
  "fields": {
    string: {
      object (Value)
    },
    ...
  },
  "createTime": string,
  "updateTime": string
}
  • name: データの所在
    • (ex)'projects/{project名}/databases/(default)/documents/{コレクション名}/{uid}'
  • fields:firestoreのデータ
    • (ex) userIdという名前でString型のデータを保存している場合
    • 'userId' : {'stringValue': 'kajdkjfen3knd'}
    • (ex) createdAtという名前でfirestore.SERVER_TIMESTAMP型のデータを保存している場合
    • 'createdAt': {'timestampValue': '2025-01-26T14:26:03.917975Z'}

例)

{
  "name": 'projects/{project名}/databases/(default)/documents/{コレクション名}/{uid}'
  "fields": {
    'userId' : {'stringValue': 'kajdkjfen3knd'},
    'createdAt': {'timestampValue': '2025-01-26T14:26:03.917975Z'},
    ...

  },
  'createTime': '2025-01-26T14:26:04.953103Z',
  'updateTime': '2025-01-26T14:26:04.953103Z'
}

FastAPIのサーバーをCloud Runにデプロイする方法

こちらについては以下の記事が参考になります。
https://zenn.dev/tockey/articles/1d4328b3b477a1

Firestoreのイベントにトリガーを設定

https://cloud.google.com/eventarc/docs/run/route-trigger-cloud-firestore?hl=ja
これに従えば問題なく実装できるはずです。
尚ここで定めたトリガー名は後でデッドレタートピックの作成で使うので注意してください。

デッドレタートピックの作成

デフォルトではEventArc非同期連携で以下の問題があります。

今の非同期連携では以下の 2 つの問題があります。

  • 各非同期処理が 10 秒以内に終わらないと、エラー扱いになりリトライしてしまう
  • リトライ回数に制限がなく、アプリケーションのバグなどで処理が失敗するとリトライされ続けてしまう=リソースコストが上がり続けてしまう

参考: https://github.com/google-cloud-japan/next-tokyo-assets/tree/main

そのため、gemini処理など時間がかかる場合は複数回トリガーが実行されてしまうというエラーがありました。その解決策として、以下のデッドレタートピックを作成しています。

  • 各非同期処理の処理待ち時間を 300 秒 (5 分) に修正
  • 最小リトライの間隔を 300 秒 (5 分) に修正
  • 合計 5 回非同期の処理に失敗したら、リトライをやめる (デッドレタートピックに入れる)

参考: https://github.com/google-cloud-japan/next-tokyo-assets/tree/main

作成手順は以下の通りです。ターミナルで以下を実行

gcloud pubsub topics create [デッドレタートピック名]
gcloud pubsub subscriptions describe [サブスクリプション名]

以下をコピペして保存

deadLetterTopic.sh
#!/bin/bash

EVENTARC_NAME=$1

DEAD_LETTER_TOPIC=[デッドレタートピック名] 

PROJECT_NUMBER=[GCPプロジェクト番号]

SUBSCRIPTION=[サブスクリプション名]

gcloud pubsub subscriptions add-iam-policy-binding $SUBSCRIPTION \
  --member="serviceAccount:service-$PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
  --role="roles/pubsub.subscriber"

gcloud pubsub subscriptions update $SUBSCRIPTION \
  --ack-deadline 300 \
  --dead-letter-topic $DEAD_LETTER_TOPIC \
  --min-retry-delay 300 \
  --max-retry-delay 600 \
  --max-delivery-attempts 5

これをターミナル上で以下のように実行

deadLetterTopic.sh [トリガー名]

これで完了です🍺🍺

Discussion