🌊

データ処理用の軽量パイプラインライブラリを作った

2021/08/07に公開

要約

既存のパイプラインライブラリ(luigiKedro)ではオーバーキルな用途(研究開発用途)のため、軽量なパイプラインライブラリ「lwpipe」を作った。ちょっとしたパイプラインが欲しいが、使い方を把握するために時間を割くのがもったいないと感じる人は是非このライブラリを使ってみてほしい。

成果物

https://github.com/estshorter/lwpipe

本文

機械学習系の仕事をしている中で、データの前処理で不満があった。前処理は複数の処理(破損データの削除、特徴量抽出など)から構成されており、その中には処理が重く、完了に数分かかるものもある。重い処理の後段をデバッグ・変更するたびに毎回数分待つのはアホらしいので、このような処理は中間結果を吐き出しておき、後段では結果を読むだけというのが普通だ。そうなると、前処理をすべてやり直したいときのために、全処理を一つのパイプラインとしてまとめたくなるが、自分にとってちょうどよいパイプラインライブラリがないというのが不満だった。

既存ライブラリとしては例えばluigiKedroがあるが、自分の用途(本番環境でなく、研究開発用のパイプライン)にはオーバーキルという印象だった。そこで、軽量なパイプラインを提供するPythonライブラリ「lwpipe」を自作した。

pip install lwpipeでインストールできる。

このライブラリを使うと、パイプライン処理は以下のようになる。

import logging

from lwpipe import Pipeline
from lwpipe.io import dump_pickle, load_pickle

def proc1():
    in_ = "INPUT"
    # 本当はもっと重い処理をする
    out_ = in_ + "_proc1"  # INPUT_proc1
    dump_pickle(data=out_, filepath="proc1.pickle")

def proc2():
    in_ = load_pickle(filepath="proc1.pickle")
    # 本当はもっと重い処理をする
    out_ = in_ + "_proc2"  # INPUT_proc1_proc2
    dump_pickle(data=out_, filepath="proc2.pickle")

logging.basicConfig(level="INFO")

funcs = [proc1, proc2]
pipe = Pipeline(funcs)
pipe.run()

# 本質的には以下と同じ
# for func in funcs:
#   func()

出力はこんな感じ。loggingのレベルをINFO以上にしないと出力されないので注意。

Scheduled 2 tasks (from: proc1, to: proc2)
Running 1/2 tasks (proc1)
Dumped to 'proc1.pickle'
Running 2/2 tasks (proc2)
Loaded from 'proc1.pickle'
Dumped to 'proc2.pickle'
Completed all tasks (from: proc1, to: proc2)

proc2から実行したい場合には、pipe.run(1)pipe.run("proc2")とすればよい。

研究開発用のコードだと、上記のようにdump/load処理をデータ処理関数内に書いてしまうことが多いが、データ処理とdump/loadは分けた方が再利用性が高まる。本ライブラリにはKedroと同様なNodeというクラスがあり、関数の入出力・dump/load設定を、Nodeクラスのコンストラクタで定義できるようになっている。使い方は以下。

import logging
from lwpipe import Node, Pipeline
from lwpipe.io import dump_pickle, load_pickle

logging.basicConfig(level="INFO")

nodes = [
    Node(
        func=lambda in_: in_+"_proc1",
        inputs="INPUT",
        name="proc1",
        outputs_dumper=dump_pickle,
        outputs_path="proc1.pickle",
        outputs_loader=load_pickle, # proc1.pickle を読み込むloader
    ),
    Node( # inputsを指定しない場合は、前段の出力が入力になる
        func=lambda in_: in_+"_proc2",
        name="proc2",
        outputs_dumper=dump_pickle,
        outputs_path="proc2.pickle",
        outputs_loader=load_pickle, # proc2.pickle を読み込むloader
    ),
]
pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == "INPUT_proc1_proc2"

この場合、proc1の出力がproc2に渡されるため、proc1.pickleはロードされない。
もちろん、pipe.run(1)pipe.run("proc2")として、途中からの実行も可能だ。

また、Nodeの出力をメモリに保存しておけば、前段より前の結果にもアクセス可能となっている。

import logging
from lwpipe import Node, Pipeline
from lwpipe.io import dump_pickle, load_pickle

logging.basicConfig(level="INFO")

nodes = [
    Node(
        func=lambda in_: in_ + "_proc1",
        inputs="INPUT",
        outputs="proc1", # 出力名を与えるとメモリに保存される
        name="proc1",
        outputs_dumper=dump_pickle,
        outputs_path="proc1.pickle",
        outputs_loader=load_pickle,
    ),
    Node(
        func=lambda in_: in_ + "_proc2",
        outputs="proc2",
        name="proc2",
        outputs_dumper=dump_pickle,
        outputs_path="proc2.pickle",
        outputs_loader=load_pickle,
    ),
    Node(
        func=lambda in_: in_ + "_proc3",
        inputs="proc1", # 最初のノードの結果を読んでいる
        name="proc3",
        outputs_dumper=dump_pickle,
        outputs_path="proc3.pickle",
        outputs_loader=load_pickle,
    ),
]
pipe = Pipeline(nodes)
outputs = pipe.run()
assert outputs[0] == "INPUT_proc1_proc3"
assert pipe.results["proc2"] == "INPUT_proc1_proc2"

出力ログはこう変わる。

Scheduled 3 tasks (from: proc1, to: proc3)
Running 1/3 tasks (proc1)
Dumped to 'proc1.pickle'
Running 2/3 tasks (proc2)
Dumped to 'proc2.pickle'
Running 3/3 tasks (proc3)
Dumped to 'proc3.pickle'
Completed all tasks (from: proc1, to: proc3)

その他のコード例は、リポジトリのREADMEテストケースに置いてある。

ちょっとしたパイプラインが欲しいが、使い方を把握するために時間を割くのがもったいないと感じる人は是非このライブラリを使ってみてほしい。

Discussion