Pythonで並行・並列処理のメモ
並行処理と並列処理
- 並行処理 (Concurrency)
- 複数のタスクが「同時進行しているように」扱われること。必ずしも同時に実行されなくても、タスク間の切り替えで効率的に処理を進められます。
- 例: ユーザー入力の待機中に他の処理を進める。
- 並列処理 (Parallelism)
- 複数のタスクが物理的に同時に実行されること。
- 例: マルチコアCPUを活用して、CPU負荷の高い処理を同時に実行する。
Pythonの場合
- グローバルインタプリタロック (GIL) によって、CPUバウンドな処理ではスレッドを使っても完全な並列実行は難しい
- 一般的にプロセスを利用する
- I/Oバウンドな処理(外部API呼び出し、ファイル操作、ネットワーク通信など)
- スレッドや非同期処理を利用する
並行処理・並列処理に関するドキュメントは以下にまとまっている
以下とも関連する
スレッド
threading
を使う
以下の例がわかりやすい
import time
import threading
# 並行で実行したい処理1
def boil_udon():
print(" 現在のスレッド:", threading.current_thread().name)
print(' うどんを茹でます。')
time.sleep(3)
print(' うどんが茹であがりました。')
# 並行で実行したい処理2
def make_tuyu():
print(" 現在のスレッド:", threading.current_thread().name)
print(' ツユをつくります。')
time.sleep(2)
print(' ツユができました。')
print('うどんを作ります。')
print("現在のスレッド:", threading.current_thread().name)
# それぞれの処理ごとにスレッドを作成
thread1 = threading.Thread(target=boil_udon)
thread2 = threading.Thread(target=make_tuyu)
# 各スレッドを開始
thread1.start()
thread2.start()
# 各スレッドの終了を待つ
thread1.join()
thread2.join()
print('盛り付けます。')
print('うどんができました。')
うどんを作ります。
現在のスレッド: MainThread
現在のスレッド: Thread-20 (boil_udon)
うどんを茹でます。
現在のスレッド: Thread-21 (make_tuyu)
ツユをつくります。
ツユができました。
うどんが茹であがりました。
盛り付けます。
うどんができました。
CPUバウンドではなく、外部リソース(ネットワーク、ディスク、ユーザからの入力等)のI/Oバウンドなタスクに向いている。
並列ではなく並行処理である。
マルチプロセス
multiprocessing
を使う
上と同じ例で書いてみた。
import time
from multiprocessing import Process, current_process
# 並列で実行したい処理1
def boil_udon():
print(f" 現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print(' うどんを茹でます。')
time.sleep(3)
print(' うどんが茹であがりました。')
# 並列で実行したい処理2
def make_tuyu():
print(f" 現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print(' ツユをつくります。')
time.sleep(2)
print(' ツユができました。')
if __name__ == '__main__':
print('うどんを作ります。')
print(f"現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
# 各処理ごとにプロセスを作成
process1 = Process(target=boil_udon)
process2 = Process(target=make_tuyu)
# 各プロセスを開始
process1.start()
process2.start()
# 各プロセスの終了を待つ
process1.join()
process2.join()
print('盛り付けます。')
print('うどんができました。')
うどんを作ります。
現在のプロセス名(プロセスID): MainProcess (528)
現在のプロセス名(プロセスID): Process-5 (41501)
うどんを茹でます。
現在のプロセス名(プロセスID): Process-6 (41504)
ツユをつくります。
ツユができました。
うどんが茹であがりました。
盛り付けます。
うどんができました。
こちらは異なるプロセスでそれぞれが動作するので、「並列」処理となる。
concurrent.futuresモジュール
concurrent.futuresモジュールは、スレッドやプロセスを非同期に並行・並列実行を行いやすくするための高レベルインタフェースを提供してくれる。
あらかじめスレッドやプロセスの「プール」(使い回し可能なリソースの集合体)をあらかじめ確保しておいて、必要なタスクをその中で使う。これにより
- リソース作成時のオーバーヘッドを軽減
- 同時実行数を制御してシステムリソースの無駄な消費を防ぐ
- タスクの管理やスケジューリングが容易になる
というメリットがあるらしい。
でこれらを行ってくれるのが、ThreadPoolExecutor
とProcessPoolExecutor
になる。その名の通り、スレッド向けとプロセス向けになっている。
ThreadPoolExecutor
の例
import time
from concurrent.futures import ThreadPoolExecutor
import threading
def boil_udon():
print(" 現在のスレッド:", threading.current_thread().name)
print(' うどんを茹でます。')
time.sleep(3)
print(' うどんが茹であがりました。')
def make_tuyu():
print(" 現在のスレッド:", threading.current_thread().name)
print(' ツユをつくります。')
time.sleep(2)
print(' ツユができました。')
def main():
print("現在のスレッド:", threading.current_thread().name)
print('うどんを作ります。')
# ThreadPoolExecutor を生成
executor = ThreadPoolExecutor(max_workers=2)
# それぞれの処理を非同期に実行
executor.submit(boil_udon)
executor.submit(make_tuyu)
# shutdown(wait=True) で、すべてのタスクが完了するまで待機
executor.shutdown(wait=True)
print('盛り付けます。')
print('うどんができました。')
if __name__ == '__main__':
main()
現在のスレッド: MainThread
うどんを作ります。
現在のスレッド: ThreadPoolExecutor-10_0
うどんを茹でます。
現在のスレッド: ThreadPoolExecutor-10_1
ツユをつくります。
ツユができました。
うどんが茹であがりました。
盛り付けます。
うどんができました。
ProcessPoolExecutor
の例
import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
def boil_udon():
print(f" 現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print(' うどんを茹でます。')
time.sleep(3)
print(' うどんが茹であがりました。')
def make_tuyu():
print(f" 現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print(' ツユをつくります。')
time.sleep(2)
print(' ツユができました。')
def main():
print(f"現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print('うどんを作ります。')
# ProcessPoolExecutor 生成
executor = ProcessPoolExecutor(max_workers=2)
# それぞれの処理を非同期に実行
executor.submit(boil_udon)
executor.submit(make_tuyu)
# shutdown(wait=True) を呼ぶことで、すべてのタスクが完了するまで待機
executor.shutdown(wait=True)
print('盛り付けます。')
print('うどんができました。')
if __name__ == '__main__':
main()
現在のプロセス名(プロセスID): MainProcess (528)
うどんを作ります。
現在のプロセス名(プロセスID): ForkProcess-37 (57408)
うどんを茹でます。
現在のプロセス名(プロセスID): ForkProcess-38 (57409)
ツユをつくります。
ツユができました。
うどんが茹であがりました。
盛り付けます。
うどんができました。
基本的な流れ
-
ThreadPoolExecutor
/ProcessPoolExecutor
を使ってExecutor
インスタンスを作成-
max_workers
で同時実行数を指定可能
-
-
.submit
でタスクを投入 -
.shutdown(wait=True)
ですべてのタスクの終了を待つ
with
句でExecutor
インスタンスを作成すると、ブロック終了時にshutdown(wait=True)
(デフォルトの場合)が呼ばれるので、それぞれ以下のように書ける。
import time
from concurrent.futures import ThreadPoolExecutor
import threading
def boil_udon():
print(" 現在のスレッド:", threading.current_thread().name)
print(' うどんを茹でます。')
time.sleep(3)
print(' うどんが茹であがりました。')
def make_tuyu():
print(" 現在のスレッド:", threading.current_thread().name)
print(' ツユをつくります。')
time.sleep(2)
print(' ツユができました。')
def main():
print("現在のスレッド:", threading.current_thread().name)
print('うどんを作ります。')
# ThreadPoolExecutor を生成
with ThreadPoolExecutor(max_workers=2) as executor:
# それぞれの処理を非同期に実行
executor.submit(boil_udon)
executor.submit(make_tuyu)
# withブロックを終了する場合にはタスクがすべて終了されていることが保障される
print('盛り付けます。')
print('うどんができました。')
if __name__ == '__main__':
main()
import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
def boil_udon():
print(f" 現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print(' うどんを茹でます。')
time.sleep(3)
print(' うどんが茹であがりました。')
def make_tuyu():
print(f" 現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print(' ツユをつくります。')
time.sleep(2)
print(' ツユができました。')
def main():
print(f"現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print('うどんを作ります。')
# ProcessPoolExecutor 生成
with ProcessPoolExecutor(max_workers=2) as executor:
# それぞれの処理を非同期に実行
executor.submit(boil_udon)
executor.submit(make_tuyu)
# withブロックを終了する場合にはタスクがすべて終了されていることが保障される
print('盛り付けます。')
print('うどんができました。')
if __name__ == '__main__':
main()
タスクをsubmit()
するとFuture オブジェクトというのが返される。これは完了していないタスクの結果をあとから受け取るためのハンドルのようなものらしい。つまり、タスクの結果を受け取るにはこのFutureオブジェクト経由で受け取ることになる。
Futureオブジェクトには以下のようなメソッド・属性がある。
-
result(timeout=None)
: タスクの完了を待ち、結果を取得する。タイムアウトを設定可。 -
done()
: タスクが完了しているかどうかをboolで取得する。 -
exception(timeout=None)
: タスク実行中の例外を取得する。タイムアウトを設定可。
また、複数のFutureオブジェクトの状態を取得したり管理することもできる。
concurrent.futures.as_completed()
concurrent.futures.wait()
以下はタスクの実行結果を受け取った場合
ThreadPoolExecutor
の例
import time
from concurrent.futures import ThreadPoolExecutor
import threading
def boil_udon():
print(" 現在のスレッド:", threading.current_thread().name)
print(' うどんを茹でます。')
time.sleep(3)
print(' うどんが茹であがりました。')
return 'うどん完了'
def make_tuyu():
print(" 現在のスレッド:", threading.current_thread().name)
print(' ツユをつくります。')
time.sleep(2)
print(' ツユができました。')
return 'ツユ完了'
def main():
print("現在のスレッド:", threading.current_thread().name)
print('うどんを作ります。')
# ThreadPoolExecutor を生成
executor = ThreadPoolExecutor(max_workers=2)
# それぞれの処理を非同期に実行
future_udon = executor.submit(boil_udon)
future_tuyu = executor.submit(make_tuyu)
# すべてのタスクが完了するのを待つ
executor.shutdown(wait=True)
# タスクの結果を取得
print(future_udon.result())
print(future_tuyu.result())
print('盛り付けます。')
print('うどんができました。')
if __name__ == '__main__':
main()
現在のスレッド: MainThread
うどんを作ります。
現在のスレッド: ThreadPoolExecutor-29_0
うどんを茹でます。
現在のスレッド: ThreadPoolExecutor-29_1
ツユをつくります。
ツユができました。
うどんが茹であがりました。
うどん完了
ツユ完了
盛り付けます。
うどんができました。
ProcessPoolExecutor
の例
import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process
def boil_udon():
print(f" 現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print(' うどんを茹でます。')
time.sleep(3)
return 'うどん完了'
def make_tuyu():
print(f" 現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print(' ツユをつくります。')
time.sleep(2)
return 'ツユ完了'
def main():
print(f"現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
print('うどんを作ります。')
# ProcessPoolExecutor 生成
executor = ProcessPoolExecutor(max_workers=2)
# それぞれの処理を非同期に実行
future_udon = executor.submit(boil_udon)
future_tuyu = executor.submit(make_tuyu)
# すべてのタスクが完了するのを待つ
executor.shutdown(wait=True)
# タスクの結果を取得
print(future_udon.result())
print(future_tuyu.result())
print('盛り付けます。')
print('うどんができました。')
if __name__ == '__main__':
main()
現在のプロセス名(プロセスID): MainProcess (528)
うどんを作ります。
現在のプロセス名(プロセスID): ForkProcess-51 (65774)
うどんを茹でます。
現在のプロセス名(プロセスID): ForkProcess-52 (65775)
ツユをつくります。
うどん完了
ツユ完了
盛り付けます。
うどんができました。