非同期処理の中に同期処理を入れてない?

に公開

いきなりですが、次のコードの実行結果はどうなると思いますか?

import asyncio
import time

async def process_save_queue():
    while True:
        print("Task1: Starting blocking operation")
        time.sleep(2)
        print("Task1: Finished blocking operation")

async def startup_process_event_queue():
    while True:
        print("Task2: I'm running!")
        await asyncio.sleep(1)

async def main():
    task1 = asyncio.create_task(process_save_queue())
    task2 = asyncio.create_task(startup_process_event_queue())

    await asyncio.gather(task1, task2)

if __name__ == "__main__":
    asyncio.run(main())

実行結果は以下のようになります:

Task1: Starting blocking operation
Task1: Finished blocking operation
Task1: Starting blocking operation
Task1: Finished blocking operation
...

Task2の出力がまったく表示されません。

なぜTask2が実行されないのか?

原因は、process_save_queue() 内で 同期処理の time.sleep(2) を使っていることにあります。

async def で定義された非同期関数では、本来 await を使って「他の処理に譲る」ことを期待されています。非同期処理はマルチスレッドやマルチプロセスとは異なり、ひとつのスレッド内でタスク同士が await を使って協調しながら動作するのが基本です。

ところが time.sleep(2) は処理をブロックする同期関数のため、実行中は他のタスクに制御が渡りません。その結果、Task1が動いている間、Task2はずっと待たされたまま実行されないという状況になります。

よくある落とし穴

今回の例のようにすべて同期的に止まってしまえば問題にすぐ気づけますが、もしタスク1の中に非同期処理と同期処理が混ざっていたらどうでしょうか?

この場合、Task2も一応動いてはいるように見えますが、処理効率は非常に悪くなります。
気づかないうちにアプリケーションのパフォーマンスを大きく損なってしまう危険があります。

正しい書き方

非同期関数内で同期処理を行いたい場合は、次のように書き換える必要があります:

方法1:非同期版のsleepを使う

await asyncio.sleep(2)

方法2:同期処理を別スレッドで実行する

await asyncio.to_thread(time.sleep, 2)

どちらも、他のタスクに制御を渡すための手段です。

結論

非同期関数の中で 同期処理を直接呼び出すのはNG です。
必ず await を使って、他のタスクにも処理を譲るように設計しましょう。

Discussion