🐍
共有メモリにNumpy配列を載せてProcessPoolExecutorに渡す
やりたいこと
- NumPy による数値計算を並列化したい。
- 各ワーカープロセスへの入力データは共通しているので、共有メモリを使って全体のメモリ使用量を減らしたい。
- 自分でプロセスプールを管理するのは面倒なので ProcessPoolExecutor を使いたい!!!
結論
ProcessPoolExecutor()
の initializer=
経由で、
ワーカープロセスに multiprocessing.RawValue
を渡せばいい。
サンプルコード
import concurrent.futures
import multiprocessing as mp
import numpy as np
def ndarray_to_shared_memory(data):
"""NumPy配列を共有メモリに載せる.
Paramerters
-----------
data: numpy.ndarray
Returns
-------
(multiprocessing.RawValue, numpy.dtype)
共有メモリ上のデータを参照するときに使う情報.
"""
data = np.asarray(data)
# 共有メモリに載せるデータの型を計算する.
value_type = np.ctypeslib.as_ctypes_type(np.uint8)
if data.ndim >= 1:
value_type *= data.itemsize * data.shape[-1]
for dim in data.shape[:-1][::-1]:
value_type *= dim
# 共有メモリを確保する.
value = mp.RawValue(value_type)
# 共有メモリにデータをコピーする.
try:
np.ctypeslib.as_array(value)[:] = data.view(np.uint8)
except TypeError:
raise TypeError(f'unsupported dtype: {data.dtype!r}')
return value, data.dtype
def shared_memory_to_ndarray(data):
"""共有メモリ上のデータをNumPy配列として参照する.
排他制御はされないので必要に応じてmultiprocessing.Lockなどを使う.
Paramerters
-----------
data: (multiprocessing.RawValue, numpy.dtype)
共有メモリ上のデータを参照するときに使う情報.
ndarray_to_shared_memory()の戻り値をそのまま渡せばOK.
Returns
-------
numpy.ndarray
"""
value, dtype = data
return np.ctypeslib.as_array(value).view(dtype)
class MySharedMemory:
"""ワーカープロセスと共有するデータ"""
dataset = None
result = None
def task(dataset, result, i):
"""並列で処理したいタスクのメイン部分"""
# for文でまわすしかなさそうな処理の例
result[i] = dataset.ravel()[::i + 1].mean()
def task_parallel(i):
"""並列で処理したいタスク"""
dataset = shared_memory_to_ndarray(MySharedMemory.dataset)
result = shared_memory_to_ndarray(MySharedMemory.result)
task(dataset, result, i)
def _main():
# 共有したいNumPy配列を用意する
dataset = np.random.random([100, 1000_000]).astype(np.float16)
result = np.zeros([len(dataset)]).astype(np.float16)
# NumPy配列を共有メモリにコピーする
MySharedMemory.dataset = ndarray_to_shared_memory(dataset)
MySharedMemory.result = ndarray_to_shared_memory(result)
# プロセスプールを用意する
executor = concurrent.futures.ProcessPoolExecutor(
# ワーカープロセスの数 (== 使用するCPUの数).
# Noneを指定するとCPUのコア数最大まで使う.
max_workers=None,
# 上で準備した共有メモリをワーカープロセスたちと共有する.
initializer=lambda: globals().update(dict(
MySharedMemory=MySharedMemory,
)),
)
with executor:
# ワーカープロセスたちにタスクを投げる
futures = []
for i in range(len(dataset)):
executor.submit(task_parallel, i)
# タスクが終わるのを待つ
for future in concurrent.futures.as_completed(futures):
# 実行中に例外が発生した場合はここで except できる.
future.result()
# 共有メモリから結果を読み出す
result = shared_memory_to_ndarray(MySharedMemory.result)
# 後処理
result_ = np.ndarray(result.shape)
for i in range(len(dataset)):
task(dataset, result_, i)
assert np.allclose(result, result_)
print('ok!')
if __name__ == '__main__':
_main()
解説
NumPy + ProcessPoolExecutor だけなら簡単だ
実際に動くサンプルコード
import concurrent.futures
import numpy as np
def task(data, i):
return data[i].sum()
def _main():
# データを用意して...
data = np.random.random([100, 100])
# ProcessPoolExecutor を作って...
with concurrent.futures.ProcessPoolExecutor() as executor:
# タスクを投げて...
futures = [
executor.submit(task, data, i)
for i in range(len(data))
]
# 結果を読み込む
result = np.array([future.result() for future in futures])
assert np.allclose(result, data.sum(axis=1))
if __name__ == '__main__':
_main()
NumPy + 共有メモリ も調べたらわかった
実際に動くサンプルコード
import multiprocessing as mp
import numpy as np
def ndarray_to_shared_memory(data):
"""NumPy配列を共有メモリに載せる.
Paramerters
-----------
data: numpy.ndarray
Returns
-------
(multiprocessing.RawValue, numpy.dtype)
共有メモリ上のデータを参照するときに使う情報.
"""
data = np.asarray(data)
# 共有メモリに載せるデータの型を計算する.
value_type = np.ctypeslib.as_ctypes_type(np.uint8)
if data.ndim >= 1:
value_type *= data.itemsize * data.shape[-1]
for dim in data.shape[:-1][::-1]:
value_type *= dim
# 共有メモリを確保する.
value = mp.RawValue(value_type)
# 共有メモリにデータをコピーする.
try:
np.ctypeslib.as_array(value)[:] = data.view(np.uint8)
except TypeError:
raise TypeError(f'unsupported dtype: {data.dtype!r}')
return value, data.dtype
def shared_memory_to_ndarray(data):
"""共有メモリ上のデータをNumPy配列として参照する.
排他制御はされないので必要に応じてmultiprocessing.Lockなどを使う.
Paramerters
-----------
data: (multiprocessing.RawValue, numpy.dtype)
共有メモリ上のデータを参照するときに使う情報.
ndarray_to_shared_memory()の戻り値をそのまま渡せばOK.
Returns
-------
numpy.ndarray
"""
value, dtype = data
return np.ctypeslib.as_array(value).view(dtype)
def task(data, result, i):
data = shared_memory_to_ndarray(data)
result = shared_memory_to_ndarray(result)
result[i] = data[i].sum()
def _main():
# データを用意して...
data = np.random.random([100, 100])
result = np.random.random([100])
# 共有メモリに載せて...
data_shared = ndarray_to_shared_memory(data)
result_shared = ndarray_to_shared_memory(result)
# Process を作って...
processes = [
mp.Process(target=task, args=[data_shared, result_shared, i])
for i in range(len(data))
]
for p in processes:
p.start()
for p in processes:
p.join()
# 結果を読み込む
result = shared_memory_to_ndarray(result_shared)
assert np.allclose(result, data.sum(axis=1))
NumPy + ProcessPoolExecutor + 共有メモリ は...
上の2つを組み合わせるだけでは動きませんでした。
細かい理由は分かりませんが、multiprocessing.RawValue
を ProcessPoolExecutor.submit()
に渡すと下のようなエラーで拒否されます。
RuntimeError: c_ubyte_Array_800_Array_100 objects should only be shared between processes through inheritance
動かないサンプルコード
import multiprocessing as mp
import numpy as np
def ndarray_to_shared_memory(data):
"""NumPy配列を共有メモリに載せる.
Paramerters
-----------
data: numpy.ndarray
Returns
-------
(multiprocessing.RawValue, numpy.dtype)
共有メモリ上のデータを参照するときに使う情報.
"""
data = np.asarray(data)
# 共有メモリに載せるデータの型を計算する.
value_type = np.ctypeslib.as_ctypes_type(np.uint8)
if data.ndim >= 1:
value_type *= data.itemsize * data.shape[-1]
for dim in data.shape[:-1][::-1]:
value_type *= dim
# 共有メモリを確保する.
value = mp.RawValue(value_type)
# 共有メモリにデータをコピーする.
try:
np.ctypeslib.as_array(value)[:] = data.view(np.uint8)
except TypeError:
raise TypeError(f'unsupported dtype: {data.dtype!r}')
return value, data.dtype
def shared_memory_to_ndarray(data):
"""共有メモリ上のデータをNumPy配列として参照する.
排他制御はされないので必要に応じてmultiprocessing.Lockなどを使う.
Paramerters
-----------
data: (multiprocessing.RawValue, numpy.dtype)
共有メモリ上のデータを参照するときに使う情報.
ndarray_to_shared_memory()の戻り値をそのまま渡せばOK.
Returns
-------
numpy.ndarray
"""
value, dtype = data
return np.ctypeslib.as_array(value).view(dtype)
def task(data, result, i):
data = shared_memory_to_ndarray(data)
result = shared_memory_to_ndarray(result)
result[i] = data[i].sum()
def _main():
# データを用意して...
data = np.random.random([100, 100])
result = np.random.random([100])
# 共有メモリに載せて...
data_shared = ndarray_to_shared_memory(data)
result_shared = ndarray_to_shared_memory(result)
# Process を作って...
processes = [
mp.Process(target=task, args=[data_shared, result_shared, i])
for i in range(len(data))
]
for p in processes:
p.start()
for p in processes:
p.join()
# 結果を読み込む
result = shared_memory_to_ndarray(result_shared)
assert np.allclose(result, data.sum(axis=1))
concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
File "/opt/conda/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/opt/conda/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "/opt/conda/lib/python3.10/multiprocessing/sharedctypes.py", line 129, in reduce_ctype
assert_spawning(obj)
File "/opt/conda/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
raise RuntimeError(
RuntimeError: c_ubyte_Array_800_Array_100 objects should only be shared between processes through inheritance
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/dst/numpy_parallel.py", line 90, in <module>
_main()
File "/dst/numpy_parallel.py", line 81, in _main
future.result()
File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 458, in result
return self.__get_result()
File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/opt/conda/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
obj = _ForkingPickler.dumps(obj)
File "/opt/conda/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "/opt/conda/lib/python3.10/multiprocessing/sharedctypes.py", line 129, in reduce_ctype
assert_spawning(obj)
File "/opt/conda/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
raise RuntimeError(
RuntimeError: c_ubyte_Array_800_Array_100 objects should only be shared between processes through inheritance
しかし、Stack Overflowに関連していそうな質問を発見しました。
どうやら、ワーカープロセスを初期化するときにオブジェクトを渡すなら、このエラーに遭わずにすみそうです。
という感じで行き着いたのが冒頭のサンプルコードです。
StackOverflowの回答では裸のグローバル変数を使っているのが気になったので、一応共有メモリの情報はクラスの中に収めています。
Discussion