😀
mlflowとluigiによるML実験管理例
はじめに
この記事はBrainPad Advent Calendar 2021の10日目の記事です。
本記事ではMLの実験を行うときの、コード、パラメータ、モデル、評価結果を管理するための構成例を紹介します。
サンプルコードはこちら
前提知識
- Must
- python
- docker
- Want
- mlflow
- luigi
思想
- 前処理を加えたデータや学習したモデルなどプログラムで出力されるファイルは全てmlflowの管理下におく。
- コードはgitで管理し、実験結果とcommit hashを紐づける。
- 前処理、学習、推論などタスク同士の依存関係を管理して、依存しているタスクを自動で実行できるようにする。また、既に実行されたタスクは実行しないようにする。
構成概要
- titanicのdataに対して、前処理、学習、推論を行う例を紹介する。
- ディレクトリ構成は以下のような感じ。
-
src/tasks/
下に前処理などの具体的なタスクを行うファイルを作成する。 - tomlファイルで実行するタスクを指定して
src/runner.py
を実行する。 - 各タスクの出力は、mlruns下のmlflowの出力ファイルの管理先のデフォルト位置に実験ごとに出力する。
ディレクトリ構成
.
├── Dockerfile
├── LICENSE
├── README.md
├── docker-compose.yml
├── input
│ ├── gender_submission.csv
│ ├── test.csv
│ ├── titanic.zip
│ └── train.csv
├── luigi.toml
├── mlruns/
├── requirements.txt
└── src
├── runner.py
└── tasks
├── prediction.py
├── preprocessor.py
├── run_task.py
└── trainer.py
環境構築
- mlflowなど必要なライブラリを入れるDockerfileを適当に作成して、下記のようなdocker-compose.ymlを用意して、プロジェクト直下で
docker-compose up d
などでmlflow trackingサーバを立ち上げる。 - mlflow trackingでは、サーバを立ち上げるときにmetricなどを管理するTracking URIと、出力されるファイルを管理するArtifact URIという2つのURIを指定することができる。デフォルトではどちらも./mlruns/下に保存される。
- サーバを立ち上げた後
{サーバのIP}:{指定したポート番号}
をブラウザで開くことでアクセスできる。
version: "3"
services:
mlflow:
image: ml_experiment_management_demo
container_name: ml_experiment_management_demo_mlflow
build:
context: .
dockerfile: Dockerfile
volumes:
- $PWD:$PWD
working_dir: $PWD
ports:
- "5000:5000"
command: mlflow ui -h 0.0.0.0
restart: always
コードの構成
.envに、プロジェクトのパス、利用するconfigのパス、configの種類を指定する。
.env
PROJECT_ROOT={プロジェクトのパスを指定}
LUIGI_CONFIG_PATH=luigi.toml
LUIGI_CONFIG_PARSER=toml
- luigiの機能で環境変数LUIGI_CONFIG_PATHに指定したファイルをconfigとして読み込むことができる。ここではluigi.tomlとしている。
- RunConfigは
runner.py
で利用するconfigで、実験名や概要、実行するtaskを指定する設計になっている。 - PathConfigでは各タスクの入出力先ディレクトリのpathを定義する。
- 前処理、学習、推論の各タスクのconfigはPreprocessor、Trainer、Predictionに定義する。
[RunConfig]
#実験名:mlflowの実験単位
experiment_name="demo"
description="概要"
# 実行するTask
task="Prediction"
[PathConfig]
# input
input="./input/"
# Preprocessorのoutput、Trainer、Evaluatorのinput
# デフォルトはartifact_uri下、"./mlruns/{experiment_id}/{run_id}/artifacts/preprocessor/"となる。
#preprocessor="./mlruns/"
# Trainerのoutput、Evaluatorのinput
# デフォルはトartifact_uri下"./mlruns/{experiment_id}/{run_id}/artifacts/trainer/"となる。
#trainer="./mlruns/"
# Predictionのoutput、
# デフォルトはartifact_uri下"./mlruns/{experiment_id}/{run_id}/artifacts/prediction/"となる。
#trainer="./mlruns/"
[Preprocessor]
train_columns=['PassengerId','Survived', 'Pclass', 'SibSp', 'Parch']
test_columns=['PassengerId','Pclass', 'SibSp', 'Parch']
[Trainer]
seed=0
[Prediction]
-
runner.py
が実際に実行するスクリプトになる。 - .envとRunConfigを読み込んで指定したtaskを実行する。
- experiment_nameを実験名として設定して、利用したconfigファイルや実行するtaskをlogとして記録している。詳しくはmlflow trackingのドキュメント参照。
runner.py
import os
import sys
import luigi
import mlflow
from dotenv import load_dotenv
from luigi.configuration import add_config_path, get_config
from tasks.prediction import Prediction
from tasks.preprocessor import Preprocessor
from tasks.trainer import Trainer
def main():
load_dotenv()
sys.path.append(f"{os.getenv('PROJECT_ROOT')}src/")
add_config_path(os.getenv("LUIGI_CONFIG_PATH"))
run_config = get_config(os.getenv("LUIGI_CONFIG_PARSER"))["RunConfig"]
mlflow.set_experiment(run_config["experiment_name"])
mlflow.start_run()
mlflow.set_tag('mlflow.note.content', run_config["description"])
mlflow.log_param("description", run_config["description"])
mlflow.log_param("task", run_config["task"])
mlflow.set_tag('mlflow.runName', mlflow.active_run().info.run_id)
mlflow.log_artifact(os.getenv("LUIGI_CONFIG_PATH"))
luigi.run([run_config["task"], "--local-scheduler"])
mlflow.end_run()
if __name__ == '__main__':
main()
タスクの定義
-
runner.py
で実行するタスクはRunTaskクラスを継承して作成する。 - RunTaskクラスでは、PathConfigで各タスクの出力先ディレクトリのpathが指定されているかを確認し、指定されていない場合
./mlruns/{experiment_id}/{run_id}/artifacts/{task_name}/
に設定する。 - build_input_output_path_dictsはサブクラスで入出力pathの一覧を返すように実装することを想定しており、サブクラスでは設定されたpath_configを用いて、入出力のプロジェクトルートからの相対パスか絶対パスを返すように実装する。
- RunTaskクラスはluigiのTaskを継承しており、outputメソッドでoutput_paths_dictの値をluigiのLocalTargetとして指定する。
- luigi.Taskの機能でコンストラクタの実行後outputメソッドで指定したファイルが既にある場合、実行済のタスクと見なされる。これにより、RunTaskを継承した各タスクはoutput_paths_dictに設定したファイルが既にあれば、実行しなくて済むようになっている。
- また、PathConfigを指定しなかった場合、run_idは毎回変わるのでそのタスクは必ず実行される。
run_task.py
import os
from abc import abstractmethod
from typing import Tuple
import luigi
import mlflow
from luigi.configuration import get_config
class RunTask(luigi.Task):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.config = get_config(
os.getenv("LUIGI_CONFIG_PARSER")
)[self.__class__.__name__]
experiment_id = mlflow.active_run().info.experiment_id
run_id = mlflow.active_run().info.run_id
self.path_config = get_config(
os.getenv("LUIGI_CONFIG_PARSER"))["PathConfig"]
self.artifacts_uri_path = f"./mlruns/{experiment_id}/{run_id}/artifacts/"
# タスクごとにPathConfigが指定されていなければデフォルト値設定
# デフォルトパス ./mlruns/{experiment_id}/{run_id}/artifacts/{task_name}/
configs_keys = get_config(os.getenv("LUIGI_CONFIG_PARSER")).data.keys()
task_list = [i for i in configs_keys if i not in [
"RunConfig", 'PathConfig']]
for task_name in task_list:
self.path_config.setdefault(
task_name,
f"{self.artifacts_uri_path}{task_name}/")
# 入出力パス output_paths_dictのvaluesのパスにファイルが既にある場合、そのタスクは行われない
self.input_paths_dict, self.output_paths_dict = self.build_input_output_path_dicts()
for output_path in self.output_paths_dict.values():
os.makedirs(os.path.dirname(output_path), exist_ok=True)
@abstractmethod
def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
"""
クラスの入出力パスをそれぞれdictで返す
Returns:
input_paths_dict,output_paths_dict
"""
pass
def output(self):
return list(map(lambda x: luigi.LocalTarget(x),
self.output_paths_dict.values()))
前処理
- RunTaskの説明で述べたように、前処理のタスクPreprocessorクラスはRunTaskを継承して作成する。
- build_input_output_path_dictsでpath_configの値を利用して、入出力pathを設定している。
- runメソッドがtaskの実行する処理で、ここでは前処理の内容を実装している。
- ここで行っている前処理は、シンプルにtrain、testをconfigで指定したcolumnsのみにしているだけである。
preprocessor.py
from typing import Tuple
import pandas as pd
from tasks.run_task import RunTask
class Preprocessor(RunTask):
def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
input_paths_dict = {
"train": f"{self.path_config['input']}train.csv",
"test": f"{self.path_config['input']}test.csv",
}
output_paths_dict = {
"train": f"{self.path_config['Preprocessor']}train.csv",
"test": f"{self.path_config['Preprocessor']}test.csv",
}
return input_paths_dict, output_paths_dict
def run(self):
train = pd.read_csv(self.input_paths_dict["train"])
test = pd.read_csv(self.input_paths_dict["test"])
train = train[self.config["train_columns"]]
test = test[self.config["test_columns"]]
train.to_csv(self.output_paths_dict["train"], index=False)
test.to_csv(self.output_paths_dict["test"], index=False)
学習
- 前処理と同様にRunTaskを継承して作成している。
- luigiの機能の
@requires
へルパを用いて、Preprocessorに依存していることを定義している。これによりTrainerを実行する前にPreprocessorが先に実行される。詳しくはluigiのdocument参照。 - また、Trainerではsklearnのモデルで学習しているが、mlflowの
mlflow.sklearn.autolog
を用いることで、Metricsなどがいくつか自動で保存することができる。自分で指標を設定したいときはlog_metric
などを用いる。詳しくはmlflowのLogging Functionsのdocument参照。
trainer.py
import pickle
from typing import Tuple
import mlflow.sklearn
import pandas as pd
from luigi.util import requires
from sklearn import linear_model
from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask
@requires(Preprocessor)
import pickle
from typing import Tuple
import mlflow.sklearn
import pandas as pd
from luigi.util import requires
from sklearn import linear_model
from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask
@requires(Preprocessor)
class Trainer(RunTask):
def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
input_paths_dict = {
"train": f"{self.path_config['Preprocessor']}train.csv",
"test": f"{self.path_config['Preprocessor']}test.csv",
}
output_paths_dict = {
"model": f"{self.path_config['Trainer']}model",
}
return input_paths_dict, output_paths_dict
def run(self):
train = pd.read_csv(
self.input_paths_dict["train"],
index_col='PassengerId')
X = train.drop("Survived", axis=1)
y = train["Survived"]
mlflow.sklearn.autolog()
reg = linear_model.RidgeClassifier(random_state=self.config["seed"])
reg.fit(X, y)
pickle.dump(reg, open(self.output_paths_dict["model"], "wb"))
推論
- 前処理・学習と同様にRunTaskを継承して作成する。
- 実行に前処理済みのtestと学習したmodelが必要になるため、Preprocessor, Trainerを
@requires
に指定している。
prediction.py
import pickle
from typing import Tuple
import pandas as pd
from luigi.util import requires
from tasks.preprocessor import Preprocessor
from tasks.run_task import RunTask
from tasks.trainer import Trainer
@requires(Preprocessor, Trainer)
class Prediction(RunTask):
def build_input_output_path_dicts(self) -> Tuple[dict, dict]:
input_paths_dict = {
"test": f"{self.path_config['Preprocessor']}test.csv",
"model": f"{self.path_config['Trainer']}model",
}
output_paths_dict = {
"test_prediction": f"{self.path_config['Prediction']}test_prediction.csv", }
return input_paths_dict, output_paths_dict
def run(self):
test = pd.read_csv(
self.input_paths_dict["test"],
index_col="PassengerId")
model = pickle.load(open(self.input_paths_dict["model"], "rb"))
test_prediction = pd.DataFrame(
model.predict(test),
columns=["Survived"],
index=test.index
)
test_prediction.to_csv(self.output_paths_dict["test_prediction"])
実行例
- プロジェクトルートで
python src/runnr.py
を実行することで、configのtaskで指定したタスクを実行する。例えばtask="Prediction"
としていれば、Predictionのrunを実行する。ただし、PredictionはPreprocessor,Trainerに依存しているため、先に前処理、学習が実行される。 - 上記の実行後、下記のようにPreprocessor,Trainerのpathを各タスクの実行結果のディレクトリに指定して、再び
task="Prediction"
として実行した場合、Predictionの入力として先程の実行結果が用いられる。この場合、前処理、学習の処理は省略できる。
[PathConfig]
Preprocessor="./mlruns/1/ba79349b64ac4493ac1f6c6eac306e68/artifacts/Preprocessor/"
Trainer="./mlruns/1/ba79349b64ac4493ac1f6c6eac306e68/artifacts/Trainer/"
課題
- mlflowのartifact URIがデフォルトの前提で、コード内でパスを指定している。つまり、実行しているプロジェクトルート直下のmlruns/ディレクトリ下で管理する前提になっている。環境変数やconfigファイルなどで指定できるようにしたほうが望ましい。
- 1つのファイルに全てのconfigをまとめているので、設定項目が多くなると頻雑になる。ただし、半端に分けても扱いづらくなると思うので難しいところ。
Discussion