Closed6

Pythonでマルチプロセスなパイプラインを作成するツールキット「MPipe」を試す

kun432kun432

ドキュメント

https://vmlaker.github.io/mpipe/index.html

MPipe: Python向けマルチプロセスパイプラインツールキット

MPipeは、標準のmultiprocessingパッケージの上に薄い層を追加した、非常に小さなPythonモジュールです。これにより、並列かつ多段階のパイプラインアルゴリズムを驚くほど簡単に記述することができます。

GitHubレポジトリ

https://github.com/vmlaker/mpipe

kun432kun432

インストール

https://vmlaker.github.io/mpipe/download.html

Colaboratoryで。

パッケージインストール。確認用にloguruも。

!pip install mpipe
!pip freeze | grep -i mpipe
出力
mpipe==1.0.8

ドキュメントのTOPにあるコードを実行してみる。

from mpipe import OrderedStage, Pipeline
from loguru import logger


def increment(value):
    """入力値に1インクリメントする関数"""
    logger.info("入力: {}", value)
    ret = value + 1
    logger.info("出力: {} -> {}", value, ret)
    return ret


def double(value):
    """入力値を2倍にする関数"""
    logger.info("入力: {}", value)
    ret = value * 2
    logger.info("出力: {} -> {}", value, ret)
    return ret


# パイプラインのステージを定義
stage1 = OrderedStage(increment, 3)  # ステージ1ではincremntを呼ぶ
stage2 = OrderedStage(double, 3)     # ステージ2ではdoubleを呼ぶ

# パイプラインで、各ステージ間のつながりを定義する。
pipe = Pipeline(stage1.link(stage2))

# 実行
for number in range(10):
    pipe.put(number)

# 終了
pipe.put(None)

# 結果
for result in pipe.results():
    print(result)
出力
2025-02-18 18:01:08.623 | INFO     | __main__:increment:7 - 入力: 0
2025-02-18 18:01:08.626 | INFO     | __main__:increment:7 - 入力: 2
2025-02-18 18:01:08.624 | INFO     | __main__:increment:7 - 入力: 1
2025-02-18 18:01:08.636 | INFO     | __main__:increment:9 - 出力: 2 -> 3
2025-02-18 18:01:08.633 | INFO     | __main__:increment:9 - 出力: 0 -> 1
2025-02-18 18:01:08.639 | INFO     | __main__:increment:9 - 出力: 1 -> 2
2025-02-18 18:01:08.648 | INFO     | __main__:double:14 - 入力: 1
2025-02-18 18:01:08.650 | INFO     | __main__:increment:7 - 入力: 5
2025-02-18 18:01:08.651 | INFO     | __main__:double:14 - 入力: 2
2025-02-18 18:01:08.654 | INFO     | __main__:double:14 - 入力: 3
2025-02-18 18:01:08.660 | INFO     | __main__:double:16 - 出力: 1 -> 2
2025-02-18 18:01:08.658 | INFO     | __main__:increment:9 - 出力: 5 -> 6
2025-02-18 18:01:08.647 | INFO     | __main__:increment:7 - 入力: 3
2025-02-18 18:01:08.650 | INFO     | __main__:increment:7 - 入力: 4
2025-02-18 18:01:08.670 | INFO     | __main__:double:16 - 出力: 2 -> 4
2025-02-18 18:01:08.672 | INFO     | __main__:double:16 - 出力: 3 -> 6
2025-02-18 18:01:08.677 | INFO     | __main__:increment:9 - 出力: 3 -> 4
2025-02-18 18:01:08.678 | INFO     | __main__:increment:9 - 出力: 4 -> 5
2025-02-18 18:01:08.685 | INFO     | __main__:increment:7 - 入力: 6
2025-02-18 18:01:08.686 | INFO     | __main__:double:14 - 入力: 4
2025-02-18 18:01:08.691 | INFO     | __main__:increment:7 - 入力: 8
2025-02-18 18:01:08.690 | INFO     | __main__:increment:7 - 入力: 7
2025-02-18 18:01:08.692 | INFO     | __main__:double:14 - 入力: 6
2025-02-18 18:01:08.694 | INFO     | __main__:increment:9 - 出力: 7 -> 8
2025-02-18 18:01:08.690 | INFO     | __main__:double:14 - 入力: 5
2025-02-18 18:01:08.701 | INFO     | __main__:increment:9 - 出力: 8 -> 9
2025-02-18 18:01:08.700 | INFO     | __main__:increment:9 - 出力: 6 -> 7
2025-02-18 18:01:08.699 | INFO     | __main__:double:16 - 出力: 4 -> 8
2025-02-18 18:01:08.708 | INFO     | __main__:double:16 - 出力: 6 -> 12
2025-02-18 18:01:08.711 | INFO     | __main__:double:16 - 出力: 5 -> 10
2025-02-18 18:01:08.713 | INFO     | __main__:double:14 - 入力: 7
2025-02-18 18:01:08.724 | INFO     | __main__:double:14 - 入力: 9
2025-02-18 18:01:08.723 | INFO     | __main__:double:14 - 入力: 8
2025-02-18 18:01:08.732 | INFO     | __main__:double:16 - 出力: 8 -> 16
2025-02-18 18:01:08.731 | INFO     | __main__:double:16 - 出力: 9 -> 18
2025-02-18 18:01:08.707 | INFO     | __main__:increment:7 - 入力: 9
2025-02-18 18:01:08.746 | INFO     | __main__:increment:9 - 出力: 9 -> 10
2025-02-18 18:01:08.746 | INFO     | __main__:double:16 - 出力: 7 -> 14
2025-02-18 18:01:08.749 | INFO     | __main__:double:14 - 入力: 10
2025-02-18 18:01:08.761 | INFO     | __main__:double:16 - 出力: 10 -> 20
2
4
6
8
10
12
14
16
18
20
kun432kun432

Cookbook

使い方はCookbookを見るのが良さそう。

https://vmlaker.github.io/mpipe/cookbook.html

上から順に見ていく。

ワーカーの定義

各ステージで実行するワーカーを定義する。一番シンプルな定義は単一の引数(タスク)を受け取り、その処理結果を返すPython関数で定義する。上のサンプルコードだとこれ。

def increment(value):
    """入力値に1インクリメントする関数"""
    ret = value + 1
    return ret

結果を返さないと、そのワーカーのステージは何も出力しない行き止まりのステージとなってしまうため、処理結果は必ず返す必要がある。

別の方法として、OrderedWorkerUnorderedWorkerというクラスのサブクラスを定義するやり方もある。この場合は、実行したい処理をdoTask()メソッドとして実装し、結果をこのメソッドから返すようにする。

class IncrementWorker(mpipe.OrderedWorker):
    def doTask(value):
        ret = value + 1
        return ret

また結果を返したあとも処理を行いたいという場合はputResult()を使うこともできる。

class IncrementWorker(mpipe.OrderedWorker):
    def doTask(value):
        ret = value + 1
        self.putResult(result)
        # 続けて実行したい処理を書く・・・

OrderedWorkerUnorderedWorkerの違いは、その名にある通り、タスクの実行順序を維持するかどうか、になるので、処理したいタスクの内容に応じて選択することになる。

https://vmlaker.github.io/mpipe/api.html#mpipe.OrderedWorker

class mpipe.OrderedWorker

OrderedWorkerオブジェクトは、出力結果の順序が常に該当する入力タスクの順序と一致するステージで動作します。
ワーカーは、直前のワーカーと次のワーカーという2つの最も近い隣接ワーカーにリンクされ、ステージ内のすべてのワーカーがこのように環状に接続されます。入力タスクは、この順序で取得されます。ワーカーは、結果を公開する前に、まず直前の隣接ワーカーが同じことを行うのを待ちます。

https://vmlaker.github.io/mpipe/api.html#mpipe.UnorderedWorker

class mpipe.UnorderedWorker

UnorderedWorkerオブジェクトは、そのステージの他のワーカーとは独立して動作し、最初の利用可能なタスクを取得し、完了次第、その結果を公開します(近隣のワーカーと調整することなく)。したがって、出力結果の順序は、対応する入力タスクの順序と一致しない場合があります。

ステージオブジェクトの作成

次にステージオブジェクトの作成して、ワーカーと紐づける。ワーカーをPython関数で定義した場合は、
OrderedStageUnorderedStageを使う。2番目の引数はそのステージ内で同時に実行されるワーカーの数になる。

stage = mpipe.OrderedStage(increment, 3)

OrderedStageUnorderedStageの違いはワーカーと同じで、実行順序を制御するかどうか。

ただし、ワーカーでも実行順序の違いによって OrderedWorkerUnorderedWorkerが指定できていたので、ステージでも設定できるとなると矛盾しないのか?というところで、 OrderedWorkerUnorderedWorkerでワーカーを定義した場合は、mpipe.Stage()を使う

class IncrementWorker(mpipe.OrderedWorker):
    def doTask(value):
        ret = value + 1
        return ret

stage = mpipe.Stage(IncrementWorker, 3)

内部的には、ステージでOrderedStageを設定すると内部でOrderedWorkerオブジェクトを生成、UnorderedStageの場合はUnorderedWorkerオブジェクトを作成しているので、要は実行順序の制御をワーカー側でやるかステージ側でやるか、ということだと思う。

ステージ間の紐づけ

ステージが複数ある場合のステージ間の遷移はlink()メソッドを使う。例えば、

stage1.link(stage2)
stage2.link(stage3)

と指定すると、ステージ1→ステージ2→ステージ3の順で実行され、それぞれの結果が受け渡されていく。なお、上記は以下のようにも書ける。

stage1.link(stage2.link(stage3))

1つのステージの結果を複数の下流ステージに受け渡したい場合、例えば、

stage1.link(stage2)
stage1.link(stage3)
stage1.link(stage4)

と書くと、ステージ1の結果が、ステージ2・3・4にそれぞれ受け渡されて、分岐して並列処理することができる。

ドキュメントに記載はないのだけど、これを結合することはできるのだろうか???

パイプラインオブジェクトの作成

遷移する複数ステージの、起点となるステージをPipeline()に渡すことで、パイプラインオブジェクトが作成され、パイプラインを実行することができるようになる。

例えば、以下。

stage1.link(stage2)

pipe = Pipeline(stage1)

以下のように書いても同じだった。

stages = stage1.link(stage2)

pipe = Pipeline(stages)
pipe = Pipeline(stage1.link(stage2))

パイプラインの操作

作成したパイプラインにデータを入力するにはput()を使う。

pipe.put(1)

結果はget()を使って1つ取り出すか、results()を使って全部まとめて取り出すか。

result = pipe.get()
for result in pipe.results():
   print(result)

パイプラインにNoneを渡すと入力の終了シグナルとなる。

pipe.put(None)

Noneはパイプラインの各ステージに伝播され、各ワーカーは現在処理中のタスクを完了させた上で停止するよう指示を受ける。なお、None投入後は既存のタスクは最後まで処理されるものの、その後に新たに投入されたタスクは無視され、最終的にパイプラインが空になるまで結果を取得できる仕組みとなっている。

kun432kun432

まとめ

ここしばらくPythonで並列・並行処理を使いやすくするようなライブラリをいくつか見てたけど、実行順序を制御できるという点で、個人的にとても使い勝手が良さそうに感じた。

このスクラップは2025/02/19にクローズされました