FastAPI などで見える機会が増えたasyncioですが、本当に恩恵があるのかベンチマークテストしてみました。



asyncio とは?

asyncio はその名の通り非同期(async) I/O の実装に活用できます。
ネットワーク通信を含む Input/Ouput の際は処理待ちが発生し CPU を持て余してしまいます。


File I/O の間 CPU を別の処理に割り当てることで CPU をフル活用させることができます。


フル活用と言ってもasyncio単体では 1 スレッドの CPU 使用率が 100%ということです。
multiprocessing, joblibなどのマルチプロセスを行う処理では CPU の全ての論理コアで CPU を活用できます。


プログラムの処理速度が CPU 速度に大きく依存する場合(CPU bound と呼んだりします)、マルチプロセスが適しています。使える CPU がたくさんあった方が良いからですね。

逆に I/O 速度に大きく依存する場合(I/O bound と呼んだりします)、非同期 I/O が適しています。CPU がたくさんあっても、CPU が暇を持て余すだけだからですね。

また、マルチプロセスと非同期 I/O は組み合わせることもできますが、本記事ではこちらの検証は行ないません。興味のある方は英語ですが以下の記事を参照してください。


asyncioを用いて 3 つの sleep を並行処理してみましょう。


import time

periods = [1, 2, 3]
for period in periods:

1, 2, 3 秒ずつ sleep するため処理が終了するには合計 6 秒かかります。


import asyncio

# コルーチン関数の定義
async def async_multi_sleep(periods: list[float]) -> None:
    for period in periods:
        await asyncio.sleep(period)

periods = [1.0, 2.0, 3.0]

# コルーチンの実行


  1. async defでコルーチン関数を定義する
  2. asyncioでコルーチンを実行する

ただし、上記を実行しても合計時間は 6 秒かかります。
コルーチンを並行処理するには Task を作成する必要があります。

import asyncio

# コルーチン関数の定義
async def async_multi_sleep(periods: list[float]) -> None:
    tasks = []

    # タスクの作成
    for period in periods:
        task = asyncio.create_task(asyncio.sleep(period))

    ´# 他のタスクに順番を譲る
    for task in tasks:
        await task

periods = [1.0, 2.0, 3.0]

# コルーチンの実行

上記で実装した場合、3 秒で処理が終了します。

async def async_multi_sleep(periods: list[float]) -> None:
    await asyncio.gather(*[asyncio.sleep(period) for period in periods])

periods = [1.0, 2.0, 3.0]

# コルーチンの実行



No 内容 利用パッケージ
1 スリープ asyncio.sleep
2 File I/O aiofiles
3 Network I/O - HTTP aiohttp
4 Network I/O - DB aiosqlite

また、各処理に CPU 処理を組み合わせてテストを行います。

  • pytest
  • pytest-benchmark
  • pytest-asyncio
  • pytest-aiohttp

なお、pytest-benchmarkは本記事の執筆時点でasyncioに対応していません。そのためGithub の Issueを参考に以下のような fixture を作成して対応しました。

def aio_benchmark(benchmark):
    import asyncio
    import threading

    class Sync2Async:
        def __init__(self, coro, *args, **kwargs):
            self.coro = coro
            self.args = args
            self.kwargs = kwargs
            self.custom_loop = None
            self.thread = None

        def start_background_loop(self) -> None:

        def __call__(self):
            evloop = None
            awaitable = self.coro(*self.args, **self.kwargs)
                evloop = asyncio.get_running_loop()
            if evloop is None:
                if not self.custom_loop or not self.thread or not self.thread.is_alive():
                    self.custom_loop = asyncio.new_event_loop()
                    self.thread = threading.Thread(target=self.start_background_loop, daemon=True)

                return asyncio.run_coroutine_threadsafe(awaitable, self.custom_loop).result()

    def _wrapper(func, *args, **kwargs):
        if asyncio.iscoroutinefunction(func):
            benchmark(Sync2Async(func, *args, **kwargs))
            benchmark(func, *args, **kwargs)

    return _wrapper

benchmark: sleep



import asyncio
from time import sleep

# -----------------------------------------------------------
# 指定された時間スリープする
# -----------------------------------------------------------
def multi_sleep(periods: list[float]) -> None:
    for period in periods:

async def async_multi_sleep(periods: list[float]) -> None:
    for period in periods:
        await asyncio.sleep(period)

async def async_multi_sleep_gather(periods: list[float]) -> None:
    await asyncio.gather(*[asyncio.sleep(period) for period in periods])


import pytest
from c001_asyncio import (

SLEEP_PERIODS = [0.2, 0.2, 0.2]

# -----------------------------------------------------------
# 指定された時間スリープするテスト
# -----------------------------------------------------------
def test_multi_sleep(benchmark):
    def _():

async def test_async_multi_sleep(aio_benchmark):
    async def _():
        await async_multi_sleep(SLEEP_PERIODS)

async def test_async_multi_sleep_gather(aio_benchmark):
    async def _():
        await async_multi_sleep_gather(SLEEP_PERIODS)


---------------------------------------------------------------------------------------- benchmark: 3 tests ----------------------------------------------------------------------------------------
Name (time in ms)                      Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations
test_async_multi_sleep_gather     201.6565 (1.0)      202.6901 (1.0)      202.3728 (1.0)      0.4125 (1.0)      202.5256 (1.0)      0.3803 (1.0)           1;0  4.9414 (1.0)           5           1
test_async_multi_sleep            603.5638 (2.99)     616.0021 (3.04)     606.5454 (3.00)     5.3204 (12.90)    604.3013 (2.98)     4.1270 (10.85)         1;1  1.6487 (0.33)          5           1
test_multi_sleep                  606.8920 (3.01)     611.6967 (3.02)     608.6299 (3.01)     1.8088 (4.39)     608.0931 (3.00)     1.4615 (3.84)          1;1  1.6430 (0.33)          5           1

Mean 列に注目すると、想定通りtest_async_multi_sleep_gatherのみ並行処理の恩恵を受けて約 200ms で終了しています。ただ、実際のプロダクトで sleep することはあまりないと思うので、別のパターンも見ていきましょう。

benchmark: aiofiles

次は File I/O です。
非同期の File I/O にはaiofilesを用います。
今回は 512MB のデータを 10 個のファイルに書き込む処理でベンチマークをしてみます。


import asyncio
from pathlib import Path

import aiofiles

# -----------------------------------------------------------
# 指定されたデータをファイルに書き込む
# -----------------------------------------------------------
def create_file_with_data(path: Path, data: bytes) -> None:
    with"wb") as f:

async def async_create_file_with_data(path: Path, data: bytes) -> None:
    async with, "wb") as f:
        await f.write(data)

# -----------------------------------------------------------
# 指定されたデータを指定された数だけファイルに書き込む
# -----------------------------------------------------------
def create_multi_files_with_data(paths: list[Path], data: bytes) -> None:
    for path in paths:
        create_file_with_data(path, data)

async def async_create_multi_files_with_data(paths: list[Path], data: bytes) -> None:
    await asyncio.gather(*[async_create_file_with_data(path, data) for path in paths])


from pathlib import Path
from tempfile import NamedTemporaryFile

import pytest
from c002_aiofiles import (


def named_tempfile():
    fw = NamedTemporaryFile("w")
    yield fw

def named_tempfiles():
    files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]
    yield files
    for fw in files:

def huge_bytes():
    return b"\0" * CREATE_FILE_SIZE

# -----------------------------------------------------------
# 指定されたデータをファイルに書き込むテスト
# -----------------------------------------------------------
def test_create_file_with_data(benchmark, named_tempfile, huge_bytes):
    def _():
        create_file_with_data(Path(, huge_bytes)

async def test_async_create_file_with_data(aio_benchmark, named_tempfile, huge_bytes):
    async def _():
        await async_create_file_with_data(Path(, huge_bytes)

# -----------------------------------------------------------
# 指定されたデータを指定された数だけファイルに書き込むテスト
# -----------------------------------------------------------
def test_create_multi_files_with_data(benchmark, named_tempfiles, huge_bytes):
    def _():
            [Path( for named_tempfile in named_tempfiles],

async def test_async_create_multi_files_with_data(aio_benchmark, named_tempfiles, huge_bytes):
    async def _():
        await async_create_multi_files_with_data(
            [Path( for named_tempfile in named_tempfiles],


--------------------------------------------------------------------------------------------------- benchmark: 4 tests --------------------------------------------------------------------------------------------------
Name (time in ms)                                  Min                   Max                  Mean             StdDev                Median                 IQR            Outliers     OPS            Rounds  Iterations
test_async_create_file_with_data               92.4087 (1.0)        198.6375 (1.07)       127.5377 (1.0)      37.2938 (1.10)       108.2839 (1.0)       61.2919 (1.15)          3;0  7.8408 (1.0)          11           1
test_create_file_with_data                     99.5028 (1.08)       185.3546 (1.0)        150.5592 (1.18)     37.0968 (1.09)       158.6906 (1.47)      63.7308 (1.19)          1;0  6.6419 (0.85)          5           1
test_async_create_multi_files_with_data     1,171.8733 (12.68)    1,254.7769 (6.77)     1,219.3601 (9.56)     33.9193 (1.0)      1,231.4451 (11.37)     53.3607 (1.0)           2;0  0.8201 (0.10)          5           1
test_create_multi_files_with_data           1,175.0913 (12.72)    1,327.1361 (7.16)     1,242.5917 (9.74)     67.4861 (1.99)     1,232.8131 (11.39)    122.8071 (2.30)          2;0  0.8048 (0.10)          5           1


File I/O 段階では sleep と同様に待ちが発生するため、並列処理を行なった方が 10 倍近く早くなる想定でしたが結果は異なり、処理速度に大きな違いは見られませんでした。

aiofiles の Issueを見ると「処理が早くはならないかも知れないが、メインスレッドで別の処理を行えることが出来るのが利点だ」との記述があります。腑に落ちないですが、CPU 処理も組み合わせてみましょう。

組み合わせる前に、まずは CPU 処理のベンチマークをとってみましょう。

CPU 処理は以下の通り、単純な数列和(高校数学で習うシグマ)です。
1000 万までのシグマを 3 回計算します。

# -----------------------------------------------------------
# 指定された整数までの総和を計算する
# -----------------------------------------------------------
def sigma(num: int) -> int:
    v = 0
    for i in range(num + 1):
        v += i
    return v

async def async_sigma(num: int) -> int:
    v = 0
    for i in range(num + 1):
        v += i
    return v

# -----------------------------------------------------------
# 指定された各整数までの総和を計算する
# -----------------------------------------------------------
def multi_sigma(nums: list[int]) -> list[int]:
    return [sigma(num) for num in nums]

async def async_multi_sigma(nums: list[int]) -> list[int]:
    return await asyncio.gather(*[async_sigma(num) for num in nums])


from c002_aiofiles import (

N_SIGMA = 10**7
V_SIGMA = 50000005000000

# -----------------------------------------------------------
# 指定された整数までの総和を計算するテスト
# -----------------------------------------------------------
def test_sigma(benchmark):
    def _():
        assert sigma(N_SIGMA) == V_SIGMA

async def test_async_sigma(aio_benchmark):
    async def _():
        assert await async_sigma(N_SIGMA) == V_SIGMA

# -----------------------------------------------------------
# 指定された各整数までの総和を計算するテスト
# -----------------------------------------------------------
def test_multi_sigma(benchmark):
    def _():
        result = multi_sigma([N_SIGMA for _ in range(N_SIGMA_TIMES)])
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]

async def test_async_multi_sigma(aio_benchmark):
    async def _():
        result = await async_multi_sigma([N_SIGMA for _ in range(N_SIGMA_TIMES)])
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]


------------------------------------------------------------------------------------------ benchmark: 4 tests -----------------------------------------------------------------------------------------
Name (time in ms)                 Min                   Max                  Mean             StdDev                Median                IQR            Outliers     OPS            Rounds  Iterations
test_async_sigma             480.9756 (1.0)        499.0507 (1.0)        492.1004 (1.0)       6.7240 (1.0)        493.6568 (1.0)       6.3340 (1.0)           2;0  2.0321 (1.0)           5           1
test_sigma                   500.9957 (1.04)       519.9479 (1.04)       511.1150 (1.04)      7.5248 (1.12)       512.2812 (1.04)     11.8297 (1.87)          2;0  1.9565 (0.96)          5           1
test_async_multi_sigma     1,466.5203 (3.05)     1,494.5071 (2.99)     1,479.8562 (3.01)     10.9483 (1.63)     1,481.6104 (3.00)     16.4346 (2.59)          2;0  0.6757 (0.33)          5           1
test_multi_sigma           1,492.3126 (3.10)     1,547.9187 (3.10)     1,523.0065 (3.09)     27.9279 (4.15)     1,534.3684 (3.11)     53.7643 (8.49)          2;0  0.6566 (0.32)          5           1

並列処理によって処理速度に違いはありませんが、こちらは想定通りです。今回の処理は CPU bound なため、CPU に待ちが発生せず並列処理の恩恵を受けられません。では I/O 処理と組み合わせるとどうでしょうか?


# -----------------------------------------------------------
# File I/O と CPUタスクを行う(データ指定)
# -----------------------------------------------------------
def mix_io_cpu_with_data(paths: list[Path], data: bytes, nums: list[int]) -> list[int]:
    create_multi_files_with_data(paths, data)
    return multi_sigma(nums)

async def async_mix_io_cpu_with_data(paths: list[Path], data: bytes, nums: list[int]) -> list[int]:
    result = await asyncio.gather(
        async_create_multi_files_with_data(paths, data),
    return result[1]


from c002_aiofiles import (

# -----------------------------------------------------------
# File I/O と CPUタスクを行うテスト(データ指定)
# -----------------------------------------------------------
def test_mix_io_cpu_with_data(benchmark, named_tempfiles, huge_bytes):
    def _():
        result = mix_io_cpu_with_data(
            [Path( for named_tempfile in named_tempfiles],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]

async def test_async_mix_io_cpu_with_data(aio_benchmark, named_tempfiles, huge_bytes):
    async def _():
        result = await async_mix_io_cpu_with_data(
            [Path( for named_tempfile in named_tempfiles],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]


------------------------------------------------------------------------------------- benchmark: 2 tests -------------------------------------------------------------------------------------
Name (time in s)                       Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
test_async_mix_io_cpu_with_data     2.4868 (1.0)      2.6220 (1.0)      2.5505 (1.0)      0.0528 (1.0)      2.5410 (1.0)      0.0798 (1.0)           2;0  0.3921 (1.0)           5           1
test_mix_io_cpu_with_data           2.6234 (1.05)     2.9007 (1.11)     2.8053 (1.10)     0.1114 (2.11)     2.8472 (1.12)     0.1423 (1.78)          1;0  0.3565 (0.91)          5           1

asyncio を使った方が、やや早くなっていますが劇的な変化は見られませんでした。。。

benchmark: aiohttp

次にaiohttpを用いたネットワーク I/O (HTTP)のベンチマークを行ってみましょう。まずアクセスされたら指定の時間だけスリープする API を作成します。

def create_app() -> web.Application:
    async def handle(request: Any) -> web.Response:
        period = float(request.rel_url.query["period"])
        await asyncio.sleep(period)
        return web.Response()

    app = web.Application()
    app.add_routes([web.get("/sleep", handle)])
    return app

if __name__ == "__main__":
    app = create_app()

これとは別プロセスで HTTP リクエストを行います。

# -----------------------------------------------------------
# /sleepリソースに停止時間を指定してHTTPリクエストを行う
# -----------------------------------------------------------
def sleep_request(url: str, period: float) -> int:
    with urllib.request.urlopen(f"{url}/sleep?period={period}") as response:
        return response.status

async def async_sleep_request(url: str, period: float) -> int:
    async with aiohttp.ClientSession() as session:
        async with session.get(f"{url}/sleep?period={period}") as response:
            return response.status

# -----------------------------------------------------------
# /sleepリソースに停止時間を指定した数だけHTTPリクエストを行う
# -----------------------------------------------------------
def multi_sleep_requests(url: str, periods: list[float]) -> list[int]:
    return [sleep_request(url, period) for period in periods]

async def async_multi_sleep_requests(url: str, periods: list[float]) -> list[int]:
    return await asyncio.gather(*[async_sleep_request(url, period) for period in periods])



def test_sleep_request(benchmark):
    def _():
        status = sleep_request(SERVER_URL, 0.2)
        assert status == 200

async def test_async_sleep_request(aio_benchmark):
    async def _():
        status = await async_sleep_request(SERVER_URL, 0.2)
        assert status == 200

def test_multi_sleep_requests(benchmark):
    def _():
        status = multi_sleep_requests(SERVER_URL, [0.2, 0.2, 0.2])
        assert status == [200, 200, 200]

async def test_async_multi_sleep_requests(aio_benchmark):
    async def _():
        status = await async_multi_sleep_requests(SERVER_URL, [0.2, 0.2, 0.2])
        assert status == [200, 200, 200]


----------------------------------------------------------------------------------------- benchmark: 4 tests -----------------------------------------------------------------------------------------
Name (time in ms)                        Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations
test_sleep_request                  202.4012 (1.0)      203.9504 (1.0)      203.4526 (1.0)      0.6106 (1.38)     203.6606 (1.0)      0.5861 (1.0)           1;0  4.9151 (1.0)           5           1
test_async_sleep_request            205.5099 (1.02)     207.0953 (1.02)     206.3653 (1.01)     0.7193 (1.63)     206.6543 (1.01)     1.2889 (2.20)          2;0  4.8458 (0.99)          5           1
test_async_multi_sleep_requests     208.4528 (1.03)     209.4410 (1.03)     208.8828 (1.03)     0.4413 (1.0)      208.6549 (1.02)     0.7507 (1.28)          1;0  4.7874 (0.97)          5           1
test_multi_sleep_requests           609.3281 (3.01)     623.3630 (3.06)     613.3552 (3.01)     5.7904 (13.12)    610.7264 (3.00)     5.9304 (10.12)         1;0  1.6304 (0.33)          5           1

こちらは想定通りの動きをしています。HTTP リクエストをしている間に並行処理が走っているため、200ms x 3 = 600msではなく、1 回分の 200ms で処理が終わっています。

ではクライアント側で重めの CPU 処理を回している場合はどうでしょうか。

N_SIGMA = 10**5
V_SIGMA = 5000050000

# -----------------------------------------------------------
# Network I/O と CPUタスクを行う
# -----------------------------------------------------------
def mix_network_io_cpu(url: str, periods: list[float], nums: list[int]) -> list[int]:
    multi_sleep_requests(url, periods)
    return multi_sigma(nums)

async def async_mix_network_io_cpu(url: str, periods: list[float], nums: list[int]) -> list[int]:
    result = await asyncio.gather(
        async_multi_sleep_requests(url, periods),
    return result[1]


# -----------------------------------------------------------
# Network I/O と CPUタスクを行うテスト(サイズ指定)
# -----------------------------------------------------------
def test_mix_network_io_cpu(benchmark):
    def _():
        result = mix_network_io_cpu(
            [1, 1, 1],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]

async def test_async_mix_network_io_cpu(aio_benchmark):
    async def _():
        result = await async_mix_network_io_cpu(
            [1, 1, 1],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]


------------------------------------------------------------------------------------ benchmark: 2 tests ------------------------------------------------------------------------------------
Name (time in s)                     Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
test_async_mix_network_io_cpu     1.0208 (1.0)      1.0343 (1.0)      1.0253 (1.0)      0.0053 (1.0)      1.0243 (1.0)      0.0053 (1.0)           1;0  0.9753 (1.0)           5           1
test_mix_network_io_cpu           3.0221 (2.96)     3.0393 (2.94)     3.0311 (2.96)     0.0072 (1.37)     3.0341 (2.96)     0.0119 (2.23)          2;0  0.3299 (0.34)          5           1

aiohttp を用いた Network I/O では、並行処理の恩恵で実際に処理が早くなることが確認できました。

ちなみに aiohttp ではaiohttp_clientという fixture を利用することが出来るため、本来であれば別プロセスでアプリを起動せず、pytest のみで完結するはずです。しかし以下の実装で試すとRuntimeError: Task got Future attached to a different loop.というエラーが出たため、今回は別プロセスでアプリを起動する方法で迂回しました。

# ※エラーが発生するコード
async def test_sleep_request(aiohttp_client, aio_benchmark):
    async def _():
        app_client = await aiohttp_client(create_app())
        period = 1
        response = await app_client.get(f"/sleep?period={period}")
        assert response.status == 200

benchmark: aiosqlite

最後は aiosqlite を用いた DB 接続です。

DB にはスリープ関数がないため、自作関数を作成してスリープを実装します

# -----------------------------------------------------------
# DBにアクセスして指定した時間だけスリープする
# -----------------------------------------------------------
def sleep_db_request(period: float) -> None:
    with sqlite3.connect(":memory:") as db:
        db.create_function("sleep", 1, time.sleep)
        cur = db.cursor()
        cur.execute(f"SELECT sleep({period})")

async def async_sleep_db_request(period: float) -> None:
    async with aiosqlite.connect(":memory:") as db:
        await db.create_function("sleep", 1, time.sleep)
        cur = await db.cursor()
        await cur.execute(f"SELECT sleep({period})")

# -----------------------------------------------------------
# DBにアクセスして指定した時間の数だけスリープする
# -----------------------------------------------------------
def multi_sleep_db_requests(periods: list[float]) -> None:
    for period in periods:

async def async_multi_sleep_db_requests(periods: list[float]) -> None:
    await asyncio.gather(*[async_sleep_db_request(period) for period in periods])


def test_sleep_db_request(benchmark):
    def _():

async def test_async_sleep_db_request(aio_benchmark):
    async def _():
        await async_sleep_db_request(0.2)

def test_multi_sleep_requests(benchmark):
    def _():
        multi_sleep_db_requests([0.2, 0.2, 0.2])

async def test_async_multi_sleep_requests(aio_benchmark):
    async def _():
        await async_multi_sleep_db_requests([0.2, 0.2, 0.2])


----------------------------------------------------------------------------------------- benchmark: 4 tests -----------------------------------------------------------------------------------------
Name (time in ms)                        Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations
test_sleep_db_request               201.3555 (1.0)      205.2846 (1.0)      203.7979 (1.0)      1.8179 (1.73)     204.9371 (1.0)      3.0303 (1.87)          1;0  4.9068 (1.0)           5           1
test_async_sleep_db_request         204.7722 (1.02)     207.3065 (1.01)     206.2676 (1.01)     1.0537 (1.0)      206.7830 (1.01)     1.6162 (1.0)           1;0  4.8481 (0.99)          5           1
test_async_multi_sleep_requests     209.6312 (1.04)     214.5691 (1.05)     211.9721 (1.04)     1.8173 (1.72)     212.0738 (1.03)     2.2477 (1.39)          2;0  4.7176 (0.96)          5           1
test_multi_sleep_requests           607.1996 (3.02)     611.5772 (2.98)     609.8215 (2.99)     2.2275 (2.11)     611.2378 (2.98)     4.0439 (2.50)          2;0  1.6398 (0.33)          5           1

こちらも aiohttp の時と同様、並列処理の恩恵を受けていますね。

念の為クライアント側で重めの CPU 処理を追加して検証します。

# -----------------------------------------------------------
# DB Network I/O と CPUタスクを行う
# -----------------------------------------------------------
def mix_db_io_cpu(periods: list[float], nums: list[int]) -> list[int]:
    return multi_sigma(nums)

async def async_mix_db_io_cpu(periods: list[float], nums: list[int]) -> list[int]:
    result = await asyncio.gather(
    return result[1]


# -----------------------------------------------------------
# DB Network I/O と CPUタスクを行うテスト(サイズ指定)
# -----------------------------------------------------------
def test_mix_db_io_cpu(benchmark):
    def _():
        result = mix_db_io_cpu(
            [1, 1, 1],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]

async def test_async_mix_db_io_cpu(aio_benchmark):
    async def _():
        result = await async_mix_db_io_cpu(
            [1, 1, 1],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]


---------------------------------------------------------------------------------- benchmark: 2 tests ---------------------------------------------------------------------------------
Name (time in s)                Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
test_async_mix_db_io_cpu     1.0220 (1.0)      1.0400 (1.0)      1.0344 (1.0)      0.0072 (1.0)      1.0371 (1.0)      0.0076 (1.0)           1;0  0.9668 (1.0)           5           1
test_mix_db_io_cpu           3.0259 (2.96)     3.0451 (2.93)     3.0336 (2.93)     0.0081 (1.13)     3.0320 (2.92)     0.0135 (1.78)          1;0  0.3296 (0.34)          5           1

やはり想定通り、aiosqlite でも並列処理の恩恵を受けることを確認できました。


今回は 「asyncio で並列処理をすることで本当に処理が早くなるか」をベンチマークテストで確認しました。aiofiles のみ処理が速くなることは確認できませんでしたが、aiohttp, aiosqlite では並列処理の恩恵を受けることが確認できました。

FastAPI をはじめとする ASGI 対応の Web フレームワークでの DB 接続やネットワーク I/O では積極的に asyncio、コルーチンを使っていくことで処理の高速化が図れそうです。

aiofiles を用いたベンチマークで結果が想定通りにならなかった件については、そもそも aiofiles では処理が速くならないのか、それとも実装に問題があるのか、もしわかる方がいればコメント頂けると幸いです。



Python と asyncio を使ったのが数年前なので間違っていたらすみません。

今回のケースでは ThreadPoolExecutor の最大ワーカースレッド数に引っかかているのでないでしょうか?

issue の記述とソースをざっと読んだ感じでは aiofileswrite()run_in_executor() をデフォルトのエクゼキューターで使っていると思われます。

What aiofiles does is delegate the file reading operations to a threadpool.

write() はブロッキング処理になるので、これを多用するとスレッドプールを消費してしまうはずです。


一方で run_in_executor() には下記のような記述があります。

バージョン 3.5.3 で変更: loop.run_in_executor() は内部で生成するスレッドプールエグゼキュータの max_workers を設定せず、代わりにスレッドプールエグゼキュータ (ThreadPoolExecutor) にデフォルト値を設定させるようになりました。

さらに ThreadPoolExecutor の方を見ると、おそらく最新の Python では下記がデフォルトになっていると思われます。

バージョン 3.8 で変更: Default value of max_workers is changed to min(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.

これは、Codespace(CPU 4 コア) で確認すると下記のように最大ワーカースレッド数が 8 ということになります。

$ grep "model name" < /proc/cpuinfo
model name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz
model name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz
model name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz
model name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz

$ python3
Python 3.8.10 (default, Jun 22 2022, 20:18:18) 
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> min(32, os.cpu_count() + 4)

$ python -c 'import os; print(min(32, os.cpu_count() + 4))'

こちらの「最大ワーカスレッド数に余裕がないか」の検証方法が分からず検証出来ていないのですが、「I/O の並行数を減らしてみる」という部分は以下の記事を参考にSemaphoreを用いて検証してみました。

async def async_create_file_with_data_sema(path: Path, data: bytes, sema: asyncio.Semaphore) -> None:
    async with, "wb") as f, sema:
        await f.write(data)

# テスト対象コード
async def async_create_multi_files_with_data_sema(paths: list[Path], data: bytes, limit: int) -> None:
    sema = asyncio.Semaphore(limit)
    await asyncio.gather(*[async_create_file_with_data_sema(path, data, sema) for path in paths])

# テストコード
async def test_async_create_multi_files_with_data_sema(aio_benchmark, named_tempfiles, huge_bytes):
    async def _():
        await async_create_multi_files_with_data_sema(
            [Path( for named_tempfile in named_tempfiles], huge_bytes, 12

limit数は12以外にも4, 8でも検証しましたが、結果としては速度は改善せずでした。。。




想定していた通りですが、もう少し単純に「CREATE_FILE_NUM を最大ワーカースレッド数 -3 くらいにしてみたらいかかでしょうか」という意味で書いていました。
(-3 はとくに意味があるわけではなく、メインスレッドの他に pytest が何かスレッドを生成している可能性を考慮しての値です)


前述の Codespace(最大ワーカースレッド数 8)の環境で CREATE_FILE_NUM = 5 にしてみましたがこちらでも違いは出ませんでした。


pytest -k 'multi' で実行しています。

----------------------------------------------------------------------------------------- benchmark: 2 tests -----------------------------------------------------------------------------------------
Name (time in s)                               Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
test_create_multi_files_with_data           6.5082 (1.0)      6.6423 (1.01)     6.5706 (1.0)      0.0513 (2.17)     6.5615 (1.0)      0.0734 (1.62)          2;0  0.1522 (1.0)           5           1
test_async_create_multi_files_with_data     6.5482 (1.01)     6.5976 (1.0)      6.5707 (1.00)     0.0236 (1.0)      6.5645 (1.00)     0.0453 (1.0)           1;0  0.1522 (1.00)          5           1

I/O 性能の頭打ちかと思い、iotop で様子を眺めていたのですが、同期と非同期どちらも似たような感じになります。

同期処理時の iostat のスクリーンショット

非同期理時の iostat のスクリーンショット
非同期処理時(write にあわせてスレッドが増えています)

スクリーンショットだと非同期の方が良い値ですが、どちらも Current は 200 M/s を前後しています。

(これ以降も CREATE_FILE_NUM = 5 で試しています)

$ df /tmp /home/vscode/tmp/
Filesystem     1K-blocks    Used Available Use% Mounted on
/dev/sda1       32845584     352  31151232   1% /tmp
overlay         32847680 2353564  28800020   8% /

偶数のときは /tmp、奇数のときは /home/vscode/tmp に書き込みます。

def named_tempfiles():
    files = [NamedTemporaryFile(
        "w", dir="/tmp" if i % 2 == 0 else "/home/vscode/tmp") for i in range(CREATE_FILE_NUM)]
    # files = [NamedTemporaryFile("w", dir="/home/vscode/tmp")
    #          for _ in range(CREATE_FILE_NUM)]
    # files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]
    yield files
    for fw in files:
----------------------------------------------------------------------------------------- benchmark: 2 tests -----------------------------------------------------------------------------------------
Name (time in s)                               Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
test_create_multi_files_with_data           3.8486 (1.0)      4.0886 (1.02)     3.9493 (1.0)      0.1022 (2.20)     3.9499 (1.0)      0.1736 (2.50)          1;0  0.2532 (1.0)           5           1
test_async_create_multi_files_with_data     3.8907 (1.01)     4.0031 (1.0)      3.9608 (1.00)     0.0464 (1.0)      3.9607 (1.00)     0.0695 (1.0)           1;0  0.2525 (1.00)          5           1

もっと性能良くないとダメかなということで、続いて tmpfs(メモリー上に作られるファイルシステムと思ってください)をマウントし試してみるとようやく違いがでました。

$ sudo mount -t tmpfs -o size=4000m tmpfs /home/vscode/tmp/

$ df /home/vscode/tmp/
Filesystem     1K-blocks  Used Available Use% Mounted on
tmpfs            4096000     0   4096000   0% /home/vscode/tmp

全部のファイルを tmpfs へ書き込みます。

def named_tempfiles():
    # files = [NamedTemporaryFile(
    #     "w", dir="/tmp" if i % 2 == 0 else "/home/vscode/tmp") for i in range(CREATE_FILE_NUM)]
    files = [NamedTemporaryFile("w", dir="/home/vscode/tmp")
             for _ in range(CREATE_FILE_NUM)]
    # files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]
    yield files
    for fw in files:
--------------------------------------------------------------------------------------------- benchmark: 2 tests ----------------------------------------------------------------------------------------------
Name (time in ms)                                Min                 Max                Mean             StdDev              Median                IQR            Outliers     OPS            Rounds  Iterations
test_async_create_multi_files_with_data     270.3285 (1.0)      283.7868 (1.0)      275.1938 (1.0)       5.4192 (1.0)      275.0887 (1.0)       7.2456 (1.0)           1;0  3.6338 (1.0)           5           1
test_create_multi_files_with_data           647.1230 (2.39)     683.4950 (2.41)     660.6459 (2.40)     14.2596 (2.63)     655.9037 (2.38)     18.3033 (2.53)          1;0  1.5137 (0.42)          5           1

5 倍まではいきませんが、速くはなったと言える値かと思います。


threadPool = ThreadPoolExecutor(max_workers=2)

async def async_create_file_with_data(path: Path, data: bytes) -> None:
    # async with, "wb") as f:
    async with, "wb", executor=threadPool) as f:
        await f.write(data)

---------------------------------------------------------------------------------------------- benchmark: 2 tests ----------------------------------------------------------------------------------------------
Name (time in ms)                                Min                 Max                Mean             StdDev              Median                IQR            Outliers     OPS            Rounds  Iterations
test_async_create_multi_files_with_data     385.8565 (1.0)      433.2469 (1.0)      397.1514 (1.0)      20.2649 (2.14)     387.9202 (1.0)      14.3998 (1.95)          1;1  2.5179 (1.0)           5           1
test_create_multi_files_with_data           649.6608 (1.68)     673.3505 (1.55)     656.7036 (1.65)      9.4867 (1.0)      652.8568 (1.68)      7.3829 (1.0)           1;1  1.5228 (0.60)          5           1



  • Python でファイル I/O を並行化するにはスレッドを使う必要がある
    • aiofiles でも内部的にはスレッドを使っている
  • Python の write をマルチスレッド化しても(おそらくは)I/O の性能に左右される
    • tmpfs などの高速な同時アクセス性能が良いファイルシステムを使うと速度が向上しスレッド数が影響してくる

ただし、I/O 性能が原因という決定的な証拠も見つからなかったというのが正直なところです(「I/O 時にはスレッドのロックが開放されるので速くなるけど、やっぱりファイルの操作は遅いよ」という記述が見つかる程度です)。

最終的に何がネックなっているのか(Python のスレッド制御なのか、I/O性能なのかなど)はイマイチはっきりしていないのが申し訳ないのですが、試した範囲でわかったことをまとめてみました。

下記のコメントに追記しましたが、今回の環境では I/O 性能(同時アクセスの性能)がネックで並行化しても思ったように速くならなかったと考えられます。



本当に I/O が頭打ちだったのか気になったので dd コマンドを使って簡単に試してみました。

実行環境は上記のベンチマークを動かした Codespace を使っています。

(/tmp/test?.tmp\0 を 2**28 バイト書き出しています)

dd if=/dev/zero bs=268435456 count=1 of=/tmp/test1.tmp
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test2.tmp
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test3.tmp
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test4.tmp
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test5.tmp

新規でファイル作成すると 900MB/s くらいはでます(後半、なぜ遅くなっているかは不明です)

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.289604 s, 927 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.288978 s, 929 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.291378 s, 921 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.90479 s, 297 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.08273 s, 248 MB/s

上書き(削除 & 作成)すると 200MB/s 前後になります。

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.07726 s, 249 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.28004 s, 210 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.27764 s, 210 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.20428 s, 223 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.37003 s, 196 MB/s


dd if=/dev/zero bs=268435456 count=1 of=/tmp/test1.tmp &
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test2.tmp &
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test3.tmp &
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test4.tmp &
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test5.tmp

こちらも最初の 1 回は速くなりますが、5 で割ったような値になります。

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.949643 s, 283 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.44472 s, 186 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.61763 s, 166 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.81 s, 148 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 2.26454 s, 119 MB/s

上書き(削除 & 作成)すると 200MB/s を 5 で割ったよりかは少し良い値ですが、全体の経過時間では 6.3 秒なのでベンチマークとあまり違いはありません。

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 5.0185 s, 53.5 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 5.07003 s, 52.9 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 5.9657 s, 45.0 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 6.2768 s, 42.8 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 6.38367 s, 42.1 MB/s

以上のように asyncio のベンチマークのときと比べて大きな違いはないようでした。

このことから、開発マシンの /tmp などに書き出した場合は asyncio に限らずファイル書き出しの並行化のメリットはあまり受けられないことになるかと思われます。

一方で tmpfs の場合は並行化してもパフォーマンスは 5 で割った値にまでは落ち込みませんでした。


1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.223789 s, 1.2 GB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.226139 s, 1.2 GB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.235712 s, 1.1 GB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.233061 s, 1.2 GB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.237924 s, 1.1 GB/s


1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.338262 s, 794 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.417389 s, 643 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.403607 s, 665 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.498165 s, 539 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.49274 s, 545 MB/s


よって、asycio などによる並行処理でファイル処理を並行化した場合、このような同時アクセスで性能が落ちないストレージならばある程度の速度向上が見込めると考えられます。



asycio などによる並行処理でファイル処理を並行化した場合、このような同時アクセスで性能が落ちないストレージならばある程度の速度向上が見込めると考えられます。

なるほど、asyncio, aiofilesの問題というよりはファイルシステム側の問題の可能性が高そうですね。記事本文内でも紹介したaiofilesのIssueでは「処理が早くはならないかも知れないが、メインスレッドで別の処理を行えることが出来るのが利点だ」という記述があり、「処理は早くならないの...?なぜ?」という疑問があったのですが、腑に落ちる原因がありそうでスッキリしました。


私も漠然と「ファイル I/O も並行化すればある程度は速くなるだろう」と考えていたので、実際に手を動かして検証できたのは良い体験でした。



ベンチマーク取ってませんが、CPU Boundのasync_sigma関数は内部で待ち状態が発生しないので無意味な比較になっていると思います。内部で待たないのでasyncio.gatherしても別のtaskへの移動は発生せず逐次実行になります。


async def async_sigma(num: int) -> int:
    print(f"async sigma({num}) start")
    v = 0
    for i in range(num + 1):
        print(f"async sigma({num})", i)
        v += i
    print(f"async sigma({num}) end")
    return v

async def async_multi_sigma(nums: list[int]) -> list[int]:
    return await asyncio.gather(*[async_sigma(num) for num in nums])

await async_multi_sigma([5, 3, 2])
async sigma(5) start
async sigma(5) 0
async sigma(5) 1
async sigma(5) 2
async sigma(5) 3
async sigma(5) 4
async sigma(5) 5
async sigma(5) end
async sigma(3) start
async sigma(3) 0
async sigma(3) 1
async sigma(3) 2
async sigma(3) 3
async sigma(3) end
async sigma(2) start
async sigma(2) 0
async sigma(2) 1
async sigma(2) 2
async sigma(2) end
[0]: [15, 6, 3]

並行(concurrent)処理になるように中で待たせる簡単な方法としては、await asyncio.sleep(0)をforの中で呼ぶと一時待ち状態になり別のtaskに移動して並行処理になります。

async def async_sigma(num: int) -> int:
    print(f"async sigma({num}) start")
    v = 0
    for i in range(num + 1):
        print(f"async sigma({num})", i)
        v += i
        await asyncio.sleep(0)
    print(f"async sigma({num}) end")
    return v

async def async_multi_sigma(nums: list[int]) -> list[int]:
    return await asyncio.gather(*[async_sigma(num) for num in nums])

await async_multi_sigma([5, 3, 2])
async sigma(5) start
async sigma(5) 0
async sigma(3) start
async sigma(3) 0
async sigma(2) start
async sigma(2) 0
async sigma(5) 1
async sigma(3) 1
async sigma(2) 1
async sigma(5) 2
async sigma(3) 2
async sigma(2) 2
async sigma(5) 3
async sigma(3) 3
async sigma(2) end
async sigma(5) 4
async sigma(3) end
async sigma(5) 5
async sigma(5) end
[1]: [15, 6, 3]


また、上のコメントで速くなっていたのはシングルスレッドの並行処理でなくマルチスレッド(ThreadPool)による並列(parallel)処理になっているからだと思われます。一方でCPU Boundの処理はProcessPoolに入れると速くなるのでおそらく実感できるはずです。(下記のドキュメント参照)

このawaitable loop.run_in_executor(executor, func, *args)はおそらく低水準APIに設計されているようなのですがもう少し雑に使える高水準APIとしてcoroutine asyncio.to_thread(func, /, *args, **kwargs)があります。おそらく我々がgatherに求めてるのはこいつで、与えた 関数(コルーチンじゃダメ) がスレッドになり、gatherにまとめて渡すと並列(parallel)実行になります。これでも速度の実感ができるはずです。
loop.run_in_executor(executor, func, *args)のThreadPoolを使った場合とどういう差があるのかまでは自分もよくわかってませんが)