🐍

Pythonの非同期IOを学ぶ

に公開

概要

Pythonプログラムの性能を向上させる手段のひとつとして非同期IOに対応させることがあります。
非同期IOはノンブロッキングIOとイベントループでIOバウンドな処理の性能を向上させられるものです。

私がもっとも慣れ親しんだ言語はJavaでして、Pythonは習熟しているとは言い難いです。
今回は非同期IOのためのAPIの中からよく使いそうなものを選び、学んでみようと思います。

非同期IOの基本

コルーチン

asyncを使うことでコルーチン関数を定義できます。

async def foo() -> None:
    ...

コルーチン関数は単に呼び出しただけでは実行されません。
awaitキーワードを付けて実行されるようにします。

# 単に呼び出しただけでは実行されない(警告が出力される)
foo()
# awaitキーワードを付けることで実行される
await foo()

イベントループ

asyncio.run()へコルーチンを渡すことで、イベントループでコルーチンを実行して結果を返します。

async def main() -> None:
    ...

asyncio.run(main())

プログラム全体を非同期IOへ対応させる場合、main関数をコルーチン関数にしてasyncio.run()へ渡すのが一般的です。

タスクの作成

前述の通りコルーチン関数は単に呼び出しただけでは実行されません。

次のコードはコルーチン関数foobarを並列に実行させようとしていますが、fbはどちらもawaitで実行が開始され完了まで待機されるため、直列に実行されてしまいます。

f = foo()
b = bar()
await f
await b

asyncio.create_task()を使えばawaitキーワードを用いなくても実行が開始されます。
先ほどのコード例は次のように修正することで期待通りに動作します。

f = asyncio.create_task(foo())
b = asyncio.create_task(bar())
await f
await b

キュー

asyncio.Queueはコルーチン間でデータを受け渡しできるキューです。

ここではProducer-Consumerパターンでの利用を前提にしています。

queue = asyncio.Queue[str]()

put()でエンキューします。

await queue.put("foobar")

get()でデキューします。
受け取ったアイテムを処理したらtask_done()を呼び出します。

item = await queue.get()

...

queue.task_done()

asyncio.Queueは内部に未完了タスクのカウンターを持っています。
カウンターはアイテムをエンキューするたびにインクリメントされ、task_done()を呼び出されるたびにデクリメントされます。

未完了タスクのカウンターはjoin()で参照されます。
join()はキュー内のすべてのアイテムが取得され、処理されるまで待機します。

await queue.join()

前述のコード例ではasyncio.Queueの型変数にstrを指定していました。
Poison PillパターンでConsumerへ完了を通知したい場合、Noneとのユニオン型を指定するのが簡単です。

queue = asyncio.Queue[str | None]()

# 通常のアイテム
await queue.put("foobar")

# すべてのアイテムを渡しきってConsumerを終了させたい場合
await queue.put(None)

Consumer側ではNoneを受け取ったら自身を終了させます。

async def consumer(queue: asyncio.Queue[int | None]) -> None:
    while True:
        item = await queue.get()
        try:
            if item is None:
                return

            ...

        finally:
            queue.task_done()

同期プリミティブ

複数個のコルーチンを協調させるためのものです。

用意されているものはいずれも基本的でシンプルなものなので、公式ドキュメントを読めば容易に理解できそうです。

その他

あと覚えておくべきAPIとしてasyncio.gather()があります。
これは複数個のコルーチン/タスクを並列実行して結果のリストを返します。

# 前提: 次のようなコルーチン関数が定義されている
#
# async def coro(id: str) -> int:
#     ...

foo = coro("foo")
bar = coro("bar")
baz = coro("baz")

results = await asyncio.gather(foo, bar, baz)

このコード例は次のコード例と概ね同様に動作します。

# 前提: 次のようなコルーチン関数が定義されている
#
# async def coro(id: str) -> int:
#     ...

foo = coro("foo")
bar = coro("bar")
baz = coro("baz")

foo_task = asyncio.create_task(foo)
bar_task = asyncio.create_task(bar)
baz_task = asyncio.create_task(baz)

results = [
    await foo_task,
    await bar_task,
    await baz_task,
]

ユースケース

ここからはいくつかのユースケースを挙げて、非同期IOに対応させるコード例を示します。

Producer-Consumerパターン

n個のConsumerがキューからアイテムを取り出して並列に処理します。
並列数はConsumerの数で制御します。

次のコード例はデキューしたアイテムをログ出力したあと1秒間待機するConsumerを3並列で動かしています。

import asyncio
import logging

logging.basicConfig(
    level=logging.INFO, format="%(asctime)s [%(name)s] (%(taskName)s) %(message)s"
)

item_size = 50
parallels = 3
wait_seconds = 1


async def consumer(name: str, queue: asyncio.Queue[int | None]) -> None:
    logger = logging.getLogger(name)
    while True:
        item = await queue.get()
        try:
            if item is None:
                return
            logger.info("item = %s", item)
            await asyncio.sleep(wait_seconds)
        finally:
            queue.task_done()


async def main():
    queue = asyncio.Queue[int | None]()

    # 複数個のConsumerを作る
    consumers = [
        asyncio.create_task(consumer(f"consumer-{i + 1}", queue))
        for i in range(parallels)
    ]

    logger = logging.getLogger("producer")
    for i in range(item_size):
        item = i + 1
        logger.info("item = %s", item)
        await queue.put(item)

    # Poison PillパターンのためConsumerの数だけNoneをエンキューする
    for _ in range(parallels):
        logger.info("item = None")
        await queue.put(None)

    await asyncio.gather(*consumers)


if __name__ == "__main__":
    asyncio.run(main())

Producerがすべてのアイテムを一息でエンキューしていますが、キューにサイズを設定することで少しずつエンキューするように制御できます。

queue = asyncio.Queue[int | None](maxsize=10)

大量のタスクを少しずつ処理する

あらかじめアイテムの数が分かっている場合はProducer-Consumerパターンを使わなくても処理できます。

次のコード例は50個のアイテムを処理するタスクを作成して最大3並列で実行します。
同期プリミティブのasyncio.Semaphoreで並列数を制御しています。

import asyncio
import logging

logging.basicConfig(level=logging.INFO, format="%(asctime)s (%(taskName)s) %(message)s")

task_size = 50
parallels = 3
wait_seconds = 1


async def create_task(semaphore: asyncio.Semaphore, item: int) -> None:
    async with semaphore:
        logging.info("%d", item)
        await asyncio.sleep(wait_seconds)


async def main():
    semaphore = asyncio.Semaphore(parallels)
    tasks = [create_task(semaphore, i) for i in range(task_size)]
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

まとめ

Pythonの非同期IOを少しは知れました。

注意深くコードを書かないとコルーチンをawaitし忘れて実行されなかったり、並列で実行しているつもりが直列になってしまいそうで、それなりに高難度であるという感想を持ちました。

また、コードを書いてしまって後から非同期IOへ対応しようとすると、プログラム全体にasync/awaitを書いて回らないといけない状況になり得るので、最初から問答無用で非同期IOへ対応しておくのが無難だと思いました。

Discussion