Closed8

Python のマルチプロセッシング、マルチスレッディング実装による並列処理

fujimotoshinjifujimotoshinji

Python で並列処理するにはマルチコア、マルチスレッドの大きく 2パターンがあります。
(async/await でも並列処理っぽいことができますが、目的が非同期処理なのでここでは取り扱いません)

https://docs.python.org/ja/3.10/library/concurrent.futures.html

マルチプロセッシング

マルチプロセッシングは複数のサブプロセスに分割することで並列処理を実現します。
CPU バウンドな並列処理が目的であればマルチプロセッシングの方が向いています。

特徴としては

  • メリット
    • 完全な並列処理が可能
    • 複数の CPU を使用可能
  • デメリット
    • 処理やメモリのオーバーヘッドが大きめ

マルチスレッディング

マルチプロセッシングは 1つのプロセス内でスレッドに分割することで並列処理を実現します。
I/O バウンドな並列処理が目的であればマルチスレッディングの方が向いています。

特徴としては

  • メリット
    • 擬似的な並列処理が可能
    • 複数の CPU を使用可能
    • マルチプロセッシングよりはオーバーヘッドが小さい
  • デメリット
    • GIL により完全な並列処理ができない

非同期処理(async/await)

非同期処理は 1プロセス、1スレッド内で coroutine に分割して処理を切り替えながら実行します。

特徴としては

  • メリット
    • オーバーヘッドが小さい
  • デメリット
    • 複数の CPU を使用できない
    • 実装が複雑になったり、考えることが多い
fujimotoshinjifujimotoshinji

マルチプロセッシング

ProcessPoolExecutor クラスを利用することで簡単に実装できます。

from concurrent.futures import ProcessPoolExecutor
from itertools import repeat
import time
from datetime import datetime

def any_processing(sleep_time: int, starttime: float):
    time.sleep(sleep_time)
    nowtime = datetime.now().timestamp()
    print(f"{nowtime - starttime} slept {sleep_time}s")

if __name__ == "__main__":
    starttime = datetime.now().timestamp()
    with ProcessPoolExecutor() as executor:
        sleep_times = [1, 2, 3]
        executor.map(any_processing, sleep_times, repeat(starttime))

実行結果です。起動のオーバーヘッドが大きいです。

2.4808850288391113 slept 1s
3.4827048778533936 slept 2s
4.492196083068848 slept 3s
fujimotoshinjifujimotoshinji

マルチスレッディング

ThredPoolExecutor クラスに変更するだけです。

from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
import time
from datetime import datetime

def any_processing(sleep_time: int, starttime: float):
    time.sleep(sleep_time)
    nowtime = datetime.now().timestamp()
    print(f"{nowtime - starttime} slept {sleep_time}s")

if __name__ == "__main__":
    starttime = datetime.now().timestamp()
    with ThreadPoolExecutor() as executor:
        sleep_times = [1, 2, 3]
        executor.map(any_processing, sleep_times, repeat(starttime))

実行結果です。マルチプロセッシングと違って、オーバーヘッドはかなり小さいです。

1.006422996520996 slept 1s
2.004075050354004 slept 2s
3.0070250034332275 slept 3s
fujimotoshinjifujimotoshinji

map の戻り値を受け取る

並列実行の戻り値を受け取ります。map 関数であれば戻り値のリストを返します。
ProcessPoolExecutorThreadPoolExecutor は使い方は同じです。

from concurrent.futures import ThreadPoolExecutor
from itertools import repeat
import time
from datetime import datetime

def any_processing(sleep_time: int, starttime: float) -> str:
    time.sleep(sleep_time)
    nowtime = datetime.now().timestamp()
    return f"{nowtime - starttime} slept {sleep_time}s"

if __name__ == "__main__":
    starttime = datetime.now().timestamp()
    with ThreadPoolExecutor() as executor:
        sleep_times = [1, 2, 3]
        messages = executor.map(any_processing, sleep_times, repeat(starttime))
        for message in messages:
            print(message)

実行結果です。

1.0043740272521973 slept 1s
2.0039401054382324 slept 2s
3.0080950260162354 slept 3s
fujimotoshinjifujimotoshinji

submit の戻り値を受け取る

map は簡単に使えますが、並行処理の間に別の処理を実行したい場合や、もう少し細かく実行を制御したい場合、 submit を利用します。
submit で戻り値を受け取る場合、submit が返す Future オブジェクトの終了を待って、結果を取り出します。

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime

def any_processing(sleep_time: int, starttime: float) -> str:
    time.sleep(sleep_time)
    nowtime = datetime.now().timestamp()
    return f"{nowtime - starttime} slept {sleep_time}s"

if __name__ == "__main__":
    starttime = datetime.now().timestamp()
    with ThreadPoolExecutor() as executor:
        sleep_times = [1, 2, 3]
        futures = [executor.submit(any_processing, sleep_time, starttime) for sleep_time in sleep_times]
        for future in as_completed(futures):
            print(future.result())

実行結果です。

1.0063040256500244 slept 1s
2.0031981468200684 slept 2s
3.0059750080108643 slept 3s
fujimotoshinjifujimotoshinji

例外をハンドリングする

map の場合、並列処理で例外が発生すると、他の実行中の処理を中断して例外を raise します。
submit の場合、いくつかのパターンがあります。

以下は例外が発生しても中断せず、例外が発生しなかった処理は結果を、例外が発生した処理は例外メッセージを出力します。

from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
import time
from datetime import datetime

def any_processing(sleep_time: int, starttime: float) -> str:
    if sleep_time == 2:
        raise ValueError("value error")
    time.sleep(sleep_time)
    nowtime = datetime.now().timestamp()
    return f"{nowtime - starttime} slept {sleep_time}s"

if __name__ == "__main__":
    starttime = datetime.now().timestamp()
    with ThreadPoolExecutor() as executor:
        sleep_times = [1, 2, 3]
        futures = [executor.submit(any_processing, sleep_time, starttime) for sleep_time in sleep_times]
        for done_and_not_done_futures in wait(futures, return_when=FIRST_COMPLETED):
            for future in done_and_not_done_futures:
                if future.exception():
                    print(future.exception())
                else:
                    print(future.result())

実行結果です。sleep しない例外の処理が最初に結果を返し、中断せずに他の処理は結果を出力します。

value error
1.003169059753418 slept 1s
3.0031561851501465 slept 3s
fujimotoshinjifujimotoshinji

同時実行数の指定

Executor クラスの max_worker 引数で指定できます。
デフォルト値は 32 か、CPU コア数 + 4 です。

with ThreadPoolExecutor(max_workers=4) as executor:
fujimotoshinjifujimotoshinji

まとめ

  • Python の標準ライブラリでマルチプロセッシング、マルチスレッディングの実装は共通化されている
  • 戻り値の制御、例外の制御など簡単に柔軟にできる
このスクラップは2022/07/14にクローズされました