Pythonにasyncioってあるけどよく知らなかったので調べた
1.はじめに
Pythonのasyncioは、async
・await
構文を利用して並行処理を行うため公式ライブラリです。
筆者はいくつかのライブラリでasync
・await
またはasyncioなるキーワードを目にしつつも、理解を後回しにしてきました。この度一念発起して情報を整理し、同じような人たちのために情報をまとめました。
- まず、asyncioの基本的な使い方を改めて整理しました。
- 特に、並行処理のタスク遷移と例外周りについて、パッと検索した範囲ではあまり情報が見つからなかったため、深掘りしてまとめました。
- 執筆当時の検索結果では、asyncioの古い書き方や機能を利用しているものも多くありました。この記事ではなるべく執筆現在の最新バージョン(Python 3.11)で推奨されている書き方でサンプルを作ってあります。
ただし、asyncioについて色々と書いたものの、async
・await
はPythonのキーワード、asyncioはそれを利用したライブラリであり、async
・await
を利用するサードパーティの非同期処理ライブラリは他にも開発されていたりします。例えばuvicornはasyncioではなくuvloopというライブラリを使用しているとのことです。
対象読者
- Pythonの基本的な使い方は理解している方
- asyncioって聞いたことはあるけどよく知らないなあという方
- Python製Webフレームワークなどで、非同期処理の中身(の一部の一例)がどうなっているのか知りたい方
2.asyncioの基本
並行処理と並列処理
まず、asyncioは、シングルスレッドで動作する並行処理のためのライブラリとのことです。
Python 3.6までのドキュメントにははっきりとそう書いてあります。3.7以降のドキュメントでは改訂によって文言が消えてしまっていますが、おそらく現在でもその仕組みは変わっていないのではないでしょうか。
並行処理と、よく比較される概念の並列処理の定義は検索すればたくさん出てきます。例えばこちらのページでは以下のように説明されています。
- 並行処理: ある時間の範囲において、複数のタスクを扱うこと
- 並列処理: ある時間の点において、複数のタスクを扱うこと
他にもこちらのページでも似たような定義が説明されています。
マルチコアで並列処理を行う場合は、複数のタスクが実際に同時に処理されています。ある時点で複数のタスクが処理されているならば、当然ある時間の範囲でも複数のタスクが処理されていますので、この定義では並列処理は並行処理の一種です。一方、シングルスレッドで並行に処理を行うとはどういうことかというと、具体的には複数の処理をタイムスライスしながら少しずつ進めるということです。例えば
- 処理Aを20%進めて
- 処理対象をAからBに切り替えて、処理Bを15%進めて
- 処理対象をBからAに切り替えて、処理Aを25%進めて...
というようなイメージです。ただし実際のasyncioでは、大まかに言えばファイルアクセスやネットワークアクセスなど、CPUが空くIOの待ち時間のタイミングで切り替えが行われます。なのでasync IOなのかと。更に厳密にどのタイミングで切り替えが生じるのかは後でコードを交えて説明します。
マルチスレッドとの比較
マルチスレッドでも似たようなことはできるのですが、例えばこちらのページでは、ロックやスレッドセーフを気にせずに、簡単に実装できることを利点として挙げています。GPT4に質問してみても、以下のような利点を答えてくれました。(執筆現在のGPT4は時々適当なことを言うので真偽はさておき)
- 非同期I/O: asyncioは非同期I/Oをサポートしています。これは、ネットワークやディスクなどのリソースにアクセスするときに特に役立ちます。プログラムがI/Oを待っている間、他のコードを実行することが可能になります。
- 高い並行性: マルチスレッドと比較して、asyncioは一つのスレッドで複数の操作を同時に行う能力があります。これにより、大量の並行接続やタスクを効率的に処理することが可能になります。
- スレッドのオーバーヘッドが少ない: 各スレッドは独自のスタックとメモリを持つため、スレッドの数が多いとそれらの管理にコストがかかります。一方、asyncioのコルーチンは非常に軽量で、大量のコルーチンを同時に管理することが可能です。
- デッドロックや競合状態が少ない: asyncioでは、1つのタスクが一度に実行されるので、デッドロックやレースコンディションを避けるためのロックなどの機構が必要ありません。これにより、マルチスレッドプログラミングよりもコードの複雑さが軽減されます。
コルーチンの定義と実行
この記事では、asyncioの中でも、最新のPythonで推奨されている高レベルAPIを用いた方法を紹介します。その他の方法については、記事の最後の方で少しだけ触れています。
まずは、並行処理される処理の単位である、コルーチンを定義する方法を紹介します。ただし、このサンプルでは定義されたコルーチンは通常のプログラムのように逐次実行されます。
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(f"{what} at {time.strftime('%X')}")
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, "hello")
await say_after(2, "world")
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
-
def
の前にasync
を付けた関数はコルーチンとして定義されます。 - コルーチンを実行する場合には呼び出しの前に
await
を付けます。 - 大元のコルーチンを実行する場合は
asyncio.run
を使用します。
このコードは、関数をコルーチンとして定義した以外は普通に逐次的に1秒・2秒ずつスリープするだけのコードです。このコードを実行すると以下のような結果になります。
実行結果:
started at 14:35:27
hello at 14:35:28
world at 14:35:30
finished at 14:35:30
JavaScriptなどでasync
・await
を使いこなしている方にはお馴染みの書き方ではないかと思います。ただし、Pythonではawait
無しでコルーチンを実行することはできません。
async def main():
print(f"started at {time.strftime('%X')}")
say_after(1, "hello")
say_after(2, "world")
print(f"finished at {time.strftime('%X')}")
実行結果:
started at 14:38:32
code2_2.py:11: RuntimeWarning: coroutine 'say_after' was never awaited
say_after(1, 'hello')
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
code2_2.py:12: RuntimeWarning: coroutine 'say_after' was never awaited
say_after(2, 'world')
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
finished at 14:38:32
タスクの定義と並行実行
asyncioで並行処理を実現するにはasyncio.create_task
によってコルーチンをタスクにします。
import asyncio
import time
async def say_after(delay, what):
print(f"prepare {what} at {time.strftime('%X')}")
await asyncio.sleep(delay)
print(f"{what} at {time.strftime('%X')}")
async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
await task1
print(f"returned from await task1 at {time.strftime('%X')}")
await task2
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
実行結果:
started at 15:48:05
prepare hello at 15:48:05
prepare world at 15:48:05
hello at 15:48:06
returned from await task1 at 15:48:06
world at 15:48:07
finished at 15:48:07
一つ前のサンプルでは全ての処理が完了するまで合計3秒かかりましたが、sleep
の待ち時間を利用して処理が並行実行されたために、started
から2秒後にfinished
の出力が実行されています。
await
の戻り値
なお、await
の評価結果はコルーチンの戻り値となります。
import asyncio
async def return42():
return 42
async def main():
result = await return42()
print(f"{result=}")
asyncio.run(main())
result=42
3.タスクの並行実行の仕組み
仕組みの解説
タスクがどのような原理でシングルスレッド上で並行実行されているのかは中々分かりにくいのですが、結論から言うと以下の点を抑えれば十分です。
- タスクは
create_task
の時点で実行がスケジュールされる。 - しかし制御が移るまでスケジュールされたタスクは実行されない。
-
await (Futureオブジェクト)
を実行した時に限り、制御が他のタスクに移動し、await
を実行したタスクはFuture
オブジェクトの終了を待つ。 -
Future
が突然出てきたが、Task
はFuture
の一種である。 - 全てのタスクが
Future
を待っている場合は、最初にFuture
が完了したタスクに制御が移る。
すぐ次の節で例を交えて検証を行います。
この仕組みの通りであれば、変に中途半端なタイミングでタスクが切り替わるということがないため、確かにロックやスレッドセーフを気にする必要はあまりなさそうです。
それぞれの根拠を捕捉すると、まず、タスクのスケジューリングについては公式ページに次のように記載されています。
asyncio.create_task() のような関数で、コルーチンが Task にラップされているとき、自動的にコルーチンは即時実行されるようにスケジュールされます
制御の移動については、こちらの記事が参考になりました。
どうやらpythonのawaitはfutureとcoroutineの2つが取れるらしいが、挙動が微妙に違う。
await futureの方はsuspends、つまり制御を手放すのに対してawait coroutineの方は単にcoroutineの終了を待つだけに見える。
cloudfit-public-docs Python Asyncio Part 2 – Awaitables, Tasks, and Futures
One of the most important points to get across is that the currently executing Task cannot be paused by any means other than awaiting a future
Task
がFuture
の一種であることは公式ページに記載があります。
Python コルーチン を実行する Future 類 オブジェクトです。
asyncio.Task は、Future.set_result() と Future.set_exception() を除いて、Future の API をすべて継承しています。
仕組みの確認のための具体例
一つ前のコードのsay_after
で
async def say_after(delay, what):
print(f"prepare {what} at {time.strftime('%X')}")
time.sleep(delay)
print(f"{what} at {time.strftime('%X')}")
のように、asyncio.sleep
をtime.sleep
に置き換えると結果は以下のようになります。
started at 16:49:38
prepare hello at 16:49:38
hello at 16:49:39
prepare world at 16:49:39
world at 16:49:41
return from task1 at 16:49:41
finished at 16:49:41
await task1
の時点でmain
からtask1
に制御が移り、await (Futureオブジェクト)
が無くなったため、task1
は終了までまとめて実行され、main
に制御が戻ってきてからawait task2
でようやくtask2
が実行されます。
sleep
やIO処理なら何でもかんでも自動的に制御が切り替わるわけではない、という根拠の一つになっています。
また、say_after
は戻して、今度はmain
を次のコード
async def main():
task1 = asyncio.create_task(say_after(1, 'hello'))
task2 = asyncio.create_task(say_after(2, 'world'))
await asyncio.sleep(0.5)
print(f"started at {time.strftime('%X')}")
await task1
print(f"returned from await task1 at {time.strftime('%X')}")
await task2
print(f"finished at {time.strftime('%X')}")
のように書き換えると、結果は以下のようになります。
prepare hello at 16:54:49
prepare world at 16:54:49
started at 16:54:50
hello at 16:54:50
returned from await task1 at 16:54:50
world at 16:54:51
finished at 16:54:51
main
内のawait asyncio.sleep(0.5)
で制御がtask1
に移り、task1
のsleep
で制御がtask2
に移り、task2
のsleep
以降は最も早く目覚めるmain
に制御が戻ります。なお、main
のsleep
の時間を長くすると出力の順序も変わります。
await task1
のように明示的にタスクをawait
しなくともタスクは既にスケジューリングされており、asyncio.sleep
などのタイミングで制御が切り替わる例となっています。
4. 例外発生時の挙動
例外発生時には通常通り上に向かって例外がスローされます。
import asyncio
async def hello_after(delay, name):
await asyncio.sleep(delay)
if name == "Jiro":
raise
print(f"Hello, {name}")
async def main():
task1 = asyncio.create_task(hello_after(1, "Taro"))
task2 = asyncio.create_task(hello_after(2, "Jiro"))
task3 = asyncio.create_task(hello_after(3, "Saburo"))
await task1
await task2
await task3
asyncio.run(main())
Hello, Taro
Traceback (most recent call last):
File "code3_4.py", line 18, in <module>
asyncio.run(main())
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "code3_4.py", line 15, in main
await task2
File "code3_4.py", line 6, in hello_after
raise
RuntimeError: No active exception to reraise
コルーチンとその実行を管理するスケジューラの処理が挟まっているため多少複雑にはなっていますが、通常通りコールスタックが形成されており、例外発生時にはそれが出力されています。
また、よって、タスクの呼び出し側でキャッチもできます。
async def main():
task1 = asyncio.create_task(hello_after(1, "Taro"))
task2 = asyncio.create_task(hello_after(2, "Jiro"))
task3 = asyncio.create_task(hello_after(3, "Saburo"))
try:
await task1
await task2
await task3
except Exception as e:
print(f"catched \"{e}\"")
Hello, Taro
catched "No active exception to reraise"
呼び出し側のコルーチンが寝ている場合でも(起きてから)キャッチできますが、並行処理の切り替えによって実行順序は変化する場合があります。
async def main():
task1 = asyncio.create_task(hello_after(1, "Taro"))
task2 = asyncio.create_task(hello_after(2, "Jiro"))
task3 = asyncio.create_task(hello_after(3, "Saburo"))
try:
await asyncio.sleep(4)
await task1
await task2
await task3
except Exception as e:
print(f"catched \"{e}\"")
Hello, Taro
Hello, Saburo
catched "No active exception to reraise"
5. 複数のタスクの終了を待つ方法と例外発生時の挙動
asyncio.gather
(Python 3.11からは後で紹介するTaskGroup
という機能の利用を推奨されていますが)asyncio.gather
は複数のタスクを並行実行して、戻り値をリストに集約する関数です。
import asyncio
from datetime import datetime
async def say_after(delay, what):
start = datetime.now()
print(f"prepare {what} at {start.strftime('%X')}")
await asyncio.sleep(delay)
end = datetime.now()
print(f"{what} at {end.strftime('%X')}")
return end - start
async def main():
print(f"started at {datetime.now().strftime('%X')}")
elapsed = await asyncio.gather(
asyncio.create_task(say_after(1, 'hello')),
asyncio.create_task(say_after(2, 'world')),
)
print(f"finished at {datetime.now().strftime('%X')}")
print(elapsed)
asyncio.run(main())
実行結果
started at 17:06:52
prepare hello at 17:06:52
prepare world at 17:06:52
hello at 17:06:53
world at 17:06:54
finished at 17:06:54
[datetime.timedelta(seconds=1, microseconds=1577), datetime.timedelta(seconds=2, microseconds=1440)]
例外発生時の挙動
gather
使用時にコルーチンの中で例外が発生した場合、実行結果は次のようになります。
import asyncio
async def hello_after(delay, name):
await asyncio.sleep(delay)
if name == "Jiro":
raise
print(f"Hello, {name}")
async def main():
results = await asyncio.gather(
asyncio.create_task(hello_after(1, "Taro")),
asyncio.create_task(hello_after(2, "Jiro")),
asyncio.create_task(hello_after(3, "Saburo")),
)
asyncio.run(main())
Traceback (most recent call last):
File "code4_2.py", line 16, in <module>
asyncio.run(main())
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "code4_2.py", line 10, in main
results = await asyncio.gather(
File "code4_2.py", line 6, in hello_after
raise
RuntimeError: No active exception to reraise
ただし、次のコード
async def main():
results = await asyncio.gather(
asyncio.create_task(hello_after(1, "Taro")),
asyncio.create_task(hello_after(2, "Jiro")),
asyncio.create_task(hello_after(3, "Saburo")),
return_exceptions=True,
)
print(results)
のようにgather
の引数にreturn_exceptions=True
を指定すると、実行結果は次のようになります。
Hello, Taro
Hello, Saburo
[None, RuntimeError('No active exception to reraise'), None]
例外がスローされたタスクでは、戻り値がその例外オブジェクトになります。
asyncio.TaskGroup
前述の通り、Python 3.11からはTaskGroup
の利用が推奨されています。
import asyncio
async def hello_after(delay, name):
await asyncio.sleep(delay)
print(f"Hello, {name}")
return name
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(hello_after(1, "Taro"))
task2 = tg.create_task(hello_after(2, "Jiro"))
task3 = tg.create_task(hello_after(3, "Saburo"))
print(task1)
print(task1.result())
asyncio.run(main())
Hello, Taro
Hello, Jiro
Hello, Saburo
<Task finished name='Task-2' coro=<hello_after() done, defined at code5_4.py:3> result='Taro'>
Taro
async with asyncio.TaskGroup()
ブロック内で作成されたタスクがそれぞれ実行され、全てが完了すると次の行に処理が移動します。タスク内のコルーチンの戻り値は、Task
オブジェクトのresult
メソッドで取得できます。
TaskGroup
を利用する場合、あるタスクで例外が発生すると、他の未完了状態のタスクはキャンセルされます。以下の例では、2秒の時点で例外が送出され、終了まで3秒かかるタスクがキャンセルされています。
import asyncio
async def hello_after(delay, name):
await asyncio.sleep(delay)
if name == "Jiro":
raise ValueError()
print(f"Hello, {name}")
async def main():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(hello_after(1, "Taro"))
task2 = tg.create_task(hello_after(2, "Jiro"))
task3 = tg.create_task(hello_after(3, "Saburo"))
except* Exception as err:
print(f"{err.exceptions=}")
print(f"{task1._state=}")
print(f"{task2._state=}")
print(f"{task3._state=}")
asyncio.run(main())
Hello, Taro
err.exceptions=(ValueError(),)
task1._state='FINISHED'
task2._state='FINISHED'
task3._state='CANCELLED'
なお、except
にアスタリスクがついていますが、これはPython3.11から導入されたExceptionGroup
という機能です。同時に複数の例外がスローされた場合には、それらをまとめて捕捉することができます。
例えば、TypeError
とValueError
を同時期に発生させた場合は以下の通りです。
import asyncio
async def hello_after(delay, name):
await asyncio.sleep(delay)
if type(name) is not str:
raise TypeError()
if name == "Jiro":
raise ValueError()
print(f"Hello, {name}")
async def main():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(hello_after(1, "Taro"))
task2 = tg.create_task(hello_after(2, "Jiro"))
task3 = tg.create_task(hello_after(2, 1))
except* Exception as err:
print(f"{err.exceptions=}")
print(f"{task1._state=}")
print(f"{task2._state=}")
print(f"{task3._state=}")
asyncio.run(main())
Hello, Taro
err.exceptions=(ValueError(), TypeError())
task1._state='FINISHED'
task2._state='FINISHED'
task3._state='FINISHED'
また、例外の発生したタスクより後に処理が完了するタスクでも、await
が無く制御の切り替えが発生しない場合は最後まで実行される場合もあります。
import asyncio
from time import sleep
async def hello_after(delay, name):
if name == "Saburo":
sleep(delay)
else:
await asyncio.sleep(delay)
if name == "Jiro":
raise ValueError()
print(f"Hello, {name}")
async def main():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(hello_after(1, "Taro"))
task2 = tg.create_task(hello_after(2, "Jiro"))
task3 = tg.create_task(hello_after(3, "Saburo"))
except* Exception as err:
print(f"{err.exceptions=}")
print(f"{task1._state=}")
print(f"{task2._state=}")
print(f"{task3._state=}")
asyncio.run(main())
Hello, Saburo
Hello, Taro
err.exceptions=(ValueError(),)
task1._state='FINISHED'
task2._state='FINISHED'
task3._state='FINISHED'
time.sleep
でタスクが切り替わらないため、出力の順番も変わっています。
この他、まだ開始されていないタスクは'PENDING'
という_state
になります。
6. 低レベルAPI
asyncioについて調べていると、get_running_loop
やget_event_loop
などを利用したサンプルコードが頻繁に登場します。
しかし、公式ドキュメントによれば、これらは前述の各種高レベルAPIにラップされた低レベルAPIであり、通常のアプリケーション開発者は使用する必要がなさそうとのことです。
アプリケーション開発者は通常 asyncio.run() のような高水準の ayncio 関数だけを利用し、ループオブジェクトを参照したり、ループオブジェクトのメソッドを呼び出したりすることはほとんどありません。この節は、イベントループの振る舞いに対して細かい調整が必要な、低水準のコード、ライブラリ、フレームワークの開発者向けです。
筆者はasyncioの変更の経緯までは詳しく追えていませんが、昔は高レベルAPIが今よりも未発達で、一部低レベルAPIの利用が必須だったのかもしれません。例えばcreate_task
はPython 3.7で追加されたとか。
また、Future
も低レベルAPIの一つであり、公式のこちらのページによれば、現在は明示的に使用する必要はないとのことです。
Future は、非同期処理の 最終結果 を表現する特別な 低レベルの awaitable オブジェクトです。
通常、アプリケーション水準のコードで Future オブジェクトを作る 必要はありません 。
Discussion