Closed4

Pythonで並行・並列処理のメモ

kun432kun432

並行処理と並列処理

  • 並行処理 (Concurrency)
    • 複数のタスクが「同時進行しているように」扱われること。必ずしも同時に実行されなくても、タスク間の切り替えで効率的に処理を進められます。
    • 例: ユーザー入力の待機中に他の処理を進める。
  • 並列処理 (Parallelism)
    • 複数のタスクが物理的に同時に実行されること。
    • 例: マルチコアCPUを活用して、CPU負荷の高い処理を同時に実行する。

Pythonの場合

  • グローバルインタプリタロック (GIL) によって、CPUバウンドな処理ではスレッドを使っても完全な並列実行は難しい
    • 一般的にプロセスを利用する
  • I/Oバウンドな処理(外部API呼び出し、ファイル操作、ネットワーク通信など)
    • スレッドや非同期処理を利用する

並行処理・並列処理に関するドキュメントは以下にまとまっている

https://docs.python.org/ja/3.13/library/concurrency.html

以下とも関連する

https://zenn.dev/kun432/scraps/7977beda32f468

kun432kun432

スレッド

threadingを使う

https://docs.python.org/ja/3.13/library/threading.html

以下の例がわかりやすい

https://qiita.com/tchnkmr/items/b05f321fa315bbce4f77

https://zenn.dev/nekoallergy/articles/py-advance-threading-01

import time
import threading

# 並行で実行したい処理1
def boil_udon():
  print("  現在のスレッド:", threading.current_thread().name)
  print('  うどんを茹でます。')
  time.sleep(3)
  print('  うどんが茹であがりました。')

# 並行で実行したい処理2
def make_tuyu():
  print("  現在のスレッド:", threading.current_thread().name)
  print('  ツユをつくります。')
  time.sleep(2)
  print('  ツユができました。')

print('うどんを作ります。')
print("現在のスレッド:", threading.current_thread().name)

# それぞれの処理ごとにスレッドを作成
thread1 = threading.Thread(target=boil_udon)
thread2 = threading.Thread(target=make_tuyu)

# 各スレッドを開始
thread1.start()
thread2.start()

# 各スレッドの終了を待つ
thread1.join()
thread2.join()

print('盛り付けます。')
print('うどんができました。')
出力
うどんを作ります。
現在のスレッド: MainThread
  現在のスレッド: Thread-20 (boil_udon)
  うどんを茹でます。
  現在のスレッド: Thread-21 (make_tuyu)
  ツユをつくります。
  ツユができました。
  うどんが茹であがりました。
盛り付けます。
うどんができました。

CPUバウンドではなく、外部リソース(ネットワーク、ディスク、ユーザからの入力等)のI/Oバウンドなタスクに向いている。

並列ではなく並行処理である。

kun432kun432

マルチプロセス

multiprocessingを使う

https://docs.python.org/ja/3.13/library/multiprocessing.html

上と同じ例で書いてみた。

import time
from multiprocessing import Process, current_process

# 並列で実行したい処理1
def boil_udon():
    print(f"  現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('  うどんを茹でます。')
    time.sleep(3)
    print('  うどんが茹であがりました。')

# 並列で実行したい処理2
def make_tuyu():
    print(f"  現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('  ツユをつくります。')
    time.sleep(2)
    print('  ツユができました。')

if __name__ == '__main__':
    print('うどんを作ります。')
    print(f"現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")

    # 各処理ごとにプロセスを作成
    process1 = Process(target=boil_udon)
    process2 = Process(target=make_tuyu)

    # 各プロセスを開始
    process1.start()
    process2.start()

    # 各プロセスの終了を待つ
    process1.join()
    process2.join()

    print('盛り付けます。')
    print('うどんができました。')
出力
うどんを作ります。
現在のプロセス名(プロセスID): MainProcess (528)
  現在のプロセス名(プロセスID): Process-5 (41501)
  うどんを茹でます。
  現在のプロセス名(プロセスID): Process-6 (41504)
  ツユをつくります。
  ツユができました。
  うどんが茹であがりました。
盛り付けます。
うどんができました。

こちらは異なるプロセスでそれぞれが動作するので、「並列」処理となる。

kun432kun432

concurrent.futuresモジュール

https://docs.python.org/ja/3.13/library/concurrent.futures.html

concurrent.futuresモジュールは、スレッドやプロセスを非同期に並行・並列実行を行いやすくするための高レベルインタフェースを提供してくれる。

あらかじめスレッドやプロセスの「プール」(使い回し可能なリソースの集合体)をあらかじめ確保しておいて、必要なタスクをその中で使う。これにより

  • リソース作成時のオーバーヘッドを軽減
  • 同時実行数を制御してシステムリソースの無駄な消費を防ぐ
  • タスクの管理やスケジューリングが容易になる

というメリットがあるらしい。

でこれらを行ってくれるのが、ThreadPoolExecutorProcessPoolExecutor になる。その名の通り、スレッド向けとプロセス向けになっている。

ThreadPoolExecutorの例

import time
from concurrent.futures import ThreadPoolExecutor
import threading

def boil_udon():
    print("  現在のスレッド:", threading.current_thread().name)
    print('  うどんを茹でます。')
    time.sleep(3)
    print('  うどんが茹であがりました。')

def make_tuyu():
    print("  現在のスレッド:", threading.current_thread().name)
    print('  ツユをつくります。')
    time.sleep(2)
    print('  ツユができました。')

def main():
    print("現在のスレッド:", threading.current_thread().name)
    print('うどんを作ります。')

    # ThreadPoolExecutor を生成
    executor = ThreadPoolExecutor(max_workers=2)

    # それぞれの処理を非同期に実行
    executor.submit(boil_udon)
    executor.submit(make_tuyu)
    
    # shutdown(wait=True) で、すべてのタスクが完了するまで待機
    executor.shutdown(wait=True)
    
    print('盛り付けます。')
    print('うどんができました。')

if __name__ == '__main__':
    main()
出力
現在のスレッド: MainThread
うどんを作ります。
   現在のスレッド: ThreadPoolExecutor-10_0
  うどんを茹でます。
   現在のスレッド: ThreadPoolExecutor-10_1
  ツユをつくります。
  ツユができました。
  うどんが茹であがりました。
盛り付けます。
うどんができました。

ProcessPoolExecutorの例

import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process

def boil_udon():
    print(f"  現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('  うどんを茹でます。')
    time.sleep(3)
    print('  うどんが茹であがりました。')

def make_tuyu():
    print(f"  現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('  ツユをつくります。')
    time.sleep(2)
    print('  ツユができました。')

def main():
    print(f"現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('うどんを作ります。')

    # ProcessPoolExecutor 生成
    executor = ProcessPoolExecutor(max_workers=2)
    
    # それぞれの処理を非同期に実行
    executor.submit(boil_udon)
    executor.submit(make_tuyu)
    
    # shutdown(wait=True) を呼ぶことで、すべてのタスクが完了するまで待機
    executor.shutdown(wait=True)
    
    print('盛り付けます。')
    print('うどんができました。')

if __name__ == '__main__':
    main()
出力
現在のプロセス名(プロセスID): MainProcess (528)
うどんを作ります。
  現在のプロセス名(プロセスID): ForkProcess-37 (57408)
  うどんを茹でます。
  現在のプロセス名(プロセスID): ForkProcess-38 (57409)
  ツユをつくります。
  ツユができました。
  うどんが茹であがりました。
盛り付けます。
うどんができました。

基本的な流れ

  • ThreadPoolExecutor/ProcessPoolExecutorを使ってExecutorインスタンスを作成
    • max_workersで同時実行数を指定可能
  • .submitでタスクを投入
  • .shutdown(wait=True)ですべてのタスクの終了を待つ

with句でExecutorインスタンスを作成すると、ブロック終了時にshutdown(wait=True)(デフォルトの場合)が呼ばれるので、それぞれ以下のように書ける。

import time
from concurrent.futures import ThreadPoolExecutor
import threading

def boil_udon():
    print("  現在のスレッド:", threading.current_thread().name)
    print('  うどんを茹でます。')
    time.sleep(3)
    print('  うどんが茹であがりました。')

def make_tuyu():
    print("  現在のスレッド:", threading.current_thread().name)
    print('  ツユをつくります。')
    time.sleep(2)
    print('  ツユができました。')

def main():
    print("現在のスレッド:", threading.current_thread().name)
    print('うどんを作ります。')

    # ThreadPoolExecutor を生成
    with ThreadPoolExecutor(max_workers=2) as executor:
        # それぞれの処理を非同期に実行
        executor.submit(boil_udon)
        executor.submit(make_tuyu)
        # withブロックを終了する場合にはタスクがすべて終了されていることが保障される
    
    print('盛り付けます。')
    print('うどんができました。')

if __name__ == '__main__':
    main()
import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process

def boil_udon():
    print(f"  現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('  うどんを茹でます。')
    time.sleep(3)
    print('  うどんが茹であがりました。')

def make_tuyu():
    print(f"  現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('  ツユをつくります。')
    time.sleep(2)
    print('  ツユができました。')

def main():
    print(f"現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('うどんを作ります。')

    # ProcessPoolExecutor 生成
    with ProcessPoolExecutor(max_workers=2) as executor:
        # それぞれの処理を非同期に実行
        executor.submit(boil_udon)
        executor.submit(make_tuyu)
        # withブロックを終了する場合にはタスクがすべて終了されていることが保障される
    
    print('盛り付けます。')
    print('うどんができました。')

if __name__ == '__main__':
    main()

タスクをsubmit()するとFuture オブジェクトというのが返される。これは完了していないタスクの結果をあとから受け取るためのハンドルのようなものらしい。つまり、タスクの結果を受け取るにはこのFutureオブジェクト経由で受け取ることになる。

Futureオブジェクトには以下のようなメソッド・属性がある。

  • result(timeout=None): タスクの完了を待ち、結果を取得する。タイムアウトを設定可。
  • done(): タスクが完了しているかどうかをboolで取得する。
  • exception(timeout=None): タスク実行中の例外を取得する。タイムアウトを設定可。

また、複数のFutureオブジェクトの状態を取得したり管理することもできる。

  • concurrent.futures.as_completed()
  • concurrent.futures.wait()

以下はタスクの実行結果を受け取った場合

ThreadPoolExecutorの例

import time
from concurrent.futures import ThreadPoolExecutor
import threading

def boil_udon():
    print("  現在のスレッド:", threading.current_thread().name)
    print('  うどんを茹でます。')
    time.sleep(3)
    print('  うどんが茹であがりました。')
    return 'うどん完了'

def make_tuyu():
    print("  現在のスレッド:", threading.current_thread().name)
    print('  ツユをつくります。')
    time.sleep(2)
    print('  ツユができました。')
    return 'ツユ完了'

def main():
    print("現在のスレッド:", threading.current_thread().name)
    print('うどんを作ります。')

    # ThreadPoolExecutor を生成
    executor = ThreadPoolExecutor(max_workers=2)
    
    # それぞれの処理を非同期に実行
    future_udon = executor.submit(boil_udon)
    future_tuyu = executor.submit(make_tuyu)

    # すべてのタスクが完了するのを待つ
    executor.shutdown(wait=True)

    # タスクの結果を取得
    print(future_udon.result())
    print(future_tuyu.result())

    print('盛り付けます。')
    print('うどんができました。')

if __name__ == '__main__':
    main()
出力
現在のスレッド: MainThread
うどんを作ります。
  現在のスレッド: ThreadPoolExecutor-29_0
  うどんを茹でます。
  現在のスレッド: ThreadPoolExecutor-29_1
  ツユをつくります。
  ツユができました。
  うどんが茹であがりました。
うどん完了
ツユ完了
盛り付けます。
うどんができました。

ProcessPoolExecutorの例

import time
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import current_process

def boil_udon():
    print(f"  現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('  うどんを茹でます。')
    time.sleep(3)
    return 'うどん完了'

def make_tuyu():
    print(f"  現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('  ツユをつくります。')
    time.sleep(2)
    return 'ツユ完了'

def main():
    print(f"現在のプロセス名(プロセスID): {current_process().name} ({current_process().pid})")
    print('うどんを作ります。')

    # ProcessPoolExecutor 生成
    executor = ProcessPoolExecutor(max_workers=2)
    
    # それぞれの処理を非同期に実行
    future_udon = executor.submit(boil_udon)
    future_tuyu = executor.submit(make_tuyu)
    
    # すべてのタスクが完了するのを待つ
    executor.shutdown(wait=True)

    # タスクの結果を取得
    print(future_udon.result())
    print(future_tuyu.result())

    print('盛り付けます。')
    print('うどんができました。')


if __name__ == '__main__':
    main()
出力
現在のプロセス名(プロセスID): MainProcess (528)
うどんを作ります。
  現在のプロセス名(プロセスID): ForkProcess-51 (65774)
  うどんを茹でます。
  現在のプロセス名(プロセスID): ForkProcess-52 (65775)
  ツユをつくります。
うどん完了
ツユ完了
盛り付けます。
うどんができました。
このスクラップは2025/02/09にクローズされました