MLOps ZoomCamp Week2:実験管理の極意 - MLflowで機械学習モデルの構築を体系化する
# はじめに
Week1では、MLOpsの基礎とローカル環境でのベースラインモデルの構築を学びました。
しかし、実際の機械学習プロジェクトでは、数十〜数百のモデルの構築(ハイパーパラメータの調整を含む)を行うことは珍しくありません。
「あのパラメータで学習したモデル、どこに保存したんだっけ?」
「先週作成したモデルの方が良かった気がするけど、設定は何だったんだろう?」
「このモデルの学習に使ったデータセットのバージョンは?」
こうした悩みを解決するのが Experiment Tracking(構築履歴の追跡) です。
Week2では、MLflowを使って機械学習を効率的に管理する方法を学びます。
# 構築履歴を追跡することの重要性
## 料理に例えて
従来の機械学習モデルの構築は、メモを取らずに料理を作るシェフのようなもの:
- 「今日の料理はいい味付けだった」
- 「でも、調味料の分量を覚えていない」
- 「同じ味は再現できないかもしれない」
ここで、作るたびにメモをとる(構築履歴を追跡する)ようにすると?:
- 全ての材料(データ)と分量(パラメータ)を記録
- 調理過程(学習過程)を詳細に記録
- 結果(性能評価)を客観的に評価
- いつでも同じ味(モデル)を再現可能
## 構築時に管理すべき要素
機械学習モデルを構築する時に管理する要素としては、以下が挙げられます:
- パラメータ(Parameters):ハイパーパラメータ、設定値
- メトリクス(Metrics):精度、損失、実行時間
- アーティファクト(Artifacts):モデル、図表、ログ
- ソースコード(Code)):実験に使用したコードバージョン
- 環境情報(Environment):ライブラリバージョン、システム情報
# MLflow入門
## MLflowとは
Mlflowは、機械学習ライフサイクル全体を管理するオープンソースプラットフォームです。
4つの主要なコンポーネントから構成されています。
1. MLflow Tracking
- 実験の記録と追跡
- パラメータ、メトリクス、アーティファクトの保存
- Web UIでの可視化
2. MLflow Projects
- コードの再現可能な実行
- 環境とエントリーポイントの定義
3. MLflow Models
- モデルのパッケージング
- 複数形式の保存・読み込み
4. MLflow Registry
- モデルの中央管理
- ステージング、本番環境への展開管理
# 実践:MLflowセットアップ
Week1で作成したプロジェクトにMLflowを統合していきます。
## 環境の準備
# プロジェクトディレクトリに移動
cd mlops-taxi-project
# 仮想環境のアクティベート
conda activate mlops-zoomcamp
# MLflow UIの起動テスト
mlflow ui --port 5000
# リンクにアクセスできるか確認してください。
## プロジェクト構造の拡張
MLflow用にディレクトリ構造を拡張します:
# MLflow関連ディレクトリの作成
mkdir -p mlruns
mldir -p experiments
mkdir -p src/tarcking
# MLflow設定ファイル
touch .mlflow_tracking_uri
echo "sqlite:///mlruns/mlflow.db" > .mlflow_tracking_uri
.
├── app.py
├── data
│ ├── external
│ ├── processed
│ └── raw
│ └── yellow_tripdata_2021-01.parquet
├── environment.yml
├── experiments
├── mlruns
│ ├── 0
│ │ └── meta.yaml
│ └── models
├── models
├── notebooks
│ └── 01_data_exploration.ipynb
├── README.md
├── reports
│ ├── img
│ └── img.png
├── src
│ ├── data
│ ├── models
│ ├── tracking
│ └── utils
├── test_environment.py
└── tests
# MLflowで実際に使ってみる
## 基本的な使い方
ファイル:src/tracking/basic_experiment.py
##############################
# ライブラリ
##############################
# 標準ライブラリ
import os
# 外部ライブラリ
import joblib
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
##############################
# MLflowの設定
##############################
mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
mlflow.set_experiment("taxi-duration-prediction")
##############################
# Main関数の定義
##############################
def main():
print("=== MLflowによる構築履歴の追跡デモ ===")
# データの準備
Xtrain,Xtest,ytrain,ytest,features = load_and_prepare_data()
# 構築するモデル
models = {
"LinearRegression": LinearRegression(),
"RandomForest": RandomForestRegressor(
n_estimators=100,
max_depth=10,
random_state=0,
n_jobs=-1
),
"RandomForest_Deep": RandomForestRegressor(
n_estimators=200,
max_depth=20,
random_state=0,
n_jobs=-1
)
}
results = {}
# 各モデルで実験
for model_name,model in models.items():
result = train_and_log_model(
model, model_name, Xtrain, Xtest, ytrain, ytest, features
)
results[model_name] = result
# 結果の比較
print(f"\n=== 構築結果の比較 ===")
for name,result in results.items():
print(f"{name}:")
print(f" RMSE: {result['test_rmse']:.2f}")
print(f" MAE: {result['test_mae']:.2f}")
print(f" R2: {result['test_r2']:.3f}")
# ベストモデルの特定
best_model_name = min(results.keys(), key=lambda x: results[x]["test_rmse"])
print(f"\nベストモデル: {best_model_name}")
print(f"\nMLflow UIで結果を確認してください:")
##############################
# ヘルパー関数の定義 - データの読み込みと前処理
##############################
def load_and_prepare_data():
print("データを読み込み中...")
# データ読み込み
data_path = "../../data/raw/yellow_tripdata_2021-01.parquet"
df = pd.read_parquet(data_path)
# 特徴量エンジニアリング
df["trip_duration"] = (
df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]
).dt.total_seconds() / 60
# 時間特徴量の追加
df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
df["pickup_day"] = df["tpep_pickup_datetime"].dt.day_of_week
# 特徴量の選択
features = [
"trip_distance", "passenger_count",
"PULocationID", "DOLocationID",
"pickup_hour", "pickup_day"
]
X = df[features].fillna(0)
y = df["trip_duration"].fillna(0)
# データクリーニング
mask = (
(y>1) & (y<120) &
(X["trip_distance"]>0) & (X["trip_distance"]<50) &
(X["passenger_count"]>0) & (X["passenger_count"]<=6)
)
Xclean,yclean = X[mask],y[mask]
# 訓練・テストデータの分割
Xtrain,Xtest,ytrain,ytest = train_test_split(
Xclean, yclean, test_size=0.2, random_state=0
)
print(f"訓練データ: {len(Xtrain):,}")
print(f"テストデータ: {len(Xtest):,}")
return Xtrain,Xtest,ytrain,ytest,features
##############################
# ヘルパー関数の定義 - モデルの学習とMLflowへの登録
##############################
def train_and_log_model(model, model_name, Xtrain, Xtest, ytrain, ytest, features):
with mlflow.start_run(run_name=f"{model_name}_experiment"):
# パラメータログ
if hasattr(model, "get_params"):
params = model.get_params()
for param,value in params.items():
mlflow.log_param(param, value)
# データ情報のログ
mlflow.log_param("train_size", len(Xtrain))
mlflow.log_param("test_size", len(Xtest))
mlflow.log_param("features", features)
# モデルの学習
print(f"\n{model_name} を学習中...")
model.fit(Xtrain, ytrain)
# 予測
ypred_train = model.predict(Xtrain)
ypred_test = model.predict(Xtest)
# メトリクスの計算のログ
train_rmse = root_mean_squared_error(ytrain, ypred_train)
test_rmse = root_mean_squared_error(ytest, ypred_test)
train_mae = mean_absolute_error(ytrain, ypred_train)
test_mae = mean_absolute_error(ytest, ypred_test)
test_r2 = r2_score(ytest, ypred_test)
# MLflowにメトリクスを登録する
mlflow.log_metric("train_rmse", train_rmse)
mlflow.log_metric("test_rmse", test_rmse)
mlflow.log_metric("train_mae", train_mae)
mlflow.log_metric("test_mae", test_mae)
mlflow.log_metric("test_r2", test_r2)
# モデルの保存
mlflow.sklearn.log_model(
model,
f"{model_name.lower()}_model",
registered_model_name=f"taxi_duration_{model_name.lower()}",
input_example=Xtrain.head()
)
# 予測結果の可視化(保存)
# 予測結果の可視化とログ
import matplotlib.pyplot as plt
plt.figure(figsize=(12, 4))
# 1.実際 vs 予測の散布図
plt.subplot(1, 3, 1)
plt.scatter(ytest, ypred_test, alpha=0.3, s=1)
plt.plot([0, 120], [0, 120], 'r--', linewidth=2)
plt.xlabel('Actual Duration (min)')
plt.ylabel('Predicted Duration (min)')
plt.title(f'{model_name}: Actual vs Predicted')
plt.grid(True, alpha=0.3)
# 2.残差プロット
plt.subplot(1, 3, 2)
residuals = ytest - ypred_test
plt.scatter(ypred_test, residuals, alpha=0.3, s=1)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('Predicted Duration (min)')
plt.ylabel('Residuals (min)')
plt.title(f'{model_name}: Residuals Plot')
plt.grid(True, alpha=0.3)
# 3.残差の分布
plt.subplot(1, 3, 3)
plt.hist(residuals, bins=50, alpha=0.7, edgecolor='black')
plt.xlabel('Residuals (min)')
plt.ylabel('Frequency')
plt.title(f'{model_name}: Residuals Distribution')
plt.grid(True, alpha=0.3)
plt.tight_layout()
# 図の保存とMLflowへの登録
plot_path = f"../../reports/{model_name.lower()}_results.png"
plt.savefig(plot_path, dpi=300, bbox_inches="tight")
mlflow.log_artifact(plot_path)
plt.close()
print(f"RMSE(Test): {test_rmse:.2f} minutes")
print(f"MAE(Test): {test_mae:.2f} minutes")
print(f"R2: {test_r2:.3f}")
return {
"model": model,
"test_rmse": test_rmse,
"test_mae": test_mae,
"test_r2": test_r2
}
##############################
# 実行
##############################
if __name__ == "__main__":
main()
## スクリプトの実行
# MLflow UIの起動(ターミナル: MLflow用)
cd mlops-taxi-project
conda activate mlops-zoomcamp
mlflow ui --backend-store-uri sqlite:///mlruns/mlflow.db
# スクリプトの実行
cd src/tracking
python basic_experiment.py
# ハイパーパラメータ調整とMLflow
## グリッドサーチとMLflow
ファイル:src/tracking/hyperparameter_tuning.py
##############################
# ライブラリ
##############################
# 標準ライブラリ
import itertools
import os
import time
# 外部ライブラリ
import joblib
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
##############################
# MLflowの設定
##############################
mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
mlflow.set_experiment("taxi-hyperparameter-tuning")
##############################
# Main関数の定義
##############################
def main():
print("=== ハイパーパラメータチューニング ===")
start_time = time.time()
best_params,best_score = hyperparameter_search()
total_time = time.time() - start_time
print(f"\n実行完了!総実行時間: {total_time/60:.1f} 分")
print(f"\nMLflow UIで詳細な結果を確認してください。")
##############################
# ヘルパー関数の定義 - ハイパーパラメータチューニング
##############################
def hyperparameter_search():
"""ハイパーパラメータサーチの実行"""
# データの準備
Xtrain,Xtest,ytrain,ytest = load_data()
# ハイパーパラメータの範囲
param_grid = {
"n_estimators": [50,100,200],
"max_depth": [10,15,20],
"min_samples_split": [2,5],
"min_samples_leaf": [1,2]
}
# 全組み合わせの数
total_combinations = np.prod([len(values) for values in param_grid.values()])
print(f"ハイパーパラメータの組み合わせの数: {total_combinations}")
best_score = float("inf")
best_params = None
# 各組み合わせをMLflowで追跡
for i,params in enumerate(itertools.product(*param_grid.values())):
param_dict = dict(zip(param_grid.keys(), params))
with mlflow.start_run(run_name=f"rf_tuning_{i+1:03d}"):
print(f"\n[{i+1}/{total_combinations}] パラメータ: {param_dict}")
# パラメータのログ
for key,value in param_dict.items():
mlflow.log_param(key,value)
# モデルの学習
start_time = time.time()
model = RandomForestRegressor(
random_state=0,
n_jobs=-1,
**param_dict
)
model.fit(Xtrain,ytrain)
training_time = time.time() - start_time
# 予測と評価
ypred_test = model.predict(Xtest)
rmse = root_mean_squared_error(ytest, ypred_test)
mae = mean_absolute_error(ytest, ypred_test)
r2 = r2_score(ytest, ypred_test)
# メトリクスのログ
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("mae", mae)
mlflow.log_metric("r2", r2)
mlflow.log_metric("training_time", training_time)
# データサイエンスのログ
mlflow.log_param("train_size", len(Xtrain))
mlflow.log_param("test_size", len(Xtest))
print(f"RMSE: {rmse:.3f}, MAE: {mae:.3f}, R2: {r2:.3f}, 時間: {training_time:.1f}s")
# ベストスコアの更新
if rmse<best_score:
best_score = rmse
best_params = param_dict.copy()
# ベストモデルを登録
mlflow.sklearn.log_model(
model,
"best_model",
registered_model_name="taxi_duration_best_rf",
input_example=Xtrain.head()
)
# ベストモデルのタグ付け
mlflow.set_tag("is_best_model", "true")
# その他の情報
mlflow.set_tag("model_type", "RandomForest")
mlflow.set_tag("experiment_type", "hyperparameter_tuning")
print(f"\n=== ベストの結果 ===")
print(f"ベストパラメータ: {best_params}")
print(f"ベストRMSE: {best_score:.3f}")
return best_params, best_score
##############################
# ヘルパー関数の定義 - データの読み込み
##############################
def load_data():
# データ読み込み
data_path = "../../data/raw/yellow_tripdata_2021-01.parquet"
df = pd.read_parquet(data_path)
# 特徴量エンジニアリング
df["trip_duration"] = (
df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]
).dt.total_seconds() / 60
# 時間特徴量の追加
df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
df["pickup_day"] = df["tpep_pickup_datetime"].dt.day_of_week
# 特徴量の選択
features = [
"trip_distance", "passenger_count",
"PULocationID", "DOLocationID",
"pickup_hour", "pickup_day"
]
X = df[features].fillna(0)
y = df["trip_duration"].fillna(0)
# データクリーニング
mask = (
(y>1) & (y<120) &
(X["trip_distance"]>0) & (X["trip_distance"]<50) &
(X["passenger_count"]>0) & (X["passenger_count"]<=6)
)
Xclean,yclean = X[mask],y[mask]
# データのサンプリング
if len(Xclean)>100000:
sample_size = 100000
indices = np.random.choice(len(Xclean), sample_size, replace=False)
Xclean = Xclean.iloc[indices]
yclean = yclean.iloc[indices]
print(f"高速化のため {sample_size:,} 件にサンプリング")
# 訓練・テストデータの分割
Xtrain,Xtest,ytrain,ytest = train_test_split(
Xclean, yclean, test_size=0.2, random_state=0
)
print(f"訓練データ: {len(Xtrain):,}")
print(f"テストデータ: {len(Xtest):,}")
return Xtrain,Xtest,ytrain,ytest
##############################
# 実行
##############################
if __name__ == "__main__":
main()
## スクリプトの実行
# スクリプトの実行
cd src/tracking
python hyperparameter_tuning.py
# MLflow Model Registry
## モデルのステージ管理
各種パターンで構築したモデルの一覧から、本番環境に用いるモデルを選択する上で、
効率的にステージ管理を行うことができます。
MLflowでは、モデルを以下の段階で管理します:
- None: 開発中
- Staging: テスト環境用
- Production: 本番環境用
- Archived: 古いバージョン
ステージが"Production"になっているモデルをMLflowから取得する用に設定すれば、
その度にモデルを構築する必要がなくなります。
ファイル:src/tracking/model_registry_demo.py
##############################
# ライブラリ
##############################
# 標準ライブラリ
# 外部ライブラリ
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient
##############################
# MLflowの設定
##############################
mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
client = MlflowClient(tracking_uri="sqlite:///../../mlruns/mlflow.db")
##############################
# Main関数の定義
##############################
def main():
print("=== MLflow Model Registry デモ ===")
# 1. ベストモデルの登録
model_name,version = register_best_model()
# 2. ステージングに移行
transition_model_stage(model_name, version, "Staging")
# 3. 検証後、本番に移行
print("\n検証が完了したと仮定して、本番環境に移行...")
transition_model_stage(model_name, version, "Production")
# 4. 本番モデルの読み込みテスト
model = load_model_from_registry(model_name, "Production")
print(f"\n本番環境のモデルタイプ: {type(model)}")
print("モデルの読み込みが成功しました!")
##############################
# ヘルパー関数の定義 - ベストモデルをレジストリに登録
##############################
def register_best_model():
# 実行履歴からベストランを検索
experiment = mlflow.get_experiment_by_name("taxi-hyperparameter-tuning")
runs = mlflow.search_runs(
experiment_ids=[experiment.experiment_id],
order_by=["metrics.rmse ASC"],
max_results=1
)
if len(runs)==0:
print("実験が見つかりません。まずハイパーパラメータチューニングを実行してください。")
return
best_run = runs.iloc[0]
run_id = best_run["run_id"]
print(f"ベストランID: {run_id}")
print(f"RMSE: {best_run['metrics.rmse']:.3f}")
# モデルをレジストリに登録
model_name = "taxi-duration-model"
model_uri = f"runs:/{run_id}/best_model"
# モデルバージョンの作成
model_version = mlflow.register_model(
model_uri=model_uri,
name=model_name
)
print(f"モデル '{model_name}' のバージョン {model_version.version} を登録しました")
return model_name, model_version.version
##############################
# ヘルパー関数の定義 - モデルステージの変更
##############################
def transition_model_stage(model_name, version, stage):
client.transition_model_version_stage(
name=model_name,
version=version,
stage=stage
)
print(f"モデル {model_name} v{version} を {stage} ステージに移行しました")
##############################
# ヘルパー関数の定義 - レジストリからモデルの読み込み
##############################
def load_model_from_registry(model_name, stage="Production"):
model_uri = f"models:/{model_name}/{stage}"
model = mlflow.sklearn.load_model(model_uri)
print(f"モデル {model_name} ({stage}) を読み込みました")
return model
##############################
# 実行
##############################
if __name__ == "__main__":
main()
## スクリプトの実行
# スクリプトの実行
cd src/tracking
python model_registry_demo.py
# 構築したモデルの可視化と比較
## MLflow UIの活用
MLflow UI では以下の機能を活用できます:
1. 構築した各モデルの比較
- 複数のRunを並べて比較
- metricsとパラメータの相関分析
2. 可視化
- parameterとパフォーマンスの関係
3. 検索とフィルタリング
- 特定の条件で構築されたモデルの検索
- タグによる分類
4. アーティファクトの管理
- モデルファイル(.pkl)の読み込み
- 図表の表示
## 以上の項目について実装
ファイル:src/tracking/experiment_analysis.py
##############################
# ライブラリ
##############################
# 標準ライブラリ
# 外部ライブラリ
import mlflow
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
##############################
# Main関数の定義
##############################
def main():
analyze_experiments()
##############################
# ヘルパー関数の定義 - 実験結果の分析と可視化
##############################
def analyze_experiments():
mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
# ハイパーパラメータチューニングの結果を取得
experiment = mlflow.get_experiment_by_name("taxi-hyperparameter-tuning")
runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id])
print(f"構築パターン数: {len(runs)}")
print(f"カラム: {runs.columns.tolist()}")
# metrics と parameter の相関分析
metric_cols = [col for col in runs.columns if col.startswith("metrics.")]
param_cols = [col for col in runs.columns if col.startswith("params.")]
# データの準備
analysis_data = runs[metric_cols + param_cols].copy()
# parameter列の型変換
for col in param_cols:
if analysis_data[col].dtype=="object":
analysis_data[col] = pd.to_numeric(analysis_data[col], errors="coerce")
# 可視化
plt.figure(figsize=(15,10))
# 1. RMSEの分布
plt.subplot(2,3,1)
plt.hist(runs["metrics.rmse"], bins=20, alpha=0.7, edgecolor="black")
plt.xlabel("RMSE")
plt.ylabel("Frequency")
plt.title("RMSE Distribution")
plt.grid(True, alpha=0.3)
# 2. parameter と RMSE の関係
params_to_plot = ["params.n_estimators", "params.max_depth"]
for i,param in enumerate(params_to_plot, 2):
plt.subplot(2,3,i)
plt.scatter(pd.to_numeric(runs[param]), runs["metrics.rmse"], alpha=0.6)
plt.xlabel(param.replace('params.',''))
plt.ylabel("RMSE")
plt.title(f'RMSE vs {param.replace("params.","")}')
plt.grid(True, alpha=0.3)
# 4. 訓練時間 vs 性能
plt.subplot(2,3,4)
plt.scatter(runs["metrics.training_time"], runs["metrics.rmse"], alpha=0.6)
plt.xlabel("Training Time (seconds)")
plt.ylabel("RMSE")
plt.title("Training Time vs RMSE")
plt.grid(True, alpha=0.3)
# 5. R2 vs RMSE
plt.subplot(2,3,5)
plt.scatter(runs["metrics.r2"], runs["metrics.rmse"], alpha=0.6)
plt.xlabel("R2")
plt.ylabel("RMSE")
plt.title("R2 vs RMSE")
plt.grid(True, alpha=0.3)
# 6. パフォーマンス vs parameter のヒートマップ
plt.subplot(2,3,6)
## parameter の組み合わせごとの平均RMSE
pivot_data = runs.pivot_table(
values="metrics.rmse",
index="params.max_depth",
columns="params.n_estimators",
aggfunc="mean"
)
sns.heatmap(pivot_data, annot=True, fmt=".3f", cmap="viridis_r")
plt.title("Average RMSE by Parameters")
# 全体のレイアウト
plt.tight_layout()
plt.savefig("../../reports/experiment_analysis.png", dpi=300, bbox_inches="tight")
# 統計サマリー
print("\n=== Summary ===")
print(f"最小RMSE: {runs['metrics.rmse'].min():.3f}")
print(f"最大RMSE: {runs['metrics.rmse'].max():.3f}")
print(f"平均RMSE: {runs['metrics.rmse'].mean():.3f}")
print(f"RMSE標準偏差: {runs['metrics.rmse'].std():.3f}")
# ベストパラメータの分析
best_run = runs.loc[runs["metrics.rmse"].idxmin()]
print("\n=== ベストモデルの詳細 ===")
for col in param_cols:
print(f"{col.replace('params.','')}: {best_run[col]}")
for col in metric_cols:
print(f"{col.replace('metrics.','')}: {best_run[col]:.3f}")
##############################
# 実行
##############################
if __name__ == "__main__":
main()
## スクリプトの実行
# スクリプトの実行
cd src/tracking
python experiment_analysis.py
experiment_analysis.pyの実行結果(src/reports/experiment_analysis.png)
# 自動化された構築パイプライン
## 構築時の設定ファイルの活用
ファイル:experiments/experiment_config.yaml
experiment_name: "taxi-automated-experiments"
random_seed: 0
data:
source: "../../data/raw/yellow_tripdata_2021-01.parquet"
sample_size: 100000
test_size: 0.2
features:
base_features:
- trip_distance
- passenger_count
- PULocationID
- DOLocationID
time_features:
- pickup_hour
- pickup_day
- pickup_month
engineered_features:
- trip_distance_per_passenger
- is_weekend
preprocessing:
duration_range:
min: 1
max: 120
distance_range:
min: 0
max: 50
passenger_range:
min: 1
max: 6
models:
linear_regression:
type: "sklearn.linear_model.LinearRegression"
params: {}
random_forest:
type: "sklearn.ensemble.RandomForestRegressor"
params:
n_estimators: [50,100,200]
max_depth: [10,15,20]
min_samples_split: [2,5]
random_state: 0
n_jobs: -1
gradient_boosting:
type: "sklearn.ensemble.GradientBoostingRegressor"
params:
n_estimators: [100,200]
learning_rate: [0.1,0.05]
max_depth: [3,5]
random_state: 0
tracking:
log_models: true
log_artifacts: true
auto_register_best: true
## モデル構築自動化スクリプト
ファイル:src/tracking/automated_experiments.py
##############################
# ライブラリ
##############################
# 標準ライブラリ
import importlib
from pathlib import Path
import time
import yaml
# 外部ライブラリ
import joblib
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor,GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split,ParameterGrid
##############################
# Main関数の定義
##############################
def main():
config_path = "../../experiments/experiment_config.yaml"
if not Path(config_path).exists():
print(f"設定ファイルが見つかりません: {config_path}")
print("experiments/experiment_config.yaml を作成してください")
return
runner = AutomatedExperimentRunner(config_path)
best_model = runner.run_all_experiments()
print("\nMLflow UI で詳細を確認")
##############################
# Class: 自動化された機械学習モデルの実行クラス
##############################
class AutomatedExperimentRunner:
def __init__(self, config_path):
with open(config_path,"r") as f:
self.config = yaml.safe_load(f)
# MLflowの設定
mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
mlflow.set_experiment(self.config["experiment_name"])
self.best_score = float("inf")
self.best_model_info = None
def load_and_prepare_data(self):
"""設定に基づいたデータの読み込みと前処理"""
print("データを読み込み中...")
# データの読み込み
df = pd.read_parquet(self.config['data']['source'])
# 基本的な特徴量エンジニアリング
df['trip_duration'] = (
df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
).dt.total_seconds() / 60
# 時間特徴量
if 'pickup_hour' in self.config['features']['time_features']:
df['pickup_hour'] = df['tpep_pickup_datetime'].dt.hour
if 'pickup_day' in self.config['features']['time_features']:
df['pickup_day'] = df['tpep_pickup_datetime'].dt.dayofweek
if 'pickup_month' in self.config['features']['time_features']:
df['pickup_month'] = df['tpep_pickup_datetime'].dt.month
# エンジニアリング特徴量
if 'trip_distance_per_passenger' in self.config['features']['engineered_features']:
df['trip_distance_per_passenger'] = df['trip_distance'] / df['passenger_count'].clip(lower=1)
if 'is_weekend' in self.config['features']['engineered_features']:
df['is_weekend'] = (df['tpep_pickup_datetime'].dt.dayofweek >= 5).astype(int)
# 特徴量の選択
all_features = (
self.config['features']['base_features'] +
self.config['features']['time_features'] +
self.config['features']['engineered_features']
)
available_features = [f for f in all_features if f in df.columns]
X = df[available_features].fillna(0)
y = df['trip_duration'].fillna(0)
# データクリーニング
duration_range = self.config['preprocessing']['duration_range']
distance_range = self.config['preprocessing']['distance_range']
passenger_range = self.config['preprocessing']['passenger_range']
mask = (
(y >= duration_range['min']) & (y <= duration_range['max']) &
(df['trip_distance'] >= distance_range['min']) &
(df['trip_distance'] <= distance_range['max']) &
(df['passenger_count'] >= passenger_range['min']) &
(df['passenger_count'] <= passenger_range['max'])
)
Xclean, yclean = X[mask], y[mask]
# サンプリング
sample_size = self.config['data'].get('sample_size')
if sample_size and len(Xclean) > sample_size:
indices = np.random.choice(len(Xclean), sample_size, replace=False)
Xclean = Xclean.iloc[indices]
yclean = yclean.iloc[indices]
print(f"データを {sample_size:,} 件にサンプリング")
# 訓練・テストデータの分割
Xtrain,Xtest,ytrain,ytest = train_test_split(
Xclean, yclean,
test_size=self.config['data']['test_size'],
random_state=self.config['random_seed']
)
print(f"訓練データ: {len(Xtrain):,}")
print(f"テストデータ: {len(Xtest):,}")
print(f"特徴量: {available_features}")
return Xtrain, Xtest, ytrain, ytest, available_features
def get_model_class(self, model_type_str):
"""文字列からモデルクラスを取得"""
module_name,class_name = model_type_str.rsplit('.',1)
module = importlib.import_module(module_name)
return getattr(module,class_name)
def run_model_experiments(self, model_name, model_config, Xtrain, Xtest, ytrain, ytest, features):
"""特定のモデルに対する実行"""
model_class = self.get_model_class(model_config["type"])
params = model_config["params"]
# パラメータグリッドの生成
param_combinations = []
grid_params = {}
fixed_params = {}
for key,value in params.items():
if isinstance(value,list):
grid_params[key] = value
else:
fixed_params[key] = value
if grid_params:
# グリッドサーチ
param_grid = ParameterGrid(grid_params)
for grid_param in param_grid:
combined_params = {**fixed_params, **grid_param}
param_combinations.append(combined_params)
else:
# 単一パラメータの場合
param_combinations.append(fixed_params)
print(f"\n{model_name}: {len(param_combinations)} パターンの実験")
for i,param_set in enumerate(param_combinations):
with mlflow.start_run(run_name=f"{model_name}_{i+1:03d}"):
self._run_single_experiment(
model_name, model_class, param_set,
Xtrain, Xtest, ytrain, ytest, features
)
def _run_single_experiment(self, model_name, model_class, params, Xtrain, Xtest, ytrain, ytest, features):
"""単一のモデルを構築"""
# parameter のログ
for key,value in params.items():
mlflow.log_param(key,value)
mlflow.log_param("model_type", model_name)
mlflow.log_param("train_size", len(Xtrain))
mlflow.log_param("test_size", len(Xtest))
mlflow.log_param("feature_count", len(features))
# モデルの学習
start_time = time.time()
model = model_class(**params)
model.fit(Xtrain,ytrain)
training_time = time.time() - start_time
# 予測
ypred_train = model.predict(Xtrain)
ypred_test = model.predict(Xtest)
# Metricsの計算
train_rmse = root_mean_squared_error(ytrain, ypred_train)
test_rmse = root_mean_squared_error(ytest, ypred_test)
train_mae = mean_absolute_error(ytrain, ypred_train)
test_mae = mean_absolute_error(ytest, ypred_test)
test_r2 = r2_score(ytest, ypred_test)
# Overfittingの指標
overfitting_ratio = test_rmse/train_rmse
# MLflowにLog
mlflow.log_metric("train_rmse", train_rmse)
mlflow.log_metric("test_rmse", test_rmse)
mlflow.log_metric("train_mae", train_mae)
mlflow.log_metric("test_mae", test_mae)
mlflow.log_metric("test_r2", test_r2)
mlflow.log_metric("training_time", training_time)
mlflow.log_metric("overfitting_ratio", overfitting_ratio)
# モデルの保存
if self.config["tracking"]["log_models"]:
mlflow.sklearn.log_model(
model,
f"{model_name}_model",
input_example=Xtrain.head()
)
# ベストモデルの更新
if test_rmse<self.best_score:
self.best_score = test_rmse
self.best_model_info = {
"model_name": model_name,
"params": params,
"rmse": test_rmse,
"run_id": mlflow.active_run().info.run_id
}
# ベストモデルのタグ付け
mlflow.set_tag("is_best_model","true")
if self.config["tracking"]["auto_register_best"]:
mlflow.sklearn.log_model(
model,
"best_model",
registered_model_name=f"taxi_duration_best_{model_name}",
input_example=Xtrain.head()
)
print(f" {model_name} - RMSE: {test_rmse:.3f}, R2: {test_r2:.3f}, Time: {training_time:.1f}s")
def run_all_experiments(self):
"""すべてのモデルに対するparameterごとにモデル構築を実行"""
print(f"=== {self.config['experiment_name']} ===")
# データの準備
Xtrain,Xtest,ytrain,ytest,features = self.load_and_prepare_data()
# 各モデル、parameterごとに構築
total_start_time = time.time()
for model_name,model_config in self.config["models"].items():
self.run_model_experiments(
model_name, model_config,
Xtrain,Xtest,ytrain,ytest,features
)
total_time = time.time() - total_start_time
# Summary
print("\n=== すべての構築完了 ===")
print(f"総実行時間: {total_time/60:.1f} 分")
print(f"ベストモデル: {self.best_model_info['model_name']}")
print(f"ベストRMSE: {self.best_model_info['rmse']:.3f}")
print(f"ベストparameter: {self.best_model_info['params']}")
return self.best_model_info
##############################
# 実行
##############################
if __name__ == "__main__":
main()
## スクリプトの実行
# スクリプトの実行
cd src/tracking
python automated_experiments.py
# まとめ
Week2では、MLflowを使った包括的な構築したモデルの管理について学びました。
今回実装した内容は以下の通りです。
- ✅ 基本的なMLflowの使い方
- ✅ ハイパーパラメータチューニング
- ✅ モデルレジストリの活用
- ✅ 構築した各モデルの可視化・分析
- ✅ 設定ファイルベースの自動化
Week3では、ワークフローオーケストレーションについて学んでいきます。
今回作成した各モデルの構築結果を、"Prefect"を使って、自動化・スケージューリングする方法を習得します。
Discussion