FastAPIでコールバックをdef(同期関数)で定義したときの挙動を確かめてみた
こんにちは。
SREホールディングス株式会社で新卒2年目エンジニアをやっております田城です。
業務でFastAPIを使っていて、各エンドポイントのコールバック定義ってasync
をつけて良いんだっけ?良くないんだっけ?みたいな話になりました。
結局、非同期の処理にはasync
をつけて、そうでない処理にはつけないほうが良い、となりました。
しかし、実際つけないとどういう動作をするんだっけ?同時動作数を制限する方法はあるのか?というのが気になったので、実際に色々なライブラリのソースコードを読んでどういう実装になっているのかを追いました。
対象読者
- FastAPIのコールバックを
async
で定義するか、しないかどうやって判断するか知りたい人 - コールバックを
async
で定義しなかった場合に、どのような実装で処理が行われるか知りたい人 - コールバックを
async
で定義しなかった処理の同時実行数の制限の仕方を知りたい人
結論
- 非同期の処理には
async
をつける(というかつけざるをえない)-
async
でない処理でawait
使うとSyntaxError
になる - 非同期処理中(
await
で呼んだ処理中)は、他のリクエストを受け取れる
-
- 同期の処理には
async
をつけない- こうすることで、処理が別スレッドで実行される
- 処理を別スレッドで動かしてる最中も、他のリクエストを受け取れる
- 同期処理の同時実行数の制限が必要なら、サーバー起動前に以下の処理を実行しておく
- 以下の
2
の部分を最大実行数に変える
- 以下の
RunVar('_default_thread_limiter').set(CapacityLimiter(2))
前提知識
FastAPI
FastAPI は、Pythonの標準である型ヒントに基づいてPython 以降でAPI を構築するための、モダンで、高速(高パフォーマンス)な、Web フレームワークです。
FastAPI公式ドキュメント より引用
threading
threadingは、Pythonの標準モジュールの一つで、スレッドベースの並行実行ができます。
PythonはGILがあるため、主にI/Oバウンドな処理を並行実行する場合に使われます。
asyncio
asyncioは、Pythonの標準モジュールの一つで、非同期処理ができます。
async
や await
などのキーワードを用いて非同期処理を行います。
それぞれの挙動について
コールバックはこの関数を通じて呼ばれているようです。
そしてコルーチンであった場合(すなわち async def
でコールバックを定義した場合)は、シンプルにawait
を使ってコルーチンの処理を行います。
そうでなかった場合はここから読み進めた処理となりますが、結論から言うと、Pythonのthreading.Thread
によるスレッドを使ってコールバックの処理を実行しているようです。
defで定義した処理の場合
調べたところ、こんな感じでコールバックが呼ばれているそうです。
FastAPIがstarletteの run_in_threadpool
を呼んで
starletteが anyio.to_thread.run_sync
を呼んで
最終的にanyioのこの行にたどり着きます
ここからは、非同期で使われているbackendによって異なります。
asyncioの場合はこんな感じです。
ここで、最大実行数を制限しています。
今回は特にlimiter引数を指定してないので、cls.current_default_thread_limiter()
の処理が呼ばれます。
指定のない場合、CapacityLimiter(40)
が使われ、それが自動でセットされるようです。
_default_thread_limiter
は以下で定義されています。RunVarクラスを使っていますね。
RunVarクラスは、contextvars.ContextVar
のようにコンテキストローカルな状態を管理するクラスです。違いはスレッドローカルか、イベントループローカルかです。
Runvarの場合は同一イベントループ上であれば、同じ名前でRunVarインスタンスを作ると(たとえ別なインスタンスでも)同じ値が共有されるようになるようです。
そのまま読み進めていくと、内部でWorkerThread
というthreading.Thread
の子クラスを使用しているようです。
元のThreadクラスについては以下のドキュメントに譲ります。
ここでは-
run()
をオーバーライドすることで、スレッド内で動かす処理を定義できること -
start()
メソッドを呼ぶと、run()
で定義した処理がスレッド内で実行されること
という点を抑えて頂ければと思います。
それを踏まえたうえで、このクラスの run()
メソッドを見てみると
中でQueueを使って、呼び出すべき関数やFutureを一つずつ取り出して
ここでやっとコールバックで定義された関数の処理を実行します。
その後でself.loop.call_soon_threadsafe
で _report_result
を実行し、コールバックの結果をFutureに移します。
最後に元のスレッドでFutureをawait
することで、別スレッドで行われている処理を待ちます。
まとめ
実際にコードを読み進めていくことで、中でどのような技術を使っているか理解したり、同時実行数の制限をするにはどうしたら良いかを理解することができました。
元々呼んでいた処理がそもそもスレッドセーフでなくて並列動作はできたものの動作が全然安定しない、なんてこともありましたが、それはまた別のお話です。
Discussion