PythonでI/Oバウンドな並行処理を一括でタイムアウトさせる
\スニダンを開発しているSODA inc.の Advent Calendar 2024 23日目の記事です!!!/
概要
PythonでI/Oバウンドな処理を並行で実行し、一定時間経過後に一括でタイムアウトさせる方法を紹介します。
PythonでI/Oバウンドな処理を並行で実行する際によく利用されるコンポーネントとして、スレッド(threading
)とコルーチン(asyncio
)があります。スレッドベースの並行処理は、スレッドを停止させる機能が提供されていないため、一括で処理をタイムアウトさせることが難しいです。一方、asyncio
で用いられるコルーチンを使うと比較的楽に一括のタイムアウトを実現できます。ただし、I/O処理を行うライブラリが非同期I/Oに対応している必要があります。
前提
この記事では、処理系としてCPythonを前提としています。
また、Pythonの並行処理ではプロセスを使う方法もありますが、I/Oバウンドな処理でプロセスを使って並行処理をするのはメモリ効率的に好ましくなく、一般的ではないため触れません。
検証はPythonのバージョン3.13.0で行っています。
スレッドベースの並行処理
スレッドベースの並行処理は、threading
モジュールを使って自分で直接Thread
を生成する方法する方法もありますが、concurrent.futures
を使うとより楽に実行することができます。concurrent.futures
に含まれるThreadPoolExecutor
は、内部にスレッドプールを保持しており、スレッドを管理してくれます。
ThreadPoolExecutor
のmap
メソッドは、スレッドで実行したい関数とその引数を渡すと、これらをExecutor内部のスレッドプールで並行実行します。そして、渡した引数の順番で結果を返すイテレータを返します。また、map
メソッドにはタイムアウトの秒数を設定することができます。タイムアウトが設定された場合、map
メソッドが呼ばれてからイテレータの__next__
メソッドが呼ばれるまでの間に設定した秒数を経過していると、タイムアウトの例外を投げます。この機能を使えばスレッドベースの並行実行を一定時間経過後に一括でタイムアウトできそうですが、うまくいかないケースがあります。
次のコードは、、map
メソッドで10秒と3秒のI/O待ちが発生する処理を並行処理する例です。5秒後のタイムアウトを設定していますが、5秒でうまくタイムアウトされません。10秒後にタイムアウトします。
import time
from concurrent.futures import ThreadPoolExecutor
def io_bound(n: int) -> int:
"""スレッドの実行をブロックするI/O処理を模した関数"""
print(f"io_bound started {n}sec")
time.sleep(n)
print(f"io_bound finished {n}sec")
return n
def main() -> None:
with ThreadPoolExecutor(max_workers=2) as executor:
results = executor.map(io_bound, (10, 3), timeout=5)
# resultsからresultを取り出すときにタイムアウトのチェックが入る
for result in results:
print(result)
main()
# 実行結果
io_bound started 10sec
io_bound started 3sec
io_bound finished 3sec
io_bound finished 10sec
Traceback (most recent call last):
(省略)
TimeoutError
結果からもわかるとおり、10秒かかる処理の実行を待ってからタイムアウトしています。このような挙動になるのは、Executorが内部でスレッド自体をタイムアウトさせているわけではなく、結果を返すイテレータが結果を返すタイミングでタイムアウトのチェックを行っているからです。イテレータは開始済のスレッドが終了するのを待って、この結果を取り出すタイミングでタイムアウトのチェックを行います。これはタイムアウトが遅延する可能性があることを示しています。
そもそも、Pythonのスレッドは、自スレッドから他スレッドから停止させる機能を提供していません。したがって、スレッドを一定時間経過後に外から停止させることはできません。
スレッドを使って並行処理をする場合は、処理全体のタイムアウトを設定することは難しいです。1つ1つのI/O処理にタイムアウトを設定して、全体として処理時間が長くなりすぎないように調整する必要があります[1]。
コルーチンベースの並行処理
コルーチンとは、処理を中断したり再開したりすることができる関数やメソッドのことです。コルーチンベースの並行処理は、async/await
とasyncio
ライブラリを利用することで実行することができます。async
のついた関数はコルーチンを返します。コルーチンをイベントループに渡すと、イベントループはこれを実行します。イベントループは、シングルスレッドで動いており、実行中のコルーチンがawait
で中断した場合、別のコルーチンを実行します。このように、コルーチン同士が中断したり再開したりしながら、シングルスレッドで協調しあって並行で処理が進みます。
さきほどのスレッドベースの例と同様の処理をコルーチンベースで行うと、次のようになります。
import asyncio
async def io_bound(n: int) -> int:
"""スレッドの実行をブロックしない非同期I/O処理を模した関数"""
print(f"io_bound started {n}sec")
await asyncio.sleep(n)
print(f"io_bound finished {n}sec")
return n
async def main() -> None:
async with asyncio.timeout(5):
tasks = []
tasks.append(asyncio.create_task(io_bound(10)))
tasks.append(asyncio.create_task(io_bound(3)))
for task in tasks:
print(await task)
# イベントループに渡す
asyncio.run(main())
io_bound started 10sec
io_bound started 3sec
io_bound finished 3sec
Traceback (most recent call last):
(省略)
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
(省略)
TimeoutError
タイムアウトが5秒に設定されており、10秒のI/O待ちが発生する処理が約5秒でキャンセルされました。このように、コルーチンを使ったI/Oバウンドな処理では、処理全体を一定時間経過後にキャンセルすることが比較的簡単にできます[2]。
ここで大切なのは、コルーチンで行う処理が非同期I/Oに対応していることです。asyncio
モジュールのsleep
関数は、time
モジュールのsleep
関数の非同期版です。この例では非同期I/Oの待ちの代わりとして利用しています。実際のネットワーク通信では非同期I/Oに対応したネットワークライブラリを使う必要があります。
ちなみに、コルーチンの内部で非同期I/Oではなく同期I/Oのライブラリを使用すると、タイムアウトされないどころか、並行実行すらされないので注意が必要です。
import asyncio
import time
async def io_bound(n: int) -> int:
"""スレッドの実行をブロックしない非同期I/O処理を模した関数"""
print(f"io_bound started {n}sec")
# await asyncio.sleep(n)
# 同期処理に置き換える
time.sleep(n)
print(f"io_bound finished {n}sec")
return n
async def main() -> None:
async with asyncio.timeout(5):
tasks = []
tasks.append(asyncio.create_task(io_bound(10)))
tasks.append(asyncio.create_task(io_bound(3)))
for task in tasks:
print(await task)
asyncio.run(main())
io_bound started 10sec
io_bound finished 10sec
io_bound started 3sec
io_bound finished 3sec
10
3
タイムアウトせず、直列に実行されていることがわかります。await
できずにスレッドの実行をブロックしています。コルーチンの実行が移り変わることがなく、また、イベントループはシングルスレッドで動作しているので、結果として直列で実行されます。
まとめ
- スレッドベースの並行処理では、標準でスレッドを停止させる機能が提供されていないのでタイムアウトが難しい
- コルーチンを使った並行処理では、比較的簡単にタスクをタイムアウトさせることができるが、非同期I/Oに対応している必要がある
参考資料
- threading - https://docs.python.org/ja/3.13/library/threading.html
- asyncio - https://docs.python.org/ja/3.13/library/asyncio.html
- concurrent.futures - https://docs.python.org/ja/3.13/library/concurrent.futures.html
株式会社SODAの開発組織がお届けするZenn Publicationです。 是非Entrance Bookもご覧ください! → recruit.soda-inc.jp/engineer
Discussion