🍣

小さくMLOpsを始めるための前処理順序付けライブラリdagstreamの紹介

2023/12/17に公開

はじめに

この記事は MLOps Advent Calendar 2023 の 17 日目の記事になります。

機械学習システムの中で、複雑化しやすい前処理の順序関係を管理するシンプルなPythonライブラリ dagstream を作成したので、その紹介をしたいと思います。

ただし、後述するように、機械学習PoC (Proof Of Concept) プロジェクトで使うような比較的規模の小さなコードを想定しています。そのため、大規模システムや既存のフレームワークへの適用方法は対象としない点にご注意ください。

dagstream の作成意図

Apache Airflow などに代表されるように、前処理の依存関係を管理するフレームワークは存在します。そうしたフレームワークは前処理だけでなく学習時や推論時、デプロイ時など機械学習ライフサイクル全体を統一的に管理することができ、大規模な機械学習システムに適しています。一方で、機械学習PoC (Proof Of Concept) プロジェクトのような小規模のコードでは小回りが効きにくく、オーバースペックである場面があります。

複数人で管理・開発していると規模の小さい機械学習システムでも小さくMLOpsを始めたいと思う場面は多々あり、今回は前処理の順序関係を管理するという点にのみ特化したライブラリ dagstream を作成しました。

同様のライブラリにC++ライブラリの taskflow があります。taskflowは Apache Airflow の内部でも使用されているような優れたライブラリで、今回 dagstream のインターフェースを作成する上でも大いに参考にしました。

コード例(Bad Sample)

例えば以下のような前処理コードがあったとします。個人で使うコードであれば十分だと思いますが、顧客へ納品したり、複数人で継続的にメンテナンスしていく上では不十分です。具体的にはどんな課題がありそうでしょうか。

def my_preprocess(apply_D: bool):
    funcA(data)
    funcB(data)

    # NOTE: funcA のあとに呼ぶこと
    funcC(data)

    if apply_D:
        funcD(data)
      
    funcE()

    # NOTE: funcF は最後によぶこと
    funcF()
    return

例えば、以下の問題を抱えています。(当然、変数名や関数名にも問題はありますが、ここでは無視します。)

  • 依存関係が明確でなくコメントに頼っているため、変更時に既存の依存関係を破壊する可能性があります。

    • # NOTE: funcA のあとに呼ぶこと とコメントされていますが、「funcB の後」という情報は考慮しなくてよいのかなど、コメントから実装の意図を完全に把握することは困難であることが多いです。
  • 各処理を並列に実行することができません。

    • 特に推論時などは、1つのデータに対する前処理を並列化することで全体のスループットを上げたいという要求は多々あります。
  • 特定の前処理の有効化・無効化にフラグ(例えば、 apply_D )を使用しており、コードが複雑化する可能性があります。

    • 例えば、funcDが事前に必要な関数はすべて if apply_D 文のネスト配下に書くことになり、ネストが深くなります。

dagstream でできること

上記のような課題を解決するために、以下が行うことができるライブラリdagstream を開発しました。

  • 前処理の順序関係を定義する
  • 依存関係の出力(mermaid 形式)
  • 順序関係に沿って直列・並列実行する
  • 部分的に依存関係を抽出する

以降では、各項項目について実際に実装例を確認したいと思います。

PyPIに登録済みですので、pipでインストールできます。

pip install dagstream

前処理の依存関係を定義する

DagStream.emplaceを用いると、Callable なオブジェクトをIFunctionalNodeというインターフェースをもったオブジェクトに変換します。以下の関数で順序関係を定義できます。

  • precede: 引数として渡ったIFunctionalNodeよりも前に実行する
  • succeed: 引数として渡ったIFunctionalNodeよりも後に実行する
import dagstream

def funcA():
    print("funcA")

def funcB():
    print("funcB")

def funcC():
    print("funcC")

def funcD():
    print("funcD")

def funcE():
    print("funcE")

def funcF():
    print("funcF")


stream = dagstream.DagStream()
A, B, C, D, E, F = stream.emplace(funcA, funcB, funcC, funcD, funcE, funcF)

A.precede(B, C)
E.succeed(B, C, D)
D.succeed(C)
F.succeed(E)

関数の順序関係を可視化する

dagstream では関数の順序関係を、 mermaid で描画できるファイルとして出力します。

from dagstream.viewers import MermaidDrawer

functional_dag = stream.construct()
drawer = MermaidDrawer()
drawer.output(functional_dag, "path/to/output.mmd")

以下のように、可視化できます。

順序関係に沿って直列・並列実行する

シングルコアで直列に実行する場合は以下のように StreamExecutor を使います。


from dagstream.executor import StreamExecutor

# construct functional dag
functional_dag = stream.construct()
executor = StreamExecutor(functional_dag)
executor.run()

並列実行する場合はStreamParallelExecutor を利用します。


from dagstream.executor import StreamParallelExecutor

# construct functional dag
functional_dag = stream.construct()
# Run in parallel by using 4 processes
executor = StreamParallelExecutor(functional_dag, n_processes=4)
executor.run()

部分的に順序関係を抽出する

定義された順序関係の中から、必要な順序関係のみを取り出すことができます。
これにより、深いネストのif文を避けることができます。

例えば、funcBfuncDが必要だとすると、以下のように指定できます。

# construct functional dag
# Extract minimum sub dag graph which is necessary for executing B and D
functional_dag = stream.construct(mandatory_nodes=[B, D])

# execute as same
executor = StreamExecutor(functional_dag)
executor.run()

# output as same
drawer.output(functional_dag, "path/to/subgraph.mmd")

出力すると依存関係は以下のようになります。

最後まで読んでいただき、ありがとうございました。まだまだできることは限られているので、今後も引き続き改良していきたいと思います。

Discussion