🦅

【python】雑に始める並列処理(multiprocessing)

に公開

雑なメモ書きです(後でもう少しまとめます)

1. はじめに

  • multiprocessingは、pythonにて並列処理を実装するための標準ライブラリ
  • threadingやasyncと比較して、高速化の恩恵が大きい
  • 実行する関数自体が独立であり、内部で並列処理が行われていないかは、注意しておく必要がある

multiprocessing --- プロセスベースの並列処理
https://docs.python.org/ja/3/library/multiprocessing.html

multiprocess, threading, asyncの違い・使い分けについては、下記の記事が分かりやすいです。
https://qiita.com/Toyo_m/items/992b0dcf765ad3082d0b

1.1 PCのスペックなど

  • OS : Windows11 Home

2. 実際に動かしてみる

  • Python 3.12.5

2.1 基本のパターン

以下のパターン、超基本

  • 引数:1変数
  • 返値:1変数

プログラム例

from multiprocessing import Pool, cpu_count


def function_A(x):
    """
    1つの引数 で 1つの変数 を返す場合
    """
    print("function_A : ", x)
    return x**2


if __name__ == "__main__":
    """
    標準ライブラリmultiprocessingを使用して、並列化を試すサンプル
    """
    n = cpu_count()  # システムの CPU 数を取得
    print("このPCで並列処理できる最大の個数 : ", n)

    print("=== function Aを並列処理する ===")
    args_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]  # 引数のlistは事前に準備しておく
    with Pool(processes=4) as p:
        results = p.map(function_A, args_list)
    print("results : ", type(results), results)  # list[int等]で返ってくる

出力

2.2 返値が複数ある関数の場合

以下のパターン、これもよくある

  • 引数:1変数
  • 返値:複数の変数

プログラム例

from multiprocessing import Pool, cpu_count


def function_B(x):
    """
    1つの引数 で 複数の変数 を返す場合
    """
    print("function_B : ", x)
    return x**2, x**3


if __name__ == "__main__":
    """
    標準ライブラリmultiprocessingを使用して、並列化を試すサンプル
    """
    n = cpu_count()  # システムの CPU 数を取得
    print("このPCで並列処理できる最大の個数 : ", n)

    print("=== function Bを並列処理する ===")
    args_list = [1, 2, 3, 4, 5, 6, 7, 8, 9]  # 引数のlistは事前に準備しておく
    with Pool(n) as p:
        results = p.map(function_B, args_list)
    print("results : ", type(results), results)  # list[tuple]で返ってくる

出力

p.mapの返り値であるlistの中身(要素)が、tupleになります

2.2 複数の引数を渡す場合(tuple)

この場合、p.statmapを使ってもよいが、statmapの存在を知らない人だと挙動が理解しずらい
チームのレベルに合わせて、可読性重視で愚直に実装したほうが良い場合もあるかと思います

プログラム例

from multiprocessing import Pool, cpu_count


def function_C(tuple_):
    """
    複数の引数を与えたい場合(tupleで1変数にする)
    """
    print("function_C : ", tuple_)
    x, y, z = tuple_  # tupeの要素を分解する
    return x+y+z


if __name__ == "__main__":
    """
    標準ライブラリmultiprocessingを使用して、並列化を試すサンプル
    """
    n = cpu_count()  # システムの CPU 数を取得
    print("このPCで並列処理できる最大の個数 : ", n)

    print("=== function Cを並列処理する ===")
    args_tuple_list = [ (x, y, z) for x, y, z in zip(range(10), range(1, 11), range(2, 21, 2))]  # listの要素をtupleにする
    with Pool(n) as p:
        results = p.map(function_C, args_tuple_list)  # tuple自体は1変数扱いなので渡せる
    print("results : ", type(results), results)

出力

2.4 複数の引数を渡す場合(dict)

プログラム例

from multiprocessing import Pool, cpu_count


def function_D(dict_):
    """
    複数の引数を与えたい場合(dictで1変数にする)
    """
    print("function_D : ", dict_)
    x = dict_["x"]
    y = dict_["y"]
    z = dict_["z"]
    return x*y*z


if __name__ == "__main__":
    """
    標準ライブラリmultiprocessingを使用して、並列化を試すサンプル
    """
    n = cpu_count()  # システムの CPU 数を取得
    print("このPCで並列処理できる最大の個数 : ", n)

    print("=== function Dを並列処理する ===")
    args_dict_list = [ {"x":x, "y":y, "z":z} for x, y, z in zip(range(10), range(1, 11), range(2, 21, 2))]  # listの要素をdictにする
    with Pool(n) as p:
        results = p.map(function_D, args_dict_list)  # dict自体は1変数扱いなので渡せる
    print("results : ", type(results), results)

2.5 キーワード引数のある関数

他のライブラリの関数等を使う場合、wrapper関数を作って、wrapper関数経由で呼び出すとよいです

プログラム例

from multiprocessing import Pool, cpu_count


def function_E(a, b=0, c=2):
    """
    キーワード引数が必要な場合(既存関数を使う)
    """
    print("function_E : ", a, b, c)
    return f"{a=}, {b=}, {c=}"


def wrapper_for_E(args):
    """
    function_Eに与える引数を分割するラッパー
    return function_E(a=args["a"], b=args["b"], c=args["c"])
    とキーワード引数を明記したほうが、可読性は高いかも?
    """
    return function_E(**args)  # **argsで、dictのkeyを引数の変数名、valueを値として関数を実行できる


if __name__ == "__main__":
    """
    標準ライブラリmultiprocessingを使用して、並列化を試すサンプル
    """
    n = cpu_count()  # システムの CPU 数を取得
    print("このPCで並列処理できる最大の個数 : ", n)

    print("=== function Eを並列処理する ===")
    keyargs_dict_list = [ {"a":a, "b":b, "c":c} for a, b, c in zip(range(10), range(1, 11), range(2, 21, 2))]  # listの要素をdictにする
    with Pool(n) as p:
        results = p.map(wrapper_for_E, keyargs_dict_list)  # wrapper_for_Eを呼び出す
    print("results : ", type(results), results)


    # おまけ(statmap)
    # print("=== (別実装)function Eを並列処理する ===")
    # with Pool(n) as p:
    #     results = p.starmap(function_E, args_tuple_list)  # starmapなら直接関数を実行できる
    # print("results : ", type(results), results)

出力

3. (検証)具体事例での速度比較

せっかくなので、並列処理の事例を紹介しておきます

3.1 概要

  • csvファイルのセンサデータ等から、特徴量を個別に計算する
  • 各処理は独立である(他の処理に影響を与えない)

3.2 プログラム

逐次処理 と 並列処理 をそれぞれ行い、処理時間を計測し表示するプログラムを作成
依存ライブラリは以下の通り

  • pandas : 2.3.0
  • numpy : 2.3.0
長いので折り畳み
from multiprocessing import Pool, cpu_count
from pathlib import Path
from time import perf_counter
import numpy as np
import pandas as pd


def make_csv():
    """
    適当なcsvファイルを作成する関数
    """
    output_folder = Path('./input/')
    output_folder.mkdir(exist_ok=True)  # 出力先フォルダを作成
    feature_name_list = ["A", "B", "C"]  # DataFrameの列名を定義
    size = (100000, len(feature_name_list))  # 生成する配列の大きさ
    for i in range(100):
        print(i)
        df_tmp = pd.DataFrame(np.random.random(size),
                              columns=feature_name_list)
        df_tmp.to_csv(output_folder/"{:04d}.csv".format(i))


def calc_features(csv_path):
    """
    引数のcsvファイルを読み込んで、特徴量を計算する関数
    """
    print(f"=== {csv_path} ===")
    input_data = pd.read_csv(csv_path, index_col=0)
    # 移動平均を計算
    features = input_data.rolling(7).mean()  # 直近7件で移動平均を計算
    # print(features)  # デバッグ用
    # features.to_csv(path.stem+"_features.csv")  # 保存


if __name__ == "__main__":
    """
    実際の使用例
    """
    # make_csv()  # 初回のみ実行 617MBくらい生成されるので注意!

    # 以下、本題
    input_folder_path = Path('./input/')
    args_list = [ p for p in input_folder_path.glob("*.csv")]

    # 1.逐次処理
    print("=== 1.逐次処理 ===")
    start = perf_counter()  # 開始時刻の取得
    for path in args_list:
        calc_features(path)  # 通常通り
    end = perf_counter()  # 開始時刻の取得
    single_elapsed_time = end - start  # 処理時間を産出


    # 2.並列処理
    print("\n=== 2.並列処理 ===")
    start = perf_counter()  # 開始時刻の取得
    with Pool(processes=cpu_count()) as p:
        p.map(calc_features, args_list)  # 並列処理
    end = perf_counter()  # 開始時刻の取得
    parallel_elapsed_time = end - start  # 処理時間を産出

    # 結果を表示
    print("1.逐次処理 : {}秒".format(round(single_elapsed_time, 4)))
    print("2.並列処理 : {}秒".format(round(parallel_elapsed_time, 4)))



3.3 計測結果

毎回、多少処理時間が変わりますが、およそ3倍くらい高速になりました

  1. 逐次処理 : 15.4149秒
  2. 並列処理 : 5.3584秒

逐次処理

並列処理

また、並列処理中は、CPUの使用率が100%に張り付きます
(これに快感&安心感を覚える人は、処理効率化の素質があると思います!!)

4. まとめ

  • 「並列処理」と聞くと難しそうだが、pythonの場合3行くらいの変更でかなり高速化が見込める
  • 機械学習の特徴量計算など、時間のかかる処理は並列化を前提に作成するとよい
  • 外部APIからのレスポンス待ちなどはasync、ある程度リアルタイム性が必要で別々のタスクを行いたい場合はthreadingと使い分けをしよう

Discussion