😽
Pythonの並行処理パターン:実測比較
はじめに
Pythonの並行処理には複数のアプローチがあり、それぞれ適した場面が異なります。本記事では実際に測定した結果を基に各パターンの特性を比較します。
間違いがございましたら、おっしゃってください.
検証環境
- Python 3.13.5
- macOS Darwin 24.5.0
- CPU: 10コア
I/Oバウンドタスクの比較
タスク内容: httpbin.orgへの5つのHTTPリクエスト(各リクエスト約3.5秒)
| パターン | 実行時間 | 説明 |
|---|---|---|
| 逐次処理 | 17.37秒 | 1つずつ順番に実行 |
| Threading | 5.20秒 | 複数スレッドで並行実行 |
| Asyncio | 0.38秒 | 非同期I/Oで効率的に処理 |
| ThreadPoolExecutor | 0.38秒 | スレッドプールで並行実行 |
結果分析
- 逐次処理: 予想通り最も遅い(5リクエスト×約3.5秒≒17秒)
- Threading: GILの影響を受けるが、I/O待機中は他スレッドが実行可能
- Asyncio: 最も効率的。単一スレッドで全リクエストを並行処理
- ThreadPoolExecutor: Asyncioと同等の性能
CPUバウンドタスクの比較
タスク内容: 素数計算
| パターン | 実行時間 | 説明 |
|---|---|---|
| 逐次処理 | 0.02秒 | ベースライン |
| Threading | 0.02秒 | GILのため改善なし |
| Multiprocessing | 0.20秒 | プロセス起動オーバーヘッド |
結果分析
- Threading: GILにより真の並列処理ができない
- Multiprocessing: 小規模タスクではオーバーヘッドが大きい
- CPUバウンドタスクが大規模な場合のみMultiprocessingが有効
FastAPIでの実測結果
デフォルトのThreadPoolExecutor
50個の同時ブロッキングリクエスト(10秒):
- 最初の40個: 10.17秒で完了
- 残り10個: 20.04秒で完了
-
FastAPIのデフォルトThreadPoolは40スレッド
- FastAPI 0.100+では、同期関数用にStarletteのThreadPoolExecutorを使用
- デフォルトサイズは40(実測により確認)
asyncio.to_threadの制限
15人同時アクセス(60秒処理):
- 14人: 60秒で完了(並列処理)
- 1人: 120秒で完了(スレッド待ち)
-
デフォルトスレッドプール: min(32, CPU数+4) = 14
- Python 3.9以降の
asyncio.to_threadのデフォルト設定 - 公式ドキュメント参照
- Python 3.9以降の
各パターンの使い分け
1. 逐次処理
- 処理順序が重要な場合
- デバッグ時
2. Threading
- I/Oバウンドタスク
- レスポンス時間が重要な場合
- 注意: スレッドプールサイズの制限
3. Multiprocessing
- CPUバウンドタスク
- 大規模な計算処理
- 注意: メモリ使用量とプロセス起動コスト
4. Asyncio
- 大量のI/O処理
- ネットワーク通信が多い場合
- 最も効率的なI/O処理
実装例
I/Oバウンド - Asyncio(推奨)
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
CPUバウンド - Multiprocessing
from multiprocessing import Pool
def cpu_task(n):
# CPU集約的な処理
return result
with Pool() as pool:
results = pool.map(cpu_task, range(10))
FastAPIでの長時間処理
from concurrent.futures import ThreadPoolExecutor
# 専用のexecutorを作成(重要)
executor = ThreadPoolExecutor(max_workers=10)
@app.post("/long-task")
async def long_task():
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor, # 専用executor使用
blocking_function
)
return result
まとめ
- I/Oバウンド: Asyncioが最も効率的(0.38秒 vs 逐次処理17.37秒)
- CPUバウンド: 小規模ならThreadingで十分、大規模ならMultiprocessing
- FastAPI: デフォルトThreadPoolは40スレッド(実測値)
- asyncio.to_thread: デフォルト min(32, CPU数+4) スレッド(Python 3.9+)
- 長時間処理: 専用ThreadPoolExecutor必須
Discussion