🐍

Pythonのマルチプロセスで巨大なデータを処理する場合のエラーハンドリング

こんにちは、エンジニアの澤田です。

普段の業務で、Pythonのプログラムで外部のAPIへリクエストを送り、そのレスポンスを加工してデータベースへ保存する処理を行っています。
その際、リクエスト数が多いため、リクエストを送って結果を受け取る処理を並列(マルチプロセス)で行っています。
ここで 1つのリクエストを「サブプロセス」と呼ばれる子プロセスが担当していますが、以下のような問題が起こることがあります。

  • サブプロセスが処理の途中でハングする
  • APIのレスポンスが巨大でメモリを大量に使用する場合に、OSの「OOM Killer(Out of Memory Killer)」によってサブプロセスが強制終了させられる

メインプロセス(親プロセス)がサブプロセスの異常を検知できないと、親は「子がまだ処理している」と思ってずっと待ち続けてしまい、全体の処理が止まってしまいます。
そこで今回は、サブプロセスがハングしたり強制終了するなどの異常が発生した場合に、メインプロセスでそれを検知して対処できるようにするエラーハンドリングを紹介しようと思います!

※Python はバージョン 3.12.11 を使用しています。

エラーが発生するタイミング

エラーの種類は多々ありますが、発生するタイミングとしては以下の6つがあります。

  • メインプロセスでエラーが発生する
  • メインプロセスが OOM Killer などの外部の処理によって強制終了させられる
  • サブプロセスでエラーが発生する
  • サブプロセスが OOM Killer などの外部の処理によって強制終了させられる
  • サブプロセスの結果を受け取って処理するコールバック関数でエラーが発生する
    ※コールバック関数を使用する場合
  • プロセス全体の処理がタイムアウトする
    ※タイムアウト値を設定する場合

上記で「メインプロセスが OOM Killer などの外部の処理によって強制終了させられる」場合は、外部のプログラムでプロセスを監視してエラーハンドリングを行う必要がありますが、今回はそれ以外の、メインプロセスで行えるエラーハンドリングをやってみようと思います!

concurrent.futures モジュールで並列処理を行う

Python の標準ライブラリに、マルチプロセスを行うモジュールとして multiprocessingconcurrent.futures がありますが、multiprocessing の Pool は「サブプロセスが OOM Killer などの外部の処理によって強制終了させられる」場合のエラーハンドリングができないため、concurrent.futures の ProcessPoolExecutor を使用することにします。

concurrent.futures の ProcessPoolExecutor は、引数の max_workers に並列処理するプロセス数を指定してインスタンス化して、処理内容をまとめた関数を submit() メソッドで登録して使います。
submit() メソッドで処理待ちのリストへ登録すると、処理を行なっていないサブプロセスがそれを取り出して並列で実行されます。

worker という関数を100回登録して、4つのサブプロセスで並列で実行するコードを書いてみましょう!

def worker():
	large_data = []
	count = 1000000
	while count >= 1:
		large_data.append(1)
		count -= 1
	return large_data

executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
for i in range(100):
	future = executor.submit(worker)
executor.shutdown()

concurrent.futures モジュールの as_completed() メソッドの問題点

上記のコードで、 executorsubmit() を実行すると Future クラスのインスタンスが返されるので、それを future という変数に入れています。
worker の実行が完了すると、 future.result()worker の戻り値を取得することができます。

また、concurrent.futures モジュールには as_completed() というメソッドがあり、以下のコードのように future のリストを渡して実行すると、実行完了した future を逐次的に取得することができます。

futures = []
for i in range(100):
	futures.append(executor.submit(worker))
for future in concurrent.futures.as_completed(futures)
	result = future.result()

ただ、ここで1つ問題があります。
futureworker の戻り値を保持するため、 worker の数が多くて戻り値が巨大なデータの場合、 futures が肥大化してしまうことになります。。

future のリストを保持せずに as_completed() と同様の処理を行う

そこで、 future のリストを保持せずに as_completed() と同様の処理を行なってみましょう!

並列で実行する関数の戻り値をコールバック関数で処理するように変更すればよさそうです。

multiprocess_error_handling.py
def worker():
	large_data = []
	count = 1000000
	while count >= 1:
		large_data.append(1)
		count -= 1
	return large_data

def callback(future):
	result = future.result()
	print(len(result))

if __name__ == '__main__':
	executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
	for i in range(100):
		future = executor.submit(worker)
		future.add_done_callback(callback)  # 並列で実行する関数の戻り値をコールバック関数で処理する
	executor.shutdown()

それでは、本題であるエラーハンドリングをやってみましょう!

実行待ちの future を中断する

エラーが発生した場合に、 future のリストを保持していれば、 future.running() で実行中かどうかを判定して、実行中でなければ future.cancel() を実行して処理を行わないようにすることができますが、 future のリストを保持していないので一筋縄ではいかなそうです。

ProcessPoolExecutorshutdown() メソッドに cancel_futures という引数があるので、 ProcessPoolExecutor は実行待ちの future のリストをどこかで保持している筈です。
concurrent.futures の process.py コード を見てみると、 _pending_work_items というメンバ変数があったので、Protected ではあるものの、こちらが使えそうです!

実行待ちの future を中断するコードを書いてみます。

for work_id, work_item in executor._pending_work_items.items():
	if not work_item.future.running():
		work_item.future.cancel()

実行中の future を強制終了する

実行中の future は中断できず、強制終了するしかないようです。
executor._processes でサブプロセスのリストが取得できるので、それをループで回して強制終了します。

if executor._processes is not None:
	for proc in executor._processes.values():
		proc.terminate()

プロセス間でエラーメッセージを共有する

次に、エラーが発生した場合にメインプロセスがそれを検知する仕組みを考えます。
multiprocessing.Manager().Value() でプロセス間で共有できる変数を生成できるので、それを使ってエラーメッセージを入れることにします。

error_message = multiprocessing.Manager().Value(c_wchar_p, None)

マルチスレッドでエラーメッセージの有無を監視する

error_message にエラーメッセージが入っているかどうかを監視して、入っている場合に実行待ちの future を中断して、実行中の future を強制終了するようにします。
メインプロセスとメモリを共有しつつ監視するため、マルチスレッドで実行することにします。
また、 multiprocessing のドキュメント を見ると、マルチプロセスの開始方式がデフォルトの fork だと、メインプロセスでマルチスレッドで処理を行なっている場合に安全ではないと書かれているので、開始方式に spawn を指定しておきます。
メインの処理は multiprocess_error_handling.py に、監視用の処理は process_monitor.py に分けて記述しました。

process_monitor.py
import threading, time

class ProcessMonitor():
	def __init__(self, error_message):
		self.__error_message = error_message
		self.__stop_event = threading.Event()

		# set at runtime
		self.__monitor_thread = None

	def start(self, executor):
		print(f"Process monitor has started.")
		self.__monitor_thread = threading.Thread(target=self._monitor, args=(executor,))
		self.__monitor_thread.start()

	def stop(self):
		self.__stop_event.set()

	def wait(self):
		self.__stop_event.wait()
		self.__stop_event.clear()
		self.__monitor_thread.join()

		if self.__error_message.value is not None:
			raise Exception(self.__error_message.value)

		print("Process monitor has finished.")

	def _monitor(self, executor):
		while not self.__stop_event.wait(5):  # 5 秒おきにチェックする
			if self.__error_message.value is not None:
				self._cancel_futures(executor)
				self._terminate_executor_processes(executor)
				self.__stop_event.set()

	def _cancel_futures(self, executor):
		for work_id, work_item in executor._pending_work_items.items():
			if not work_item.future.running():
				work_item.future.cancel()

	def _terminate_executor_processes(self, executor):
		if executor._processes is not None:
			for proc in executor._processes.values():
				proc.terminate()
multiprocess_error_handling.py
import multiprocessing, concurrent.futures
from ctypes import c_wchar_p
from process_monitor import ProcessMonitor

def worker(error_message):
	try:
		large_data = []
		count = 1000000
		while count >= 1:
			large_data.append(1)
			count -= 1
		return large_data

	except Exception as e:
		error_message.value = "\n".join([
			"Worker process error has occurred.",
			f">> Error message: {e}"
		])
		return

def callback(error_message, future):
	if error_message.value is not None:
		# 他の並列処理でエラーになっている場合は何もしない
		return

	try:
		result = future.result()
		print(len(result))

	except Exception as e:
		error_message.value = "\n".join([
			"Callback process error has occurred.",
			f">> Error message: {e}"
		])

if __name__ == '__main__':
	mp_context = multiprocessing.get_context('spawn')
	error_message = mp_context.Manager().Value(c_wchar_p, None)
	process_monitor = ProcessMonitor(error_message)
	executor = concurrent.futures.ProcessPoolExecutor(max_workers=4, mp_context=mp_context)

	process_monitor.start(executor)
	for i in range(100):
		future = executor.submit(worker, error_message)
		future.add_done_callback(lambda future: callback(error_message, future))
	executor.shutdown()
	process_monitor.stop()

	process_monitor.wait()

タイムアウトのエラーハンドリングを追加する

最後に、「プロセス全体の処理がタイムアウトする」場合のエラーハンドリングを追加します。
process_monitor.py に開始時間と経過時間を持たせて、 _monitor() メソッドにタイムアウト値を超えた場合の処理を追加します。

process_monitor.py
@@ -1,15 +1,19 @@
 import threading, time
 
 class ProcessMonitor():
-       def __init__(self, error_message):
+       def __init__(self, error_message, timeout):
+               self.__timeout = timeout
+               self.__elapsed_time = 0
                self.__error_message = error_message
                self.__stop_event = threading.Event()
 
                # set at runtime
+               self.__start_time = None
                self.__monitor_thread = None
 
        def start(self, executor):
-               print(f"Process monitor has started.")
+               self.__start_time = time.time()
+               print(f"Process monitor has started. (timeout threshold: {self.__timeout} sec)")
                self.__monitor_thread = threading.Thread(target=self._monitor, args=(executor,))
                self.__monitor_thread.start()
 
@@ -28,6 +32,14 @@ class ProcessMonitor():
 
        def _monitor(self, executor):
                while not self.__stop_event.wait(5):  # 5 秒おきにチェックする
+                       self.__elapsed_time = time.time() - self.__start_time
+
+                       if self.__elapsed_time > self.__timeout:
+                               self.__error_message.value = " ".join([
+                                       "Process timeout",
+                                       f"(elapsed time: {self.__elapsed_time:.2f} sec / timeout threshold: {self.__timeout} sec)"
+                               ])
+
                        if self.__error_message.value is not None:
                                self._cancel_futures(executor)
                                self._terminate_executor_processes(executor)
multiprocess_error_handling.py
@@ -36,7 +36,7 @@ def callback(error_message, future):
 if __name__ == '__main__':
        mp_context = multiprocessing.get_context('spawn')
        error_message = mp_context.Manager().Value(c_wchar_p, None)
-       process_monitor = ProcessMonitor(error_message)
+       process_monitor = ProcessMonitor(error_message, 60)
        executor = concurrent.futures.ProcessPoolExecutor(max_workers=4, mp_context=mp_context)
 
        process_monitor.start(executor)
全体のコード
process_monitor.py
import threading, time

class ProcessMonitor():
	def __init__(self, error_message, timeout):
		self.__timeout = timeout
		self.__elapsed_time = 0
		self.__error_message = error_message
		self.__stop_event = threading.Event()

		# set at runtime
		self.__start_time = None
		self.__monitor_thread = None

	def start(self, executor):
		self.__start_time = time.time()
		print(f"Process monitor has started. (timeout threshold: {self.__timeout} sec)")
		self.__monitor_thread = threading.Thread(target=self._monitor, args=(executor,))
		self.__monitor_thread.start()

	def stop(self):
		self.__stop_event.set()

	def wait(self):
		self.__stop_event.wait()
		self.__stop_event.clear()
		self.__monitor_thread.join()

		if self.__error_message.value is not None:
			raise Exception(self.__error_message.value)

		print("Process monitor has finished.")

	def _monitor(self, executor):
		while not self.__stop_event.wait(5):  # 5 秒おきにチェックする
			self.__elapsed_time = time.time() - self.__start_time

			if self.__elapsed_time > self.__timeout:
				self.__error_message.value = " ".join([
					"Process timeout",
					f"(elapsed time: {self.__elapsed_time:.2f} sec / timeout threshold: {self.__timeout} sec)"
				])

			if self.__error_message.value is not None:
				self._cancel_futures(executor)
				self._terminate_executor_processes(executor)
				self.__stop_event.set()

	def _cancel_futures(self, executor):
		for work_id, work_item in executor._pending_work_items.items():
			if not work_item.future.running():
				work_item.future.cancel()

	def _terminate_executor_processes(self, executor):
		if executor._processes is not None:
			for proc in executor._processes.values():
				proc.terminate()
multiprocess_error_handling.py
import multiprocessing, concurrent.futures
from ctypes import c_wchar_p
from process_monitor import ProcessMonitor

def worker(error_message):
	try:
		large_data = []
		count = 1000000
		while count >= 1:
			large_data.append(1)
			count -= 1
		return large_data

	except Exception as e:
		error_message.value = "\n".join([
			"Worker process error has occurred.",
			f">> Error message: {e}"
		])
		return

def callback(error_message, future):
	if error_message.value is not None:
		# 他の並列処理でエラーになっている場合は何もしない
		return

	try:
		result = future.result()
		print(len(result))

	except Exception as e:
		error_message.value = "\n".join([
			"Callback process error has occurred.",
			f">> Error message: {e}"
		])

if __name__ == '__main__':
	mp_context = multiprocessing.get_context('spawn')
	error_message = mp_context.Manager().Value(c_wchar_p, None)
	process_monitor = ProcessMonitor(error_message, 60)
	executor = concurrent.futures.ProcessPoolExecutor(max_workers=4, mp_context=mp_context)

	process_monitor.start(executor)
	for i in range(100):
		future = executor.submit(worker, error_message)
		future.add_done_callback(lambda future: callback(error_message, future))
	executor.shutdown()
	process_monitor.stop()

	process_monitor.wait()

これで一通りエラーハンドリングを行えるようになりました。

実際にエラーを発生させてみる

それでは実際にエラーを発生させて、エラーハンドリングが行われるか見てみましょう!

※「メインプロセスでエラーが発生する」場合は、メインプロセス側でエラーハンドリングできるので、今回は割愛しています

「サブプロセスでエラーが発生する」場合

worker 関数で定義されていない変数 hoge を出力して、意図的にエラーを発生させてみます。

multiprocess_error_handling.py
@@ -9,6 +9,7 @@ def worker(error_message):
                while count >= 1:
                        large_data.append(1)
                        count -= 1
+               print(hoge)
                return large_data
 
        except Exception as e:

実行結果

$ python3 ./multiprocess_error_handling.py
Process monitor has started. (timeout threshold: 60 sec)
Traceback (most recent call last):
  File "/home/forcia/study/python3_multiprocess_error_handling/./multiprocess_error_handling.py", line 50, in <module>
    process_monitor.wait()
  File "/home/forcia/study/python3_multiprocess_error_handling/process_monitor.py", line 29, in wait
    raise Exception(self.__error_message.value)
Exception: Worker process error has occurred.
>> Error message: name 'hoge' is not defined
$

「サブプロセスが OOM Killer などの外部の処理によって強制終了させられる」場合

OSのkillコマンドに、サブプロセスのPID(プロセスID)を指定して実行してみます。

※killコマンドは別のシェルを立ち上げて実行しています

実行結果

$ python3 ./multiprocess_error_handling.py
Process monitor has started. (timeout threshold: 60 sec)
Traceback (most recent call last):
  File "/home/forcia/study/python3_multiprocess_error_handling/./multiprocess_error_handling.py", line 51, in <module>
    process_monitor.wait()
  File "/home/forcia/study/python3_multiprocess_error_handling/process_monitor.py", line 29, in wait
    raise Exception(self.__error_message.value)
Exception: Callback process error has occurred.
>> Error message: A process in the process pool was terminated abruptly while the future was running or pending.
$

「サブプロセスの結果を受け取って処理するコールバック関数でエラーが発生する」場合

callback 関数で定義されていない変数 piyo を出力して、意図的にエラーを発生させてみます。

multiprocess_error_handling.py
@@ -25,6 +25,7 @@ def callback(error_message, future):
 
        try:
                result = future.result()
+               print(piyo)
                print(len(result))
 
        except Exception as e:

実行結果

$ python3 ./multiprocess_error_handling.py
Process monitor has started. (timeout threshold: 60 sec)
Traceback (most recent call last):
  File "/home/forcia/study/python3_multiprocess_error_handling/./multiprocess_error_handling.py", line 50, in <module>
    process_monitor.wait()
  File "/home/forcia/study/python3_multiprocess_error_handling/process_monitor.py", line 29, in wait
    raise Exception(self.__error_message.value)
Exception: Callback process error has occurred.
>> Error message: name 'piyo' is not defined
$

「プロセス全体の処理がタイムアウトする」場合

worker 関数に30秒間処理を停止する処理を入れて、意図的にタイムアウトを発生させてみます。

multiprocess_error_handling.py
@@ -1,6 +1,7 @@
 import multiprocessing, concurrent.futures
 from ctypes import c_wchar_p
 from process_monitor import ProcessMonitor
+import time
 
 def worker(error_message):
        try:
@@ -9,6 +10,7 @@ def worker(error_message):
                while count >= 1:
                        large_data.append(1)
                        count -= 1
+               time.sleep(30)
                return large_data
 
        except Exception as e:

実行結果

$ python3 ./multiprocess_error_handling.py
Process monitor has started. (timeout threshold: 60 sec)
1000000
1000000
1000000
1000000
Traceback (most recent call last):
  File "/home/forcia/study/python3_multiprocess_error_handling/./multiprocess_error_handling.py", line 51, in <module>
    process_monitor.wait()
  File "/home/forcia/study/python3_multiprocess_error_handling/process_monitor.py", line 29, in wait
    raise Exception(self.__error_message.value)
Exception: Process timeout (elapsed time: 60.04 sec / timeout threshold: 60 sec)
$

無事エラーハンドリングできて、全体の処理が終了していますね!

さいごに

一番悩んだのは「実行待ちの future のリストをどう取得するか」でした。
その際に読んだ concurrent.futures の process.py コード がとても分かりやすく、勉強になりました:)

この記事を書いた人

澤田 哲明
大手旅行会社でWebデザイナーとして勤務しつつプログラミングを学び、2012年にフォルシアに入社。
現在は事業推進部に所属して、福利厚生アウトソーシング会社などのシステム開発を担当。
子供の夏休みに、家族で神戸の六甲アイランドへ「みんなで出撃!にゃんこ大戦争展」を見に行ってきました!

FORCIA Tech Blog

Discussion