🌊

FastAPIでコールバックをdef(同期関数)で定義したときの挙動を確かめてみた

2024/05/31に公開

こんにちは。
SREホールディングス株式会社で新卒2年目エンジニアをやっております田城です。

業務でFastAPIを使っていて、各エンドポイントのコールバック定義ってasyncをつけて良いんだっけ?良くないんだっけ?みたいな話になりました。
結局、非同期の処理にはasyncをつけて、そうでない処理にはつけないほうが良い、となりました。
しかし、実際つけないとどういう動作をするんだっけ?同時動作数を制限する方法はあるのか?というのが気になったので、実際に色々なライブラリのソースコードを読んでどういう実装になっているのかを追いました。

対象読者

  • FastAPIのコールバックを async で定義するか、しないかどうやって判断するか知りたい人
  • コールバックをasync で定義しなかった場合に、どのような実装で処理が行われるか知りたい人
  • コールバックをasync で定義しなかった処理の同時実行数の制限の仕方を知りたい人

結論

  • 非同期の処理には async をつける(というかつけざるをえない)
    • async でない処理で await 使うと SyntaxError になる
    • 非同期処理中(awaitで呼んだ処理中)は、他のリクエストを受け取れる
  • 同期の処理には async をつけない
    • こうすることで、処理が別スレッドで実行される
    • 処理を別スレッドで動かしてる最中も、他のリクエストを受け取れる
  • 同期処理の同時実行数の制限が必要なら、サーバー起動前に以下の処理を実行しておく
    • 以下の2の部分を最大実行数に変える
RunVar('_default_thread_limiter').set(CapacityLimiter(2))

前提知識

FastAPI

FastAPI は、Pythonの標準である型ヒントに基づいてPython 以降でAPI を構築するための、モダンで、高速(高パフォーマンス)な、Web フレームワークです。
FastAPI公式ドキュメント より引用

threading

threadingは、Pythonの標準モジュールの一つで、スレッドベースの並行実行ができます。
PythonはGILがあるため、主にI/Oバウンドな処理を並行実行する場合に使われます。
https://docs.python.org/ja/3/library/threading.html

asyncio

asyncioは、Pythonの標準モジュールの一つで、非同期処理ができます。
asyncawait などのキーワードを用いて非同期処理を行います。

https://docs.python.org/ja/3/library/asyncio.html

それぞれの挙動について

コールバックはこの関数を通じて呼ばれているようです。
https://github.com/tiangolo/fastapi/blob/master/fastapi/routing.py#L278

そしてコルーチンであった場合(すなわち async defでコールバックを定義した場合)は、シンプルにawaitを使ってコルーチンの処理を行います。

そうでなかった場合はここから読み進めた処理となりますが、結論から言うと、Pythonのthreading.Threadによるスレッドを使ってコールバックの処理を実行しているようです。

defで定義した処理の場合

調べたところ、こんな感じでコールバックが呼ばれているそうです。

FastAPIがstarletteの run_in_threadpool を呼んで
https://github.com/tiangolo/fastapi/blob/master/fastapi/routing.py#L193

starletteが anyio.to_thread.run_sync を呼んで
https://github.com/encode/starlette/blob/master/starlette/concurrency.py#L42

最終的にanyioのこの行にたどり着きます
https://github.com/agronholm/anyio/blob/master/src/anyio/to_thread.py#L56-L58

ここからは、非同期で使われているbackendによって異なります。

asyncioの場合はこんな感じです。

https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L2121

ここで、最大実行数を制限しています。
https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L2141

今回は特にlimiter引数を指定してないので、cls.current_default_thread_limiter() の処理が呼ばれます。
指定のない場合、CapacityLimiter(40)が使われ、それが自動でセットされるようです。
https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L2469-L2474

_default_thread_limiterは以下で定義されています。RunVarクラスを使っていますね。
https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L1782

RunVarクラスは、contextvars.ContextVarのようにコンテキストローカルな状態を管理するクラスです。違いはスレッドローカルか、イベントループローカルかです。
https://docs.python.org/ja/3/library/contextvars.html
https://github.com/agronholm/anyio/blob/master/src/anyio/lowlevel.py#L92

Runvarの場合は同一イベントループ上であれば、同じ名前でRunVarインスタンスを作ると(たとえ別なインスタンスでも)同じ値が共有されるようになるようです。

そのまま読み進めていくと、内部でWorkerThreadというthreading.Threadの子クラスを使用しているようです。

元のThreadクラスについては以下のドキュメントに譲ります。
https://docs.python.org/ja/3/library/threading.html#threading.Thread
ここでは

  • run()をオーバーライドすることで、スレッド内で動かす処理を定義できること
  • start()メソッドを呼ぶと、run()で定義した処理がスレッド内で実行されること
    という点を抑えて頂ければと思います。

それを踏まえたうえで、このクラスの run() メソッドを見てみると

中でQueueを使って、呼び出すべき関数やFutureを一つずつ取り出して
https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L853

ここでやっとコールバックで定義された関数の処理を実行します。
https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L859

その後でself.loop.call_soon_threadsafe_report_result を実行し、コールバックの結果をFutureに移します。
https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L866-L868

https://docs.python.org/ja/3/library/asyncio-eventloop.html

最後に元のスレッドでFutureをawaitすることで、別スレッドで行われている処理を待ちます。
https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L2177

まとめ

実際にコードを読み進めていくことで、中でどのような技術を使っているか理解したり、同時実行数の制限をするにはどうしたら良いかを理解することができました。
元々呼んでいた処理がそもそもスレッドセーフでなくて並列動作はできたものの動作が全然安定しない、なんてこともありましたが、それはまた別のお話です。

SRE Holdings 株式会社

Discussion