🛠️

Python非同期処理の道具箱:httpxやSQLAlchemyからユニットテストまで

に公開

目的

このドキュメントではPythonの非同期処理の実験を行います。
Pythonの非同期自体初めての場合は以下のページを参照してください。

もし、そもそもコルーチンとは何か、Pythonでの非同期処理の導入の経緯を知りたい場合は以下を参照してください。
コルーチンとジェネレータを追って泥沼へ:半世紀前のプログラミング言語は動くのか!?

ここで実験するコードは以下のリポジトリに存在します。
https://github.com/mima3/test_asyncio

実験環境

  • 必要ツール
    • docker
    • pipenv + python3.13
  • 動作環境
    • macOS 15.4.1
    • 2 GHz クアッドコアIntel Core i5
    • 16 GB 3733 MHz LPDDR4X

標準ライブラリの実験

TaskGroup

TaskGroupは3.11から導入されました。タスクをまとめて実行可能です。
多くの場面で gather の代替になりますが、後述のコード例のように挙動に差があります。

gather

    results = await asyncio.gather(ok("A", 0.30), ok("B", 0.10), ok("C", 0.20))
    print("results:", results)

TaskGroup

    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(ok("A", 0.30))
        t2 = tg.create_task(ok("B", 0.10))
        t3 = tg.create_task(ok("C", 0.20))
    # ここまで来たら全タスク完了済み。各 Task から result() で回収する
    print("results:", t1.result(), t2.result(), t3.result())

TaskGroupとgatherの挙動差の検証は以下のコードで行えます。
https://github.com/mima3/test_asyncio/blob/main/py313/basic/test_task_group.py

以下のケースではTaskGroupの使用が適しているといえます。

  • タスクの寿命をスコープで閉じたい/失敗時に兄弟タスクを確実に畳みたい
  • 複数同時失敗を正しく扱いたい(集約して上げたい)
  • グループをネストさせたい

以下のケースではgatherの使用が適しているでしょう。

  • 3.10 以前でも動かしたい
  • 単に N 個の awaitable を並行実行して結果リストが欲しい
  • 例外も“値”として扱いたい(失敗しても進めたい) → gather(return_exceptions=True)

サブプロセスの非同期実行

asyncio.subprocessを使用して非同期実行が行えます。

以下のサンプルコードではプロセスを同時に2起動して、標準出力を取得するサンプルです。
https://github.com/mima3/test_asyncio/blob/main/py313/basic/test_subprocess.py

create_subprocess_execProcessを起動しています。
子プロセスの終了待ちやパイプ監視の実装はOS・イベントループ実装に依存します(環境によってはワーカスレッドが用いられます)

asyncio.timeoutを使用することで、タイムアウトを指定することも可能です。

    try:
        async with asyncio.timeout(timeout):
            async with asyncio.TaskGroup() as tg:
                tg.create_task(pipe_reader(stdout, f"{name} [OUT]"))
                tg.create_task(pipe_reader(stderr, f"{name} [ERR]"))
                # 子プロセス終了を待つ(正常終了時はここを抜ける)
                await proc.wait()
        # TaskGroup は正常終了時、reader が EOF を読み切るまで待ってくれる
        return proc.returncode or 0

    except asyncio.TimeoutError:
        if proc.returncode is None:
            proc.terminate()
            # proc.kill() ... 強制的に止めるケース
        return proc.returncode if proc.returncode is not None else 1

全体のコード

I/OではなくてCPUがボトルネックとなっている処理を同時実行する場合は、マルチプロセスで非同期処理を行うことを検討することになります。

同期処理を行う関数を非同期実行

たとえば、requestsライブラリは同期実行で、非同期処理をサポートしていません。
このようなライブラリを非同期に使用する例を考えます。

https://github.com/mima3/test_asyncio/blob/main/py313/rest/test_requests_sync.py

asyncio.to_threadを使用することで同期的な処理を別スレッドで非同期的に関数を実行させることができます。

async def fetch(url: str, timeout: float):
    print(f"start fetch.... thread_id:{threading.get_ident()} {url}")
    return await asyncio.to_thread(fetch_sync, url, timeout)

全体のコード

to_threadでは内部でconcurrent.futures.ThreadPoolExecutorを使用しており、指定した関数は、スレッドプール上で空いてるスレッドで動作します。

今回の実験環境では裏で12個のワーカスレッドが動作していますが、状況によってはサーバーの負荷などを考慮して並列数を制御したいケースがあります。その場合はasyncio.Semaphoreを活用します。

async def fetch(url: str, sem: asyncio.Semaphore, timeout: float):
    async with sem:
        return await asyncio.to_thread(fetch_sync, url, timeout)

async def main():
    sem = asyncio.Semaphore(5)  # 同時実行上限(必要に応じて調整)

    try:
        tasks = []
        result = []
        async with asyncio.TaskGroup() as tg:
            for u in get_url_list():
                # セマフォを渡す
                tasks.append(tg.create_task(fetch(u, sem, timeout)))

全体のコード

今回は同期処理を非同期で実行する例を示しました。
実際に使用する場合はまず、非同期対応のライブラリを探すべきでしょう。今回のケースではhttpxaiohttpの採用を検討すべきです。

コンテキスト変数

contextvars は タスク生成時点のコンテキストをコピーして保持し、以後の set() はそのタスクにのみ影響します。

以下の例では各タスク内で request_id を設定しているため、タスクごとに独立した値になります

import asyncio
from contextvars import ContextVar


request_id: ContextVar[str] = ContextVar("request_id", default="-")


async def proc(n: int):
    request_id.set(f"request_id:{n}")
    await asyncio.sleep(1)
    return request_id.get()


async def main():
    tasks = []
    async with asyncio.TaskGroup() as g:
        for i in range(10):
            tasks.append(g.create_task(proc(i)))
    for task in tasks:
        print(task.result())


if __name__ == "__main__":
    with asyncio.Runner(debug=True) as runner:
        runner.run(main())

外部ライブラリの使用

asyncioは低レベルの通信をサポートしていますが、それだけでシステム開発を行うには不足しています。
ここでは非同期処理をサポートしている外部ライブラリの実験を行います。

httpxによるREST APIの実行

REST API クライアントとしては従来 requests が広く使われてきましたが、httpx同期/非同期の両方をサポートします。
今回はhttpxを実験します。

同期的にhttpxを使用する場合、Clientを使用して、以下のようになります。

import httpx
with httpx.Client(http2=False) as client:
    r = client.get('http://127.0.0.1:8080/hello/1')
    r.raise_for_status()
    print(r.json())
# 以下と同等
httpx.get('http://127.0.0.1:8080/hello/1').json()

これを非同期で動かすにはAsyncClientを用い以下のようになります。

import httpx
import asyncio

async def main():
    async with httpx.AsyncClient(http2=False) as client:
        r = await client.get('http://127.0.0.1:8080/hello/1')
        r.raise_for_status()
        print(r.json())

with asyncio.Runner(debug=True) as runner:
    runner.run(main())

複数のリクエストを同時実行している例については以下を参照してください。
https://github.com/mima3/test_asyncio/blob/main/py313/rest/test_httpx_async.py

上記では100件のURLを同時に実行して非同期に動作していますが、AsyncClientにわたす、limits引数で接続数自体の上限を管理できます。

    limits = httpx.Limits(max_connections=20, max_keepalive_connections=20)
    # 略
    async with httpx.AsyncClient(http2=False, timeout=timeout, limits=limits) as client:

また、このプログラムでは前述のrequestsを無理やりスレッドプールで動かしたケースとちがって、メインスレッドのみでI/Oの待ち時間を効率よく使用していることが観測できます。

さて同時接続数についてはlimits引数、同時実行数は前述のasyncio.Semaphoreを使用することで制限できます。
しかし、REST APIの中では秒間の実行数を制限しているケースがあります。
その場合の対応については次のライブラリで行います。

aiometerによるREST APIの秒間実行数の制限

aiometerを使用することで、同時実行について制限を調整できます。
aiometerで調整できる項目は2つです。

  • max_at_once: 同時に実行されるタスクの最大数
  • max_per_second: 一秒あたりで生成されるタスク数の制限

使用方法は以下のようになります。

import aiometer
# 略
async def main():
    print(f"start thread_id:{threading.get_ident()}")
    limits = httpx.Limits(max_connections=20, max_keepalive_connections=20)
    timeout = httpx.Timeout(3.0)

    result = []
    async with httpx.AsyncClient(http2=False, timeout=timeout, limits=limits) as client:
        async with aiometer.amap(
            functools.partial(fetch, client),
            get_url_list(),
            max_at_once=5,  # これは、特定の時点で同時に実行されるタスクの最大数を制限するために使用されます。(100 個のタスクがあり、 を設定するとmax_at_once=10、aiometer同時に実行されるタスクが 10 個以下になります。)
            max_per_second=2,  # このオプションは、1秒あたりに生成されるタスクの数を制限します。これは、レート制限ポリシーが適用されているサーバーなど、I/Oリソースの過負荷を防ぐのに役立ちます。
        ) as items:
            async for item in items:
                result.append(item)
    for item in result:
        print(item)
    print("result...", len(result))

全体のコード

この例ではfetch処理を最大5並行で、1秒あたり最大2個しかタスクを実行できません。
aiometerでも余計なワーカースレッドは作成しておらず上記の例では、スレッドはメインスレッドしか作成されていません。

aiofilesによるファイル処理の非同期化

aiofilesはローカルのディスクファイルを非同期に処理するためのライブラリです。
以下のようにaiofiles.openを使用してファイルを処理します。

async def read_file(input_path: str) -> list[str]:
    lines = []
    async with aiofiles.open(input_path, mode="r") as f:
        async for line in f:
            lines.append(line)
    return lines

全体のコード

aiofilesはスレッドプールを使用して、ファイルI/Oを実行してます。
実際、このサンプルコードを実験環境で実行するとワーカースレッドが6つ起動していることが確認できます。この結果は実行環境によって変化する可能性があります。

また、aiofiles.tempfile.TemporaryFileを使用することで一時ファイルの利用も可能です。

AsyncSSHによるSSHサーバーとの送受信

AsyncSSHを使用することで非同期でSSHサーバーとのファイルの送受信が可能になります。

import asyncio
import asyncssh

async def main():
    async with asyncssh.connect(
        "127.0.0.1",
        port=2222,
        username="test_user",
        password="pass",
        client_keys=[],
        known_hosts=None,
    ) as conn:
        async with conn.start_sftp_client() as sftp:
            await sftp.get("/home/test_user/downloads/0001.md", "./tmp/0001.md")

with asyncio.Runner(debug=True) as runner:

    runner.run(main())

AsyncSSHを使用した大量ファイルの受信・送信の例は以下になります。
https://github.com/mima3/test_asyncio/blob/main/py313/ssh/test_ssh.py

このサンプルだとSFTPチャネルが1つだけなので効果が薄く見えますが、実際には1つのファイル転送でもデータをブロックに分割し、同一チャネル上で複数リクエストを並行(パイプライン)処理しています。

https://github.com/ronf/asyncssh/issues/369

データベースの非同期操作

以下のライブラリを使用してPostgreSQLとMySQLをORMで操作が可能です。

サンプルコード
https://github.com/mima3/test_asyncio/tree/main/py313/db

このコードではPostgreSQLとMySQLに同時にSQLを発行するサンプルになっています。

# dbを構築
pipenv run python -m db.init_db
# db操作
pipenv run python -m db.query_db
# データ削除
pipenv run python -m db.truncate_db

もっともシンプルな非同期でORMを操作する例は以下のようになります。

import asyncio
from datetime import datetime
from contextlib import asynccontextmanager
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy import String, Integer, DateTime, func, select


from sqlalchemy.ext.asyncio import (
    AsyncEngine,
    async_sessionmaker,
    create_async_engine,
)

# PostgreSQLの場合
# DB_URL = "postgresql+asyncpg://app:app@localhost:5432/appdb"
# MySQL + asyncmyの場合
DB_URL = "mysql+asyncmy://app:app@localhost:3306/appdb"
# MySQL + aiomysql: RuntimeError: Event loop is closedでエラー
# DB_URL = "mysql+aiomysql://app:app@localhost:3306/appdb"


def make_engine(url: str) -> AsyncEngine:
    # echo=True to see SQL; set False to quiet
    return create_async_engine(url, echo=False, pool_pre_ping=True, future=True)


engine_db: AsyncEngine = make_engine(DB_URL)
SessionDB = async_sessionmaker(engine_db, expire_on_commit=False)


@asynccontextmanager
async def get_session():
    async with SessionDB() as session:
        yield session


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "users"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(100), nullable=False, index=True)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=False), server_default=func.now(), nullable=False)

    def __repr__(self) -> str:  # pragma: no cover
        return f"User(id={self.id!r}, name={self.name!r})"


async def main():
    async with get_session() as session:
        async with session.begin():
            stmt = select(User).order_by(User.id)
            users = await session.stream_scalars(stmt)
            async for u in users:
                print(f"[User] #{u.id} {u.name}")


with asyncio.Runner(debug=True) as runner:
    runner.run(main())

DB_URLを変更するだけで接続先が変わります。

  • "postgresql+asyncpg://app:app@localhost:5432/appdb"
    • PostgreSQLをasyncpgで接続します
  • "mysql+asyncmy://app:app@localhost:3306/appdb"
    • mysqlをasyncmyで接続します
  • "mysql+aiomysql://app:app@localhost:3306/appdb"
    • mysqlをaiomysqlで接続します。終了時にエラーとなります。

SQLAlchemy側では非同期用に以下の関数を利用してセッションを作成します。

作成したセッションを利用してORMを「async with」や「async for」、「await」とともに使用すると非同期になります。

Redisの非同期操作

通常の同期処理でも使用しているredis-pyを使用して非同期処理が可能です。

以下のようにredis.asyncio.clientを使用して非同期処理になります。

import redis.asyncio as redis

async def main():
    async with redis.Redis(host="localhost", port=6379, db=0) as client:
        pong = await client.ping()
        print("PING ->", pong)
        res = await client.set("demo:key", "hello")
        print("SET ->", res)
        res = await client.get("demo:key")
        print("GET ->", res)

全コード

Redisのコネクションプールを使用することで同じサーバーに対して同時の読み書きが可能になります。
redis.asyncio.BlockingConnectionPoolを使用することでコネクションプールが使用できます。コネクションプールが枯渇した場合、使用可能になるまで待機します。

import redis.asyncio as redis
...
    pool = redis.BlockingConnectionPool.from_url("redis://localhost:6379/0", max_connections=5, timeout=10)
    async with redis.Redis(connection_pool=pool) as r:
        # rに対してr.setやr.getが行える

全コード

上記の例でコネクションプールから接続が払い出されるのはr.setやr.getを実施する直前になります。

AWS S3の非同期操作

boto3は残念ながら非同期処理をサポートしていません。
今回はaioboto3を使用してS3へのファイルアップロードを行います。
aioboto3はboto3とaiobotocoreのラッパーになっており、ローカルファイルの操作はaiofilesを使用して非同期化しています。

以下はS3中のオブジェクトの一覧を非同期で取得するサンプルになっています。

async def main():
    session = aioboto3.Session()
    async with session.resource("s3") as s3:
        bucket = await s3.Bucket("my-bucket")
        async for s3_object in bucket.objects.all():
            print(s3_object)

全ソース

もし、localstackを使用していてエンドポイントを変更したい場合は次のような環境変数を指定します。

os.environ["AWS_ENDPOINT_URL_S3"] = "http://localhost:4566/"

アップロードを同時に実行するケースでは以下のようになります。

   sem = asyncio.Semaphore(max_concurrency_file)

    async with session.client("s3") as s3:

        async def put_one(path: Path):
            print("  start put_one", path)
            async with sem:
                print("  start put_one after sem", path)
                # S3 の Key を決める(prefix + ファイル名)
                rel = path.name
                key = f"{prefix.rstrip('/')}/{rel}" if prefix else rel
                # 高水準API: マルチパートを自動処理
                await s3.upload_file(
                    Filename=str(path),
                    Bucket=bucket,
                    Key=key,
                )
                results.append({"file": str(path), "key": key, "status": "uploaded"})
            print("  end put_one", path)

        async with asyncio.TaskGroup() as tg:
            for p in file_list:
                tg.create_task(put_one(p))

全ソース

このコードで型チェックを行う場合、types-aioboto3が必要になります。

aioboto3は全てのboto3の動作が保証されているわけではないです。現時点でテストがされている機能は以下の通りです。

Services Status
DynamoDB Service Resource Tested and working
DynamoDB Table Tested and working
S3 Working
Kinesis Working
SSM Parameter Store Working
Athena Working

イベントループの変更

uvloopはasyncioの標準イベントループに置き換えることができます。
これによりパフォーマンスが向上します。

以下のコードは同一プロセス内でローカル TCP サーバを立て、大量の同時接続で送受信の往復して通常のイベントループとuvloopのパフォーマンスを計測したものです。

https://github.com/mima3/test_asyncio/blob/main/py313/basic/bench_uvloop_echo.py

今回の実験結果では以下のようになりました

[stdlib] 0.806s  | 96.94 MiB/s | 24816 msg/s
[uvloop] 0.542s  | 144.04 MiB/s | 36874 msg/s
speedup (stdlib → uvloop): x1.49

なお、Windowsは非対応です。

ユニットテストの方法

非同期処理をユニットテストするケースではpytest-asyncioを使用します。

非同期関数を含むケースでは「@pytest.mark.asyncio」をデコレータを付与します。

import asyncio
import pytest

class TestAsync:
    @pytest.mark.asyncio
    async def test_async(self):
        await asyncio.sleep(0)
        assert True

あるいは、pytest.iniをプロジェクト直下に用意することでデコレータなしでも自動で判断してくれます。

pytest.iniの例

[pytest]
asyncio_mode = auto

たとえば、非同期関数をモックアップするケースを考えます。
https://github.com/mima3/test_asyncio/blob/main/py313/file/test_file.py#L7をテストする場合は以下のようになります。

import asyncio
import pytest
from unittest.mock import patch, AsyncMock
from file.test_file import read_file

class TestAsync:
    @pytest.mark.asyncio
    async def test_async_read_file_mock(self):
        file_obj = AsyncMock()
        # file_obj.read.return_value = ["hello"]
        file_obj.__aiter__.return_value = iter(["hello\n", "world\n"])
        cm = AsyncMock()
        cm.__aenter__.return_value = file_obj
        cm.__aexit__.return_value = None
        with patch("file.test_file.aiofiles.open", return_value=cm) as m_open:
            print(m_open)
            contents = await read_file("../docker/ssh/downloads/0001.md")
            print(contents)
            assert contents[0] == "hello\n"

AsyncMockを使用して、モックオブジェクトを作成し、patchで関数を置き換えます。

まとめ

今回はいくつかの非同期処理のライブラリについて実験をしました。
DBやredisなどのネットワークを介したシステムを操作するライブラリは大抵揃っているようです。
ただし、BigQueryやAWSのクラウドを利用するライブラリについては公式が提供しているわけでなく、有志のライブラリであるため、自己責任で使用する必要があることに注意する必要があります。

Discussion