🐈‍⬛

FastAPI のミドルウェアでリクエストボディをログに記録する方法とその内部動作

2024/12/22に公開

本記事は SimpleForm Advent Calendar 2024 の 22 日目の記事です。

はじめに

シンプルフォーム株式会社 の山田と申します。
今回は、FastAPI を使用した Web アプリケーションで、FastAPI のミドルウェアを使用してログを取得する方法をご紹介します。

主に以下のようなユースケースを想定しています。

  • エラー等の解析用にエンドポイントに渡るリクエストを一律同じ形式でログに残したい
  • POST 等で使用するリクエストボディの情報まで含めてログに残したい

本記事の概要は以下の通りです。

  • FastAPI のミドルウェアを使用して、リクエストボディまで含めたログを記録する方法
  • リクエストボディを取得するタイミングに制約がある理由の深掘り

お時間の無い方はログを記録する方法の部分までご参照ください。

  • 環境
    • FastAPI: 0.115.6
    • Starlette: 0.41.3

FastAPI のミドルウェアとは

公式ドキュメントの説明が簡潔で分かりやすいです。

ミドルウェア - FastAPI

「ミドルウェア」は、すべてのリクエストに対して、それがあらゆる特定のpath operationによって処理される前に機能する関数です。また、すべてのレスポンスに対して、それを返す前に機能します。

詳細は上記ドキュメントにありますが、記載の通り、エンドポイント処理の前後に任意の処理を差し込める機能です。
実装方法としてはドキュメント内の @app.middleware("http") の他にも、Advanced Middleware - FastAPI にある app.add_middleware を使用する方法があります。
今回は app.add_middleware を使用した方法について紹介します。

ログの記録方法

ミドルウェアを使用してログを記録する具体的な方法を説明します。
以下の方針で実装します。

  • BaseHTTPMiddleware を継承したカスタムミドルウェアを作成
    • dispatch メソッドを実装
      • リクエストのルーティング前に実行される
      • リクエストボディを読み取る
      • ログを記録する
      • エンドポイント処理を実行する (call_next 関数)
      • エンドポイント処理のレスポンスを返す
  • FastAPI のインスタンスに上記カスタムミドルウェアを追加

上記実装のサンプルコードが以下になります。
※説明に必要な箇所のみの抜粋になります。

  • BaseHTTPMiddleware を継承したカスタムミドルウェアを作成
# カスタムミドルウェアの実装
class CustomLoggingMiddleware(BaseHTTPMiddleware):
  async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
    # リクエストボディを読み取る
    log_data = {}
    if request.method == "POST":
      log_data["Body"] = await request.json()

    # ログを記録する
    logger.info(log_data)

    # エンドポイント処理を実行する
    response = await call_next(request)

    # エンドポイント処理のレスポンスを返す
    return response
  • FastAPI のインスタンスに上記カスタムミドルウェアを追加
app = FastAPI()

# カスタムミドルウェアを追加
app.add_middleware(CustomLoggingMiddleware)

このように、カスタムミドルウェアの dispatch メソッドの中にカスタム処理を実装します。
その中でエンドポイント処理を呼ぶ操作が call_next になりますので、その前後に差し込みたい処理を実装します。

こちらのサンプルコードで以下のようなログが得られます。

{'Body': {'name': 'keyboard', 'description': 'yoimono', 'price': 32600, 'tax': 3260}}

ログを取る方法は以上になります。
なお、リクエストボディの読み取りはエンドポイント処理 (call_next) を実行する前に行う必要があります。
以降はその理由の詳細を見ていこうと思いますので、お時間のある方はお付き合いください。

リクエストボディは消費される

リクエストボディの読み取りをエンドポイント処理前に行う理由を確認するため、リクエストボディを読み取る時の動作を見ていきます。
以下の流れで読み取られます。

  • Request クラスの json や body やメソッドを実行
    • json メソッドは body メソッドを呼び出し結果を取得
    • body メソッドは _body にキャッシュが保存されている場合、_body を返す
  • _body が無い場合、stream メソッドによりストリームからボディ情報を読み取り、結果を _body に保存して _body を返す

詳細は以下の json や body メソッドから辿れますので、ご興味ありましたらご参照ください。

  • Request クラスの json / body メソッド
    ※FastAPI でリクエストボディを読み取る時には、FastAPI がベースとしている ASGI フレームワーク Starlette の Request クラスの json や body メソッドが実行されます。

リクエストボディの情報が格納されているストリームは、一度読み取られると消費済みの状態となり、再度利用することができなくなってしまいます。
そのため、複数回リクエストボディの情報を参照するには上記のようなキャッシュ動作が必要となります。
詳細は後述しますが、ミドルウェアでリクエストボディを読み取りストリームを消費済みであるにも関わらず、エンドポイント処理でリクエストボディの情報を参照可能となっているのは、このキャッシュ動作に依るものとなります。

ミドルウェアでエンドポイント処理の前にリクエストボディを読み取った際の動作

Request クラスのキャッシュ動作によりストリーム情報がキャッシュされ、再利用される動作の流れを見ていきます。
サンプルコードの箇所としては以下の部分になります。

# リクエストボディを読み取る
...
  log_data["Body"] = await request.json()
...
# エンドポイント処理を実行する
response = await call_next(request)

call_next 関数がエンドポイント処理を呼び出す操作となっており、この前に request.json() を実行してリクエストボディの情報を読み取っています。
これにより、ミドルウェア中で使用されている Request インスタンスにリクエストボディの情報がキャッシュされます。

call_next ではこの Request インスタンスを渡していますが、エンドポイント処理用の Request インスタンスとして異なるインスタンスが別途 call_next 以降の処理で生成されます。
以降の説明でこの 2 つのインスタンスを明確に区別するため、それぞれ以下の名称で呼び分けます。

  • ミドルウェアで使用する Request インスタンス: R1
  • エンドポイント処理用の Request インスタンス: R2

全体の処理の流れと、R1 インスタンス、R2 インスタンス、ストリーム状態の様子を図示すると以下のようになります。

リクエストボディ読み取りの流れ

上図のポイントは以下になります。

  • ミドルウェア内処理で request.json() によりリクエストボディの情報を読み取った際、R1 インスタンスにボディ情報がキャッシュされる
    • ストリームはこの時点で再利用不可となる
  • call_next 以降の処理で R2 インスタンスがボディ情報を読み取る際には R1 インスタンスのキャッシュ情報が参照される
    • R2 インスタンスとしては自身が _body キャッシュを持たないためストリームから読み取ろうとするが、読み取り動作の一部が内部動作の過程で上書きされており (wrapped_receive)、R1 インスタンスのキャッシュ情報を読み取る動作となっている

これらの動作により、ストリームが消費済みの状態であってもエンドポイント処理に影響を及ぼさずに済みます。

以下に参考情報として、図中の処理の該当コードのリンクを載せておきます。

ミドルウェアでエンドポイント処理の後にリクエストボディを読み取った際の動作

ここまでは、エンドポイント処理 (call_next) 前にリクエストボディを読み取った際の動作を見てきましたが、call_next 後に読み取るとどうなるのでしょうか。
例えば以下のようなコードです。

# エンドポイント処理を実行する
response = await call_next(request)

# リクエストボディを読み取る
log_data = {}
if request.method == "POST":
  log_data["Body"] = await request.json()

実際に試してみると、以下のエラーが発生します。

RuntimeError: Stream consumed

こちらについても何が起こっているのか、動作の流れを図示しました。

Stream consumed エラー発生までの流れ

こちらのポイントは以下になります。

  • こちらの処理の場合、call_next 以降の処理で R2 インスタンスがボディ情報を読み取る際には、ストリームが読み込まれ R2 インスタンスにボディ情報がキャッシュされる
    • ボディ情報読み取りの処理としては、こちらの場合も読み取り動作の一部が wrapped_receive に上書きされているため R1 インスタンスの処理が実行されるが、キャッシュは R2 インスタンスに保存される
  • call_next の処理後、ミドルウェア内処理で request.json() によりリクエストボディの情報を読み取ろうとすると、R1 インスタンスにはキャッシュが保存されていないため、ストリームからボディ情報を読み取ろうとする
  • 読み取ろうとしたストリームは既に call_next 処理中に読み取られ消費済みの状態となっているため、RuntimeError: Stream consumed のエラーが発生する

このように、キャッシュが R2 インスタンスにしか保存されず、R1 インスタンスからは R2 インスタンスのキャッシュ情報を読み取れないため、当該エラーが発生します。

こちらも参考情報として、ストリームからリクエストボディを読み取る処理の箇所の該当コードのリンクを載せておきます。

まとめ

以下、本記事の内容まとめになります。

  • FastAPI のミドルウェアでエンドポイント処理の前後にログ記録処理を入れられる
    • 今回は紹介しなかったが、ログに限らずエラーハンドリング等の他の処理も可能
  • エンドポイント処理前 (call_next 前) にリクエストボディを取得することでボディ情報をログに記録でき、エンドポイント処理にも影響を及ぼさない
    • ミドルウェアで使用する Request インスタンスがボディ情報をキャッシュし、エンドポイント処理用の Request インスタンスはそのキャッシュ情報を参照するため
  • エンドポイント処理後 (call_next 後) にリクエストボディを取得しようとすると RuntimeError: Stream consumed エラーとなる
    • エンドポイント処理用の Request インスタンスにしかキャッシュが保存されず、ミドルウェアで使用する Request インスタンスは消費済みのストリームを読み取ろうとするため

おわりに

今回、ストリーム情報を読み取った時にボディ情報がキャッシュされるのに、何故 call_next の後だとキャッシュが参照されずエラーになるんだろう?という軽い疑問から調べてみましたが、思いのほか複雑な処理が行われていました。
その際の調べ方として、検索して調べた範囲では分からなかったため、デバッガーでステップインしてコツコツと処理を追っていきました。
結果として、この FastAPI / Starlette の動作も勉強になりましたが、フレームワークの内部動作を追うことをあまりやったことがなかったため、この調査自体が良い学びになったと思います。

最後までご覧いただきありがとうございました。この記事がどなたかのご参考になれば幸いです。

SimpleForm Tech Blog

Discussion