Pythonでマルチプロセスなパイプラインを作成するツールキット「MPipe」を試す
ドキュメント
MPipe: Python向けマルチプロセスパイプラインツールキット
MPipeは、標準のmultiprocessingパッケージの上に薄い層を追加した、非常に小さなPythonモジュールです。これにより、並列かつ多段階のパイプラインアルゴリズムを驚くほど簡単に記述することができます。
GitHubレポジトリ
インストール
Colaboratoryで。
パッケージインストール。確認用にloguruも。
!pip install mpipe
!pip freeze | grep -i mpipe
mpipe==1.0.8
ドキュメントのTOPにあるコードを実行してみる。
from mpipe import OrderedStage, Pipeline
from loguru import logger
def increment(value):
"""入力値に1インクリメントする関数"""
logger.info("入力: {}", value)
ret = value + 1
logger.info("出力: {} -> {}", value, ret)
return ret
def double(value):
"""入力値を2倍にする関数"""
logger.info("入力: {}", value)
ret = value * 2
logger.info("出力: {} -> {}", value, ret)
return ret
# パイプラインのステージを定義
stage1 = OrderedStage(increment, 3) # ステージ1ではincremntを呼ぶ
stage2 = OrderedStage(double, 3) # ステージ2ではdoubleを呼ぶ
# パイプラインで、各ステージ間のつながりを定義する。
pipe = Pipeline(stage1.link(stage2))
# 実行
for number in range(10):
pipe.put(number)
# 終了
pipe.put(None)
# 結果
for result in pipe.results():
print(result)
2025-02-18 18:01:08.623 | INFO | __main__:increment:7 - 入力: 0
2025-02-18 18:01:08.626 | INFO | __main__:increment:7 - 入力: 2
2025-02-18 18:01:08.624 | INFO | __main__:increment:7 - 入力: 1
2025-02-18 18:01:08.636 | INFO | __main__:increment:9 - 出力: 2 -> 3
2025-02-18 18:01:08.633 | INFO | __main__:increment:9 - 出力: 0 -> 1
2025-02-18 18:01:08.639 | INFO | __main__:increment:9 - 出力: 1 -> 2
2025-02-18 18:01:08.648 | INFO | __main__:double:14 - 入力: 1
2025-02-18 18:01:08.650 | INFO | __main__:increment:7 - 入力: 5
2025-02-18 18:01:08.651 | INFO | __main__:double:14 - 入力: 2
2025-02-18 18:01:08.654 | INFO | __main__:double:14 - 入力: 3
2025-02-18 18:01:08.660 | INFO | __main__:double:16 - 出力: 1 -> 2
2025-02-18 18:01:08.658 | INFO | __main__:increment:9 - 出力: 5 -> 6
2025-02-18 18:01:08.647 | INFO | __main__:increment:7 - 入力: 3
2025-02-18 18:01:08.650 | INFO | __main__:increment:7 - 入力: 4
2025-02-18 18:01:08.670 | INFO | __main__:double:16 - 出力: 2 -> 4
2025-02-18 18:01:08.672 | INFO | __main__:double:16 - 出力: 3 -> 6
2025-02-18 18:01:08.677 | INFO | __main__:increment:9 - 出力: 3 -> 4
2025-02-18 18:01:08.678 | INFO | __main__:increment:9 - 出力: 4 -> 5
2025-02-18 18:01:08.685 | INFO | __main__:increment:7 - 入力: 6
2025-02-18 18:01:08.686 | INFO | __main__:double:14 - 入力: 4
2025-02-18 18:01:08.691 | INFO | __main__:increment:7 - 入力: 8
2025-02-18 18:01:08.690 | INFO | __main__:increment:7 - 入力: 7
2025-02-18 18:01:08.692 | INFO | __main__:double:14 - 入力: 6
2025-02-18 18:01:08.694 | INFO | __main__:increment:9 - 出力: 7 -> 8
2025-02-18 18:01:08.690 | INFO | __main__:double:14 - 入力: 5
2025-02-18 18:01:08.701 | INFO | __main__:increment:9 - 出力: 8 -> 9
2025-02-18 18:01:08.700 | INFO | __main__:increment:9 - 出力: 6 -> 7
2025-02-18 18:01:08.699 | INFO | __main__:double:16 - 出力: 4 -> 8
2025-02-18 18:01:08.708 | INFO | __main__:double:16 - 出力: 6 -> 12
2025-02-18 18:01:08.711 | INFO | __main__:double:16 - 出力: 5 -> 10
2025-02-18 18:01:08.713 | INFO | __main__:double:14 - 入力: 7
2025-02-18 18:01:08.724 | INFO | __main__:double:14 - 入力: 9
2025-02-18 18:01:08.723 | INFO | __main__:double:14 - 入力: 8
2025-02-18 18:01:08.732 | INFO | __main__:double:16 - 出力: 8 -> 16
2025-02-18 18:01:08.731 | INFO | __main__:double:16 - 出力: 9 -> 18
2025-02-18 18:01:08.707 | INFO | __main__:increment:7 - 入力: 9
2025-02-18 18:01:08.746 | INFO | __main__:increment:9 - 出力: 9 -> 10
2025-02-18 18:01:08.746 | INFO | __main__:double:16 - 出力: 7 -> 14
2025-02-18 18:01:08.749 | INFO | __main__:double:14 - 入力: 10
2025-02-18 18:01:08.761 | INFO | __main__:double:16 - 出力: 10 -> 20
2
4
6
8
10
12
14
16
18
20
Cookbook
使い方はCookbookを見るのが良さそう。
上から順に見ていく。
ワーカーの定義
各ステージで実行するワーカーを定義する。一番シンプルな定義は単一の引数(タスク)を受け取り、その処理結果を返すPython関数で定義する。上のサンプルコードだとこれ。
def increment(value):
"""入力値に1インクリメントする関数"""
ret = value + 1
return ret
結果を返さないと、そのワーカーのステージは何も出力しない行き止まりのステージとなってしまうため、処理結果は必ず返す必要がある。
別の方法として、OrderedWorker
と UnorderedWorker
というクラスのサブクラスを定義するやり方もある。この場合は、実行したい処理をdoTask()
メソッドとして実装し、結果をこのメソッドから返すようにする。
class IncrementWorker(mpipe.OrderedWorker):
def doTask(value):
ret = value + 1
return ret
また結果を返したあとも処理を行いたいという場合はputResult()
を使うこともできる。
class IncrementWorker(mpipe.OrderedWorker):
def doTask(value):
ret = value + 1
self.putResult(result)
# 続けて実行したい処理を書く・・・
OrderedWorker
と UnorderedWorker
の違いは、その名にある通り、タスクの実行順序を維持するかどうか、になるので、処理したいタスクの内容に応じて選択することになる。
class mpipe.OrderedWorker
OrderedWorkerオブジェクトは、出力結果の順序が常に該当する入力タスクの順序と一致するステージで動作します。
ワーカーは、直前のワーカーと次のワーカーという2つの最も近い隣接ワーカーにリンクされ、ステージ内のすべてのワーカーがこのように環状に接続されます。入力タスクは、この順序で取得されます。ワーカーは、結果を公開する前に、まず直前の隣接ワーカーが同じことを行うのを待ちます。
class mpipe.UnorderedWorker
UnorderedWorkerオブジェクトは、そのステージの他のワーカーとは独立して動作し、最初の利用可能なタスクを取得し、完了次第、その結果を公開します(近隣のワーカーと調整することなく)。したがって、出力結果の順序は、対応する入力タスクの順序と一致しない場合があります。
ステージオブジェクトの作成
次にステージオブジェクトの作成して、ワーカーと紐づける。ワーカーをPython関数で定義した場合は、
OrderedStage
かUnorderedStage
を使う。2番目の引数はそのステージ内で同時に実行されるワーカーの数になる。
stage = mpipe.OrderedStage(increment, 3)
OrderedStage
かUnorderedStage
の違いはワーカーと同じで、実行順序を制御するかどうか。
ただし、ワーカーでも実行順序の違いによって OrderedWorker
と UnorderedWorker
が指定できていたので、ステージでも設定できるとなると矛盾しないのか?というところで、 OrderedWorker
と UnorderedWorker
でワーカーを定義した場合は、mpipe.Stage()
を使う
class IncrementWorker(mpipe.OrderedWorker):
def doTask(value):
ret = value + 1
return ret
stage = mpipe.Stage(IncrementWorker, 3)
内部的には、ステージでOrderedStage
を設定すると内部でOrderedWorker
オブジェクトを生成、UnorderedStage
の場合はUnorderedWorker
オブジェクトを作成しているので、要は実行順序の制御をワーカー側でやるかステージ側でやるか、ということだと思う。
ステージ間の紐づけ
ステージが複数ある場合のステージ間の遷移はlink()
メソッドを使う。例えば、
stage1.link(stage2)
stage2.link(stage3)
と指定すると、ステージ1→ステージ2→ステージ3の順で実行され、それぞれの結果が受け渡されていく。なお、上記は以下のようにも書ける。
stage1.link(stage2.link(stage3))
1つのステージの結果を複数の下流ステージに受け渡したい場合、例えば、
stage1.link(stage2)
stage1.link(stage3)
stage1.link(stage4)
と書くと、ステージ1の結果が、ステージ2・3・4にそれぞれ受け渡されて、分岐して並列処理することができる。
ドキュメントに記載はないのだけど、これを結合することはできるのだろうか???
パイプラインオブジェクトの作成
遷移する複数ステージの、起点となるステージをPipeline()
に渡すことで、パイプラインオブジェクトが作成され、パイプラインを実行することができるようになる。
例えば、以下。
stage1.link(stage2)
pipe = Pipeline(stage1)
以下のように書いても同じだった。
stages = stage1.link(stage2)
pipe = Pipeline(stages)
pipe = Pipeline(stage1.link(stage2))
パイプラインの操作
作成したパイプラインにデータを入力するにはput()
を使う。
pipe.put(1)
結果はget()
を使って1つ取り出すか、results()
を使って全部まとめて取り出すか。
result = pipe.get()
for result in pipe.results():
print(result)
パイプラインにNone
を渡すと入力の終了シグナルとなる。
pipe.put(None)
None
はパイプラインの各ステージに伝播され、各ワーカーは現在処理中のタスクを完了させた上で停止するよう指示を受ける。なお、None
投入後は既存のタスクは最後まで処理されるものの、その後に新たに投入されたタスクは無視され、最終的にパイプラインが空になるまで結果を取得できる仕組みとなっている。
各コンポーネントなどの基本的なコンセプトは以下にまとまっている。
APIリファレンスもとてもコンパクトなので、目を通しておくと良さそう。
あと以下にいろいろな例が載っている。
まとめ
ここしばらくPythonで並列・並行処理を使いやすくするようなライブラリをいくつか見てたけど、実行順序を制御できるという点で、個人的にとても使い勝手が良さそうに感じた。