Python Concurrency with asyncioの読書メモとasyncioの3.11–3.13変更点
はじめに
Python Concurrency with asyncioというPythonの非同期処理を専門とした書籍です。
既存または新規のPythonアプリケーションにおける並行処理の理解を深め、活用したいと考えている開発者には有用な書籍です。
書籍中のソースコードによると、本書のコードが動く環境はPython3.10になります。
本書にはいくつか古い実装方法が残っているので留意してください。
asyncioについて3.11以降に変更された点はバージョン差による注意にまとめてあります。
各章のまとめ
1 Getting to know asyncio
- Pythonにおける並行処理の基礎知識
- I/OバウンドとCPUバウンドの違い
- 並行性と並列性の違い
- マルチプロセスとマルチスレッド
- GILの理解
- シングルスレッド同時実行の仕組み
- イベントループ
2 asyncio basics
- asyncio コルーチンの基礎
- コルーチン関数とコルーチンオブジェクト
- タスク
- タスクとコルーチンとフューチャーの関係
- コルーチンとタスクの落とし穴
- CPU依存のコード
- ブロッキングするAPI
- イベントループの操作
- デバッグモードを使用したasyncio.run
3 A first asyncio application
- ブロッキングするソケットの操作
- 非ブロッキングのソケット操作
- asyncioを使用したエコーサーバーのサンプル
4 Concurrent web requests
- 複数のWebリクエストを並行して送信する方法
- aiohttpを使用した非同期対応
- 非同期コンテキストマネージャー async with
- gatherによる同時実行
- waitによる制御
注意:
おてがるにAPI叩くだけならhttpxという選択肢もある
サンプル:
https://github.com/mima3/test_asyncio/tree/main/py313/rest
5 Non-blocking database drivers
- データベースにおける非同期処理
- asyncpgによるPostgresSQLの非同期処理
- queryの実行例
- 接続プールの利用例
- transactionの利用例
- 非同期ジェネレータと結果セットの取得
注意:asyncpgの話しかしてないが、同じような実装でasyncmyなどでMySqlも利用できる。
参考:
https://github.com/mima3/test_asyncio/tree/main/py313/db
6 Handling CPU-bound work
- CPU負荷の高い処理をasyncioと組み合わせて処理する方法
- multiprocessingライブラリの説明
- プロセスプールの使用方法をasyncioで動かす
- asyncioによるMappingとreducing
- データの共有方法/ロックによる同期
7 Handling blocking work with threads
- asyncioを用いてブロッキングI/Oを処理する方法
- Threadクラスの紹介
- ThreadPoolExecutorクラス紹介
- asyncioを使用したスレッド操作
- threading.Lock, threading.Rlock, deadlock
-
Tkinterの紹介
- asyncioとスレッドを利用してUIをフリーズさせない
- 低レベルなAPIはGILを解放するケースがあり、スレッド+asyncioが効く
- hashlib
- numpy(執筆時点ではスレッドのメリットに関するドキュメントが不足してたとのこと)
8 Streams
- チャットシステムのクライアント・サーバーを作る
- 低レベルのトランスポートAPIとプロトコルAPIの使い方
- asyncioのStreamReader/StreamWriter
- tty
9 Web applications
-
aiohttpを使用したWebアプリケーションの作成
- 簡単なRESTAPIを作る
- DBにも繋げる
- aiohttpとFlaskの比較
- WSGIとASGI
- Starlette紹介
- Djangoの非同期ビュー
10 Microservices
- asyncio ベースの Web APIの提供
- マイクロサービスの基礎
- backend-for-frontend pattern
- circuit breaker pattern
11 Synchronization
- 同期における問題と解決策
- シングルスレッドの同時実行のバグ
- asyncio.Lock
- asyncio.Semaphore/ asyncio.BoundedSemaphore
- asyncio.Event
- asyncio.Condition
12 Asynchronous queues
- 非同期キュー
- asyncio.Queue
- Webアプリでキューを使用する例
- Webクローラーでキューを使用する例
- asyncio.PriorityQueue
- asyncio.LifoQueue
13 Managing subprocesses
- サブプロセスの作成と管理
-
asyncio.subprocess
- create_subprocess_shell vs create_subprocess_exec(推奨)
- パイプを扱う際にはデッドロックに気をつける
- communicate()の使用を検討
14 Advanced asyncio
- asyncio APIの設計者や、asyncioイベントループの内部動作
- iscoroutineとinspect.iscoroutinefunctionを使用してコルーチンオブジェクトかチェックして通常関数とコルーチンオブジェクトの両方を処理できる
- ContextVarsで特定のタスクに対してローカルな変数を作成することを検討する
- 意識的に制御をイベントループに返したい場合は
asyncio.sleep(0)
を使う - uvloopなどを使用してイベントループの実装の変更を検討する
- 独自のイベントループを作る方法
バージョン差による注意
Python 3.11以降も非同期処理はバージョンアップしています。ここでは書籍の対象外の主な機能を紹介します。
TaskGroupの関係
TaskGroupは3.11に導入されたものです。本書籍には載っていませんがgatherを置き換えた方が無難です。
従来
results = await asyncio.gather(ok("A", 0.30), ok("B", 0.10), ok("C", 0.20))
print("results:", results)
新しい書き方
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(ok("A", 0.30))
t2 = tg.create_task(ok("B", 0.10))
t3 = tg.create_task(ok("C", 0.20))
# ここまで来たら全タスク完了済み。各 Task から result() で回収する
print("results:", t1.result(), t2.result(), t3.result())
asyncio.timeout() / timeout_at()
timeoutとtimeout_at3.11に導入されたものです。wait_forの代替となります
従来
try:
print("start. wait_for")
await asyncio.wait_for(asyncio.sleep(5), 1)
except TimeoutError:
print(' wait_for...')
新しい書き方
try:
print("start. timeout")
async with asyncio.timeout(1):
await asyncio.sleep(5)
except asyncio.TimeoutError:
print(' timeout...')
try:
print("start. timeout_at")
deadline = asyncio.get_running_loop().time() + 1
async with asyncio.timeout_at(deadline):
await asyncio.sleep(5)
except asyncio.TimeoutError:
print(' timeout_at...')
Barrierのサポート
Barrier指定された数のタスクが待機するまでブロックすることを可能にします。Python3.11で追加されました。
import asyncio, random
async def worker(i: int, barrier: asyncio.Barrier):
prep = random.uniform(0.05, 0.25) # 準備時間はバラバラ
await asyncio.sleep(prep)
print(f"[W{i}] ready (prep {prep:.2f}s)")
# 待機者が parties 人に達した瞬間に全員解除される
pos = await barrier.wait()
print(f"[W{i}] run...", pos)
async def main():
parties = 5
barrier = asyncio.Barrier(parties)
async with asyncio.TaskGroup() as tg:
for i in range(parties):
tg.create_task(worker(i, barrier))
if __name__ == "__main__":
asyncio.run(main())
Runnerクラス
3.11より、asyncio.runの代わりにRunnerクラスを使用できるようになりました。
def main():
with asyncio.Runner(debug=True) as runner:
runner.run(test("1番目", 0.5))
runner.run(test("2番目", 2.5)) # 同じループ・同じContextVarsで続けて実行
if __name__ == "__main__":
main()
Eager Task Factory
eager_task_factoryではタスク生成時に同期的にコルーチンを走らせ、ブロックしたら初めてループに乗せることがPython 3.12から可能になりました。
一般的には I/O を回避するためにキャッシュをするコルーチンで活用されます
import asyncio
async def maybe_cached(x):
return x * 2 # await を含まない → eager なら即完了する
async def main():
loop = asyncio.get_running_loop() # 既存ループを取得
loop.set_task_factory(asyncio.eager_task_factory) # ★このループに設定
try:
t = asyncio.create_task(maybe_cached(21))
assert t.done() # ← eager なら True
print(await t) # 42
finally:
loop.set_task_factory(None) # 片付け(元に戻す)
if __name__ == "__main__":
asyncio.run(main())
asyncio.iscoroutinefunctionについて
3.13で非推奨になりました。
inspect.iscoroutinefunction(object)に置き換えてください。
Discussion