[Python] asyncio.TaskGroupの動きまとめ
Python3.11から追加されているTaskGroupについて、触ったことがなかったので実際に動かしながら振る舞いを確認してみました。
参考にしたページ
TaskGroupとは
従来、Pythonのasyncioでタスクを並行実行するには、
- asyncio.create_task()で単一のタスクを作成する
- asyncio.gather()で複数タスクを実行する
の大きく2つの方法がありました。ただしキャンセルや例外発生時など、気をつけることが多く使う上では注意が必要でした。
task = asyncio.create_task(some_task()) # タスク化
await other_task() # ここで外からキャンセルされてもtaskはキャンセルされない
await task
そんな中で、Python3.11から新しく追加されたのがasyncio.TaskGroupです。書き方は以前のふたつに比べて多少ややこしくなるのですが、タスクの並行実行に関してより安全な手段を提供しているということだったので、動きを確認してみました。
実行するスクリプト
今回のテスト用にスクリプトを作成しました。
Taskとしてのasync関数
import asyncio
import logging
async def task(num: int, delay: float):
logging.info("Task %d start", num)
try:
await asyncio.sleep(delay)
logging.info("Task %d finished", num)
except asyncio.CancelledError:
logging.info("Task %d cancelled", num)
raise
一定時間sleepした後にメッセージをlog出力して終了します。
タスクが中断されたことがわかるように、CancelledErrorのexceptとlog表示を追加しています。
main.py
import asyncio
import logging
# taskの定義は上述の通り
async def main():
pass # 確認用のコードを書いていく
if __name__ == "__main__":
logging.basicConfig(format="[%(relativeCreated)04d] %(message)s", level=logging.INFO)
asyncio.run(main())
main関数の中でTaskGroupを動かして、挙動を確認していく想定です。
loggingのフォーマットで、loggingモジュールが読み込まれてからの相対時間を表示してタイミングをわかりやすくしています。
各パターンの動作確認
いよいよ本編です。main()の内容を変えて各パターンでの挙動を見ていきます。
Pythonのバージョンは3.13.1で確認しました。
$ python3 --version
Python 3.13.1
タスクの実行
基本の形です。すべてのタスクが終わるまで待ってからcontext managerを抜けます。
async def main():
logging.info("Enter TaskGroup")
async with asyncio.TaskGroup() as tg:
tg.create_task(task(num=1, delay=1))
tg.create_task(task(num=2, delay=0.5))
logging.info("Exit TaskGroup")
[0083] Enter TaskGroup
[0083] Task 1 start
[0083] Task 2 start
[0584] Task 2 finished
[1084] Task 1 finished
[1085] Exit TaskGroup

context manager内でのasync処理の実行
context manager内で長い処理が動いている場合はそちらが終わってから抜けます。
async def main():
logging.info("Enter TaskGroup")
async with asyncio.TaskGroup() as tg:
tg.create_task(task(num=1, delay=1))
await task(num=2, delay=2)
logging.info("Exit TaskGroup")
[0080] Enter TaskGroup
[0080] Task 2 start
[0080] Task 1 start
[1082] Task 1 finished
[2082] Task 2 finished
[2082] Exit TaskGroup

タスクのキャンセル
ひとつのタスクがキャンセルされても他のタスクはキャンセルされません。
async def main():
logging.info("Enter TaskGroup")
async with asyncio.TaskGroup() as tg:
tg.create_task(task(num=1, delay=1))
task2 = tg.create_task(task(num=2, delay=0.5))
await asyncio.sleep(0.1)
task2.cancel()
logging.info("Exit TaskGroup")
[0082] Enter TaskGroup
[0083] Task 1 start
[0083] Task 2 start
[0184] Task 2 cancelled
[1084] Task 1 finished
[1085] Exit TaskGroup

タスクのエラー
例外発生時の挙動を確かめるためにtask関数とmain関数を少しアップデートしました。
task関数の引数にexceptionを追加して、指定がある場合はraiseしています。
async def task(num: int, delay: float, exception: Exception | None = None):
logging.info("Task %d start", num)
try:
await asyncio.sleep(delay)
if exception is not None: # 例外を上げられるようにする
logging.info("Task %d raising exception", num)
raise exception
logging.info("Task %d finished", num)
except asyncio.CancelledError:
logging.info("Task %d cancelled", num)
raise
async def main():
logging.info("Enter TaskGroup")
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task(num=1, delay=1))
tg.create_task(task(num=2, delay=0.5, exception=RuntimeError("test")))
await task(num=3, delay=1)
except* Exception as e: # 例外のハンドリングを追加
logging.error("TaskGroup error: %s", repr(e))
logging.info("Exit TaskGroup")
[0095] Enter TaskGroup
[0095] Task 3 start
[0095] Task 1 start
[0095] Task 2 start
[0597] Task 2 raising exception
[0597] Task 1 cancelled
[0598] Task 3 cancelled
[0598] TaskGroup error: ExceptionGroup('unhandled errors in a TaskGroup', [RuntimeError('test')])
[0598] Exit TaskGroup
例外発生時は、TaskGroup内の他のタスクやcontext manager内の処理はすべてキャンセルされるようです。

context manager内でのエラー
context manager内部でエラーが起きたときの動作も同様で、実行中のタスクはキャンセルされるようです。
async def main():
logging.info("Enter TaskGroup")
try:
async with asyncio.TaskGroup() as tg:
tg.create_task(task(num=1, delay=1))
await task(num=3, delay=0.5, exception=RuntimeError("test"))
except* Exception as e:
logging.error("TaskGroup error: %s", repr(e))
logging.info("Exit TaskGroup")
[0096] Enter TaskGroup
[0096] Task 3 start
[0096] Task 1 start
[0597] Task 3 raising exception
[0597] Task 1 cancelled
[0597] TaskGroup error: ExceptionGroup('unhandled errors in a TaskGroup', [RuntimeError('test')])
[0597] Exit TaskGroup

TaskGroupのキャンセル
ややこしくなってきました。
TaskGroup自体がキャンセルされた場合は、実行中のタスクもcontext manager内の処理もキャンセルされます。
async def run_taskgroup():
async with asyncio.TaskGroup() as tg:
tg.create_task(task(num=1, delay=1))
await task(num=2, delay=1)
async def main():
logging.info("Enter TaskGroup")
async with asyncio.TaskGroup() as tg:
inner_task = tg.create_task(run_taskgroup())
await asyncio.sleep(0.5)
inner_task.cancel()
logging.info("Exit TaskGroup")
[0080] Enter TaskGroup
[0081] Task 2 start
[0081] Task 1 start
[0582] Task 2 cancelled
[0583] Task 1 cancelled
[0583] Exit TaskGroup

タスクのキャンセル(finally節)
個人的にここが重要なポイントで、タスクがfinally節の中でawaitを呼び出すとき、きちんと待ってからcontext managerを抜けてくれます。
async def task(num: int, delay: float, exception: Exception | None = None):
logging.info("Task %d start", num)
try:
await asyncio.sleep(delay)
if exception is not None:
logging.info("Task %d raising exception", num)
raise exception
logging.info("Task %d finished", num)
except asyncio.CancelledError:
logging.info("Task %d cancelled", num)
raise
finally:
await asyncio.sleep(delay)
logging.info("Task %d gracefully cancelled", num)
async def main():
logging.info("Enter TaskGroup")
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(task(num=1, delay=1))
await asyncio.sleep(0.5)
task1.cancel()
logging.info("Exit TaskGroup")
[0079] Enter TaskGroup
[0079] Task 1 start
[0581] Task 1 cancelled
[1583] Task 1 gracefully cancelled
[1583] Exit TaskGroup

通常のcreate_taskのTaskをキャンセルして終了を待とうとすると、別途awaitする場合があります。
迷った末以下の書き方をしていますが、他に方法がありそうとも思っているのでご存知の方は是非コメント下さい。
async def run_create_task():
task1 = asyncio.create_task(task(num=1, delay=1))
await asyncio.sleep(0.5)
task1.cancel() # この時点ではまだfinally節は完了していない
try:
await task1 # finallyの完了を待つ必要がある
except asyncio.CancelledError: # キャンセルされたtaskをawaitすると例外が上がるので無視する必要がある
pass
logging.info("run_create_task() finished")
finally節のキャンセル
上記のCancelledErrorをpassする実装をしてしまうと、運悪くtaskの終了を待っている間にキャンセルされると無視されてしまうという問題があります。
念の為TaskGroupでの動作を確認しましたが、きちんと中断できていることを確認しました。
async def run_taskgroup():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(task(num=1, delay=1))
await asyncio.sleep(0.5)
task1.cancel()
logging.info("run_taskgroup() finished")
async def main():
logging.info("Enter TaskGroup")
async with asyncio.TaskGroup() as tg:
inner_task = tg.create_task(run_taskgroup())
await asyncio.sleep(0.7)
inner_task.cancel()
logging.info("Exit TaskGroup")
[0073] Enter TaskGroup
[0073] Task 1 start
[0575] Task 1 cancelled
[0775] Exit TaskGroup

mainの中で上記のrun_create_task()を呼び出した場合はキャンセルが無視されて最後まで到達してしまっています。
[0085] Enter TaskGroup
[0085] Task 1 start
[0586] Task 1 cancelled
[0787] run_create_task() finished #キャンセルできていない
[0790] Exit TaskGroup
まとめ
まとめると、context managerを抜けるときはタスクの成功/失敗によらず、全てのタスクが(finally節含め)完了していると思って良さそうです。
ドキュメントでもasyncio.gather以外の選択肢としてオススメされているようで、コード上も見通しが良くなるので積極的に使っていこうかなと思います。
A new alternative to create and run tasks concurrently and wait for their completion is
asyncio.TaskGroup. TaskGroup provides stronger safety guarantees than gather for scheduling a nesting of subtasks: if a task (or a subtask, a task scheduled by a task) raises an exception, TaskGroup will, while gather will not, cancel the remaining scheduled tasks).
使ったことはないのですが、trio等の非同期ライブラリでも同様の機能はあり、structured concurrencyというコンセプトが提唱されているようです。
Discussion