🐍

Pythonのasyncio.Queueを使ってみる(非同期処理)

2022/09/03に公開

やること

Pythonで非同期処理を行うためのモジュールであるasyncioに含まれるasyncio.Queueを利用して、簡単なキューのput/getプログラムを作成します。

今回は非同期関数としてキューにデータを投入する役割をする関数をproducer、キューからデータを取得して処理する役割をする関数をconsumerとします。

「とりあえずasyncio.Queueを使う」ことを目的として、producer/consumerの仕様はそれぞれ下記のようにします。

producer
計15個のタスクを順番に投入する。

consumer
その時点でキューに存在するタスクを取得し、foo関数に値を渡し並列に処理を実行する。取得したタスクが全て完了したら再度キューを確認しにいく。
※foo関数では処理に数秒の時間がかかることとします。

全体プログラム

import asyncio

async def producer(q):
    for i in range(15):
        item = "task{0}".format(i)
        await q.put(item)
        print("producer: put", item)
        await asyncio.sleep(1)

async def consumer(q):
    while True:
        while q.empty():
            await asyncio.sleep(3)

        items = []
        for _ in range(q.qsize()):
            item = await q.get()
            items.append(item)
        
        results = await asyncio.gather(
            *[foo(item) for item in items]
        )

        print("consumer: done -> {0}".format(results))

        for _ in range(len(items)):
            q.task_done()

async def foo(item):
    await asyncio.sleep(5)
    print("foo: finished -> {0}".format(item))
    return item

async def main_wrapper():
    q = asyncio.Queue()
    
    # キューの監視
    asyncio.create_task(consumer(q))

    # キューへデータを投入
    await asyncio.create_task(producer(q))

    # キューの中にあるタスクが全て完了したら終了
    await q.join()
    print("Finish!!")

if __name__ == '__main__':
    asyncio.run(main_wrapper())

実行結果

producer: put task0
producer: put task1
producer: put task2
producer: put task3
producer: put task4
producer: put task5
producer: put task6
producer: put task7
foo: finished -> task2
foo: finished -> task0
foo: finished -> task1
consumer: done -> ['task0', 'task1', 'task2']
producer: put task8
producer: put task9
producer: put task10
producer: put task11
producer: put task12
foo: finished -> task4
foo: finished -> task7
foo: finished -> task3
foo: finished -> task5
foo: finished -> task6
consumer: done -> ['task3', 'task4', 'task5', 'task6', 'task7']
producer: put task13
producer: put task14
foo: finished -> task9
foo: finished -> task12
foo: finished -> task11
foo: finished -> task8
foo: finished -> task10
consumer: done -> ['task8', 'task9', 'task10', 'task11', 'task12']
foo: finished -> task13
foo: finished -> task14
consumer: done -> ['task13', 'task14']
Finish!!

詳細説明

まずproducerです。

async def producer(q):
    for i in range(15):
        item = "task{0}".format(i)
        await q.put(item)
        print("producer: put", item)
        await asyncio.sleep(1)

特に難しいことはしていません。
await q.put(item)でキューにタスクをputしています。
await asyncio.sleep(1)で1秒間のスリープを実行しています。


次にconsumerです。
※consumerから呼び出しているfoo関数は5秒待って渡された値をそのまま返しているだけなので省略します。

async def consumer(q):
    while True:
        while q.empty():
            await asyncio.sleep(3)

        items = []
        for _ in range(q.qsize()):
            item = await q.get()
            items.append(item)
        
        results = await asyncio.gather(
            *[foo(item) for item in items]
        )

        print("consumer: done -> {0}".format(results))

        for _ in range(len(items)):
            q.task_done()

q.empty()でキューの中身を確認し、空なら3秒待機するようにしています。

q.qsize()で実行時点のキューの要素数を確認し、確認した要素数分await q.get()にてキューから要素を取り出しています。

次にfoo関数を並列に実行している部分ですが、ここではasyncio.gather()を利用しています。

results = await asyncio.gather(
    *[foo(item) for item in items]
)

asyncio.gather()を利用することで、並列処理を実行し、返り値の順序を引数に渡した順序で得ることができます。
先の実行結果の以下の部分に着目すると分かりやすいかも知れません。

foo: finished -> task2
foo: finished -> task0
foo: finished -> task1
consumer: done -> ['task0', 'task1', 'task2']

終了順序はtask2 -> task0 -> task1となっていますが、foo関数からの返り値resultsには呼び出した順序通りで格納されています。

このように、並列処理を実行し、その終了した順番がバラバラでも、呼び出した順序通りに結果を並べたい際にasyncio.gather()を利用できます。

最後に、完了したタスク分だけq.task_done()を実行しています。
こちらはタスクの完了をキューに通知しているのですが、なぜこのような処理をしているかは後述します。


最後にメイン関数です。

async def main_wrapper():
    q = asyncio.Queue()
    
    # キューの監視
    asyncio.create_task(consumer(q))

    # キューへデータを投入
    await asyncio.create_task(producer(q))

    # キューの中にあるタスクが全て完了したら終了
    await q.join()
    print("Finish!!")

今更ですが、 asyncio.Queue()が今回メインとなっているキューです。

次にconsumerやproducerをasyncio.create_task()に渡していますが、これが何をしているかと言うとcoroutine(コルーチン)をTaskでラップしてバックグラウンドで実行できるようにしています。
こうすることで、処理の完了を待たずに次の処理に進むことができます。
※coroutine=async defで宣言されるオブジェクト

ただ、producerに関してはcreate_task()でバックグラウンド側に処理を投げているものの、即awaitしているので、Taskでラップするのに本質的な意味はありません。

awaitはcoroutineとTaskのどちらのオブジェクトでも利用できますので、今回の例だと別にcoroutineのままawaitしても良いのですが(await producer(q)とする)、今回はあくまでawaitがcoroutineでもTaskでも良いという例示のためこのようにしています。

最後に今回一番重要なのがawait q.join()です。
これはキューの中身のデータ処理が全て完了するまで待機しています。

何のこっちゃ?と思われる方も中にはいるかも知れませんが、我々が知りたいのは「キューの中身が全て取得されたか?」よりも「取り出されたタスクが全て完了したか?」です。

ではキュー側は持ち出されたデータに対するタスクが完了したのをどこで判断しているのか?という話になりますが、それが先程出てきたq.task_done()になります。

asyncio.Queue.task_doneを利用すると、タスクを処理した側が明示的に処理を完了したことを伝えることができます。
asyncio.Queue.joinは、キューの中身が空になるときではなく、putした分のtask_doneの呼び出しが行われるまで待つ、という処理になります。

内部処理的には、キューは未完了のタスクのカウント値を保持しており、キューにアイテムが追加されるときは加算、task_done()を呼び出すと減算、未完了のタスクのカウント値が0になるとjoin()の制御が解除されるイメージです。

最後に

公式ドキュメントを掲載しておきます。
https://docs.python.org/ja/3/library/asyncio-queue.html

Discussion