🙄

Semaphoreを使ってPythonの非同期処理の平行処理数をコントロールする

2024/03/11に公開

先に結論

import asyncio

class AsyncResource:
    def __init__(self, concurrency: int = 10):
        self.semaphore = asyncio.Semaphore(concurrency)

    async def task(self):
        async with self.semaphore:  # 11個目以降のタスクはここでブロックされる
            print("Hello")
            await self.call_api()
            print("World")

    async def call_api(self):
        await asyncio.sleep(1)

async def main():
    async_resource = AsyncResource()
    tasks = [async_resource.task() for _ in range(100)]  # 100個実行するように命令
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main()) # >> HelloとWorldが10個ずつ出力される

Pythonの非同期処理

プログラムは特に何もしないとシングルスレッドで動きます。マルチタスクはできません。これにはいくつかの限界があります。
Pythonにおけるマルチタスクの方法は

  • multiprocessingを使った並列処理
  • threadingを使った並行処理
  • async-awaitを使った並行処理
    の3択があります。

並列処理と並行処理の違いをおさらいしておきましょう。
並列処理は難しい計算を早く実施するために使います。たくさんのCPUを使うので数万桁の素因数分解などに利用できます。こうした難しい計算はCPU-Boundと呼ばれます。
並行処理は待ち時間を有効活用するために使います。通信などのI/O処理が目立つ処理ではプロセスのほとんどが通信結果を待つ時間になってしまいますから、非常に効率的です。こうした待ち時間の多い計算はIO-Boundと呼ばれます。今日の主役はこっちです。

例えばOpenAIのような外部のAPIと10回通信するとしたこんなコードになるはずです。

import time

def call_api():
    time.sleep(1)  # 実際にはここでrequests.get()のようなAPIコールをする想定

def task():
    print("Hello")
    call_api()
    print("World")

def main():
    for _ in range(10):
        task()

if __name__ == "__main__":
    main()

もちろんこのコードの実行は10秒かかるわけですが、10秒もかかるというのにCPUはほとんど仕事をしておらず待機中暇を持て余しています。

async-await

そこで待ち時間を有効活用するために並行処理を導入します。特別な事情がなければ現代のプログラミングにおいて並行処理はasync-awaitと呼ばれるプログラミングパラダイムで行われます。1つ重要な点は、並行処理ができるのは待ち時間が発生する処理だけです。さらに並行処理される関数はそのために専用にasyncキーワードをつけて定義されなければいけません。一般的にはOpenAIなどの主要な通信ライブラリはこの機能を搭載しています。バージョン1.0以降ではopenai.AsyncOpenAI()モジュールが、バージョン0.*ではopenai.ChatCompletion.acreate関数が非同期呼び出しに対応しているのでドキュメントを参照してください。

ここではcall_api関数がasyncで定義されたとしましょう。するとこのように書くと

import asyncio

async def call_api():
    await asyncio.sleep(1)

async def concurrent_task():
    print("Hello")
    await call_api()
    print("World")

async def main():
    for _ in range(10):
        await concurrent_task()

if __name__ == "__main__":
    asyncio.run(main())

concurrent_taskが並行して実行されません。上のコードではasyncキーワードを使って「処理を並行に実施してもいいよ」と印をつけただけで、並行で処理しろ!という命令は出していません。

並行に処理するにはこうします。asyncio.gather(*[coroutines])を使ってこう書きます。

import asyncio

async def call_api():
    await asyncio.sleep(1)

async def concurrent_task():
    print("Hello")
    await call_api()
    print("World")

async def main():
    tasks = [concurrent_task() for _ in range(10)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

Semaphoreを使って並行処理数をコントロールする

並行処理は処理によっては早すぎることがあります。特にWebリクエストなどではリクエストを送り付けすぎると429 RateLimit Errorがリターンされたり、DNSサーバーから追い返されたりします。そんなときはasyncio.Semaphoreを使って並行処理数を制限できます。

やり方はsemaphore = asyncio.Semaphore(n)として生成したSemaphoreインスタンスを使ってasync with semaphoreとするだけ。こうするだけでasync withブロック内に同時に入れる処理をn個に制約できます。

async withの書き忘れを防ぐためにも以下のように同時にsemaphoreをクラスの中に内包する形で使うとよいでしょう。

import asyncio

class AsyncResource:
    def __init__(self, concurrency: int = 10):
        self.semaphore = asyncio.Semaphore(concurrency)

    async def task(self):
        async with self.semaphore:  # 11個目以降のタスクはここでブロックされる
            print("Hello")
            await self.call_api()
            print("World")

    async def call_api(self):
        await asyncio.sleep(1)

async def main():
    async_resource = AsyncResource()
    tasks = [async_resource.task() for _ in range(100)]  # 100個実行するように命令
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main()) # >> HelloとWorldが10個ずつ出力される

まとめ

LLMの台頭で非同期プログラミングはPythonエンジニアにマストの道具になりつつあります。ここでしっかりマスターしてしまいましょう。

Discussion