📖

Pythonにasyncioってあるけどよく知らなかったので調べた

2023/05/24に公開

1.はじめに

Pythonのasyncioは、asyncawait構文を利用して並行処理を行うため公式ライブラリです。

筆者はいくつかのライブラリでasyncawaitまたはasyncioなるキーワードを目にしつつも、理解を後回しにしてきました。この度一念発起して情報を整理し、同じような人たちのために情報をまとめました。

  • まず、asyncioの基本的な使い方を改めて整理しました。
  • 特に、並行処理のタスク遷移と例外周りについて、パッと検索した範囲ではあまり情報が見つからなかったため、深掘りしてまとめました。
  • 執筆当時の検索結果では、asyncioの古い書き方や機能を利用しているものも多くありました。この記事ではなるべく執筆現在の最新バージョン(Python 3.11)で推奨されている書き方でサンプルを作ってあります。

ただし、asyncioについて色々と書いたものの、asyncawaitはPythonのキーワード、asyncioはそれを利用したライブラリであり、asyncawaitを利用するサードパーティの非同期処理ライブラリは他にも開発されていたりします。例えばuvicornはasyncioではなくuvloopというライブラリを使用しているとのことです。

対象読者

  • Pythonの基本的な使い方は理解している方
  • asyncioって聞いたことはあるけどよく知らないなあという方
  • Python製Webフレームワークなどで、非同期処理の中身(の一部の一例)がどうなっているのか知りたい方

2.asyncioの基本

並行処理と並列処理

まず、asyncioは、シングルスレッドで動作する並行処理のためのライブラリとのことです。
Python 3.6までのドキュメントにははっきりとそう書いてあります。3.7以降のドキュメントでは改訂によって文言が消えてしまっていますが、おそらく現在でもその仕組みは変わっていないのではないでしょうか。

並行処理と、よく比較される概念の並列処理の定義は検索すればたくさん出てきます。例えばこちらのページでは以下のように説明されています。

  • 並行処理: ある時間の範囲において、複数のタスクを扱うこと
  • 並列処理: ある時間の点において、複数のタスクを扱うこと

他にもこちらのページでも似たような定義が説明されています。

マルチコアで並列処理を行う場合は、複数のタスクが実際に同時に処理されています。ある時点で複数のタスクが処理されているならば、当然ある時間の範囲でも複数のタスクが処理されていますので、この定義では並列処理は並行処理の一種です。一方、シングルスレッドで並行に処理を行うとはどういうことかというと、具体的には複数の処理をタイムスライスしながら少しずつ進めるということです。例えば

  1. 処理Aを20%進めて
  2. 処理対象をAからBに切り替えて、処理Bを15%進めて
  3. 処理対象をBからAに切り替えて、処理Aを25%進めて...

というようなイメージです。ただし実際のasyncioでは、大まかに言えばファイルアクセスやネットワークアクセスなど、CPUが空くIOの待ち時間のタイミングで切り替えが行われます。なのでasync IOなのかと。更に厳密にどのタイミングで切り替えが生じるのかは後でコードを交えて説明します。

マルチスレッドとの比較

マルチスレッドでも似たようなことはできるのですが、例えばこちらのページでは、ロックやスレッドセーフを気にせずに、簡単に実装できることを利点として挙げています。GPT4に質問してみても、以下のような利点を答えてくれました。(執筆現在のGPT4は時々適当なことを言うので真偽はさておき)

  1. 非同期I/O: asyncioは非同期I/Oをサポートしています。これは、ネットワークやディスクなどのリソースにアクセスするときに特に役立ちます。プログラムがI/Oを待っている間、他のコードを実行することが可能になります。
  2. 高い並行性: マルチスレッドと比較して、asyncioは一つのスレッドで複数の操作を同時に行う能力があります。これにより、大量の並行接続やタスクを効率的に処理することが可能になります。
  3. スレッドのオーバーヘッドが少ない: 各スレッドは独自のスタックとメモリを持つため、スレッドの数が多いとそれらの管理にコストがかかります。一方、asyncioのコルーチンは非常に軽量で、大量のコルーチンを同時に管理することが可能です。
  4. デッドロックや競合状態が少ない: asyncioでは、1つのタスクが一度に実行されるので、デッドロックやレースコンディションを避けるためのロックなどの機構が必要ありません。これにより、マルチスレッドプログラミングよりもコードの複雑さが軽減されます。

コルーチンの定義と実行

この記事では、asyncioの中でも、最新のPythonで推奨されている高レベルAPIを用いた方法を紹介します。その他の方法については、記事の最後の方で少しだけ触れています。

まずは、並行処理される処理の単位である、コルーチンを定義する方法を紹介します。ただし、このサンプルでは定義されたコルーチンは通常のプログラムのように逐次実行されます。

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, "hello")
    await say_after(2, "world")

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())
  • defの前にasyncを付けた関数はコルーチンとして定義されます。
  • コルーチンを実行する場合には呼び出しの前にawaitを付けます。
  • 大元のコルーチンを実行する場合はasyncio.runを使用します。

このコードは、関数をコルーチンとして定義した以外は普通に逐次的に1秒・2秒ずつスリープするだけのコードです。このコードを実行すると以下のような結果になります。

実行結果:

started at 14:35:27
hello at 14:35:28
world at 14:35:30
finished at 14:35:30

JavaScriptなどでasyncawaitを使いこなしている方にはお馴染みの書き方ではないかと思います。ただし、Pythonではawait無しでコルーチンを実行することはできません。

async def main():
    print(f"started at {time.strftime('%X')}")

    say_after(1, "hello")
    say_after(2, "world")

    print(f"finished at {time.strftime('%X')}")

実行結果:

started at 14:38:32
code2_2.py:11: RuntimeWarning: coroutine 'say_after' was never awaited
  say_after(1, 'hello')
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
code2_2.py:12: RuntimeWarning: coroutine 'say_after' was never awaited
  say_after(2, 'world')
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
finished at 14:38:32

タスクの定義と並行実行

asyncioで並行処理を実現するにはasyncio.create_taskによってコルーチンをタスクにします。

import asyncio
import time

async def say_after(delay, what):
    print(f"prepare {what} at {time.strftime('%X')}")
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    await task1

    print(f"returned from await task1 at {time.strftime('%X')}")

    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

実行結果:

started at 15:48:05
prepare hello at 15:48:05
prepare world at 15:48:05
hello at 15:48:06
returned from await task1 at 15:48:06
world at 15:48:07
finished at 15:48:07

一つ前のサンプルでは全ての処理が完了するまで合計3秒かかりましたが、sleepの待ち時間を利用して処理が並行実行されたために、startedから2秒後にfinishedの出力が実行されています。

awaitの戻り値

なお、awaitの評価結果はコルーチンの戻り値となります。

import asyncio

async def return42():
    return 42

async def main():
    result = await return42()
    print(f"{result=}")

asyncio.run(main())
result=42

3.タスクの並行実行の仕組み

仕組みの解説

タスクがどのような原理でシングルスレッド上で並行実行されているのかは中々分かりにくいのですが、結論から言うと以下の点を抑えれば十分です。

  • タスクはcreate_taskの時点で実行がスケジュールされる。
  • しかし制御が移るまでスケジュールされたタスクは実行されない。
  • await (Futureオブジェクト)を実行した時に限り、制御が他のタスクに移動し、awaitを実行したタスクはFutureオブジェクトの終了を待つ。
  • Futureが突然出てきたが、TaskFutureの一種である。
  • 全てのタスクがFutureを待っている場合は、最初にFutureが完了したタスクに制御が移る。

すぐ次の節で例を交えて検証を行います。

この仕組みの通りであれば、変に中途半端なタイミングでタスクが切り替わるということがないため、確かにロックやスレッドセーフを気にする必要はあまりなさそうです。

それぞれの根拠を捕捉すると、まず、タスクのスケジューリングについては公式ページに次のように記載されています。

asyncio.create_task() のような関数で、コルーチンが Task にラップされているとき、自動的にコルーチンは即時実行されるようにスケジュールされます

制御の移動については、こちらの記事が参考になりました。

python3 の async/awaitを理解する

どうやらpythonのawaitはfutureとcoroutineの2つが取れるらしいが、挙動が微妙に違う。
await futureの方はsuspends、つまり制御を手放すのに対してawait coroutineの方は単にcoroutineの終了を待つだけに見える。

cloudfit-public-docs Python Asyncio Part 2 – Awaitables, Tasks, and Futures

One of the most important points to get across is that the currently executing Task cannot be paused by any means other than awaiting a future

TaskFutureの一種であることは公式ページに記載があります。

Python コルーチン を実行する Future 類 オブジェクトです。

asyncio.Task は、Future.set_result() と Future.set_exception() を除いて、Future の API をすべて継承しています。

仕組みの確認のための具体例

一つ前のコードのsay_after

async def say_after(delay, what):
    print(f"prepare {what} at {time.strftime('%X')}")
    time.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

のように、asyncio.sleeptime.sleepに置き換えると結果は以下のようになります。

started at 16:49:38
prepare hello at 16:49:38
hello at 16:49:39
prepare world at 16:49:39
world at 16:49:41
return from task1 at 16:49:41
finished at 16:49:41

await task1の時点でmainからtask1に制御が移り、await (Futureオブジェクト)が無くなったため、task1は終了までまとめて実行され、mainに制御が戻ってきてからawait task2でようやくtask2が実行されます。

sleepやIO処理なら何でもかんでも自動的に制御が切り替わるわけではない、という根拠の一つになっています。

また、say_afterは戻して、今度はmainを次のコード

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'world'))

    await asyncio.sleep(0.5)

    print(f"started at {time.strftime('%X')}")

    await task1

    print(f"returned from await task1 at {time.strftime('%X')}")

    await task2

    print(f"finished at {time.strftime('%X')}")

のように書き換えると、結果は以下のようになります。

prepare hello at 16:54:49
prepare world at 16:54:49
started at 16:54:50
hello at 16:54:50
returned from await task1 at 16:54:50
world at 16:54:51
finished at 16:54:51

main内のawait asyncio.sleep(0.5)で制御がtask1に移り、task1sleepで制御がtask2に移り、task2sleep以降は最も早く目覚めるmainに制御が戻ります。なお、mainsleepの時間を長くすると出力の順序も変わります。

await task1のように明示的にタスクをawaitしなくともタスクは既にスケジューリングされており、asyncio.sleepなどのタイミングで制御が切り替わる例となっています。

4. 例外発生時の挙動

例外発生時には通常通り上に向かって例外がスローされます。

import asyncio

async def hello_after(delay, name):
    await asyncio.sleep(delay)
    if name == "Jiro":
        raise
    print(f"Hello, {name}")

async def main():
    task1 = asyncio.create_task(hello_after(1, "Taro"))
    task2 = asyncio.create_task(hello_after(2, "Jiro"))
    task3 = asyncio.create_task(hello_after(3, "Saburo"))

    await task1
    await task2
    await task3

asyncio.run(main())
Hello, Taro
Traceback (most recent call last):
  File "code3_4.py", line 18, in <module>
    asyncio.run(main())
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "code3_4.py", line 15, in main
    await task2
  File "code3_4.py", line 6, in hello_after
    raise
RuntimeError: No active exception to reraise

コルーチンとその実行を管理するスケジューラの処理が挟まっているため多少複雑にはなっていますが、通常通りコールスタックが形成されており、例外発生時にはそれが出力されています。

また、よって、タスクの呼び出し側でキャッチもできます。

async def main():
    task1 = asyncio.create_task(hello_after(1, "Taro"))
    task2 = asyncio.create_task(hello_after(2, "Jiro"))
    task3 = asyncio.create_task(hello_after(3, "Saburo"))

    try:
        await task1
        await task2
        await task3
    except Exception as e:
        print(f"catched \"{e}\"")
Hello, Taro
catched "No active exception to reraise"

呼び出し側のコルーチンが寝ている場合でも(起きてから)キャッチできますが、並行処理の切り替えによって実行順序は変化する場合があります。

async def main():
    task1 = asyncio.create_task(hello_after(1, "Taro"))
    task2 = asyncio.create_task(hello_after(2, "Jiro"))
    task3 = asyncio.create_task(hello_after(3, "Saburo"))

    try:
        await asyncio.sleep(4)

        await task1
        await task2
        await task3
    except Exception as e:
        print(f"catched \"{e}\"")
Hello, Taro
Hello, Saburo
catched "No active exception to reraise"

5. 複数のタスクの終了を待つ方法と例外発生時の挙動

asyncio.gather

(Python 3.11からは後で紹介するTaskGroupという機能の利用を推奨されていますが)asyncio.gatherは複数のタスクを並行実行して、戻り値をリストに集約する関数です。

import asyncio

from datetime import datetime

async def say_after(delay, what):
    start = datetime.now()
    print(f"prepare {what} at {start.strftime('%X')}")
    await asyncio.sleep(delay)
    end = datetime.now()
    print(f"{what} at {end.strftime('%X')}")
    return end - start

async def main():
    print(f"started at {datetime.now().strftime('%X')}")

    elapsed = await asyncio.gather(
        asyncio.create_task(say_after(1, 'hello')),
        asyncio.create_task(say_after(2, 'world')),
    )

    print(f"finished at {datetime.now().strftime('%X')}")
    print(elapsed)

asyncio.run(main())

実行結果

started at 17:06:52
prepare hello at 17:06:52
prepare world at 17:06:52
hello at 17:06:53
world at 17:06:54
finished at 17:06:54
[datetime.timedelta(seconds=1, microseconds=1577), datetime.timedelta(seconds=2, microseconds=1440)]

例外発生時の挙動

gather使用時にコルーチンの中で例外が発生した場合、実行結果は次のようになります。

import asyncio

async def hello_after(delay, name):
    await asyncio.sleep(delay)
    if name == "Jiro":
        raise
    print(f"Hello, {name}")

async def main():
    results = await asyncio.gather(
        asyncio.create_task(hello_after(1, "Taro")),
        asyncio.create_task(hello_after(2, "Jiro")),
        asyncio.create_task(hello_after(3, "Saburo")),
    )

asyncio.run(main())
Traceback (most recent call last):
  File "code4_2.py", line 16, in <module>
    asyncio.run(main())
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "code4_2.py", line 10, in main
    results = await asyncio.gather(
  File "code4_2.py", line 6, in hello_after
    raise
RuntimeError: No active exception to reraise

ただし、次のコード

async def main():
    results = await asyncio.gather(
        asyncio.create_task(hello_after(1, "Taro")),
        asyncio.create_task(hello_after(2, "Jiro")),
        asyncio.create_task(hello_after(3, "Saburo")),
        return_exceptions=True,
    )
    print(results)

のようにgatherの引数にreturn_exceptions=Trueを指定すると、実行結果は次のようになります。

Hello, Taro
Hello, Saburo
[None, RuntimeError('No active exception to reraise'), None]

例外がスローされたタスクでは、戻り値がその例外オブジェクトになります。

asyncio.TaskGroup

前述の通り、Python 3.11からはTaskGroupの利用が推奨されています。

import asyncio

async def hello_after(delay, name):
    await asyncio.sleep(delay)
    print(f"Hello, {name}")
    return name

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(hello_after(1, "Taro"))
        task2 = tg.create_task(hello_after(2, "Jiro"))
        task3 = tg.create_task(hello_after(3, "Saburo"))
    print(task1)
    print(task1.result())

asyncio.run(main())
Hello, Taro
Hello, Jiro
Hello, Saburo
<Task finished name='Task-2' coro=<hello_after() done, defined at code5_4.py:3> result='Taro'>
Taro

async with asyncio.TaskGroup()ブロック内で作成されたタスクがそれぞれ実行され、全てが完了すると次の行に処理が移動します。タスク内のコルーチンの戻り値は、Taskオブジェクトのresultメソッドで取得できます。

TaskGroupを利用する場合、あるタスクで例外が発生すると、他の未完了状態のタスクはキャンセルされます。以下の例では、2秒の時点で例外が送出され、終了まで3秒かかるタスクがキャンセルされています。

import asyncio

async def hello_after(delay, name):
    await asyncio.sleep(delay)
    if name == "Jiro":
        raise ValueError()
    print(f"Hello, {name}")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(hello_after(1, "Taro"))
            task2 = tg.create_task(hello_after(2, "Jiro"))
            task3 = tg.create_task(hello_after(3, "Saburo"))
    except* Exception as err:
        print(f"{err.exceptions=}")

    print(f"{task1._state=}")
    print(f"{task2._state=}")
    print(f"{task3._state=}")

asyncio.run(main())
Hello, Taro
err.exceptions=(ValueError(),)
task1._state='FINISHED'
task2._state='FINISHED'
task3._state='CANCELLED'

なお、exceptにアスタリスクがついていますが、これはPython3.11から導入されたExceptionGroupという機能です。同時に複数の例外がスローされた場合には、それらをまとめて捕捉することができます。

例えば、TypeErrorValueErrorを同時期に発生させた場合は以下の通りです。

import asyncio

async def hello_after(delay, name):
    await asyncio.sleep(delay)
    if type(name) is not str:
        raise TypeError()
    if name == "Jiro":
        raise ValueError()
    print(f"Hello, {name}")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(hello_after(1, "Taro"))
            task2 = tg.create_task(hello_after(2, "Jiro"))
            task3 = tg.create_task(hello_after(2, 1))
    except* Exception as err:
        print(f"{err.exceptions=}")

    print(f"{task1._state=}")
    print(f"{task2._state=}")
    print(f"{task3._state=}")

asyncio.run(main())
Hello, Taro
err.exceptions=(ValueError(), TypeError())
task1._state='FINISHED'
task2._state='FINISHED'
task3._state='FINISHED'

また、例外の発生したタスクより後に処理が完了するタスクでも、awaitが無く制御の切り替えが発生しない場合は最後まで実行される場合もあります。

import asyncio
from time import sleep

async def hello_after(delay, name):
    if name == "Saburo":
        sleep(delay)
    else:
        await asyncio.sleep(delay)
    if name == "Jiro":
        raise ValueError()
    print(f"Hello, {name}")

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            task1 = tg.create_task(hello_after(1, "Taro"))
            task2 = tg.create_task(hello_after(2, "Jiro"))
            task3 = tg.create_task(hello_after(3, "Saburo"))
    except* Exception as err:
        print(f"{err.exceptions=}")

    print(f"{task1._state=}")
    print(f"{task2._state=}")
    print(f"{task3._state=}")

asyncio.run(main())
Hello, Saburo
Hello, Taro
err.exceptions=(ValueError(),)
task1._state='FINISHED'
task2._state='FINISHED'
task3._state='FINISHED'

time.sleepでタスクが切り替わらないため、出力の順番も変わっています。

この他、まだ開始されていないタスクは'PENDING'という_stateになります。

6. 低レベルAPI

asyncioについて調べていると、get_running_loopget_event_loopなどを利用したサンプルコードが頻繁に登場します。

しかし、公式ドキュメントによれば、これらは前述の各種高レベルAPIにラップされた低レベルAPIであり、通常のアプリケーション開発者は使用する必要がなさそうとのことです。

アプリケーション開発者は通常 asyncio.run() のような高水準の ayncio 関数だけを利用し、ループオブジェクトを参照したり、ループオブジェクトのメソッドを呼び出したりすることはほとんどありません。この節は、イベントループの振る舞いに対して細かい調整が必要な、低水準のコード、ライブラリ、フレームワークの開発者向けです。

筆者はasyncioの変更の経緯までは詳しく追えていませんが、昔は高レベルAPIが今よりも未発達で、一部低レベルAPIの利用が必須だったのかもしれません。例えばcreate_taskはPython 3.7で追加されたとか。

また、Futureも低レベルAPIの一つであり、公式のこちらのページによれば、現在は明示的に使用する必要はないとのことです。

Future は、非同期処理の 最終結果 を表現する特別な 低レベルの awaitable オブジェクトです。

通常、アプリケーション水準のコードで Future オブジェクトを作る 必要はありません 。

Discussion