python-asyncio
実行
- asyncio.run(a_coroutine):コルーチンの実行
- asyncio.Task(a_coroutine): タスクを生成して、実行。
- asyncio.gather([tasks]): タスクが終了するまで待つ。coroutineが返り値を持つ場合は、taskの入力順に返り値が入ったリストを返す。
- asyncio.gather([coroutines]):コルーチンを実行し、終了するまで待つ。 coroutineが返り値を持つ場合は、taskの入力順に返り値が入ったリストを返す
- asyncio.as_completed(tasks): タスクが終了した順に、タスクのコルーチンを返す。コルーチンの返り値はawaitで取り出す。
for coroutine in asyncio.as_completed(tasks):
a, b = await coroutine
- async with: asyncのコンテキストマネージャ。
async with aiohttp.ClientSession() as session:
- async for: asyncのイテレーターを使うとき。async defした関数をgeneratorとして使う場合などに使用。
Async Context Manager
- 普通のコンテキストマネージャ。以下公式ドキュメントより
class contextlib.AbstractContextManager
object.enter() と object.exit() の2つのメソッドを実装した抽象基底クラス (abstract base class) です。 object.enter() は self を返すデフォルトの実装が提供されるいっぽう、 object.exit() はデフォルトで None を返す抽象メソッドです。 コンテキストマネージャ型 の定義も参照してください。
- Asyncのコンテキストマネージャ。以下公式ドキュメントより
class contextlib.AbstractAsyncContextManager
object.aenter() と object.aexit() の2つのメソッドを実装するクラスのための抽象基底クラス (abstract base class) です。 object.aenter() は self を返すデフォルト実装が提供されるいっぽう、 object.aexit() はデフォルトで None を返す抽象メソッドです。 非同期コンテキストマネージャ (Asynchronous Context Manager) の定義も参照してください。
Async Iterator
- 普通のイテレーター。以下、公式ドキュメントより
イテレータオブジェクト自体は以下の 2 つのメソッドをサポートする必要があります。これらのメソッドは 2 つ合わせて iterator protocol: (イテレータプロトコル) を成します:
iterator.iter()
iterator オブジェクト自体を返します。このメソッドはコンテナとイテレータの両方を for および in 文で使えるようにするために必要です。このメソッドは Python/C API において Python オブジェクトを表す型構造体の tp_iter スロットに対応します。
iterator.next()
iterator の次のアイテムを返します。もしそれ以上アイテムが無ければ StopIteration 例外を送出します。 このメソッドは Python/C APIでのPythonオブジェクトの型構造体の tp_iternext スロットに対応します。
- Asyucイテレーター。以下、公式ドキュメント
(非同期イテレータ) aiter() と anext() メソッドを実装したオブジェクトです。 anext() は awaitable オブジェクトを返さなければなりません。 async for は StopAsyncIteration 例外を送出するまで、非同期イテレータの anext() メソッドが返す awaitable を解決します。 PEP 492 で導入されました。
コンテキスト変数
import contextvars
var = contextvars.ContextVar("Var", default="blank")
公式文書より
コンテキストローカルな状態を管理し、保持し、アクセスするための API を提供します。 ContextVar クラスは コンテキスト変数 を宣言し、取り扱うために使われます。
コンテキストローカルな状態を保持している例
import contextvars
var = contextvars.ContextVar("Var", default="blank")
async def func_c(id):
print(id_var.get()) # func_b(1)で実行されたfunc_cは1, func_b(2)で実行されたfunc_cは2
id_var.set(id)
print(id_var.get()) # func_c((1,1))では(1,1)、func_c((1,2))では(1,2)、func_c((2,1))では(2,1)、func_c((2,2))では(2,2)
async def func_b(id):
id_var.set(id)
print(id_var.get()) # func_b(1)では1、func_b(2)では2
await asyncio.gather(func_c((id, 1)), func_c((id, 2)))
print(id_var.get()) # func_b(1)では1、func_b(2)では2。func_cの実行でsetされるid_varの影響を受けない。
async def func_a():
await asyncio.gather(func_b(1), func_b(2))
asyncio.run(func_a)
- 例えば、func_bが違うDBにアクセスしているときなどに有効
その他
ContextVarについて、公式ドキュメントより
このクラスは新しいコンテキスト変数を宣言するのに使われます。例えば、次の通りです:
var: ContextVar[int] = ContextVar('var', default=42)
必須のパラメータの name は内観やデバッグの目的で使われます。
オプションのキーワード専用引数 default は、現在のコンテキストにその変数の値が見付からなかったときに ContextVar.get() から返されます。
重要: コンテキスト変数は、モジュールのトップレベルで生成する必要があり、クロージャの中で作成すべきではありません。Context オブジェクトはコンテキスト変数への強参照を持っており、コンテキスト変数がガーベジコレクトされるのを防ぎます。
Queues
-
pre-emptive[1]なマルチタスクに使用
- 競合状態[2]を防止する
- 複数のワーカーを使うときにも有効
-
- class asyncio.Queue(maxsize=0): 先入れ先出し (FIFO) キュー。maxsize がゼロ以下の場合、キューは無限長になる。 0 より大きい整数の場合、キューが maxsize に達すると await put() は get() によってキューの要素が除去されるまでブロックする。標準ライブラリにおけるスレッドベースの queue モジュールと異なり、キューのサイズは常に既知であり、 qsize() メソッドを呼び出すことによって取得することができる。
- get(): キューから要素を削除して返す。キューが空の場合項目が利用可能になるまで待機
- put(item): 要素をキューに入力。キューが満杯の場合、要素を追加する前に空きスロットが利用できるようになるまで待機。
- join(): キューにある全ての要素が取得され、処理されるまでブロック。未完了のタスクのカウント値は、キューにアイテムが追加されるときは常に加算され、キューの要素を消費するコルーチンが要素を取り出し、処理を完了したことを通知するために task_done() を呼び出すと減算される。未完了のタスクのカウント値がゼロになると、 join() のブロックは解除される。
- task_done(): queueから取り出したitemについて処理が完了したことを通知。
-
使い方の例
import asyncio
import aiohttp
from bs4 import BeautifulSoup
BASE_URL = "https://en.wikipedia.org"
async def worker(task_queue visited):
async with aiohttp.ClientSession() as session:
while True:
url = await task_queue.get()
if url in visited:
task_queue.task_done()
continue
async with session.get(url) as response:
text = await response.text()
soup = BeautifulSoup(text, "html.parase")
main_body = soup.find(id="target-id")
for link in main_body.find_all("a"):
if url := link.get("href"):
await task_queue.put(BASE_URL + url)
task_queue.task_done()
async def main():
task_queue = asyncio.Queue()
visited = set()
url = BASE_URL + hogefuga
workers = [ worker(task_queue, visited) for _ in range(5)]
await task_queue.joijn() # すべてのタスクが終わるまで待つ。
同期プリミティブ
Lock
- 競合状態を回避するために使用。共有リソースに対する排他的なアクセスを保証するために使う。
- 使い方例
import asyncio
total = 0
total_lock = None
async def counter():
global total
async with total_lock:
...
async def main():
global totla_lock
total_lock = asyncio.lock()
...
Semaphore
- 並行数を制御するために使用[1]
- acquire() メソッドが呼び出された時にカウンターがゼロになっていると、セマフォは処理をブロックし、他のタスクが release() メソッドを呼び出すまで待機。
- withで呼び出せば、これらをやっておいてくれる
- 使用法例
import asyncio
connection_semaphore = None
connections = []
limit_connection_num = 5
async def some_proc_to_connection(item):
async with connection_semaphore:
connections.append(item)
connection.remove(item)
async def main():
global connection_semaphore
connection_semaphore = asyncio.Semaphore(limit_connection_num)
coroutines = [some_proc_to_connection(item) for item in some_items]
await asyncio.gather(*coroutines)
Events
- Eventが起きるまで、複数の asyncio タスクを待たせて、イベントが起きた後に処理したいときに使う。
- 公式ドキュメントのサンプル
async def waiter(event):
print('waiting for it ...')
await event.wait()
print('... got it!')
async def main():
# Create an Event object.
event = asyncio.Event()
# Spawn a Task to wait until 'event' is set.
waiter_task = asyncio.create_task(waiter(event))
# Sleep for 1 second and set the event.
await asyncio.sleep(1)
event.set()
# Wait until the waiter task is finished.
await waiter_task
asyncio.run(main())
ブロッキングな処理を組み合わせたい場合
- 別のthreadで処理すれば良い。そのために、
asyncio.to_thread(some_blocking_func)
と書くと良い。 - 公式ドキュメントの使用例
def blocking_io():
print(f"start blocking_io at {time.strftime('%X')}")
# Note that time.sleep() can be replaced with any blocking
# IO-bound operation, such as file operations.
time.sleep(1)
print(f"blocking_io complete at {time.strftime('%X')}")
async def main():
print(f"started main at {time.strftime('%X')}")
await asyncio.gather(
asyncio.to_thread(blocking_io),
asyncio.sleep(1))
print(f"finished main at {time.strftime('%X')}")
asyncio.run(main())
Logging
- asyncioはasyncio用のloggerを使う
- 使い方 from 公式:
logging.getLogger("asyncio").setLevel(logging.WARNING)
- loglevelも変えられるよ
- loggerの処理はblockingなので、logが大量な場合はqueueと組み合わせて使うと良い
- 例
import asyncio
import logging
import queue
logger = logging.getLogger("asyncio")
log_queue = queue.SimpleQueue()
queue_handler = logging.hander.QueueHandler(log_queue)
file_handler = logging.Filehandler("log_file.txt")
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
logging.addhandler(queue_handler)
logger.setLevel(logging.DEBUG)
...
Debugモード
- asyncio.run(main(), debug=True)
- その他の方法
Test
- pytest-asyncioを使うと簡単
- テストの関数に
@pytest.mark.asyncio
をつけて、async defを使うだけ! - 公式ドキュメントの例
@pytest.mark.asyncio
async def test_some_asyncio_code():
res = await library.do_something()
assert b"expected result" == res
例外処理
- gather()の場合:
gather(coroutines, return_exceptions=True)
を使う- try-exceptだと、最初の1回しか補足市内。
- as_completed()の場合: for文中で、try-exceptを使えば良い。完了したものから返すので。
for coroutine in asyncio.as_completed(corutines):
try:
await coroutine
except Exception:
print("Exception")
タスクのキャンセル
-
task.cancel()
でキャンセルできる。 -
asyncio.shield()
でキャンセルから守ることができる。
Timeout
-
asyncio.wait_for(coroutine(), timeout=time_out_limit)
でタイムアウトできる。ただし、間にblockingな処理が入っていると、 -
with asyncio.Timeout(time_out_liimit)
でもできる模様。