📘

Python concurrent.futures.ProcessPoolExecutor で KeyboardInterrupt のメモ

に公開

concurrent.futures.ProcessPoolExecutor で並行処理しているのを, Ctrl-C で止めると子プロセスが残ってつらい...(Linux)

環境

Python 3.8+

原因

Python ProcessPoolExecutor では, fork ベースでプロセス作って IPC しているっぽいです.

そのため, プログラムによっては Ctrl-C で強制終了させると, 親プロセスと通信できなくなってずっと子プロセスが残ってしまうときがあります.

解決

とりま SIGINT handler と try の組み合わせで行います.

https://stackoverflow.com/questions/68902374/processpoolexectur-and-ctrl-c

import sys
import time
import concurrent.futures
import signal

def handler(signum, frame):
    # Gracefull shutfown
    print('Signal handler called with signal', signum)
    
    # sys.exit とかでプロセス終了はうまくいかない
    # sys.exit(-1)

def worker(fname):
    print(fname)
    #time.a(4)
    time.sleep(5)
    #except KeyboardInterrupt:
    #    print("bora")


files = ['a', 'b', 'c', 'd', 'e', 'f', 'g']

if __name__ == '__main__':
    signal.signal(signal.SIGINT, handler)

    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:

        try:

            results = executor.map(worker, files)
            for res in results:
                print("DONE")

        except Exception as exc:
            print(exc)
            executor.shutdown(wait=True, cancel_futures=True)

問題点

ただこれだと task が終わるまでしかし待ちます.
global 変数経由でなんか state 管理したりがよいでしょう.

https://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

あたりを掘り下げるとよりよい解があるでしょう.

その他の話題 executor.submit では worker での例外を catch しない

また, executor.map の結果を使わないときや, executor.submit では worker での例外を catch しないので注意です!

import sys
import time
import concurrent.futures


def worker(fname):
    print(fname)
    time.a(4) # raise


files = ['a', 'b', 'c', 'd', 'e', 'f', 'g']

if __name__ == '__main__':

    with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:
        futures = []
	for f in files:
	    fs = executor.submit(worker, files)
            futures.append(fs)
	    
	# futures を待つ処理をここに

time.a(4) のエラーを catch しません.

https://qiita.com/mms0xf/items/47e08a0f4b2467b4a164

future.result()future.exception() 例外があるまで待つ, を使うとよさそうでしょうか

Discussion