🚀

高性能Python: Asyncio

2024/12/28に公開

並行プログラミングは、複数のタスクを同時に実行するプログラミング方法です。Pythonにおいて、asyncioは非同期プログラミングを実現するための強力なツールです。asyncioはコルーチン(coroutine)の概念に基づいており、I/O集中型タスクを効率的に処理することができます。本文ではasyncioの基本原理と使い方を紹介します。

なぜasyncioが必要なのか

I/O操作を処理する際、マルチスレッドを使うと、通常のシングルスレッドと比べて、効率が大幅に向上します。では、なぜAsyncioが必要なのでしょうか?
マルチスレッドには多くの利点があり、広く利用されていますが、一定の制限もあります:

  • たとえば、マルチスレッドの実行中は割り込みが発生しやすく、そのため、「レースコンディション(race condition)」の状況が起こり得ます;
  • また、スレッドの切り替え自体には一定のオーバーヘッドがあり、スレッド数を無限に増やすことはできません。そのため、I/O操作が非常に重たい場合、マルチスレッドでは高効率、高品質の要件を満たせない可能性が高いです。
    これらの問題を解決するため、Asyncioが登場しました。
    同期(Sync)VS非同期(Async)
    まず、Sync(同期)とAsync(非同期)の概念を区別してみましょう。
  • Syncとは、操作が1つずつ順番に実行され、次の操作は前の操作が完了してから実行できることを指します。
  • 一方、Asyncは、異なる操作が相互に交互に実行されることを指します。もしその中のある操作がブロックされても、プログラムは待たず、実行可能な操作を見つけて続けて実行します。

Asyncioの動作原理

  1. コルーチン(Coroutines):Asyncioはコルーチンを使って非同期操作を実現します。コルーチンはasyncキーワードで定義される特殊な関数です。コルーチン内では、awaitキーワードを使って現在のコルーチンの実行を一時停止し、非同期操作の完了を待つことができます。
  2. イベントループ(Event Loop):イベントループはAsyncioの核心メカニズムの1つです。それはコルーチンのスケジューリングと実行を担当し、コルーチン間の切り替えを処理します。イベントループは常に実行可能なタスクをポーリングしており、タスクが準備完了した場合(たとえばI/O完了やタイマーが期限切れになった場合)、そのタスクを実行キューに入れて、次のタスクを続けて処理します。
  3. 非同期タスク(Async Tasks):Asyncioでは、非同期タスクを作成してコルーチンを実行します。非同期タスクはasyncio.create_task()関数で作成され、コルーチンを待機可能なオブジェクトにラップして、イベントループに送信して処理されます。
  4. 非同期I/O操作:Asyncioは一連の非同期I/O操作(たとえばネットワークリクエスト、ファイルの読み書きなど)を提供します。これらの操作はコルーチンとイベントループと無縫に統合できます。非同期I/O操作を使うことで、I/O完了を待つ間のブロックを回避し、プログラムの性能と並行性を向上させることができます。
  5. コールバック(Callbacks):Asyncioは非同期操作の結果をコールバック関数で処理することもサポートしています。asyncio.ensure_future()関数を使ってコールバック関数を待機可能なオブジェクトにラップして、イベントループに送信して処理することができます。
  6. 並行実行:Asyncioは複数のコルーチンタスクを並行して実行できます。イベントループはタスクの準備状況に応じて自動的にコルーチンの実行をスケジューリングし、効率的な並行プログラミングを実現します。
    要するに、Asyncioの動作原理はコルーチンとイベントループのメカニズムに基づいています。コルーチンを使って非同期操作を行い、イベントループがコルーチンのスケジューリングと実行を担当することで、Asyncioは効率的な非同期プログラミングモデルを実現しています。

コルーチンと非同期プログラミング

コルーチンはasyncioの重要な概念で、軽量な実行単位です。スレッドの切り替えのオーバーヘッドなしに、タスク間で迅速に切り替えることができます。コルーチンはasyncキーワードで定義され、awaitキーワードはコルーチンの実行を一時停止し、ある操作が完了してから続けて実行するために使われます。

以下は、コルーチンを使って非同期プログラミングを行う簡単なサンプルコードです:

import asyncio

async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 時間のかかる操作をシミュレート
    print("World")

# イベントループを作成
loop = asyncio.get_event_loop()

# コルーチンをイベントループに追加して実行
loop.run_until_complete(hello())

このサンプルでは、関数hello()はコルーチンで、asyncキーワードで定義されています。コルーチン内部では、awaitを使ってコルーチンの実行を一時停止し、ここではasyncio.sleep(1)を使って時間のかかる操作をシミュレートしています。run_until_complete()メソッドを使って、コルーチンをイベントループに追加して実行します。

非同期I/O操作

asyncioは主にI/O集中型タスク、たとえばネットワークリクエスト、ファイルの読み書きなどの操作を処理するために使われます。それは一連の非同期I/O操作APIを提供し、awaitキーワードと組み合わせて使うことで、簡単に非同期プログラミングを実現できます。
以下は、asyncioを使って非同期ネットワークリクエストを行う簡単なサンプルコードです:

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'https://www.example.com')
        print(html)

# イベントループを作成
loop = asyncio.get_event_loop()

# コルーチンをイベントループに追加して実行
loop.run_until_complete(main())

このサンプルでは、aiohttpライブラリを使ってネットワークリクエストを行っています。関数fetch()はコルーチンで、session.get()メソッドを使って非同期GETリクエストを送信し、awaitキーワードを使って応答を待ちます。関数main()は別のコルーチンで、内部でClientSessionオブジェクトを作成して再利用し、fetch()メソッドを呼び出してウェブコンテンツを取得して表示します。
注意:
ここでaiohttpを使っていてrequestsライブラリを使っていないのは、requestsライブラリがAsyncioと互換性がないからです。一方、aiohttpライブラリは互換性があります。
Asyncioをうまく使い、特にその強力な機能を発揮するには、多くの場合、対応するPythonライブラリのサポートが必要です。

複数のタスクを並行実行

asyncioは複数のタスクを並行実行するためのいくつかのメカニズムも提供しており、たとえばasyncio.gather()asyncio.wait()などです。以下は、これらのメカニズムを使って複数のコルーチンタスクを並行実行するサンプルコードです:

import asyncio

async def task1():
    print("Task 1 started")
    await asyncio.sleep(1)
    print("Task 1 finished")

async def task2():
    print("Task 2 started")
    await asyncio.sleep(2)
    print("Task 2 finished")

async def main():
    await asyncio.gather(task1(), task2())

# イベントループを作成
loop = asyncio.get_event_loop()

# コルーチンをイベントループに追加して実行
loop.run_until_complete(main())

このサンプルでは、2つのコルーチンタスクtask1()task2()を定義しています。それらはいずれも時間のかかる操作を行っています。コルーチンmain()asyncio.gather()を使ってこれら2つのタスクを同時に起動し、完了を待ちます。並行実行することで、プログラムの実行効率を向上させることができます。

どう選ぶか?

実際のプロジェクトでは、マルチスレッドとasyncioのどちらを選ぶべきでしょうか?ある大物がこんな風に要約しており、とても分かりやすいです。

if io_bound:
    if io_slow:
        print('Use Asyncio')
    else:
        print('Use multi-threading')
else if cpu_bound:
    print('Use multi-processing')
  • I/Oバウンドで、かつI/O操作が遅く、多くのタスク/スレッドが協調して実現する必要がある場合、Asyncioの方が適切です。
  • I/Oバウンドで、I/O操作が速く、限られた数のタスク/スレッドで済む場合は、マルチスレッドで十分です。
  • CPUバウンドの場合は、マルチプロセスを使ってプログラムの実行効率を高める必要があります。

実践

入力としてリストを受け取り、そのリストの各要素に対して、0からその要素までのすべての整数の二乗和を計算したいとします。
同期実装

import time

def cpu_bound(number):
    return sum(i * i for i in range(number))

def calculate_sums(numbers):
    for number in numbers:
        cpu_bound(number)

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))

if __name__ == '__main__':
    main()

実行時間はCalculation takes 16.00943413000002 secondsです。

非同期実装
concurrent.futuresでの実装

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed

def cpu_bound(number):
    return sum(i * i for i in range(number))

def calculate_sums(numbers):
    with ProcessPoolExecutor() as executor:
        results = executor.map(cpu_bound, numbers)
        results = [result for result in results]
    print(results)

def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))

if __name__ == '__main__':
    main()

実行時間はCalculation takes 7.314132894999999 secondsです。この改良版のコードでは、concurrent.futures.ProcessPoolExecutorを使ってプロセスプールを作成し、executor.map()メソッドを使ってタスクを送信し、結果を取得しています。
注意点として、executor.map()を使った後、結果を取得するには、結果をリストにイテレートするか、他の方法で結果を処理する必要があります。

multiprocessingでの実装

import time
import multiprocessing

def cpu_bound(number):
```python
    return sum(i * i for i in range(number))


def calculate_sums(numbers):
    with multiprocessing.Pool() as pool:
        pool.map(cpu_bound, numbers)


def main():
    start_time = time.perf_counter()
    numbers = [10000000 + x for x in range(20)]
    calculate_sums(numbers)
    end_time = time.perf_counter()
    print('Calculation takes {} seconds'.format(end_time - start_time))


if __name__ == '__main__':
    main()

実行に要する時間はCalculation takes 5.024221667 secondsです。concurrent.futures.ProcessPoolExecutormultiprocessingは、Pythonでマルチプロセス並行処理を実現するためのライブラリで、いくつかの違いがあります。

  1. インターフェースに基づくラッピング:concurrent.futures.ProcessPoolExecutorconcurrent.futuresモジュールが提供する高級インターフェースで、低レベルのマルチプロセス機能をラップしており、マルチプロセスコードの記述をより簡単にします。一方、multiprocessingはPythonの標準ライブラリの1つで、完全なマルチプロセスサポートを提供し、直接プロセスを操作することができます。
  2. APIの使い方:concurrent.futures.ProcessPoolExecutorの使い方はスレッドプールに似ており、呼び出し可能なオブジェクト(たとえば関数)をプロセスプールに送信して実行し、実行結果を取得するためのFutureオブジェクトを返します。一方、multiprocessingはより低レベルのプロセス管理と通信インターフェースを提供し、明示的にプロセスを作成、起動、制御することができ、複数のプロセス間でキューやパイプを使って通信することができます。
  3. 拡張性と柔軟性:multiprocessingがより低レベルのインターフェースを提供するため、concurrent.futures.ProcessPoolExecutorに比べてより柔軟です。直接プロセスを操作することで、各プロセスに対してより細かい制御が可能で、たとえばプロセスの優先度設定、プロセス間でのデータ共有などができます。一方、concurrent.futures.ProcessPoolExecutorは、単純なタスク並列化に適しており、多くの低レベルの詳細を隠蔽しており、マルチプロセスコードの記述をより簡単かつ使いやすくしています。
  4. クロスプラットフォームサポート:concurrent.futures.ProcessPoolExecutormultiprocessingはどちらもクロスプラットフォームのマルチプロセスサポートを提供し、様々なオペレーションシステムで使用することができます。

以上のことから、concurrent.futures.ProcessPoolExecutorは高級インターフェースで、低レベルのマルチプロセス機能をラップしており、簡単なマルチプロセスタスク並列化に適しています。一方、multiprocessingはより低レベルのライブラリで、より多くの制御と柔軟性を提供し、プロセスを細かく制御する必要があるシーンに適しています。
具体的な要件に応じて適切なライブラリを選択する必要があります。単純なタスク並列化の場合は、concurrent.futures.ProcessPoolExecutorを使ってコードを簡略化することができます。より低レベルの制御と通信が必要な場合は、multiprocessingライブラリを使うことができます。

総括

マルチスレッドとは異なり、Asyncioはシングルスレッドですが、内部のイベントループメカニズムにより、複数の異なるタスクを並行して実行することができ、マルチスレッドよりも大きな自主制御権を持っています。
Asyncio内のタスクは、実行中に割り込まれることがなく、そのため、「レースコンディション」の状況が起こりえません。
特にI/O操作が多いシーンでは、Asyncioはマルチスレッドよりも実行効率が高いです。なぜなら、Asyncio内部のタスク切り替えのオーバーヘッドは、スレッド切り替えのオーバーヘッドよりもはるかに小さく、また、Asyncioで起動できるタスク数は、マルチスレッドのスレッド数よりもはるかに多いからです。
ただし、多くの場合、Asyncioを使うには特定のサードパーティライブラリのサポートが必要です。前述のサンプルのaiohttpのように。もしI/O操作が速く、多量ではない場合は、マルチスレッドを使っても問題を十分に解決することができます。

  • asyncioはPythonの非同期プログラミングを実現するライブラリです。
  • コルーチンはasyncioの核心概念で、asyncawaitキーワードを使って非同期操作を実現します。
  • asyncioは強力な非同期I/O操作APIを提供し、I/O集中型タスクを簡単に処理することができます。
  • asyncio.gather()などのメカニズムを通じて、複数のコルーチンタスクを並行して実行することができます。

Leapcell: Flask/FastAPIその他Pythonアプリケーション向けの理想的なプラットフォーム

最後に、Flask/FastAPIをデプロイする理想的なプラットフォームであるLeapcellを紹介します。

Leapcellは、現代の分散アプリケーション向けに設計されたクラウドコンピューティングプラットフォームです。その使いたい分だけ支払う(pay-as-you-go)価格モデルにより、アイドルコストが発生しません。つまり、ユーザーは実際に使ったリソースだけを支払うことになります。

WSGI/ASGIアプリケーションにとってのLeapcellのユニークな利点:

  1. 多言語サポート
    • JavaScript、Python、Go、またはRustでの開発をサポートします。
  2. 無制限のプロジェクトの無料デプロイ
    • 使用量に基づいて課金されます。リクエストがない場合、料金はかかりません。
  3. 比類なきコスト効率
    • 使いたい分だけ支払う方式で、アイドル料金はありません。
    • たとえば、25ドルで694万件のリクエストをサポートし、平均応答時間は60ミリ秒です。
  4. 簡素化された開発者体験
    • 直感的なユーザーインターフェースで、簡単なセットアップが可能です。
    • 完全自動化されたCI/CDパイプラインとGitOps統合。
    • リアルタイムのメトリックとログで、アクション可能な洞察を提供します。
  5. 簡単なスケーラビリティと高性能
    • 自動的なスケーリングで、高い並行性を簡単に処理することができます。
    • オペレーションオーバーヘッドがゼロで、開発者は開発に集中することができます。

詳細はDocsを参照してください!

Leapcell Twitter: https://x.com/LeapcellHQ

Discussion