FirestoreをトリガーとしてCloudRunを実行する方法(FastAPI)
経緯
FirebaseのFirestoreにdocumentを追加、更新、削除することをトリガーとしてEventArc経由でCloudRun上にデプロイした関数を実行することを行いました。FastAPIで実装している記事がWeb上で見当たらず苦労したのでその備忘録も兼ねて記録しています。
FastAPIでの実装方法
firestoreからのリクエストボディは単なるJson形式ではなく、Cloudeventという型だったので苦労しました。
リクエストボディをfastAPI.Request型で受け取り、これをCloudeventに変換してからfirestore上で編集されたDocumentのIDを取得するといった方法を採用しました。
ソースコードは以下の通りです。
from fastapi import FastAPI, Request
from cloudevents.http import from_http
import google.cloud.logging
import logging
# ログ用
client = google.cloud.logging.Client()
client.setup_logging()
app = FastAPI()
@app.post("/doProcess")
async def process(request:Request):
try:
body = await request.body()
event = from_http(request.headers, body)
document = event.get("document")
inputId = document.split("/")[-1]
logging.info(f"✅inputID : {inputId}")
#処理を行う
except Exception as e:
logging.error(f"Error processing Firestore event: {e}")
return {"status": "error", "message": str(e)}
(追記2025年1月27日)
エンコードされたデータの処理方法が分かりました。
以下参考文献です。
from google.events.cloud import firestore
from google.protobuf.json_format import MessageToDict
encoded_data = event.data
firestore_payload = firestore.DocumentEventData()
firestore_payload._pb.ParseFromString(encoded_data)
old_value_dict = MessageToDict(firestore_payload.old_value._pb) if firestore_payload.old_value else {}
new_value_dict = MessageToDict(firestore_payload.value._pb) if firestore_payload.value else {}
print(new_value_dict)
print(old_value_dict)
これを追記することでfirestoreに対して行った更新前/更新後のデータを取得できます。
これらのデータは以下の辞書形式になっています。
{
"name": string,
"fields": {
string: {
object (Value)
},
...
},
"createTime": string,
"updateTime": string
}
- name: データの所在
- (ex)'projects/{project名}/databases/(default)/documents/{コレクション名}/{uid}'
- fields:firestoreのデータ
- (ex) userIdという名前でString型のデータを保存している場合
- 'userId' : {'stringValue': 'kajdkjfen3knd'}
- (ex) createdAtという名前でfirestore.SERVER_TIMESTAMP型のデータを保存している場合
- 'createdAt': {'timestampValue': '2025-01-26T14:26:03.917975Z'}
例)
{
"name": 'projects/{project名}/databases/(default)/documents/{コレクション名}/{uid}'
"fields": {
'userId' : {'stringValue': 'kajdkjfen3knd'},
'createdAt': {'timestampValue': '2025-01-26T14:26:03.917975Z'},
...
},
'createTime': '2025-01-26T14:26:04.953103Z',
'updateTime': '2025-01-26T14:26:04.953103Z'
}
FastAPIのサーバーをCloud Runにデプロイする方法
こちらについては以下の記事が参考になります。
Firestoreのイベントにトリガーを設定
尚ここで定めたトリガー名は後でデッドレタートピックの作成で使うので注意してください。
デッドレタートピックの作成
デフォルトではEventArc非同期連携で以下の問題があります。
今の非同期連携では以下の 2 つの問題があります。
- 各非同期処理が 10 秒以内に終わらないと、エラー扱いになりリトライしてしまう
- リトライ回数に制限がなく、アプリケーションのバグなどで処理が失敗するとリトライされ続けてしまう=リソースコストが上がり続けてしまう
参考: https://github.com/google-cloud-japan/next-tokyo-assets/tree/main
そのため、gemini処理など時間がかかる場合は複数回トリガーが実行されてしまうというエラーがありました。その解決策として、以下のデッドレタートピックを作成しています。
- 各非同期処理の処理待ち時間を 300 秒 (5 分) に修正
- 最小リトライの間隔を 300 秒 (5 分) に修正
- 合計 5 回非同期の処理に失敗したら、リトライをやめる (デッドレタートピックに入れる)
参考: https://github.com/google-cloud-japan/next-tokyo-assets/tree/main
作成手順は以下の通りです。ターミナルで以下を実行
gcloud pubsub topics create [デッドレタートピック名]
gcloud pubsub subscriptions describe [サブスクリプション名]
以下をコピペして保存
#!/bin/bash
EVENTARC_NAME=$1
DEAD_LETTER_TOPIC=[デッドレタートピック名]
PROJECT_NUMBER=[GCPプロジェクト番号]
SUBSCRIPTION=[サブスクリプション名]
gcloud pubsub subscriptions add-iam-policy-binding $SUBSCRIPTION \
--member="serviceAccount:service-$PROJECT_NUMBER@gcp-sa-pubsub.iam.gserviceaccount.com" \
--role="roles/pubsub.subscriber"
gcloud pubsub subscriptions update $SUBSCRIPTION \
--ack-deadline 300 \
--dead-letter-topic $DEAD_LETTER_TOPIC \
--min-retry-delay 300 \
--max-retry-delay 600 \
--max-delivery-attempts 5
これをターミナル上で以下のように実行
deadLetterTopic.sh [トリガー名]
これで完了です🍺🍺
Discussion