🌞
Lambda×FastAPI環境でMQからのメッセージを受信し、処理する方法
概要
以下の図のようにAPI GatewayのトリガーでLambdaが起動し、FastAPIを利用したアプリケーションが実行されるシステムがあります。この既存の構成にAmazon SQSをトリガーとして追加し、送信されたメッセージを処理する方法をまとめました。
※Flaskでも大丈夫なはずです。
現状の構成 | 実現したい構成 |
---|---|
現状
環境
- FastAPI(バージョン0.78.0)を利用
- API Gateway×Lambda環境でFastAPIを動かすために、Mangum(バージョン0.15.0)を利用
- Dockerfileを利用し、ECRにDockerイメージをpushしている
- LambdaへのデプロイにはECRにpushされたDockerイメージを指定している
アプリケーションの実装
app
├── Dockerfile.aws.lambda # ECRにプッシュするLambda用のDockerfile
├── __init__.py
├── ...
└── start_app.py # FastAPI、Mangumを定義
start_app.py
from fastapi import FastAPI
from mangum import Mangum
# from example.routers import router
app = FastAPI(title="App")
# routerを定義
# app.include_router(router)
app.include_router(...)
app.include_router(...)
# API Gateway×LambdaでFastAPIを起動させるためにMangumクラスを生成
# 内部でLambdaイベントの情報をもとに該当routerで処理を呼び出している
handler = Mangum(app, lifespan="off")
Dockerfile.aws.lambda
FROM public.ecr.aws/lambda/python:3.9
COPY . .
RUN pip install --upgrade pip && \
pip install -r requirements.txt
CMD ["start_app.handler"]
上記の実装では、API GatewayをトリガーとしてLambdaイベントが発生して、Mangum内部でリクエストされたURIに該当するFastAPIのrouterモジュールの関数を呼び出しています。
しかし、Amazon SQSをLambdaのトリガーに追加した場合、MangumはSQSトリガーには対応していないので上記の実装ではエラーが発生します。
方法 : アプリケーション内部でLambdaのイベントに応じて条件分岐させる
Amazon SQSに対応するためには、次のようにstart_app.py
を変更するだけです。
start_app.py
from fastapi import FastAPI
from mangum import Mangum
# from example.routers import router
app = FastAPI(title="App")
# routerを定義
# app.include_router(router)
app.include_router(...)
app.include_router(...)
+def handler(event, context):
+ # Amazon SQSトリガーの場合
+ if 'Records' in event:
+ message_consumer = SQSMessageConsumer() # SQSからのメッセージを購読するためのクラス
+ for record in event['Records']:
+ message_consumer.dispatch_message(record['body'])
+ return
+
+ # API Gatewayトリガーの場合
+ asgi_handler = Mangum(app, lifespan="off")
+ return asgi_handler(event, context)
-# API Gateway×LambdaでFastAPIを起動させるためにMangumクラスを生成
-# 内部でLambdaイベントの情報をもとに該当routerで処理を呼び出している
-handler = Mangum(app, lifespan="off")
いくつか方法を検討した結果、Adapter - Mangumに記載のあるようにhandler関数を用意して、関数内部で処理を分岐させるのがシンプルかと思いました。
その他検討した方法
API Gateway用のLambda関数とSQS用のLambda関数を別々にする
そもそも別々のトリガー用のLambda関数を作る方法です。
採用しなかった理由
- 2つのLambda関数を作ることによる以下の点がビミョー
- 既存の
start_app.py
以外にSQSトリガー用のstart_lambda.py
なるものを用意する - 2つのモジュールに対応するため2つのDockerfileを用意する、もしくはLambda環境でCMDをオーバーライドする必要がある
- 既存の
- デプロイするのに時間がかかる
- 管理が面倒になる
MangumのCustom HandlerにSQS用のHandlerを追加し、FastAPIで処理させるようにする
MangumにSQSトリガー用のCustom Handerを指定し、SQS経由で送信されたメッセージをMangumからFastAPIに転送し、処理する方法
start_app.py
...
class SQSCustomHander:
def infer(cls, event, context) -> bool:
return 'Records' in event
# その他メソッド...
handler = Mangum(app, lifespan="off", custom_handers=[SQSCustomHander])
採用しなかった理由
- 途中まで調べて断念
- ドキュメントを見たところ、https://mangum.io/adapter/ にも書いてあるように関数内で分岐させるほうがシンプル&わかりやすい
Discussion