🌞

Lambda×FastAPI環境でMQからのメッセージを受信し、処理する方法

2022/08/20に公開

概要

以下の図のようにAPI GatewayのトリガーでLambdaが起動し、FastAPIを利用したアプリケーションが実行されるシステムがあります。この既存の構成にAmazon SQSをトリガーとして追加し、送信されたメッセージを処理する方法をまとめました。

※Flaskでも大丈夫なはずです。

現状の構成 実現したい構成

現状

環境

  • FastAPI(バージョン0.78.0)を利用
  • API Gateway×Lambda環境でFastAPIを動かすために、Mangum(バージョン0.15.0)を利用
  • Dockerfileを利用し、ECRにDockerイメージをpushしている
  • LambdaへのデプロイにはECRにpushされたDockerイメージを指定している

アプリケーションの実装

app
├── Dockerfile.aws.lambda  # ECRにプッシュするLambda用のDockerfile
├── __init__.py
├── ...
└── start_app.py  # FastAPI、Mangumを定義
start_app.py
from fastapi import FastAPI
from mangum import Mangum

# from example.routers import router

app = FastAPI(title="App")

# routerを定義
# app.include_router(router)
app.include_router(...)
app.include_router(...)

# API Gateway×LambdaでFastAPIを起動させるためにMangumクラスを生成
# 内部でLambdaイベントの情報をもとに該当routerで処理を呼び出している
handler = Mangum(app, lifespan="off")
Dockerfile.aws.lambda
FROM public.ecr.aws/lambda/python:3.9

COPY . .

RUN pip install --upgrade pip && \
    pip install -r requirements.txt

CMD ["start_app.handler"]

上記の実装では、API GatewayをトリガーとしてLambdaイベントが発生して、Mangum内部でリクエストされたURIに該当するFastAPIのrouterモジュールの関数を呼び出しています。

しかし、Amazon SQSをLambdaのトリガーに追加した場合、MangumはSQSトリガーには対応していないので上記の実装ではエラーが発生します。

方法 : アプリケーション内部でLambdaのイベントに応じて条件分岐させる

Amazon SQSに対応するためには、次のようにstart_app.pyを変更するだけです。

start_app.py
from fastapi import FastAPI
from mangum import Mangum

# from example.routers import router

app = FastAPI(title="App")

# routerを定義
# app.include_router(router)
app.include_router(...)
app.include_router(...)


+def handler(event, context):
+    # Amazon SQSトリガーの場合
+    if 'Records' in event:
+        message_consumer = SQSMessageConsumer()  # SQSからのメッセージを購読するためのクラス
+        for record in event['Records']:
+            message_consumer.dispatch_message(record['body'])
+        return
+
+    # API Gatewayトリガーの場合
+    asgi_handler = Mangum(app, lifespan="off")
+    return asgi_handler(event, context)

-# API Gateway×LambdaでFastAPIを起動させるためにMangumクラスを生成
-# 内部でLambdaイベントの情報をもとに該当routerで処理を呼び出している
-handler = Mangum(app, lifespan="off")

いくつか方法を検討した結果、Adapter - Mangumに記載のあるようにhandler関数を用意して、関数内部で処理を分岐させるのがシンプルかと思いました。

その他検討した方法

API Gateway用のLambda関数とSQS用のLambda関数を別々にする


そもそも別々のトリガー用のLambda関数を作る方法です。

採用しなかった理由

  • 2つのLambda関数を作ることによる以下の点がビミョー
    • 既存のstart_app.py以外にSQSトリガー用のstart_lambda.pyなるものを用意する
    • 2つのモジュールに対応するため2つのDockerfileを用意する、もしくはLambda環境でCMDをオーバーライドする必要がある
  • デプロイするのに時間がかかる
  • 管理が面倒になる

MangumのCustom HandlerにSQS用のHandlerを追加し、FastAPIで処理させるようにする

MangumにSQSトリガー用のCustom Handerを指定し、SQS経由で送信されたメッセージをMangumからFastAPIに転送し、処理する方法

start_app.py
...

class SQSCustomHander:
    def infer(cls, event, context) -> bool:
        return 'Records' in event
     # その他メソッド...

handler = Mangum(app, lifespan="off", custom_handers=[SQSCustomHander])

採用しなかった理由

  • 途中まで調べて断念
  • ドキュメントを見たところ、https://mangum.io/adapter/ にも書いてあるように関数内で分岐させるほうがシンプル&わかりやすい

参考文献

Discussion