SODA Engineering Blog
🛑

PythonでI/Oバウンドな並行処理を一括でタイムアウトさせる

2024/12/23に公開

スニダンを開発しているSODA inc.Advent Calendar 2024 23日目の記事です!!!/

概要

PythonでI/Oバウンドな処理を並行で実行し、一定時間経過後に一括でタイムアウトさせる方法を紹介します。
PythonでI/Oバウンドな処理を並行で実行する際によく利用されるコンポーネントとして、スレッド(threading)とコルーチン(asyncio)があります。スレッドベースの並行処理は、スレッドを停止させる機能が提供されていないため、一括で処理をタイムアウトさせることが難しいです。一方、asyncioで用いられるコルーチンを使うと比較的楽に一括のタイムアウトを実現できます。ただし、I/O処理を行うライブラリが非同期I/Oに対応している必要があります。

前提

この記事では、処理系としてCPythonを前提としています。
また、Pythonの並行処理ではプロセスを使う方法もありますが、I/Oバウンドな処理でプロセスを使って並行処理をするのはメモリ効率的に好ましくなく、一般的ではないため触れません。
検証はPythonのバージョン3.13.0で行っています。

スレッドベースの並行処理

スレッドベースの並行処理は、threadingモジュールを使って自分で直接Threadを生成する方法する方法もありますが、concurrent.futuresを使うとより楽に実行することができます。concurrent.futuresに含まれるThreadPoolExecutorは、内部にスレッドプールを保持しており、スレッドを管理してくれます。
ThreadPoolExecutormapメソッドは、スレッドで実行したい関数とその引数を渡すと、これらをExecutor内部のスレッドプールで並行実行します。そして、渡した引数の順番で結果を返すイテレータを返します。また、mapメソッドにはタイムアウトの秒数を設定することができます。タイムアウトが設定された場合、mapメソッドが呼ばれてからイテレータの__next__メソッドが呼ばれるまでの間に設定した秒数を経過していると、タイムアウトの例外を投げます。この機能を使えばスレッドベースの並行実行を一定時間経過後に一括でタイムアウトできそうですが、うまくいかないケースがあります。
次のコードは、、mapメソッドで10秒と3秒のI/O待ちが発生する処理を並行処理する例です。5秒後のタイムアウトを設定していますが、5秒でうまくタイムアウトされません。10秒後にタイムアウトします。

import time
from concurrent.futures import ThreadPoolExecutor

def io_bound(n: int) -> int:
    """スレッドの実行をブロックするI/O処理を模した関数"""
    print(f"io_bound started {n}sec")
    time.sleep(n)
    print(f"io_bound finished {n}sec")
    return n

def main() -> None:
    with ThreadPoolExecutor(max_workers=2) as executor:
        results = executor.map(io_bound, (10, 3), timeout=5)
        # resultsからresultを取り出すときにタイムアウトのチェックが入る
        for result in results:
            print(result)

main()
# 実行結果
io_bound started 10sec
io_bound started 3sec
io_bound finished 3sec
io_bound finished 10sec
Traceback (most recent call last):
(省略)
TimeoutError

結果からもわかるとおり、10秒かかる処理の実行を待ってからタイムアウトしています。このような挙動になるのは、Executorが内部でスレッド自体をタイムアウトさせているわけではなく、結果を返すイテレータが結果を返すタイミングでタイムアウトのチェックを行っているからです。イテレータは開始済のスレッドが終了するのを待って、この結果を取り出すタイミングでタイムアウトのチェックを行います。これはタイムアウトが遅延する可能性があることを示しています。
そもそも、Pythonのスレッドは、自スレッドから他スレッドから停止させる機能を提供していません。したがって、スレッドを一定時間経過後に外から停止させることはできません。
スレッドを使って並行処理をする場合は、処理全体のタイムアウトを設定することは難しいです。1つ1つのI/O処理にタイムアウトを設定して、全体として処理時間が長くなりすぎないように調整する必要があります[1]

コルーチンベースの並行処理

コルーチンとは、処理を中断したり再開したりすることができる関数やメソッドのことです。コルーチンベースの並行処理は、async/awaitasyncioライブラリを利用することで実行することができます。asyncのついた関数はコルーチンを返します。コルーチンをイベントループに渡すと、イベントループはこれを実行します。イベントループは、シングルスレッドで動いており、実行中のコルーチンがawaitで中断した場合、別のコルーチンを実行します。このように、コルーチン同士が中断したり再開したりしながら、シングルスレッドで協調しあって並行で処理が進みます。
さきほどのスレッドベースの例と同様の処理をコルーチンベースで行うと、次のようになります。

import asyncio

async def io_bound(n: int) -> int:
    """スレッドの実行をブロックしない非同期I/O処理を模した関数"""
    print(f"io_bound started {n}sec")
    await asyncio.sleep(n)
    print(f"io_bound finished {n}sec")
    return n

async def main() -> None:
    async with asyncio.timeout(5):
        tasks = []
        tasks.append(asyncio.create_task(io_bound(10)))
        tasks.append(asyncio.create_task(io_bound(3)))
        for task in tasks:
            print(await task)

# イベントループに渡す
asyncio.run(main())
io_bound started 10sec
io_bound started 3sec
io_bound finished 3sec
Traceback (most recent call last):
(省略)
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
(省略)
TimeoutError

タイムアウトが5秒に設定されており、10秒のI/O待ちが発生する処理が約5秒でキャンセルされました。このように、コルーチンを使ったI/Oバウンドな処理では、処理全体を一定時間経過後にキャンセルすることが比較的簡単にできます[2]
ここで大切なのは、コルーチンで行う処理が非同期I/Oに対応していることです。asyncioモジュールのsleep関数は、timeモジュールのsleep関数の非同期版です。この例では非同期I/Oの待ちの代わりとして利用しています。実際のネットワーク通信では非同期I/Oに対応したネットワークライブラリを使う必要があります。
ちなみに、コルーチンの内部で非同期I/Oではなく同期I/Oのライブラリを使用すると、タイムアウトされないどころか、並行実行すらされないので注意が必要です。

import asyncio
import time

async def io_bound(n: int) -> int:
    """スレッドの実行をブロックしない非同期I/O処理を模した関数"""
    print(f"io_bound started {n}sec")
    # await asyncio.sleep(n)
    # 同期処理に置き換える
    time.sleep(n)
    print(f"io_bound finished {n}sec")
    return n

async def main() -> None:
    async with asyncio.timeout(5):
        tasks = []
        tasks.append(asyncio.create_task(io_bound(10)))
        tasks.append(asyncio.create_task(io_bound(3)))
        for task in tasks:
            print(await task)

asyncio.run(main())
io_bound started 10sec
io_bound finished 10sec
io_bound started 3sec
io_bound finished 3sec
10
3

タイムアウトせず、直列に実行されていることがわかります。awaitできずにスレッドの実行をブロックしています。コルーチンの実行が移り変わることがなく、また、イベントループはシングルスレッドで動作しているので、結果として直列で実行されます。

まとめ

  • スレッドベースの並行処理では、標準でスレッドを停止させる機能が提供されていないのでタイムアウトが難しい
  • コルーチンを使った並行処理では、比較的簡単にタスクをタイムアウトさせることができるが、非同期I/Oに対応している必要がある

参考資料

脚注
  1. 処理全体のタイムアウトとは関係なく、I/O処理にはタイムアウトを設定すべきです。タイムアウトを設定しない場合、I/O処理に問題が起きた場合、joinしているスレッドがブロックされ続ける問題があります。また、スレッドを投げっぱなしの場合でも、スレッドが終了せずにリークするため、余計なメモリを消費してしまいます。スレッド数によってはメモリを逼迫する問題が起きるかもしれません。 ↩︎

  2. CPUバウンドな処理や同期処理がコルーチンの処理を大半を締めている場合は、イベントループがコルーチンの実行を切り替えられないので、この限りではありません ↩︎

SODA Engineering Blog
SODA Engineering Blog

Discussion