高性能Python: Asyncio
並行プログラミングは、複数のタスクを同時に実行するプログラミング方法です。Pythonにおいて、asyncio
は非同期プログラミングを実現するための強力なツールです。asyncio
はコルーチン(coroutine)の概念に基づいており、I/O集中型タスクを効率的に処理することができます。本文ではasyncio
の基本原理と使い方を紹介します。
asyncio
が必要なのか
なぜI/O操作を処理する際、マルチスレッドを使うと、通常のシングルスレッドと比べて、効率が大幅に向上します。では、なぜAsyncio
が必要なのでしょうか?
マルチスレッドには多くの利点があり、広く利用されていますが、一定の制限もあります:
- たとえば、マルチスレッドの実行中は割り込みが発生しやすく、そのため、「レースコンディション(race condition)」の状況が起こり得ます;
- また、スレッドの切り替え自体には一定のオーバーヘッドがあり、スレッド数を無限に増やすことはできません。そのため、I/O操作が非常に重たい場合、マルチスレッドでは高効率、高品質の要件を満たせない可能性が高いです。
これらの問題を解決するため、Asyncio
が登場しました。
同期(Sync)VS非同期(Async)
まず、Sync(同期)とAsync(非同期)の概念を区別してみましょう。 - Syncとは、操作が1つずつ順番に実行され、次の操作は前の操作が完了してから実行できることを指します。
- 一方、Asyncは、異なる操作が相互に交互に実行されることを指します。もしその中のある操作がブロックされても、プログラムは待たず、実行可能な操作を見つけて続けて実行します。
Asyncio
の動作原理
- コルーチン(Coroutines):
Asyncio
はコルーチンを使って非同期操作を実現します。コルーチンはasync
キーワードで定義される特殊な関数です。コルーチン内では、await
キーワードを使って現在のコルーチンの実行を一時停止し、非同期操作の完了を待つことができます。 - イベントループ(Event Loop):イベントループは
Asyncio
の核心メカニズムの1つです。それはコルーチンのスケジューリングと実行を担当し、コルーチン間の切り替えを処理します。イベントループは常に実行可能なタスクをポーリングしており、タスクが準備完了した場合(たとえばI/O完了やタイマーが期限切れになった場合)、そのタスクを実行キューに入れて、次のタスクを続けて処理します。 - 非同期タスク(Async Tasks):
Asyncio
では、非同期タスクを作成してコルーチンを実行します。非同期タスクはasyncio.create_task()
関数で作成され、コルーチンを待機可能なオブジェクトにラップして、イベントループに送信して処理されます。 - 非同期I/O操作:
Asyncio
は一連の非同期I/O操作(たとえばネットワークリクエスト、ファイルの読み書きなど)を提供します。これらの操作はコルーチンとイベントループと無縫に統合できます。非同期I/O操作を使うことで、I/O完了を待つ間のブロックを回避し、プログラムの性能と並行性を向上させることができます。 - コールバック(Callbacks):
Asyncio
は非同期操作の結果をコールバック関数で処理することもサポートしています。asyncio.ensure_future()
関数を使ってコールバック関数を待機可能なオブジェクトにラップして、イベントループに送信して処理することができます。 - 並行実行:
Asyncio
は複数のコルーチンタスクを並行して実行できます。イベントループはタスクの準備状況に応じて自動的にコルーチンの実行をスケジューリングし、効率的な並行プログラミングを実現します。
要するに、Asyncio
の動作原理はコルーチンとイベントループのメカニズムに基づいています。コルーチンを使って非同期操作を行い、イベントループがコルーチンのスケジューリングと実行を担当することで、Asyncio
は効率的な非同期プログラミングモデルを実現しています。
コルーチンと非同期プログラミング
コルーチンはasyncio
の重要な概念で、軽量な実行単位です。スレッドの切り替えのオーバーヘッドなしに、タスク間で迅速に切り替えることができます。コルーチンはasync
キーワードで定義され、await
キーワードはコルーチンの実行を一時停止し、ある操作が完了してから続けて実行するために使われます。
以下は、コルーチンを使って非同期プログラミングを行う簡単なサンプルコードです:
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1) # 時間のかかる操作をシミュレート
print("World")
# イベントループを作成
loop = asyncio.get_event_loop()
# コルーチンをイベントループに追加して実行
loop.run_until_complete(hello())
このサンプルでは、関数hello()
はコルーチンで、async
キーワードで定義されています。コルーチン内部では、await
を使ってコルーチンの実行を一時停止し、ここではasyncio.sleep(1)
を使って時間のかかる操作をシミュレートしています。run_until_complete()
メソッドを使って、コルーチンをイベントループに追加して実行します。
非同期I/O操作
asyncio
は主にI/O集中型タスク、たとえばネットワークリクエスト、ファイルの読み書きなどの操作を処理するために使われます。それは一連の非同期I/O操作APIを提供し、await
キーワードと組み合わせて使うことで、簡単に非同期プログラミングを実現できます。
以下は、asyncio
を使って非同期ネットワークリクエストを行う簡単なサンプルコードです:
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
html = await fetch(session, 'https://www.example.com')
print(html)
# イベントループを作成
loop = asyncio.get_event_loop()
# コルーチンをイベントループに追加して実行
loop.run_until_complete(main())
このサンプルでは、aiohttp
ライブラリを使ってネットワークリクエストを行っています。関数fetch()
はコルーチンで、session.get()
メソッドを使って非同期GETリクエストを送信し、await
キーワードを使って応答を待ちます。関数main()
は別のコルーチンで、内部でClientSession
オブジェクトを作成して再利用し、fetch()
メソッドを呼び出してウェブコンテンツを取得して表示します。
注意:
ここでaiohttp
を使っていてrequests
ライブラリを使っていないのは、requests
ライブラリがAsyncio
と互換性がないからです。一方、aiohttp
ライブラリは互換性があります。
Asyncio
をうまく使い、特にその強力な機能を発揮するには、多くの場合、対応するPythonライブラリのサポートが必要です。
複数のタスクを並行実行
asyncio
は複数のタスクを並行実行するためのいくつかのメカニズムも提供しており、たとえばasyncio.gather()
やasyncio.wait()
などです。以下は、これらのメカニズムを使って複数のコルーチンタスクを並行実行するサンプルコードです:
import asyncio
async def task1():
print("Task 1 started")
await asyncio.sleep(1)
print("Task 1 finished")
async def task2():
print("Task 2 started")
await asyncio.sleep(2)
print("Task 2 finished")
async def main():
await asyncio.gather(task1(), task2())
# イベントループを作成
loop = asyncio.get_event_loop()
# コルーチンをイベントループに追加して実行
loop.run_until_complete(main())
このサンプルでは、2つのコルーチンタスクtask1()
とtask2()
を定義しています。それらはいずれも時間のかかる操作を行っています。コルーチンmain()
はasyncio.gather()
を使ってこれら2つのタスクを同時に起動し、完了を待ちます。並行実行することで、プログラムの実行効率を向上させることができます。
どう選ぶか?
実際のプロジェクトでは、マルチスレッドとasyncio
のどちらを選ぶべきでしょうか?ある大物がこんな風に要約しており、とても分かりやすいです。
if io_bound:
if io_slow:
print('Use Asyncio')
else:
print('Use multi-threading')
else if cpu_bound:
print('Use multi-processing')
- I/Oバウンドで、かつI/O操作が遅く、多くのタスク/スレッドが協調して実現する必要がある場合、
Asyncio
の方が適切です。 - I/Oバウンドで、I/O操作が速く、限られた数のタスク/スレッドで済む場合は、マルチスレッドで十分です。
- CPUバウンドの場合は、マルチプロセスを使ってプログラムの実行効率を高める必要があります。
実践
入力としてリストを受け取り、そのリストの各要素に対して、0からその要素までのすべての整数の二乗和を計算したいとします。
同期実装
import time
def cpu_bound(number):
return sum(i * i for i in range(number))
def calculate_sums(numbers):
for number in numbers:
cpu_bound(number)
def main():
start_time = time.perf_counter()
numbers = [10000000 + x for x in range(20)]
calculate_sums(numbers)
end_time = time.perf_counter()
print('Calculation takes {} seconds'.format(end_time - start_time))
if __name__ == '__main__':
main()
実行時間はCalculation takes 16.00943413000002 seconds
です。
非同期実装
concurrent.futures
での実装
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
def cpu_bound(number):
return sum(i * i for i in range(number))
def calculate_sums(numbers):
with ProcessPoolExecutor() as executor:
results = executor.map(cpu_bound, numbers)
results = [result for result in results]
print(results)
def main():
start_time = time.perf_counter()
numbers = [10000000 + x for x in range(20)]
calculate_sums(numbers)
end_time = time.perf_counter()
print('Calculation takes {} seconds'.format(end_time - start_time))
if __name__ == '__main__':
main()
実行時間はCalculation takes 7.314132894999999 seconds
です。この改良版のコードでは、concurrent.futures.ProcessPoolExecutor
を使ってプロセスプールを作成し、executor.map()
メソッドを使ってタスクを送信し、結果を取得しています。
注意点として、executor.map()
を使った後、結果を取得するには、結果をリストにイテレートするか、他の方法で結果を処理する必要があります。
multiprocessing
での実装
import time
import multiprocessing
def cpu_bound(number):
```python
return sum(i * i for i in range(number))
def calculate_sums(numbers):
with multiprocessing.Pool() as pool:
pool.map(cpu_bound, numbers)
def main():
start_time = time.perf_counter()
numbers = [10000000 + x for x in range(20)]
calculate_sums(numbers)
end_time = time.perf_counter()
print('Calculation takes {} seconds'.format(end_time - start_time))
if __name__ == '__main__':
main()
実行に要する時間はCalculation takes 5.024221667 seconds
です。concurrent.futures.ProcessPoolExecutor
とmultiprocessing
は、Pythonでマルチプロセス並行処理を実現するためのライブラリで、いくつかの違いがあります。
- インターフェースに基づくラッピング:
concurrent.futures.ProcessPoolExecutor
はconcurrent.futures
モジュールが提供する高級インターフェースで、低レベルのマルチプロセス機能をラップしており、マルチプロセスコードの記述をより簡単にします。一方、multiprocessing
はPythonの標準ライブラリの1つで、完全なマルチプロセスサポートを提供し、直接プロセスを操作することができます。 - APIの使い方:
concurrent.futures.ProcessPoolExecutor
の使い方はスレッドプールに似ており、呼び出し可能なオブジェクト(たとえば関数)をプロセスプールに送信して実行し、実行結果を取得するためのFuture
オブジェクトを返します。一方、multiprocessing
はより低レベルのプロセス管理と通信インターフェースを提供し、明示的にプロセスを作成、起動、制御することができ、複数のプロセス間でキューやパイプを使って通信することができます。 - 拡張性と柔軟性:
multiprocessing
がより低レベルのインターフェースを提供するため、concurrent.futures.ProcessPoolExecutor
に比べてより柔軟です。直接プロセスを操作することで、各プロセスに対してより細かい制御が可能で、たとえばプロセスの優先度設定、プロセス間でのデータ共有などができます。一方、concurrent.futures.ProcessPoolExecutor
は、単純なタスク並列化に適しており、多くの低レベルの詳細を隠蔽しており、マルチプロセスコードの記述をより簡単かつ使いやすくしています。 - クロスプラットフォームサポート:
concurrent.futures.ProcessPoolExecutor
とmultiprocessing
はどちらもクロスプラットフォームのマルチプロセスサポートを提供し、様々なオペレーションシステムで使用することができます。
以上のことから、concurrent.futures.ProcessPoolExecutor
は高級インターフェースで、低レベルのマルチプロセス機能をラップしており、簡単なマルチプロセスタスク並列化に適しています。一方、multiprocessing
はより低レベルのライブラリで、より多くの制御と柔軟性を提供し、プロセスを細かく制御する必要があるシーンに適しています。
具体的な要件に応じて適切なライブラリを選択する必要があります。単純なタスク並列化の場合は、concurrent.futures.ProcessPoolExecutor
を使ってコードを簡略化することができます。より低レベルの制御と通信が必要な場合は、multiprocessing
ライブラリを使うことができます。
総括
マルチスレッドとは異なり、Asyncio
はシングルスレッドですが、内部のイベントループメカニズムにより、複数の異なるタスクを並行して実行することができ、マルチスレッドよりも大きな自主制御権を持っています。
Asyncio
内のタスクは、実行中に割り込まれることがなく、そのため、「レースコンディション」の状況が起こりえません。
特にI/O操作が多いシーンでは、Asyncio
はマルチスレッドよりも実行効率が高いです。なぜなら、Asyncio
内部のタスク切り替えのオーバーヘッドは、スレッド切り替えのオーバーヘッドよりもはるかに小さく、また、Asyncio
で起動できるタスク数は、マルチスレッドのスレッド数よりもはるかに多いからです。
ただし、多くの場合、Asyncio
を使うには特定のサードパーティライブラリのサポートが必要です。前述のサンプルのaiohttp
のように。もしI/O操作が速く、多量ではない場合は、マルチスレッドを使っても問題を十分に解決することができます。
-
asyncio
はPythonの非同期プログラミングを実現するライブラリです。 - コルーチンは
asyncio
の核心概念で、async
とawait
キーワードを使って非同期操作を実現します。 -
asyncio
は強力な非同期I/O操作APIを提供し、I/O集中型タスクを簡単に処理することができます。 -
asyncio.gather()
などのメカニズムを通じて、複数のコルーチンタスクを並行して実行することができます。
Leapcell: Flask/FastAPIその他Pythonアプリケーション向けの理想的なプラットフォーム
最後に、Flask/FastAPIをデプロイする理想的なプラットフォームであるLeapcellを紹介します。
Leapcellは、現代の分散アプリケーション向けに設計されたクラウドコンピューティングプラットフォームです。その使いたい分だけ支払う(pay-as-you-go)価格モデルにより、アイドルコストが発生しません。つまり、ユーザーは実際に使ったリソースだけを支払うことになります。
WSGI/ASGIアプリケーションにとってのLeapcellのユニークな利点:
- 多言語サポート
- JavaScript、Python、Go、またはRustでの開発をサポートします。
- 無制限のプロジェクトの無料デプロイ
- 使用量に基づいて課金されます。リクエストがない場合、料金はかかりません。
- 比類なきコスト効率
- 使いたい分だけ支払う方式で、アイドル料金はありません。
- たとえば、25ドルで694万件のリクエストをサポートし、平均応答時間は60ミリ秒です。
- 簡素化された開発者体験
- 直感的なユーザーインターフェースで、簡単なセットアップが可能です。
- 完全自動化されたCI/CDパイプラインとGitOps統合。
- リアルタイムのメトリックとログで、アクション可能な洞察を提供します。
- 簡単なスケーラビリティと高性能
- 自動的なスケーリングで、高い並行性を簡単に処理することができます。
- オペレーションオーバーヘッドがゼロで、開発者は開発に集中することができます。
詳細はDocsを参照してください!
Leapcell Twitter: https://x.com/LeapcellHQ
Discussion