Zenn
Open8

python-asyncio

meshidennmeshidenn

実行

  • asyncio.run(a_coroutine):コルーチンの実行
  • asyncio.Task(a_coroutine): タスクを生成して、実行。
  • asyncio.gather([tasks]): タスクが終了するまで待つ。coroutineが返り値を持つ場合は、taskの入力順に返り値が入ったリストを返す。
  • asyncio.gather([coroutines]):コルーチンを実行し、終了するまで待つ。 coroutineが返り値を持つ場合は、taskの入力順に返り値が入ったリストを返す
  • asyncio.as_completed(tasks): タスクが終了した順に、タスクのコルーチンを返す。コルーチンの返り値はawaitで取り出す。
for coroutine in asyncio.as_completed(tasks):
    a, b = await coroutine
meshidennmeshidenn
  • async with: asyncのコンテキストマネージャ。
    • async with aiohttp.ClientSession() as session:
  • async for: asyncのイテレーターを使うとき。async defした関数をgeneratorとして使う場合などに使用。

Async Context Manager

class contextlib.AbstractContextManager
object.enter() と object.exit() の2つのメソッドを実装した抽象基底クラス (abstract base class) です。 object.enter() は self を返すデフォルトの実装が提供されるいっぽう、 object.exit() はデフォルトで None を返す抽象メソッドです。 コンテキストマネージャ型 の定義も参照してください。

class contextlib.AbstractAsyncContextManager
object.aenter() と object.aexit() の2つのメソッドを実装するクラスのための抽象基底クラス (abstract base class) です。 object.aenter() は self を返すデフォルト実装が提供されるいっぽう、 object.aexit() はデフォルトで None を返す抽象メソッドです。 非同期コンテキストマネージャ (Asynchronous Context Manager) の定義も参照してください。

Async Iterator

イテレータオブジェクト自体は以下の 2 つのメソッドをサポートする必要があります。これらのメソッドは 2 つ合わせて iterator protocol: (イテレータプロトコル) を成します:
iterator.iter()
iterator オブジェクト自体を返します。このメソッドはコンテナとイテレータの両方を for および in 文で使えるようにするために必要です。このメソッドは Python/C API において Python オブジェクトを表す型構造体の tp_iter スロットに対応します。
iterator.next()
iterator の次のアイテムを返します。もしそれ以上アイテムが無ければ StopIteration 例外を送出します。 このメソッドは Python/C APIでのPythonオブジェクトの型構造体の tp_iternext スロットに対応します。

(非同期イテレータ) aiter() と anext() メソッドを実装したオブジェクトです。 anext() は awaitable オブジェクトを返さなければなりません。 async for は StopAsyncIteration 例外を送出するまで、非同期イテレータの anext() メソッドが返す awaitable を解決します。 PEP 492 で導入されました。

meshidennmeshidenn

コンテキスト変数

import contextvars
var = contextvars.ContextVar("Var", default="blank")

公式文書より

コンテキストローカルな状態を管理し、保持し、アクセスするための API を提供します。 ContextVar クラスは コンテキスト変数 を宣言し、取り扱うために使われます。

コンテキストローカルな状態を保持している例

import contextvars
var = contextvars.ContextVar("Var", default="blank")

async def func_c(id):
    print(id_var.get()) # func_b(1)で実行されたfunc_cは1, func_b(2)で実行されたfunc_cは2
    id_var.set(id)
    print(id_var.get()) # func_c((1,1))では(1,1)、func_c((1,2))では(1,2)、func_c((2,1))では(2,1)、func_c((2,2))では(2,2)

async def func_b(id):
    id_var.set(id)
    print(id_var.get()) # func_b(1)では1、func_b(2)では2
    await asyncio.gather(func_c((id, 1)), func_c((id, 2)))
    print(id_var.get()) # func_b(1)では1、func_b(2)では2。func_cの実行でsetされるid_varの影響を受けない。

async def func_a():
    await asyncio.gather(func_b(1), func_b(2))

asyncio.run(func_a)
  • 例えば、func_bが違うDBにアクセスしているときなどに有効

その他

ContextVarについて、公式ドキュメントより

このクラスは新しいコンテキスト変数を宣言するのに使われます。例えば、次の通りです:
var: ContextVar[int] = ContextVar('var', default=42)
必須のパラメータの name は内観やデバッグの目的で使われます。
オプションのキーワード専用引数 default は、現在のコンテキストにその変数の値が見付からなかったときに ContextVar.get() から返されます。
重要: コンテキスト変数は、モジュールのトップレベルで生成する必要があり、クロージャの中で作成すべきではありません。Context オブジェクトはコンテキスト変数への強参照を持っており、コンテキスト変数がガーベジコレクトされるのを防ぎます。

meshidennmeshidenn

Queues

  • pre-emptive[1]なマルチタスクに使用

    • 競合状態[2]を防止する
    • 複数のワーカーを使うときにも有効
  • 代表的なメソッド(公式ドキュメントより)

    • class asyncio.Queue(maxsize=0): 先入れ先出し (FIFO) キュー。maxsize がゼロ以下の場合、キューは無限長になる。 0 より大きい整数の場合、キューが maxsize に達すると await put() は get() によってキューの要素が除去されるまでブロックする。標準ライブラリにおけるスレッドベースの queue モジュールと異なり、キューのサイズは常に既知であり、 qsize() メソッドを呼び出すことによって取得することができる。
    • get(): キューから要素を削除して返す。キューが空の場合項目が利用可能になるまで待機
    • put(item): 要素をキューに入力。キューが満杯の場合、要素を追加する前に空きスロットが利用できるようになるまで待機。
    • join(): キューにある全ての要素が取得され、処理されるまでブロック。未完了のタスクのカウント値は、キューにアイテムが追加されるときは常に加算され、キューの要素を消費するコルーチンが要素を取り出し、処理を完了したことを通知するために task_done() を呼び出すと減算される。未完了のタスクのカウント値がゼロになると、 join() のブロックは解除される。
    • task_done(): queueから取り出したitemについて処理が完了したことを通知。
  • 使い方の例

import asyncio
import aiohttp
from bs4 import BeautifulSoup

BASE_URL = "https://en.wikipedia.org"
async def worker(task_queue visited):
    async with aiohttp.ClientSession() as session:
        while True:
            url = await task_queue.get()
            if url in visited:
                task_queue.task_done()
                continue
            async with session.get(url) as response:
                text = await response.text()
            soup = BeautifulSoup(text, "html.parase")
            main_body = soup.find(id="target-id")
            for link in main_body.find_all("a"):
                if url := link.get("href"):
                    await task_queue.put(BASE_URL + url)
            task_queue.task_done()

async def main():
    task_queue = asyncio.Queue()
    visited = set()
    url = BASE_URL + hogefuga
    workers = [ worker(task_queue, visited)  for _ in range(5)]
    await task_queue.joijn() # すべてのタスクが終わるまで待つ。
脚注
  1. いつでも切り替わる可能性があるということ。実際にはタイマー割り込みや入出力イベントなどで切り替わる。 ↩︎

  2. レース・コンディション(Race Condition)は、複数のプロセスが同一のリソースにアクセスし操作しようとしたときに発生する事象であり脆弱性です。レース・コンディション(競合状態)が発生すると、実行結果はアクセスが行われたときの順序に依存します。 ↩︎

meshidennmeshidenn

同期プリミティブ

-公式ドキュメント

Lock

  • 競合状態を回避するために使用。共有リソースに対する排他的なアクセスを保証するために使う。
  • 使い方例
import asyncio
total = 0
total_lock = None

async def counter():
    global total
    async with total_lock:
        ...

async def main():
    global totla_lock
    total_lock = asyncio.lock()
    ...

Semaphore

  • 並行数を制御するために使用[1]
  • acquire() メソッドが呼び出された時にカウンターがゼロになっていると、セマフォは処理をブロックし、他のタスクが release() メソッドを呼び出すまで待機。
    • withで呼び出せば、これらをやっておいてくれる
  • 使用法例
import asyncio

connection_semaphore = None
connections = []
limit_connection_num = 5

async def some_proc_to_connection(item):
    async with connection_semaphore:
        connections.append(item)
        connection.remove(item)

async def main():
    global connection_semaphore
    connection_semaphore = asyncio.Semaphore(limit_connection_num)
    coroutines = [some_proc_to_connection(item) for item in some_items]
    await asyncio.gather(*coroutines)

Events

  • Eventが起きるまで、複数の asyncio タスクを待たせて、イベントが起きた後に処理したいときに使う。
  • 公式ドキュメントのサンプル
async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
脚注
  1. セマフォ(semaphore)は、整数値のカウンタと、スレッドキューを組み合わせた抽象データ型。らしい ↩︎

meshidennmeshidenn

ブロッキングな処理を組み合わせたい場合

  • 別のthreadで処理すれば良い。そのために、 asyncio.to_thread(some_blocking_func)と書くと良い。
  • 公式ドキュメントの使用例
def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())
meshidennmeshidenn

Logging

  • asyncioはasyncio用のloggerを使う
  • 使い方 from 公式: logging.getLogger("asyncio").setLevel(logging.WARNING)
    • loglevelも変えられるよ
  • loggerの処理はblockingなので、logが大量な場合はqueueと組み合わせて使うと良い
import asyncio
import logging
import queue

logger = logging.getLogger("asyncio")
log_queue = queue.SimpleQueue()
queue_handler = logging.hander.QueueHandler(log_queue)
file_handler = logging.Filehandler("log_file.txt")
listener = logging.handlers.QueueListener(log_queue, file_handler)
listener.start()
logging.addhandler(queue_handler)
logger.setLevel(logging.DEBUG)
...

Debugモード

Test

@pytest.mark.asyncio
async def test_some_asyncio_code():
    res = await library.do_something()
    assert b"expected result" == res

例外処理

  • gather()の場合: gather(coroutines, return_exceptions=True) を使う
    • try-exceptだと、最初の1回しか補足市内。
  • as_completed()の場合: for文中で、try-exceptを使えば良い。完了したものから返すので。
for coroutine in asyncio.as_completed(corutines):
    try:
        await coroutine
    except Exception:
        print("Exception")

タスクのキャンセル

  • task.cancel()でキャンセルできる。
  • asyncio.shield()でキャンセルから守ることができる。

Timeout

作成者以外のコメントは許可されていません