🐍

共有メモリにNumpy配列を載せてProcessPoolExecutorに渡す

2022/09/27に公開

やりたいこと

  • NumPy による数値計算を並列化したい。
  • 各ワーカープロセスへの入力データは共通しているので、共有メモリを使って全体のメモリ使用量を減らしたい。
  • 自分でプロセスプールを管理するのは面倒なので ProcessPoolExecutor を使いたい!!!

結論

ProcessPoolExecutor()initializer= 経由で、
ワーカープロセスに multiprocessing.RawValue を渡せばいい。

サンプルコード

import concurrent.futures
import multiprocessing as mp

import numpy as np


def ndarray_to_shared_memory(data):
    """NumPy配列を共有メモリに載せる.
    
    Paramerters
    -----------
    data: numpy.ndarray
    
    Returns
    -------
    (multiprocessing.RawValue, numpy.dtype)
        共有メモリ上のデータを参照するときに使う情報.
    """
    data = np.asarray(data)
    
    # 共有メモリに載せるデータの型を計算する.
    value_type = np.ctypeslib.as_ctypes_type(np.uint8)
    if data.ndim >= 1:
        value_type *= data.itemsize * data.shape[-1]
    for dim in data.shape[:-1][::-1]:
        value_type *= dim
    
    # 共有メモリを確保する.
    value = mp.RawValue(value_type)
    
    # 共有メモリにデータをコピーする.
    try:
        np.ctypeslib.as_array(value)[:] = data.view(np.uint8)
    except TypeError:
        raise TypeError(f'unsupported dtype: {data.dtype!r}')

    return value, data.dtype


def shared_memory_to_ndarray(data):
    """共有メモリ上のデータをNumPy配列として参照する.
    排他制御はされないので必要に応じてmultiprocessing.Lockなどを使う.
    
    Paramerters
    -----------
    data: (multiprocessing.RawValue, numpy.dtype)
        共有メモリ上のデータを参照するときに使う情報.
        ndarray_to_shared_memory()の戻り値をそのまま渡せばOK.
    
    Returns
    -------
    numpy.ndarray
    """
    value, dtype = data
    return np.ctypeslib.as_array(value).view(dtype)


class MySharedMemory:
    """ワーカープロセスと共有するデータ"""
    dataset = None
    result = None


def task(dataset, result, i):
    """並列で処理したいタスクのメイン部分"""
    # for文でまわすしかなさそうな処理の例
    result[i] = dataset.ravel()[::i + 1].mean()


def task_parallel(i):
    """並列で処理したいタスク"""
    dataset = shared_memory_to_ndarray(MySharedMemory.dataset)
    result = shared_memory_to_ndarray(MySharedMemory.result)
    task(dataset, result, i)


def _main():
    # 共有したいNumPy配列を用意する
    dataset = np.random.random([100, 1000_000]).astype(np.float16)
    result = np.zeros([len(dataset)]).astype(np.float16)

    # NumPy配列を共有メモリにコピーする
    MySharedMemory.dataset = ndarray_to_shared_memory(dataset)
    MySharedMemory.result = ndarray_to_shared_memory(result)

    # プロセスプールを用意する
    executor = concurrent.futures.ProcessPoolExecutor(
        # ワーカープロセスの数 (== 使用するCPUの数).
        # Noneを指定するとCPUのコア数最大まで使う.
        max_workers=None,
        # 上で準備した共有メモリをワーカープロセスたちと共有する.
        initializer=lambda: globals().update(dict(
            MySharedMemory=MySharedMemory,
        )),
    )

    with executor:
        # ワーカープロセスたちにタスクを投げる
        futures = []
        for i in range(len(dataset)):
            executor.submit(task_parallel, i)
        
        # タスクが終わるのを待つ
        for future in concurrent.futures.as_completed(futures):
            # 実行中に例外が発生した場合はここで except できる.
            future.result()

    # 共有メモリから結果を読み出す
    result = shared_memory_to_ndarray(MySharedMemory.result)

    # 後処理
    result_ = np.ndarray(result.shape)
    for i in range(len(dataset)):
        task(dataset, result_, i)
    assert np.allclose(result, result_)
    print('ok!')


if __name__ == '__main__':
    _main()

解説

NumPy + ProcessPoolExecutor だけなら簡単だ

実際に動くサンプルコード
import concurrent.futures
import numpy as np


def task(data, i):
    return data[i].sum()


def _main():
    # データを用意して...
    data = np.random.random([100, 100])

    # ProcessPoolExecutor を作って...
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # タスクを投げて...
        futures = [
            executor.submit(task, data, i)
            for i in range(len(data))
        ]
        # 結果を読み込む
        result = np.array([future.result() for future in futures])

    assert np.allclose(result, data.sum(axis=1))


if __name__ == '__main__':
    _main()

NumPy + 共有メモリ も調べたらわかった

実際に動くサンプルコード
import multiprocessing as mp
import numpy as np


def ndarray_to_shared_memory(data):
    """NumPy配列を共有メモリに載せる.
    
    Paramerters
    -----------
    data: numpy.ndarray
    
    Returns
    -------
    (multiprocessing.RawValue, numpy.dtype)
        共有メモリ上のデータを参照するときに使う情報.
    """
    data = np.asarray(data)
    
    # 共有メモリに載せるデータの型を計算する.
    value_type = np.ctypeslib.as_ctypes_type(np.uint8)
    if data.ndim >= 1:
        value_type *= data.itemsize * data.shape[-1]
    for dim in data.shape[:-1][::-1]:
        value_type *= dim
    
    # 共有メモリを確保する.
    value = mp.RawValue(value_type)
    
    # 共有メモリにデータをコピーする.
    try:
        np.ctypeslib.as_array(value)[:] = data.view(np.uint8)
    except TypeError:
        raise TypeError(f'unsupported dtype: {data.dtype!r}')

    return value, data.dtype


def shared_memory_to_ndarray(data):
    """共有メモリ上のデータをNumPy配列として参照する.
    排他制御はされないので必要に応じてmultiprocessing.Lockなどを使う.
    
    Paramerters
    -----------
    data: (multiprocessing.RawValue, numpy.dtype)
        共有メモリ上のデータを参照するときに使う情報.
        ndarray_to_shared_memory()の戻り値をそのまま渡せばOK.
    
    Returns
    -------
    numpy.ndarray
    """
    value, dtype = data
    return np.ctypeslib.as_array(value).view(dtype)


def task(data, result, i):
    data = shared_memory_to_ndarray(data)
    result = shared_memory_to_ndarray(result)
    result[i] = data[i].sum()


def _main():
    # データを用意して...
    data = np.random.random([100, 100])
    result = np.random.random([100])

    # 共有メモリに載せて...
    data_shared = ndarray_to_shared_memory(data)
    result_shared = ndarray_to_shared_memory(result)

    # Process を作って...
    processes = [
        mp.Process(target=task, args=[data_shared, result_shared, i])
        for i in range(len(data))
    ]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    # 結果を読み込む
    result = shared_memory_to_ndarray(result_shared)

    assert np.allclose(result, data.sum(axis=1))

NumPy + ProcessPoolExecutor + 共有メモリ は...

上の2つを組み合わせるだけでは動きませんでした。
細かい理由は分かりませんが、multiprocessing.RawValueProcessPoolExecutor.submit() に渡すと下のようなエラーで拒否されます。

RuntimeError: c_ubyte_Array_800_Array_100 objects should only be shared between processes through inheritance
動かないサンプルコード
import multiprocessing as mp
import numpy as np


def ndarray_to_shared_memory(data):
    """NumPy配列を共有メモリに載せる.
    
    Paramerters
    -----------
    data: numpy.ndarray
    
    Returns
    -------
    (multiprocessing.RawValue, numpy.dtype)
        共有メモリ上のデータを参照するときに使う情報.
    """
    data = np.asarray(data)
    
    # 共有メモリに載せるデータの型を計算する.
    value_type = np.ctypeslib.as_ctypes_type(np.uint8)
    if data.ndim >= 1:
        value_type *= data.itemsize * data.shape[-1]
    for dim in data.shape[:-1][::-1]:
        value_type *= dim
    
    # 共有メモリを確保する.
    value = mp.RawValue(value_type)
    
    # 共有メモリにデータをコピーする.
    try:
        np.ctypeslib.as_array(value)[:] = data.view(np.uint8)
    except TypeError:
        raise TypeError(f'unsupported dtype: {data.dtype!r}')

    return value, data.dtype


def shared_memory_to_ndarray(data):
    """共有メモリ上のデータをNumPy配列として参照する.
    排他制御はされないので必要に応じてmultiprocessing.Lockなどを使う.
    
    Paramerters
    -----------
    data: (multiprocessing.RawValue, numpy.dtype)
        共有メモリ上のデータを参照するときに使う情報.
        ndarray_to_shared_memory()の戻り値をそのまま渡せばOK.
    
    Returns
    -------
    numpy.ndarray
    """
    value, dtype = data
    return np.ctypeslib.as_array(value).view(dtype)


def task(data, result, i):
    data = shared_memory_to_ndarray(data)
    result = shared_memory_to_ndarray(result)
    result[i] = data[i].sum()


def _main():
    # データを用意して...
    data = np.random.random([100, 100])
    result = np.random.random([100])

    # 共有メモリに載せて...
    data_shared = ndarray_to_shared_memory(data)
    result_shared = ndarray_to_shared_memory(result)

    # Process を作って...
    processes = [
        mp.Process(target=task, args=[data_shared, result_shared, i])
        for i in range(len(data))
    ]
    for p in processes:
        p.start()
    for p in processes:
        p.join()

    # 結果を読み込む
    result = shared_memory_to_ndarray(result_shared)

    assert np.allclose(result, data.sum(axis=1))
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/conda/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/opt/conda/lib/python3.10/multiprocessing/sharedctypes.py", line 129, in reduce_ctype
    assert_spawning(obj)
  File "/opt/conda/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
    raise RuntimeError(
RuntimeError: c_ubyte_Array_800_Array_100 objects should only be shared between processes through inheritance
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/dst/numpy_parallel.py", line 90, in <module>
    _main()
  File "/dst/numpy_parallel.py", line 81, in _main
    future.result()
  File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.10/multiprocessing/queues.py", line 244, in _feed
    obj = _ForkingPickler.dumps(obj)
  File "/opt/conda/lib/python3.10/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "/opt/conda/lib/python3.10/multiprocessing/sharedctypes.py", line 129, in reduce_ctype
    assert_spawning(obj)
  File "/opt/conda/lib/python3.10/multiprocessing/context.py", line 373, in assert_spawning
    raise RuntimeError(
RuntimeError: c_ubyte_Array_800_Array_100 objects should only be shared between processes through inheritance

しかし、Stack Overflowに関連していそうな質問を発見しました。
どうやら、ワーカープロセスを初期化するときにオブジェクトを渡すなら、このエラーに遭わずにすみそうです。

という感じで行き着いたのが冒頭のサンプルコードです。
StackOverflowの回答では裸のグローバル変数を使っているのが気になったので、一応共有メモリの情報はクラスの中に収めています。

Discussion