😀

mlflowとluigiによるML実験管理例

2022/12/07に公開約11,600字

はじめに

この記事はBrainPad Advent Calendar 2021の10日目の記事です。

本記事ではMLの実験を行うときの、コード、パラメータ、モデル、評価結果を管理するための構成例を紹介します。
サンプルコードはこちら

前提知識

  • Must
    • python
    • docker
  • Want
    • mlflow
    • luigi

思想

  • 前処理を加えたデータや学習したモデルなどプログラムで出力されるファイルは全てmlflowの管理下におく。
  • コードはgitで管理し、実験結果とcommit hashを紐づける。
  • 前処理、学習、推論などタスク同士の依存関係を管理して、依存しているタスクを自動で実行できるようにする。また、既に実行されたタスクは実行しないようにする。

構成概要

  • titanicのdataに対して、前処理、学習、推論を行う例を紹介する。
  • ディレクトリ構成は以下のような感じ。
  • src/tasks/下に前処理などの具体的なタスクを行うファイルを作成する。
  • tomlファイルで実行するタスクを指定してsrc/runner.pyを実行する。
  • 各タスクの出力は、mlruns下のmlflowの出力ファイルの管理先のデフォルト位置に実験ごとに出力する。
ディレクトリ構成
.
├── Dockerfile
├── LICENSE
├── README.md
├── docker-compose.yml
├── input
│   ├── gender_submission.csv
│   ├── test.csv
│   ├── titanic.zip
│   └── train.csv
├── luigi.toml
├── mlruns/
├── requirements.txt
└── src
    ├── runner.py
    └── tasks
        ├── prediction.py
        ├── preprocessor.py
        ├── run_task.py
        └── trainer.py

環境構築

  • mlflowなど必要なライブラリを入れるDockerfileを適当に作成して、下記のようなdocker-compose.ymlを用意して、プロジェクト直下でdocker-compose up dなどでmlflow trackingサーバを立ち上げる。
  • mlflow trackingでは、サーバを立ち上げるときにmetricなどを管理するTracking URIと、出力されるファイルを管理するArtifact URIという2つのURIを指定することができる。デフォルトではどちらも./mlruns/下に保存される。
  • サーバを立ち上げた後{サーバのIP}:{指定したポート番号}をブラウザで開くことでアクセスできる。
version: "3"
services:
  mlflow:
    image: ml_experiment_management_demo
    container_name: ml_experiment_management_demo_mlflow
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - $PWD:$PWD
    working_dir: $PWD
    ports:
      - "5000:5000"
    command: mlflow ui -h 0.0.0.0
    restart: always

コードの構成

.envに、プロジェクトのパス、利用するconfigのパス、configの種類を指定する。

.env
PROJECT_ROOT={プロジェクトのパスを指定}
LUIGI_CONFIG_PATH=luigi.toml
LUIGI_CONFIG_PARSER=toml
  • luigiの機能で環境変数LUIGI_CONFIG_PATHに指定したファイルをconfigとして読み込むことができる。ここではluigi.tomlとしている。
  • RunConfigはrunner.pyで利用するconfigで、実験名や概要、実行するtaskを指定する設計になっている。
  • PathConfigでは各タスクの入出力先ディレクトリのpathを定義する。
  • 前処理、学習、推論の各タスクのconfigはPreprocessor、Trainer、Predictionに定義する。
[RunConfig]
#実験名:mlflowの実験単位
experiment_name="demo"
description="概要"
# 実行するTask
task="Prediction"

[PathConfig]
# input
input="./input/"

# Preprocessorのoutput、Trainer、Evaluatorのinput
# デフォルトはartifact_uri下、"./mlruns/{experiment_id}/{run_id}/artifacts/preprocessor/"となる。
#preprocessor="./mlruns/"

# Trainerのoutput、Evaluatorのinput
# デフォルはトartifact_uri下"./mlruns/{experiment_id}/{run_id}/artifacts/trainer/"となる。
#trainer="./mlruns/"

# Predictionのoutput、
# デフォルトはartifact_uri下"./mlruns/{experiment_id}/{run_id}/artifacts/prediction/"となる。
#trainer="./mlruns/"

[Preprocessor]
train_columns=['PassengerId','Survived', 'Pclass', 'SibSp', 'Parch']
test_columns=['PassengerId','Pclass', 'SibSp', 'Parch']

[Trainer]
seed=0

[Prediction]

  • runner.pyが実際に実行するスクリプトになる。
  • .envとRunConfigを読み込んで指定したtaskを実行する。
  • experiment_nameを実験名として設定して、利用したconfigファイルや実行するtaskをlogとして記録している。詳しくはmlflow trackingのドキュメント参照。
runner.py
import os
import sys

import luigi
import mlflow
from dotenv import load_dotenv
from luigi.configuration import add_config_path, get_config

from tasks.prediction import Prediction
from tasks.preprocessor import Preprocessor
from tasks.trainer import Trainer


def main():
    load_dotenv()
    sys.path.append(f"{os.getenv('PROJECT_ROOT')}src/")
    add_config_path(os.getenv("LUIGI_CONFIG_PATH"))
    run_config = get_config(os.getenv("LUIGI_CONFIG_PARSER"))["RunConfig"]
    mlflow.set_experiment(run_config["experiment_name"])
    mlflow.start_run()
    mlflow.set_tag('mlflow.note.content', run_config["description"])
    mlflow.log_param("description", run_config["description"])
    mlflow.log_param("task", run_config["task"])
    mlflow.set_tag('mlflow.runName', mlflow.active_run().info.run_id)
    mlflow.log_artifact(os.getenv("LUIGI_CONFIG_PATH"))
    luigi.run([run_config["task"], "--local-scheduler"])
    mlflow.end_run()


if __name__ == '__main__':
    main()

タスクの定義

  • runner.pyで実行するタスクはRunTaskクラスを継承して作成する。
  • RunTaskクラスでは、PathConfigで各タスクの出力先ディレクトリのpathが指定されているかを確認し、指定されていない場合./mlruns/{experiment_id}/{run_id}/artifacts/{task_name}/に設定する。
  • build_input_output_path_dictsはサブクラスで入出力pathの一覧を返すように実装することを想定しており、サブクラスでは設定されたpath_configを用いて、入出力のプロジェクトルートからの相対パスか絶対パスを返すように実装する。
  • RunTaskクラスはluigiのTaskを継承しており、outputメソッドでoutput_paths_dictの値をluigiのLocalTargetとして指定する。
  • luigi.Taskの機能でコンストラクタの実行後outputメソッドで指定したファイルが既にある場合、実行済のタスクと見なされる。これにより、RunTaskを継承した各タスクはoutput_paths_dictに設定したファイルが既にあれば、実行しなくて済むようになっている。
  • また、PathConfigを指定しなかった場合、run_idは毎回変わるのでそのタスクは必ず実行される。
run_task.py
import os
from abc import abstractmethod
from typing import Tuple

import luigi
import mlflow
from luigi.configuration import get_config


class RunTask(luigi.Task):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.config = get_config(
            os.getenv("LUIGI_CONFIG_PARSER")
        )[self.__class__.__name__]

        experiment_id = mlflow.active_run().info.experiment_id
        run_id = mlflow.active_run().info.run_id

        self.path_config = get_config(
            os.getenv("LUIGI_CONFIG_PARSER"))["PathConfig"]
        self.artifacts_uri_path = f"./mlruns/{experiment_id}/{run_id}/artifacts/"

        # タスクごとにPathConfigが指定されていなければデフォルト値設定
        # デフォルトパス ./mlruns/{experiment_id}/{run_id}/artifacts/{task_name}/
        configs_keys = get_config(os.getenv("LUIGI_CONFIG_PARSER")).data.keys()
        task_list = [i for i in configs_keys if i not in [
            "RunConfig", 'PathConfig']]
        for task_name in task_list:
            self.path_config.setdefault(
                task_name,
                f"{self.artifacts_uri_path}{task_name}/")
        # 入出力パス output_paths_dictのvaluesのパスにファイルが既にある場合、そのタスクは行われない
        self.input_paths_dict, self.output_paths_dict = self.build_input_output_path_dicts()
        for output_path in self.output_paths_dict.values():
            os.makedirs(os.path.dirname(output_path), exist_ok=True)

    @abstractmethod
    def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
        """
        クラスの入出力パスをそれぞれdictで返す

        Returns:
            input_paths_dict,output_paths_dict
        """
        pass

    def output(self):
        return list(map(lambda x: luigi.LocalTarget(x),
                        self.output_paths_dict.values()))

前処理

  • RunTaskの説明で述べたように、前処理のタスクPreprocessorクラスはRunTaskを継承して作成する。
  • build_input_output_path_dictsでpath_configの値を利用して、入出力pathを設定している。
  • runメソッドがtaskの実行する処理で、ここでは前処理の内容を実装している。
  • ここで行っている前処理は、シンプルにtrain、testをconfigで指定したcolumnsのみにしているだけである。
preprocessor.py
from typing import Tuple

import pandas as pd

from tasks.run_task import RunTask


class Preprocessor(RunTask):
    def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
        input_paths_dict = {
            "train": f"{self.path_config['input']}train.csv",
            "test": f"{self.path_config['input']}test.csv",
        }
        output_paths_dict = {
            "train": f"{self.path_config['Preprocessor']}train.csv",
            "test": f"{self.path_config['Preprocessor']}test.csv",
        }
        return input_paths_dict, output_paths_dict

    def run(self):
        train = pd.read_csv(self.input_paths_dict["train"])
        test = pd.read_csv(self.input_paths_dict["test"])
        train = train[self.config["train_columns"]]
        test = test[self.config["test_columns"]]
        train.to_csv(self.output_paths_dict["train"], index=False)
        test.to_csv(self.output_paths_dict["test"], index=False)

学習

  • 前処理と同様にRunTaskを継承して作成している。
  • luigiの機能の@requiresへルパを用いて、Preprocessorに依存していることを定義している。これによりTrainerを実行する前にPreprocessorが先に実行される。詳しくはluigiのdocument参照。
  • また、Trainerではsklearnのモデルで学習しているが、mlflowのmlflow.sklearn.autologを用いることで、Metricsなどがいくつか自動で保存することができる。自分で指標を設定したいときはlog_metricなどを用いる。詳しくはmlflowのLogging Functionsのdocument参照。
trainer.py
import pickle
from typing import Tuple

import mlflow.sklearn
import pandas as pd
from luigi.util import requires
from sklearn import linear_model

from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask


@requires(Preprocessor)
import pickle
from typing import Tuple

import mlflow.sklearn
import pandas as pd
from luigi.util import requires
from sklearn import linear_model

from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask


@requires(Preprocessor)
class Trainer(RunTask):
    def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
        input_paths_dict = {
            "train": f"{self.path_config['Preprocessor']}train.csv",
            "test": f"{self.path_config['Preprocessor']}test.csv",
        }
        output_paths_dict = {
            "model": f"{self.path_config['Trainer']}model",
        }
        return input_paths_dict, output_paths_dict

    def run(self):
        train = pd.read_csv(
            self.input_paths_dict["train"],
            index_col='PassengerId')
        X = train.drop("Survived", axis=1)
        y = train["Survived"]
        mlflow.sklearn.autolog()
        reg = linear_model.RidgeClassifier(random_state=self.config["seed"])
        reg.fit(X, y)
        pickle.dump(reg, open(self.output_paths_dict["model"], "wb"))

推論

  • 前処理・学習と同様にRunTaskを継承して作成する。
  • 実行に前処理済みのtestと学習したmodelが必要になるため、Preprocessor, Trainerを@requiresに指定している。
prediction.py
import pickle
from typing import Tuple

import pandas as pd
from luigi.util import requires

from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask
from tasks.trainer import Trainer


@requires(Preprocessor, Trainer)
class Prediction(RunTask):
    def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
        input_paths_dict = {
            "test": f"{self.path_config['Preprocessor']}test.csv",
            "model": f"{self.path_config['Trainer']}model",
        }
        output_paths_dict = {
            "test_prediction": f"{self.path_config['Prediction']}test_prediction.csv", }
        return input_paths_dict, output_paths_dict

    def run(self):
        test = pd.read_csv(
            self.input_paths_dict["test"],
            index_col="PassengerId")
        model = pickle.load(open(self.input_paths_dict["model"], "rb"))
        test_prediction = pd.DataFrame(
            model.predict(test),
            columns=["Survived"],
            index=test.index
        )
        test_prediction.to_csv(self.output_paths_dict["test_prediction"])

実行例

  • プロジェクトルートでpython src/runnr.pyを実行することで、configのtaskで指定したタスクを実行する。例えばtask="Prediction"としていれば、Predictionのrunを実行する。ただし、PredictionはPreprocessor,Trainerに依存しているため、先に前処理、学習が実行される。
  • 上記の実行後、下記のようにPreprocessor,Trainerのpathを各タスクの実行結果のディレクトリに指定して、再びtask="Prediction"として実行した場合、Predictionの入力として先程の実行結果が用いられる。この場合、前処理、学習の処理は省略できる。
[PathConfig]
Preprocessor="./mlruns/1/ba79349b64ac4493ac1f6c6eac306e68/artifacts/Preprocessor/"
Trainer="./mlruns/1/ba79349b64ac4493ac1f6c6eac306e68/artifacts/Trainer/"

課題

  • mlflowのartifact URIがデフォルトの前提で、コード内でパスを指定している。つまり、実行しているプロジェクトルート直下のmlruns/ディレクトリ下で管理する前提になっている。環境変数やconfigファイルなどで指定できるようにしたほうが望ましい。
  • 1つのファイルに全てのconfigをまとめているので、設定項目が多くなると頻雑になる。ただし、半端に分けても扱いづらくなると思うので難しいところ。
GitHubで編集を提案

Discussion

ログインするとコメントできます