asyncioで並行処理すると本当に速くなるのか、実際にベンチマークしてみた

2022/10/04に公開7

こんにちわ alivelimb です。
FastAPI などで見える機会が増えたasyncioですが、本当に恩恵があるのかベンチマークテストしてみました。

はじめに

「そもそもasyncioって何?」という方のために簡単に紹介してみます。
詳細は公式ドキュメントまたは@JunyaFffさんのスライドが非常にわかりやすいです。

asyncio とは?

asyncio はその名の通り非同期(async) I/O の実装に活用できます。
ネットワーク通信を含む Input/Ouput の際は処理待ちが発生し CPU を持て余してしまいます。

blocking

File I/O の間 CPU を別の処理に割り当てることで CPU をフル活用させることができます。

non-blocking

フル活用と言ってもasyncio単体では 1 スレッドの CPU 使用率が 100%ということです。
multiprocessing, joblibなどのマルチプロセスを行う処理では CPU の全ての論理コアで CPU を活用できます。

ではマルチプロセスの方が良いのか?というとそういう訳ではありません。

プログラムの処理速度が CPU 速度に大きく依存する場合(CPU bound と呼んだりします)、マルチプロセスが適しています。使える CPU がたくさんあった方が良いからですね。

逆に I/O 速度に大きく依存する場合(I/O bound と呼んだりします)、非同期 I/O が適しています。CPU がたくさんあっても、CPU が暇を持て余すだけだからですね。

また、マルチプロセスと非同期 I/O は組み合わせることもできますが、本記事ではこちらの検証は行ないません。興味のある方は英語ですが以下の記事を参照してください。

どうやって実装するの?

asyncioを用いて 3 つの sleep を並行処理してみましょう。

まずは普通に書くと以下のようになります。

import time

periods = [1, 2, 3]
for period in periods:
    time.sleep(period)

1, 2, 3 秒ずつ sleep するため処理が終了するには合計 6 秒かかります。

次にasyncioを用いると以下のようになります。

import asyncio

# コルーチン関数の定義
async def async_multi_sleep(periods: list[float]) -> None:
    for period in periods:
        await asyncio.sleep(period)

periods = [1.0, 2.0, 3.0]

# コルーチンの実行
asyncio.run(async_multi_sleep(periods))

上記の通り、大まかの手順としては以下の通りです。

  1. async defでコルーチン関数を定義する
  2. asyncioでコルーチンを実行する

ただし、上記を実行しても合計時間は 6 秒かかります。
コルーチンを並行処理するには Task を作成する必要があります。

import asyncio

# コルーチン関数の定義
async def async_multi_sleep(periods: list[float]) -> None:
    tasks = []

    # タスクの作成
    for period in periods:
        task = asyncio.create_task(asyncio.sleep(period))
        tasks.append(task)

    ´# 他のタスクに順番を譲る
    for task in tasks:
        await task

periods = [1.0, 2.0, 3.0]

# コルーチンの実行
asyncio.run(async_multi_sleep(periods))

上記で実装した場合、3 秒で処理が終了します。
なお、上記の内容はasyncio.gatherを用いて以下のように実装することもできます。

async def async_multi_sleep(periods: list[float]) -> None:
    await asyncio.gather(*[asyncio.sleep(period) for period in periods])

periods = [1.0, 2.0, 3.0]

# コルーチンの実行
asyncio.run(async_multi_sleep(periods))

benchmark

ベンチマークテストとして処理時間を計測します。
テスト対象としては以下の通りです。

No 内容 利用パッケージ
1 スリープ asyncio.sleep
2 File I/O aiofiles
3 Network I/O - HTTP aiohttp
4 Network I/O - DB aiosqlite

また、各処理に CPU 処理を組み合わせてテストを行います。
テストツールとしては以下を利用します。

  • pytest
  • pytest-benchmark
  • pytest-asyncio
  • pytest-aiohttp

なお、pytest-benchmarkは本記事の執筆時点でasyncioに対応していません。そのためGithub の Issueを参考に以下のような fixture を作成して対応しました。

@pytest.fixture(scope="function")
def aio_benchmark(benchmark):
    import asyncio
    import threading

    class Sync2Async:
        def __init__(self, coro, *args, **kwargs):
            self.coro = coro
            self.args = args
            self.kwargs = kwargs
            self.custom_loop = None
            self.thread = None

        def start_background_loop(self) -> None:
            asyncio.set_event_loop(self.custom_loop)
            self.custom_loop.run_forever()

        def __call__(self):
            evloop = None
            awaitable = self.coro(*self.args, **self.kwargs)
            try:
                evloop = asyncio.get_running_loop()
            except:
                pass
            if evloop is None:
                return asyncio.run(awaitable)
            else:
                if not self.custom_loop or not self.thread or not self.thread.is_alive():
                    self.custom_loop = asyncio.new_event_loop()
                    self.thread = threading.Thread(target=self.start_background_loop, daemon=True)
                    self.thread.start()

                return asyncio.run_coroutine_threadsafe(awaitable, self.custom_loop).result()

    def _wrapper(func, *args, **kwargs):
        if asyncio.iscoroutinefunction(func):
            benchmark(Sync2Async(func, *args, **kwargs))
        else:
            benchmark(func, *args, **kwargs)

    return _wrapper

benchmark: sleep

まずは既に紹介したスリープについてです。

テスト対象コード

import asyncio
from time import sleep

# -----------------------------------------------------------
# 指定された時間スリープする
# -----------------------------------------------------------
def multi_sleep(periods: list[float]) -> None:
    for period in periods:
        sleep(period)


async def async_multi_sleep(periods: list[float]) -> None:
    for period in periods:
        await asyncio.sleep(period)


async def async_multi_sleep_gather(periods: list[float]) -> None:
    await asyncio.gather(*[asyncio.sleep(period) for period in periods])

テストコード

import pytest
from c001_asyncio import (
    async_multi_sleep,
    async_multi_sleep_gather,
    async_multi_sleep_gather_from_blocking,
    multi_sleep,
)

SLEEP_PERIODS = [0.2, 0.2, 0.2]

# -----------------------------------------------------------
# 指定された時間スリープするテスト
# -----------------------------------------------------------
def test_multi_sleep(benchmark):
    @benchmark
    def _():
        multi_sleep(SLEEP_PERIODS)


@pytest.mark.asyncio
async def test_async_multi_sleep(aio_benchmark):
    @aio_benchmark
    async def _():
        await async_multi_sleep(SLEEP_PERIODS)


@pytest.mark.asyncio
async def test_async_multi_sleep_gather(aio_benchmark):
    @aio_benchmark
    async def _():
        await async_multi_sleep_gather(SLEEP_PERIODS)

結果は以下の通りです。

---------------------------------------------------------------------------------------- benchmark: 3 tests ----------------------------------------------------------------------------------------
Name (time in ms)                      Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_async_multi_sleep_gather     201.6565 (1.0)      202.6901 (1.0)      202.3728 (1.0)      0.4125 (1.0)      202.5256 (1.0)      0.3803 (1.0)           1;0  4.9414 (1.0)           5           1
test_async_multi_sleep            603.5638 (2.99)     616.0021 (3.04)     606.5454 (3.00)     5.3204 (12.90)    604.3013 (2.98)     4.1270 (10.85)         1;1  1.6487 (0.33)          5           1
test_multi_sleep                  606.8920 (3.01)     611.6967 (3.02)     608.6299 (3.01)     1.8088 (4.39)     608.0931 (3.00)     1.4615 (3.84)          1;1  1.6430 (0.33)          5           1
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

Mean 列に注目すると、想定通りtest_async_multi_sleep_gatherのみ並行処理の恩恵を受けて約 200ms で終了しています。ただ、実際のプロダクトで sleep することはあまりないと思うので、別のパターンも見ていきましょう。

benchmark: aiofiles

次は File I/O です。
非同期の File I/O にはaiofilesを用います。
今回は 512MB のデータを 10 個のファイルに書き込む処理でベンチマークをしてみます。

テスト対象コード

import asyncio
from pathlib import Path

import aiofiles

# -----------------------------------------------------------
# 指定されたデータをファイルに書き込む
# -----------------------------------------------------------
def create_file_with_data(path: Path, data: bytes) -> None:
    with path.open("wb") as f:
        f.write(data)


async def async_create_file_with_data(path: Path, data: bytes) -> None:
    async with aiofiles.open(path.as_posix(), "wb") as f:
        await f.write(data)

# -----------------------------------------------------------
# 指定されたデータを指定された数だけファイルに書き込む
# -----------------------------------------------------------
def create_multi_files_with_data(paths: list[Path], data: bytes) -> None:
    for path in paths:
        create_file_with_data(path, data)


async def async_create_multi_files_with_data(paths: list[Path], data: bytes) -> None:
    await asyncio.gather(*[async_create_file_with_data(path, data) for path in paths])

テストコード

from pathlib import Path
from tempfile import NamedTemporaryFile

import pytest
from c002_aiofiles import (
    async_create_file_with_data,
    async_create_multi_files_with_data,
    create_file_with_data,
    create_multi_files_with_data,
)

CREATE_FILE_SIZE = 2**28
CREATE_FILE_NUM = 10


@pytest.fixture(scope="function")
def named_tempfile():
    fw = NamedTemporaryFile("w")
    yield fw
    fw.close()


@pytest.fixture(scope="function")
def named_tempfiles():
    files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]
    yield files
    for fw in files:
        fw.close()


@pytest.fixture(scope="module")
def huge_bytes():
    return b"\0" * CREATE_FILE_SIZE


# -----------------------------------------------------------
# 指定されたデータをファイルに書き込むテスト
# -----------------------------------------------------------
def test_create_file_with_data(benchmark, named_tempfile, huge_bytes):
    @benchmark
    def _():
        create_file_with_data(Path(named_tempfile.name), huge_bytes)


@pytest.mark.asyncio
async def test_async_create_file_with_data(aio_benchmark, named_tempfile, huge_bytes):
    @aio_benchmark
    async def _():
        await async_create_file_with_data(Path(named_tempfile.name), huge_bytes)


# -----------------------------------------------------------
# 指定されたデータを指定された数だけファイルに書き込むテスト
# -----------------------------------------------------------
def test_create_multi_files_with_data(benchmark, named_tempfiles, huge_bytes):
    @benchmark
    def _():
        create_multi_files_with_data(
            [Path(named_tempfile.name) for named_tempfile in named_tempfiles],
            huge_bytes,
        )


@pytest.mark.asyncio
async def test_async_create_multi_files_with_data(aio_benchmark, named_tempfiles, huge_bytes):
    @aio_benchmark
    async def _():
        await async_create_multi_files_with_data(
            [Path(named_tempfile.name) for named_tempfile in named_tempfiles],
            huge_bytes,
        )

結果は以下の通りです

--------------------------------------------------------------------------------------------------- benchmark: 4 tests --------------------------------------------------------------------------------------------------
Name (time in ms)                                  Min                   Max                  Mean             StdDev                Median                 IQR            Outliers     OPS            Rounds  Iterations
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_async_create_file_with_data               92.4087 (1.0)        198.6375 (1.07)       127.5377 (1.0)      37.2938 (1.10)       108.2839 (1.0)       61.2919 (1.15)          3;0  7.8408 (1.0)          11           1
test_create_file_with_data                     99.5028 (1.08)       185.3546 (1.0)        150.5592 (1.18)     37.0968 (1.09)       158.6906 (1.47)      63.7308 (1.19)          1;0  6.6419 (0.85)          5           1
test_async_create_multi_files_with_data     1,171.8733 (12.68)    1,254.7769 (6.77)     1,219.3601 (9.56)     33.9193 (1.0)      1,231.4451 (11.37)     53.3607 (1.0)           2;0  0.8201 (0.10)          5           1
test_create_multi_files_with_data           1,175.0913 (12.72)    1,327.1361 (7.16)     1,242.5917 (9.74)     67.4861 (1.99)     1,232.8131 (11.39)    122.8071 (2.30)          2;0  0.8048 (0.10)          5           1
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

!?

これは想定とは異なります。
File I/O 段階では sleep と同様に待ちが発生するため、並列処理を行なった方が 10 倍近く早くなる想定でしたが結果は異なり、処理速度に大きな違いは見られませんでした。

aiofiles の Issueを見ると「処理が早くはならないかも知れないが、メインスレッドで別の処理を行えることが出来るのが利点だ」との記述があります。腑に落ちないですが、CPU 処理も組み合わせてみましょう。

組み合わせる前に、まずは CPU 処理のベンチマークをとってみましょう。

CPU 処理は以下の通り、単純な数列和(高校数学で習うシグマ)です。
1000 万までのシグマを 3 回計算します。

# -----------------------------------------------------------
# 指定された整数までの総和を計算する
# -----------------------------------------------------------
def sigma(num: int) -> int:
    v = 0
    for i in range(num + 1):
        v += i
    return v


async def async_sigma(num: int) -> int:
    v = 0
    for i in range(num + 1):
        v += i
    return v


# -----------------------------------------------------------
# 指定された各整数までの総和を計算する
# -----------------------------------------------------------
def multi_sigma(nums: list[int]) -> list[int]:
    return [sigma(num) for num in nums]


async def async_multi_sigma(nums: list[int]) -> list[int]:
    return await asyncio.gather(*[async_sigma(num) for num in nums])

テストコード

from c002_aiofiles import (
    async_multi_sigma,
    async_sigma,
    multi_sigma,
    sigma,
)

N_SIGMA = 10**7
V_SIGMA = 50000005000000
N_SIGMA_TIMES = 3

# -----------------------------------------------------------
# 指定された整数までの総和を計算するテスト
# -----------------------------------------------------------
def test_sigma(benchmark):
    @benchmark
    def _():
        assert sigma(N_SIGMA) == V_SIGMA


@pytest.mark.asyncio
async def test_async_sigma(aio_benchmark):
    @aio_benchmark
    async def _():
        assert await async_sigma(N_SIGMA) == V_SIGMA


# -----------------------------------------------------------
# 指定された各整数までの総和を計算するテスト
# -----------------------------------------------------------
def test_multi_sigma(benchmark):
    @benchmark
    def _():
        result = multi_sigma([N_SIGMA for _ in range(N_SIGMA_TIMES)])
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]


@pytest.mark.asyncio
async def test_async_multi_sigma(aio_benchmark):
    @aio_benchmark
    async def _():
        result = await async_multi_sigma([N_SIGMA for _ in range(N_SIGMA_TIMES)])
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]

結果は以下の通りです。

------------------------------------------------------------------------------------------ benchmark: 4 tests -----------------------------------------------------------------------------------------
Name (time in ms)                 Min                   Max                  Mean             StdDev                Median                IQR            Outliers     OPS            Rounds  Iterations
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_async_sigma             480.9756 (1.0)        499.0507 (1.0)        492.1004 (1.0)       6.7240 (1.0)        493.6568 (1.0)       6.3340 (1.0)           2;0  2.0321 (1.0)           5           1
test_sigma                   500.9957 (1.04)       519.9479 (1.04)       511.1150 (1.04)      7.5248 (1.12)       512.2812 (1.04)     11.8297 (1.87)          2;0  1.9565 (0.96)          5           1
test_async_multi_sigma     1,466.5203 (3.05)     1,494.5071 (2.99)     1,479.8562 (3.01)     10.9483 (1.63)     1,481.6104 (3.00)     16.4346 (2.59)          2;0  0.6757 (0.33)          5           1
test_multi_sigma           1,492.3126 (3.10)     1,547.9187 (3.10)     1,523.0065 (3.09)     27.9279 (4.15)     1,534.3684 (3.11)     53.7643 (8.49)          2;0  0.6566 (0.32)          5           1
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

並列処理によって処理速度に違いはありませんが、こちらは想定通りです。今回の処理は CPU bound なため、CPU に待ちが発生せず並列処理の恩恵を受けられません。では I/O 処理と組み合わせるとどうでしょうか?

テスト対象コード

# -----------------------------------------------------------
# File I/O と CPUタスクを行う(データ指定)
# -----------------------------------------------------------
def mix_io_cpu_with_data(paths: list[Path], data: bytes, nums: list[int]) -> list[int]:
    create_multi_files_with_data(paths, data)
    return multi_sigma(nums)


async def async_mix_io_cpu_with_data(paths: list[Path], data: bytes, nums: list[int]) -> list[int]:
    result = await asyncio.gather(
        async_create_multi_files_with_data(paths, data),
        async_multi_sigma(nums),
    )
    return result[1]

テストコード

from c002_aiofiles import (
    async_mix_io_cpu_with_data,
    mix_io_cpu_with_data,
)

# -----------------------------------------------------------
# File I/O と CPUタスクを行うテスト(データ指定)
# -----------------------------------------------------------
def test_mix_io_cpu_with_data(benchmark, named_tempfiles, huge_bytes):
    @benchmark
    def _():
        result = mix_io_cpu_with_data(
            [Path(named_tempfile.name) for named_tempfile in named_tempfiles],
            huge_bytes,
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        )
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]


@pytest.mark.asyncio
async def test_async_mix_io_cpu_with_data(aio_benchmark, named_tempfiles, huge_bytes):
    @aio_benchmark
    async def _():
        result = await async_mix_io_cpu_with_data(
            [Path(named_tempfile.name) for named_tempfile in named_tempfiles],
            huge_bytes,
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        )
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]

結果は以下の通りです。

------------------------------------------------------------------------------------- benchmark: 2 tests -------------------------------------------------------------------------------------
Name (time in s)                       Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_async_mix_io_cpu_with_data     2.4868 (1.0)      2.6220 (1.0)      2.5505 (1.0)      0.0528 (1.0)      2.5410 (1.0)      0.0798 (1.0)           2;0  0.3921 (1.0)           5           1
test_mix_io_cpu_with_data           2.6234 (1.05)     2.9007 (1.11)     2.8053 (1.10)     0.1114 (2.11)     2.8472 (1.12)     0.1423 (1.78)          1;0  0.3565 (0.91)          5           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

asyncio を使った方が、やや早くなっていますが劇的な変化は見られませんでした。。。
私自身、腑に落ちる理由が分かっていないので、心当たりのある方はコメントで教えていただけると幸いです。

benchmark: aiohttp

次にaiohttpを用いたネットワーク I/O (HTTP)のベンチマークを行ってみましょう。まずアクセスされたら指定の時間だけスリープする API を作成します。

def create_app() -> web.Application:
    async def handle(request: Any) -> web.Response:
        period = float(request.rel_url.query["period"])
        await asyncio.sleep(period)
        return web.Response()

    app = web.Application()
    app.add_routes([web.get("/sleep", handle)])
    return app


if __name__ == "__main__":
    app = create_app()
    web.run_app(app)

これとは別プロセスで HTTP リクエストを行います。

# -----------------------------------------------------------
# /sleepリソースに停止時間を指定してHTTPリクエストを行う
# -----------------------------------------------------------
def sleep_request(url: str, period: float) -> int:
    with urllib.request.urlopen(f"{url}/sleep?period={period}") as response:
        return response.status


async def async_sleep_request(url: str, period: float) -> int:
    async with aiohttp.ClientSession() as session:
        async with session.get(f"{url}/sleep?period={period}") as response:
            return response.status


# -----------------------------------------------------------
# /sleepリソースに停止時間を指定した数だけHTTPリクエストを行う
# -----------------------------------------------------------
def multi_sleep_requests(url: str, periods: list[float]) -> list[int]:
    return [sleep_request(url, period) for period in periods]


async def async_multi_sleep_requests(url: str, periods: list[float]) -> list[int]:
    return await asyncio.gather(*[async_sleep_request(url, period) for period in periods])

これまでのようにベンチマークを実施してみましょう。

SERVER_URL = "http://0.0.0.0:8080"

def test_sleep_request(benchmark):
    @benchmark
    def _():
        status = sleep_request(SERVER_URL, 0.2)
        assert status == 200


@pytest.mark.asyncio
async def test_async_sleep_request(aio_benchmark):
    @aio_benchmark
    async def _():
        status = await async_sleep_request(SERVER_URL, 0.2)
        assert status == 200


def test_multi_sleep_requests(benchmark):
    @benchmark
    def _():
        status = multi_sleep_requests(SERVER_URL, [0.2, 0.2, 0.2])
        assert status == [200, 200, 200]


@pytest.mark.asyncio
async def test_async_multi_sleep_requests(aio_benchmark):
    @aio_benchmark
    async def _():
        status = await async_multi_sleep_requests(SERVER_URL, [0.2, 0.2, 0.2])
        assert status == [200, 200, 200]

結果は以下の通りです。

----------------------------------------------------------------------------------------- benchmark: 4 tests -----------------------------------------------------------------------------------------
Name (time in ms)                        Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_sleep_request                  202.4012 (1.0)      203.9504 (1.0)      203.4526 (1.0)      0.6106 (1.38)     203.6606 (1.0)      0.5861 (1.0)           1;0  4.9151 (1.0)           5           1
test_async_sleep_request            205.5099 (1.02)     207.0953 (1.02)     206.3653 (1.01)     0.7193 (1.63)     206.6543 (1.01)     1.2889 (2.20)          2;0  4.8458 (0.99)          5           1
test_async_multi_sleep_requests     208.4528 (1.03)     209.4410 (1.03)     208.8828 (1.03)     0.4413 (1.0)      208.6549 (1.02)     0.7507 (1.28)          1;0  4.7874 (0.97)          5           1
test_multi_sleep_requests           609.3281 (3.01)     623.3630 (3.06)     613.3552 (3.01)     5.7904 (13.12)    610.7264 (3.00)     5.9304 (10.12)         1;0  1.6304 (0.33)          5           1
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

こちらは想定通りの動きをしています。HTTP リクエストをしている間に並行処理が走っているため、200ms x 3 = 600msではなく、1 回分の 200ms で処理が終わっています。

ではクライアント側で重めの CPU 処理を回している場合はどうでしょうか。

N_SIGMA = 10**5
V_SIGMA = 5000050000
N_SIGMA_TIMES = 3


# -----------------------------------------------------------
# Network I/O と CPUタスクを行う
# -----------------------------------------------------------
def mix_network_io_cpu(url: str, periods: list[float], nums: list[int]) -> list[int]:
    multi_sleep_requests(url, periods)
    return multi_sigma(nums)


async def async_mix_network_io_cpu(url: str, periods: list[float], nums: list[int]) -> list[int]:
    result = await asyncio.gather(
        async_multi_sleep_requests(url, periods),
        async_multi_sigma(nums),
    )
    return result[1]

こちらもベンチマークを回してみましょう。

# -----------------------------------------------------------
# Network I/O と CPUタスクを行うテスト(サイズ指定)
# -----------------------------------------------------------
def test_mix_network_io_cpu(benchmark):
    @benchmark
    def _():
        result = mix_network_io_cpu(
            SERVER_URL,
            [1, 1, 1],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        )
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]


@pytest.mark.asyncio
async def test_async_mix_network_io_cpu(aio_benchmark):
    @aio_benchmark
    async def _():
        result = await async_mix_network_io_cpu(
            SERVER_URL,
            [1, 1, 1],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        )
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]

結果は以下の通りです。

------------------------------------------------------------------------------------ benchmark: 2 tests ------------------------------------------------------------------------------------
Name (time in s)                     Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_async_mix_network_io_cpu     1.0208 (1.0)      1.0343 (1.0)      1.0253 (1.0)      0.0053 (1.0)      1.0243 (1.0)      0.0053 (1.0)           1;0  0.9753 (1.0)           5           1
test_mix_network_io_cpu           3.0221 (2.96)     3.0393 (2.94)     3.0311 (2.96)     0.0072 (1.37)     3.0341 (2.96)     0.0119 (2.23)          2;0  0.3299 (0.34)          5           1
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

こちらも想定通り、並行処理の効果が効いていますね。
aiohttp を用いた Network I/O では、並行処理の恩恵で実際に処理が早くなることが確認できました。

ちなみに aiohttp ではaiohttp_clientという fixture を利用することが出来るため、本来であれば別プロセスでアプリを起動せず、pytest のみで完結するはずです。しかし以下の実装で試すとRuntimeError: Task got Future attached to a different loop.というエラーが出たため、今回は別プロセスでアプリを起動する方法で迂回しました。

# ※エラーが発生するコード
@pytest.mark.asyncio
async def test_sleep_request(aiohttp_client, aio_benchmark):
    @aio_benchmark
    async def _():
        app_client = await aiohttp_client(create_app())
        period = 1
        response = await app_client.get(f"/sleep?period={period}")
        assert response.status == 200

benchmark: aiosqlite

最後は aiosqlite を用いた DB 接続です。

これまで通り、まずはスリープから検証していきましょう。
DB にはスリープ関数がないため、自作関数を作成してスリープを実装します

# -----------------------------------------------------------
# DBにアクセスして指定した時間だけスリープする
# -----------------------------------------------------------
def sleep_db_request(period: float) -> None:
    with sqlite3.connect(":memory:") as db:
        db.create_function("sleep", 1, time.sleep)
        cur = db.cursor()
        cur.execute(f"SELECT sleep({period})")


async def async_sleep_db_request(period: float) -> None:
    async with aiosqlite.connect(":memory:") as db:
        await db.create_function("sleep", 1, time.sleep)
        cur = await db.cursor()
        await cur.execute(f"SELECT sleep({period})")


# -----------------------------------------------------------
# DBにアクセスして指定した時間の数だけスリープする
# -----------------------------------------------------------
def multi_sleep_db_requests(periods: list[float]) -> None:
    for period in periods:
        sleep_db_request(period)


async def async_multi_sleep_db_requests(periods: list[float]) -> None:
    await asyncio.gather(*[async_sleep_db_request(period) for period in periods])

テストケースも今まで通りです

def test_sleep_db_request(benchmark):
    @benchmark
    def _():
        sleep_db_request(0.2)


@pytest.mark.asyncio
async def test_async_sleep_db_request(aio_benchmark):
    @aio_benchmark
    async def _():
        await async_sleep_db_request(0.2)


def test_multi_sleep_requests(benchmark):
    @benchmark
    def _():
        multi_sleep_db_requests([0.2, 0.2, 0.2])


@pytest.mark.asyncio
async def test_async_multi_sleep_requests(aio_benchmark):
    @aio_benchmark
    async def _():
        await async_multi_sleep_db_requests([0.2, 0.2, 0.2])

結果は以下の通りです。

----------------------------------------------------------------------------------------- benchmark: 4 tests -----------------------------------------------------------------------------------------
Name (time in ms)                        Min                 Max                Mean            StdDev              Median               IQR            Outliers     OPS            Rounds  Iterations
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_sleep_db_request               201.3555 (1.0)      205.2846 (1.0)      203.7979 (1.0)      1.8179 (1.73)     204.9371 (1.0)      3.0303 (1.87)          1;0  4.9068 (1.0)           5           1
test_async_sleep_db_request         204.7722 (1.02)     207.3065 (1.01)     206.2676 (1.01)     1.0537 (1.0)      206.7830 (1.01)     1.6162 (1.0)           1;0  4.8481 (0.99)          5           1
test_async_multi_sleep_requests     209.6312 (1.04)     214.5691 (1.05)     211.9721 (1.04)     1.8173 (1.72)     212.0738 (1.03)     2.2477 (1.39)          2;0  4.7176 (0.96)          5           1
test_multi_sleep_requests           607.1996 (3.02)     611.5772 (2.98)     609.8215 (2.99)     2.2275 (2.11)     611.2378 (2.98)     4.0439 (2.50)          2;0  1.6398 (0.33)          5           1
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

こちらも aiohttp の時と同様、並列処理の恩恵を受けていますね。

念の為クライアント側で重めの CPU 処理を追加して検証します。

# -----------------------------------------------------------
# DB Network I/O と CPUタスクを行う
# -----------------------------------------------------------
def mix_db_io_cpu(periods: list[float], nums: list[int]) -> list[int]:
    multi_sleep_db_requests(periods)
    return multi_sigma(nums)


async def async_mix_db_io_cpu(periods: list[float], nums: list[int]) -> list[int]:
    result = await asyncio.gather(
        async_multi_sleep_db_requests(periods),
        async_multi_sigma(nums),
    )
    return result[1]

テストコードもほとんど変更はありません

# -----------------------------------------------------------
# DB Network I/O と CPUタスクを行うテスト(サイズ指定)
# -----------------------------------------------------------
def test_mix_db_io_cpu(benchmark):
    @benchmark
    def _():
        result = mix_db_io_cpu(
            [1, 1, 1],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        )
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]


@pytest.mark.asyncio
async def test_async_mix_db_io_cpu(aio_benchmark):
    @aio_benchmark
    async def _():
        result = await async_mix_db_io_cpu(
            [1, 1, 1],
            [N_SIGMA for _ in range(N_SIGMA_TIMES)],
        )
        assert result == [V_SIGMA for _ in range(N_SIGMA_TIMES)]

結果は以下の通りです。

---------------------------------------------------------------------------------- benchmark: 2 tests ---------------------------------------------------------------------------------
Name (time in s)                Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_async_mix_db_io_cpu     1.0220 (1.0)      1.0400 (1.0)      1.0344 (1.0)      0.0072 (1.0)      1.0371 (1.0)      0.0076 (1.0)           1;0  0.9668 (1.0)           5           1
test_mix_db_io_cpu           3.0259 (2.96)     3.0451 (2.93)     3.0336 (2.93)     0.0081 (1.13)     3.0320 (2.92)     0.0135 (1.78)          1;0  0.3296 (0.34)          5           1
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

やはり想定通り、aiosqlite でも並列処理の恩恵を受けることを確認できました。

まとめ

今回は 「asyncio で並列処理をすることで本当に処理が早くなるか」をベンチマークテストで確認しました。aiofiles のみ処理が速くなることは確認できませんでしたが、aiohttp, aiosqlite では並列処理の恩恵を受けることが確認できました。

FastAPI をはじめとする ASGI 対応の Web フレームワークでの DB 接続やネットワーク I/O では積極的に asyncio、コルーチンを使っていくことで処理の高速化が図れそうです。

aiofiles を用いたベンチマークで結果が想定通りにならなかった件については、そもそも aiofiles では処理が速くならないのか、それとも実装に問題があるのか、もしわかる方がいればコメント頂けると幸いです。

Discussion

hankei6kmhankei6km

aiofiles を用いたベンチマークで結果が想定通りにならなかった件については、そもそも aiofiles では処理が速くならないのか、それとも実装に問題があるのか、もしわかる方がいればコメント頂けると幸いです。

Python と asyncio を使ったのが数年前なので間違っていたらすみません。

今回のケースでは ThreadPoolExecutor の最大ワーカースレッド数に引っかかているのでないでしょうか?

issue の記述とソースをざっと読んだ感じでは aiofileswrite()run_in_executor() をデフォルトのエクゼキューターで使っていると思われます。

What aiofiles does is delegate the file reading operations to a threadpool.

https://github.com/Tinche/aiofiles/blob/6c29e3d98d7e25ae9f58f052086ac9bbd7633ddd/src/aiofiles/threadpool/utils.py#L41-L47

write() はブロッキング処理になるので、これを多用するとスレッドプールを消費してしまうはずです。

ちょっと趣旨は違いますが下記の記事が参考になるかと思います。

https://pod.hatenablog.com/entry/2019/03/21/162511

一方で run_in_executor() には下記のような記述があります。

https://docs.python.org/ja/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor

バージョン 3.5.3 で変更: loop.run_in_executor() は内部で生成するスレッドプールエグゼキュータの max_workers を設定せず、代わりにスレッドプールエグゼキュータ (ThreadPoolExecutor) にデフォルト値を設定させるようになりました。

さらに ThreadPoolExecutor の方を見ると、おそらく最新の Python では下記がデフォルトになっていると思われます。

https://docs.python.org/ja/3/library/concurrent.futures.html#threadpoolexecutor

バージョン 3.8 で変更: Default value of max_workers is changed to min(32, os.cpu_count() + 4). This default value preserves at least 5 workers for I/O bound tasks. It utilizes at most 32 CPU cores for CPU bound tasks which release the GIL. And it avoids using very large resources implicitly on many-core machines.

これは、Codespace(CPU 4 コア) で確認すると下記のように最大ワーカースレッド数が 8 ということになります。

$ grep "model name" < /proc/cpuinfo
model name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz
model name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz
model name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz
model name      : Intel(R) Xeon(R) Platinum 8168 CPU @ 2.70GHz

$ python3
Python 3.8.10 (default, Jun 22 2022, 20:18:18) 
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> min(32, os.cpu_count() + 4)
8

もしも測定されている環境での最大ワーカースレッド数に余裕がなさそうでしたら、I/O の並行数をら減らしてみるのはいかがでしょうか?

alivelimbalivelimb

コメント頂きありがとうございます!
(Pythonのthreadに関する理解が足りておらず、共有いただいた資料などを読ませて頂いていた都合で回答が遅れてしまいました...)

まず私の環境での最大ワーカスレッド数は以下で確認した通り12となります

$ python -c 'import os; print(min(32, os.cpu_count() + 4))'
12

もしも測定されている環境での最大ワーカースレッド数に余裕がなさそうでしたら、I/O の並行数を減らしてみるのはいかがでしょうか?

こちらの「最大ワーカスレッド数に余裕がないか」の検証方法が分からず検証出来ていないのですが、「I/O の並行数を減らしてみる」という部分は以下の記事を参考にSemaphoreを用いて検証してみました。

async def async_create_file_with_data_sema(path: Path, data: bytes, sema: asyncio.Semaphore) -> None:
    async with aiofiles.open(path.as_posix(), "wb") as f, sema:
        await f.write(data)

# テスト対象コード
async def async_create_multi_files_with_data_sema(paths: list[Path], data: bytes, limit: int) -> None:
    sema = asyncio.Semaphore(limit)
    await asyncio.gather(*[async_create_file_with_data_sema(path, data, sema) for path in paths])

# テストコード
@pytest.mark.asyncio
async def test_async_create_multi_files_with_data_sema(aio_benchmark, named_tempfiles, huge_bytes):
    @aio_benchmark
    async def _():
        await async_create_multi_files_with_data_sema(
            [Path(named_tempfile.name) for named_tempfile in named_tempfiles], huge_bytes, 12
        )

limit数は12以外にも4, 8でも検証しましたが、結果としては速度は改善せずでした。。。
「hankei6kmさんが想定していた検証方法ではないかも...?」とも思っているので、お時間ある際に再びコメント頂けると幸いです。

hankei6kmhankei6km

ご返答ありがとうございます。

こちらの書き方が曖昧だったばかりにお手間をとらせてしまってすみません。

limit数は12以外にも4, 8でも検証しましたが、結果としては速度は改善せずでした。。。
「hankei6kmさんが想定していた検証方法ではないかも...?」とも思っているので、お時間ある際に再びコメント頂けると幸いです。

想定していた通りですが、もう少し単純に「CREATE_FILE_NUM を最大ワーカースレッド数 -3 くらいにしてみたらいかかでしょうか」という意味で書いていました。
(-3 はとくに意味があるわけではなく、メインスレッドの他に pytest が何かスレッドを生成している可能性を考慮しての値です)

今回は予想だけで書いてしまうのも申し訳ないので実際に環境を作って試してみました。

https://github.com/hankei6km/test-python-asyncio

前述の Codespace(最大ワーカースレッド数 8)の環境で CREATE_FILE_NUM = 5 にしてみましたがこちらでも違いは出ませんでした。

# CREATE_FILE_NUM = 10
CREATE_FILE_NUM = 5

pytest -k 'multi' で実行しています。

----------------------------------------------------------------------------------------- benchmark: 2 tests -----------------------------------------------------------------------------------------
Name (time in s)                               Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_create_multi_files_with_data           6.5082 (1.0)      6.6423 (1.01)     6.5706 (1.0)      0.0513 (2.17)     6.5615 (1.0)      0.0734 (1.62)          2;0  0.1522 (1.0)           5           1
test_async_create_multi_files_with_data     6.5482 (1.01)     6.5976 (1.0)      6.5707 (1.00)     0.0236 (1.0)      6.5645 (1.00)     0.0453 (1.0)           1;0  0.1522 (1.00)          5           1
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

I/O 性能の頭打ちかと思い、iotop で様子を眺めていたのですが、同期と非同期どちらも似たような感じになります。

同期処理時の iostat のスクリーンショット
同期処理時

非同期理時の iostat のスクリーンショット
非同期処理時(write にあわせてスレッドが増えています)

スクリーンショットだと非同期の方が良い値ですが、どちらも Current は 200 M/s を前後しています。

それならばと、ためしに書き出すファイルを異なるボリュームに分散させてみましたが、同期と非同期ともに速くなってしまったのでこちらも違いはでませんでした。
(これ以降も CREATE_FILE_NUM = 5 で試しています)

$ df /tmp /home/vscode/tmp/
Filesystem     1K-blocks    Used Available Use% Mounted on
/dev/sda1       32845584     352  31151232   1% /tmp
overlay         32847680 2353564  28800020   8% /

偶数のときは /tmp、奇数のときは /home/vscode/tmp に書き込みます。

@pytest.fixture(scope="function")
def named_tempfiles():
    files = [NamedTemporaryFile(
        "w", dir="/tmp" if i % 2 == 0 else "/home/vscode/tmp") for i in range(CREATE_FILE_NUM)]
    # files = [NamedTemporaryFile("w", dir="/home/vscode/tmp")
    #          for _ in range(CREATE_FILE_NUM)]
    # files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]
    yield files
    for fw in files:
        fw.close()
----------------------------------------------------------------------------------------- benchmark: 2 tests -----------------------------------------------------------------------------------------
Name (time in s)                               Min               Max              Mean            StdDev            Median               IQR            Outliers     OPS            Rounds  Iterations
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_create_multi_files_with_data           3.8486 (1.0)      4.0886 (1.02)     3.9493 (1.0)      0.1022 (2.20)     3.9499 (1.0)      0.1736 (2.50)          1;0  0.2532 (1.0)           5           1
test_async_create_multi_files_with_data     3.8907 (1.01)     4.0031 (1.0)      3.9608 (1.00)     0.0464 (1.0)      3.9607 (1.00)     0.0695 (1.0)           1;0  0.2525 (1.00)          5           1
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

もっと性能良くないとダメかなということで、続いて tmpfs(メモリー上に作られるファイルシステムと思ってください)をマウントし試してみるとようやく違いがでました。

$ sudo mount -t tmpfs -o size=4000m tmpfs /home/vscode/tmp/

$ df /home/vscode/tmp/
Filesystem     1K-blocks  Used Available Use% Mounted on
tmpfs            4096000     0   4096000   0% /home/vscode/tmp

全部のファイルを tmpfs へ書き込みます。

@pytest.fixture(scope="function")
def named_tempfiles():
    # files = [NamedTemporaryFile(
    #     "w", dir="/tmp" if i % 2 == 0 else "/home/vscode/tmp") for i in range(CREATE_FILE_NUM)]
    files = [NamedTemporaryFile("w", dir="/home/vscode/tmp")
             for _ in range(CREATE_FILE_NUM)]
    # files = [NamedTemporaryFile("w") for _ in range(CREATE_FILE_NUM)]
    yield files
    for fw in files:
        fw.close()
--------------------------------------------------------------------------------------------- benchmark: 2 tests ----------------------------------------------------------------------------------------------
Name (time in ms)                                Min                 Max                Mean             StdDev              Median                IQR            Outliers     OPS            Rounds  Iterations
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_async_create_multi_files_with_data     270.3285 (1.0)      283.7868 (1.0)      275.1938 (1.0)       5.4192 (1.0)      275.0887 (1.0)       7.2456 (1.0)           1;0  3.6338 (1.0)           5           1
test_create_multi_files_with_data           647.1230 (2.39)     683.4950 (2.41)     660.6459 (2.40)     14.2596 (2.63)     655.9037 (2.38)     18.3033 (2.53)          1;0  1.5137 (0.42)          5           1
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

5 倍まではいきませんが、速くはなったと言える値かと思います。

この状態だと最大ワーカースレッド数を変更すると若干ですが速度に影響がありました。

threadPool = ThreadPoolExecutor(max_workers=2)


async def async_create_file_with_data(path: Path, data: bytes) -> None:
    # async with aiofiles.open(path.as_posix(), "wb") as f:
    async with aiofiles.open(path.as_posix(), "wb", executor=threadPool) as f:
        await f.write(data)

---------------------------------------------------------------------------------------------- benchmark: 2 tests ----------------------------------------------------------------------------------------------
Name (time in ms)                                Min                 Max                Mean             StdDev              Median                IQR            Outliers     OPS            Rounds  Iterations
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_async_create_multi_files_with_data     385.8565 (1.0)      433.2469 (1.0)      397.1514 (1.0)      20.2649 (2.14)     387.9202 (1.0)      14.3998 (1.95)          1;1  2.5179 (1.0)           5           1
test_create_multi_files_with_data           649.6608 (1.68)     673.3505 (1.55)     656.7036 (1.65)      9.4867 (1.0)      652.8568 (1.68)      7.3829 (1.0)           1;1  1.5228 (0.60)          5           1
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

そのようなわけで当初の「最大ワーカースレッド数に引っかかっているのでは?」はあまり正しくありませんでした。

まとめると以下のようになるかと思います。

  • Python でファイル I/O を並行化するにはスレッドを使う必要がある
    • aiofiles でも内部的にはスレッドを使っている
  • Python の write をマルチスレッド化しても(おそらくは)I/O の性能に左右される
    • tmpfs などの高速な同時アクセス性能が良いファイルシステムを使うと速度が向上しスレッド数が影響してくる

ただし、I/O 性能が原因という決定的な証拠も見つからなかったというのが正直なところです(「I/O 時にはスレッドのロックが開放されるので速くなるけど、やっぱりファイルの操作は遅いよ」という記述が見つかる程度です)。

最終的に何がネックなっているのか(Python のスレッド制御なのか、I/O性能なのかなど)はイマイチはっきりしていないのが申し訳ないのですが、試した範囲でわかったことをまとめてみました。

下記のコメントに追記しましたが、今回の環境では I/O 性能(同時アクセスの性能)がネックで並行化しても思ったように速くならなかったと考えられます。

hankei6kmhankei6km

小出しになってしまってすみません、追記です。

本当に I/O が頭打ちだったのか気になったので dd コマンドを使って簡単に試してみました。

実行環境は上記のベンチマークを動かした Codespace を使っています。

まず、各ファイルへ順次書き出す処理です。
(/tmp/test?.tmp\0 を 2**28 バイト書き出しています)

dd if=/dev/zero bs=268435456 count=1 of=/tmp/test1.tmp
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test2.tmp
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test3.tmp
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test4.tmp
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test5.tmp

新規でファイル作成すると 900MB/s くらいはでます(後半、なぜ遅くなっているかは不明です)

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.289604 s, 927 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.288978 s, 929 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.291378 s, 921 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.90479 s, 297 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.08273 s, 248 MB/s

上書き(削除 & 作成)すると 200MB/s 前後になります。

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.07726 s, 249 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.28004 s, 210 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.27764 s, 210 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.20428 s, 223 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.37003 s, 196 MB/s

続いて並行化した場合です。

dd if=/dev/zero bs=268435456 count=1 of=/tmp/test1.tmp &
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test2.tmp &
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test3.tmp &
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test4.tmp &
dd if=/dev/zero bs=268435456 count=1 of=/tmp/test5.tmp

こちらも最初の 1 回は速くなりますが、5 で割ったような値になります。

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.949643 s, 283 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.44472 s, 186 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.61763 s, 166 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 1.81 s, 148 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 2.26454 s, 119 MB/s

上書き(削除 & 作成)すると 200MB/s を 5 で割ったよりかは少し良い値ですが、全体の経過時間では 6.3 秒なのでベンチマークとあまり違いはありません。

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 5.0185 s, 53.5 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 5.07003 s, 52.9 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 5.9657 s, 45.0 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 6.2768 s, 42.8 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 6.38367 s, 42.1 MB/s

以上のように asyncio のベンチマークのときと比べて大きな違いはないようでした。

このことから、開発マシンの /tmp などに書き出した場合は asyncio に限らずファイル書き出しの並行化のメリットはあまり受けられないことになるかと思われます。

一方で tmpfs の場合は並行化してもパフォーマンスは 5 で割った値にまでは落ち込みませんでした。

順次書き出した場合。

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.223789 s, 1.2 GB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.226139 s, 1.2 GB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.235712 s, 1.1 GB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.233061 s, 1.2 GB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.237924 s, 1.1 GB/s

並行して書き出した場合

1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.338262 s, 794 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.417389 s, 643 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.403607 s, 665 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.498165 s, 539 MB/s
1+0 records in
1+0 records out
268435456 bytes (268 MB, 256 MiB) copied, 0.49274 s, 545 MB/s

これは速度的な性能というよりも、ファイルシステムの構成上(メモリー上に作成されるので)「同時アクセスの性能が良い」という感じかと思われます。

よって、asycio などによる並行処理でファイル処理を並行化した場合、このような同時アクセスで性能が落ちないストレージならばある程度の速度向上が見込めると考えられます。
(例、冗長化されたディスクなど)

alivelimbalivelimb

様々な検証を試して頂きありがとうございます!
読ませて頂いていて「なるほど、こうやって検証する方法があるのか」と非常に勉強になりました。

asycio などによる並行処理でファイル処理を並行化した場合、このような同時アクセスで性能が落ちないストレージならばある程度の速度向上が見込めると考えられます。

なるほど、asyncio, aiofilesの問題というよりはファイルシステム側の問題の可能性が高そうですね。記事本文内でも紹介したaiofilesのIssueでは「処理が早くはならないかも知れないが、メインスレッドで別の処理を行えることが出来るのが利点だ」という記述があり、「処理は早くならないの...?なぜ?」という疑問があったのですが、腑に落ちる原因がありそうでスッキリしました。

hankei6kmhankei6km

私も漠然と「ファイル I/O も並行化すればある程度は速くなるだろう」と考えていたので、実際に手を動かして検証できたのは良い体験でした。

ありがとうございました。

cartcart

ベンチマーク取ってませんが、CPU Boundのasync_sigma関数は内部で待ち状態が発生しないので無意味な比較になっていると思います。内部で待たないのでasyncio.gatherしても別のtaskへの移動は発生せず逐次実行になります。

例えば、async_sigma関数の中にprint関数を追加してgatherしても次のような逐次実行になります。

async def async_sigma(num: int) -> int:
    print(f"async sigma({num}) start")
    v = 0
    for i in range(num + 1):
        print(f"async sigma({num})", i)
        v += i
    print(f"async sigma({num}) end")
    return v

async def async_multi_sigma(nums: list[int]) -> list[int]:
    return await asyncio.gather(*[async_sigma(num) for num in nums])

await async_multi_sigma([5, 3, 2])
async sigma(5) start
async sigma(5) 0
async sigma(5) 1
async sigma(5) 2
async sigma(5) 3
async sigma(5) 4
async sigma(5) 5
async sigma(5) end
async sigma(3) start
async sigma(3) 0
async sigma(3) 1
async sigma(3) 2
async sigma(3) 3
async sigma(3) end
async sigma(2) start
async sigma(2) 0
async sigma(2) 1
async sigma(2) 2
async sigma(2) end
[0]: [15, 6, 3]

並行(concurrent)処理になるように中で待たせる簡単な方法としては、await asyncio.sleep(0)をforの中で呼ぶと一時待ち状態になり別のtaskに移動して並行処理になります。

async def async_sigma(num: int) -> int:
    print(f"async sigma({num}) start")
    v = 0
    for i in range(num + 1):
        print(f"async sigma({num})", i)
        v += i
        await asyncio.sleep(0)
    print(f"async sigma({num}) end")
    return v

async def async_multi_sigma(nums: list[int]) -> list[int]:
    return await asyncio.gather(*[async_sigma(num) for num in nums])

await async_multi_sigma([5, 3, 2])
async sigma(5) start
async sigma(5) 0
async sigma(3) start
async sigma(3) 0
async sigma(2) start
async sigma(2) 0
async sigma(5) 1
async sigma(3) 1
async sigma(2) 1
async sigma(5) 2
async sigma(3) 2
async sigma(2) 2
async sigma(5) 3
async sigma(3) 3
async sigma(2) end
async sigma(5) 4
async sigma(3) end
async sigma(5) 5
async sigma(5) end
[1]: [15, 6, 3]

これで速くなるかは不明ですが、並行処理として比較するなら必要かと思います。ややこしいんですがgatherだけでは並行にも並列(parallel)にもならないようです。

また、上のコメントで速くなっていたのはシングルスレッドの並行処理でなくマルチスレッド(ThreadPool)による並列(parallel)処理になっているからだと思われます。一方でCPU Boundの処理はProcessPoolに入れると速くなるのでおそらく実感できるはずです。(下記のドキュメント参照)

https://docs.python.org/ja/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

このawaitable loop.run_in_executor(executor, func, *args)はおそらく低水準APIに設計されているようなのですがもう少し雑に使える高水準APIとしてcoroutine asyncio.to_thread(func, /, *args, **kwargs)があります。おそらく我々がgatherに求めてるのはこいつで、与えた 関数(コルーチンじゃダメ) がスレッドになり、gatherにまとめて渡すと並列(parallel)実行になります。これでも速度の実感ができるはずです。
loop.run_in_executor(executor, func, *args)のThreadPoolを使った場合とどういう差があるのかまでは自分もよくわかってませんが)

https://docs.python.org/ja/3.11/library/asyncio-task.html#running-in-threads