fastapiのスレッド周り動作確認
環境
❯❯🐨❯❯ sw_vers
ProductName: macOS
ProductVersion: 12.2.1
BuildVersion: 21D62
❯❯🐨❯❯ poetry -V
Poetry version 1.1.12
❯❯🐨❯❯ poetry run python -V
Python 3.9.4
~/Codes/fastapi_experiment
❯❯🐨❯❯ poetry show
anyio 3.6.2 High level compatibility layer for multiple asynchronous event loop ...
attrs 22.1.0 Classes Without Boilerplate
click 8.1.3 Composable command line interface toolkit
fastapi 0.87.0 FastAPI framework, high performance, easy to learn, fast to code, re...
gunicorn 20.1.0 WSGI HTTP Server for UNIX
h11 0.14.0 A pure-Python, bring-your-own-I/O implementation of HTTP/1.1
idna 3.4 Internationalized Domain Names in Applications (IDNA)
more-itertools 9.0.0 More routines for operating on iterables, beyond itertools
packaging 21.3 Core utilities for Python packages
pluggy 0.13.1 plugin and hook calling mechanisms for python
py 1.11.0 library with cross-python path, ini-parsing, io, code, log facilities
pydantic 1.10.2 Data validation and settings management using python type hints
pyparsing 3.0.9 pyparsing module - Classes and methods to define and execute parsing...
pytest 5.4.3 pytest: simple powerful testing with Python
sniffio 1.3.0 Sniff out which async library your code is running under
starlette 0.21.0 The little ASGI library that shines.
typing-extensions 4.4.0 Backported and Experimental Type Hints for Python 3.7+
uvicorn 0.20.0 The lightning-fast ASGI server.
wcwidth 0.2.5 Measures the displayed width of unicode strings in a terminal
Path Operationがacync defの場合外部スレッドプールは本当に使わないのか
こちらの動作確認
sleepをするエンドポイントとしないエンドポイントを作成。
レスポンスはPID, スレッドIDとした。
import os
import threading
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/1")
async def one():
time.sleep(10) # 10秒寝る
return os.getpid(),threading.get_ident()
@app.get("/2")
async def two():
return os.getpid(),threading.get_ident()
API起動
poetry run gunicorn main:app -k uvicorn.workers.UvicornWorker
以下のコマンドを異なるターミナルから順番に叩いた。
curl http://127.0.0.1:8000/1
> [12938,4310812032]
curl http://127.0.0.1:8000/2
> [12938,4310812032]
2は必ず1が終了したあとにレスポンスを返した。
そして同じスレッドIDが返ってきた。
どうやら外部スレッドプールを使わないようだ。
プロセスを増やして詰まりが解消されるか確認
コードは先程と同じ。acync def
関数が2つで外部スレッドプールは使われない。
ワーカーを2つでAPIを起動する。
poetry run gunicorn main:app -w 2 -k uvicorn.workers.UvicornWorker
curl http://127.0.0.1:8000/1
> [12832,4369712512]
curl http://127.0.0.1:8000/2
> [12831,4369712512]
PIDが異なるので増やしたワーカーが処理をしてくれたということだ。
もちろん1を二回叩くと2つのプロセスが占有されてしまいその後のリクエストが詰まる。
非同期処理を使うことで本当に空き時間に処理をしてくれるか確認
await asyncio
を使った非同期なsleepを挟む。
- import time
+ import asyncio
- time.sleep(10)
+ await asyncio.sleep(10) # 非同期
API起動
❯❯🐨❯❯ poetry run gunicorn main:app --workers 1 -k uvicorn.workers.UvicornWorker --reload
以下を別ターミナルから順番に叩いた。
curl http://127.0.0.1:8000/1
> [13066,4312745344]
curl http://127.0.0.1:8000/2
> [13066,4312745344] # PID, thread id共に↑と同じ
2のレスポンスがすぐに返ってきたためちゃんとawait中に他の処理をしてくれたようだ。
非同期処理と同期処理を持つエンドポイントを作ると本当に非同期中だけ他のリクエストを捌いてくれるのかの確認
import asyncio
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/1")
async def one():
await asyncio.sleep(10) # 非同期
time.sleep(10) # 同期
return 1
@app.get("/2")
async def two():
time.sleep(10) # 同期
return 2
1をリクエストした5秒後に2をリクエストしたら2は20秒後に返ってくるのか?
そうはならなかった。
1の非同期sleep実行中に同期処理である2のリクエストが入ってきたため、2の終了を待ったあとに1が再開されたようだ。
1の実行時間が25秒になっている。
❯❯🐨❯❯ curl http://127.0.0.1:8000/1
1
~ 25s
❯❯🐨❯❯ curl http://127.0.0.1:8000/2
2
~ 10s
async def
はなかなか厳しいものがある。
Path Operationをdefにすると外部スレッドプールを使ってくれるのか確認
import os
import threading
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/1")
def one(): # def
time.sleep(10) # 10秒寝る
return os.getpid(),threading.get_ident()
@app.get("/2")
def two(): # def
return os.getpid(),threading.get_ident()
スレッドを2個にしてAPI起動
❯❯🐨❯❯ poetry run gunicorn main:app -w 1 -k uvicorn.workers.UvicornWorker
ほぼ同時に1->2の順序で以下を実行した。
❯❯🐨❯❯ curl http://127.0.0.1:8000/1
> [13513,6191050752]
❯❯🐨❯❯ curl http://127.0.0.1:8000/2
> [13513,6207877120]
スレッドIDが異なっているので外部スレッドを用いているようです。
Path Operationをdefにしてワーカーを2つにすると外部スレッドを使う?別ワーカーを使う?
ワーカーを2つにした。
コードは一つ前と同じ。
❯❯🐨❯❯ poetry run gunicorn main:app --workers 2 -k uvicorn.workers.UvicornWorker --reload
❯❯🐨❯❯ curl http://127.0.0.1:8000/1
> [13627,6188953600]
❯❯🐨❯❯ curl http://127.0.0.1:8000/2
> [13628,6188953600]
何度か毎秒叩いてみた。
❯❯🐨❯❯ curl http://127.0.0.1:8000/2
> [13627,6188953600]
❯❯🐨❯❯ curl http://127.0.0.1:8000/2
> [13628,6205779968]
❯❯🐨❯❯ curl http://127.0.0.1:8000/2
> [13628,6205779968]
gunicornがよしなにworkerを割り振ってくれるようだ。ルールはよくわからない。
ワーカー2つで片方がasync defによって利用されているときに新しいリクエストはどうなる?
こんな感じのプログラムを用意。
ワーカーを2つの状態で起動して1->2の順にリクエストを飛ばす。
1と同じワーカーに2のリクエストも行ってしまうと、随分待たされることになるがどうなるだろうか。
私の予想だとそんなに賢く割り振られない気がするのでたまに1の方に割り振られて待たされることになりそう。
import os
import threading
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/1")
async def one(): # async
time.sleep(10) # 10秒寝る
return os.getpid(),threading.get_ident()
@app.get("/2")
def two(): # not async
return os.getpid(),threading.get_ident()
1を叩いた後に、2を何度も叩いてみたが一度も1と同じワーカーで処理されることはなかった。
賢い。
おそらく空いている方にリクエストを投げてくれるようだ。
先の実験ではどちらのワーカーにも割り振られていたが、外部スレッドプールが使えるかつCPUが相手そうであるというのを見ているのだろうか?
ワーカーを2つで片方がCPU使用率が高く処理時間が長いときに空いている方にリクエストが飛ぶのか?
こんなプログラムを用意。
1の方はタイムアウトになるまでCPUを使い続ける。
import os
import threading
import time
from fastapi import FastAPI
app = FastAPI()
@app.get("/1")
async def one():
a = 0
while 1:
a += 1 # ひたすらCPUを使う
return os.getpid(),threading.get_ident()
@app.get("/2")
def two():
return os.getpid(),threading.get_ident()
ワーカーは2つ。
❯❯🐨❯❯ poetry run gunicorn main:app --workers 2 -k uvicorn.workers.UvicornWorker --reload
1をリクエストし片方のワーカーのCPU利用率を高めた後に2を何度もリクエストしたが、空いている方のワーカーにしかリクエストが飛ばなかった。
賢い。
プロセス内では外部スレッドプールを使っていい感じにCPU使ってくれるし、ワーカー間もなるだけ空いている方にリクエストを割り振ってくれていそうな印象を受けたのでひとまずLGTMとなったのだった。
asyncは必要に迫られなければ使うのはやめておこうとなった。
理解がもっと深まったらつける場面も出てくるかもしれない。