📝

MLOps ZoomCamp Week2:実験管理の極意 - MLflowで機械学習モデルの構築を体系化する

に公開

# はじめに

Week1では、MLOpsの基礎とローカル環境でのベースラインモデルの構築を学びました。
しかし、実際の機械学習プロジェクトでは、数十〜数百のモデルの構築(ハイパーパラメータの調整を含む)を行うことは珍しくありません。

「あのパラメータで学習したモデル、どこに保存したんだっけ?」
「先週作成したモデルの方が良かった気がするけど、設定は何だったんだろう?」
「このモデルの学習に使ったデータセットのバージョンは?」

こうした悩みを解決するのが Experiment Tracking(構築履歴の追跡) です。
Week2では、MLflowを使って機械学習を効率的に管理する方法を学びます。

# 構築履歴を追跡することの重要性

## 料理に例えて

従来の機械学習モデルの構築は、メモを取らずに料理を作るシェフのようなもの:

  • 「今日の料理はいい味付けだった」
  • 「でも、調味料の分量を覚えていない」
  • 「同じ味は再現できないかもしれない」

ここで、作るたびにメモをとる(構築履歴を追跡する)ようにすると?:

  • 全ての材料(データ)と分量(パラメータ)を記録
  • 調理過程(学習過程)を詳細に記録
  • 結果(性能評価)を客観的に評価
  • いつでも同じ味(モデル)を再現可能

## 構築時に管理すべき要素

機械学習モデルを構築する時に管理する要素としては、以下が挙げられます:

  1. パラメータ(Parameters):ハイパーパラメータ、設定値
  2. メトリクス(Metrics):精度、損失、実行時間
  3. アーティファクト(Artifacts):モデル、図表、ログ
  4. ソースコード(Code)):実験に使用したコードバージョン
  5. 環境情報(Environment):ライブラリバージョン、システム情報

# MLflow入門

## MLflowとは

Mlflowは、機械学習ライフサイクル全体を管理するオープンソースプラットフォームです。
4つの主要なコンポーネントから構成されています。

1. MLflow Tracking

  • 実験の記録と追跡
  • パラメータ、メトリクス、アーティファクトの保存
  • Web UIでの可視化

2. MLflow Projects

  • コードの再現可能な実行
  • 環境とエントリーポイントの定義

3. MLflow Models

  • モデルのパッケージング
  • 複数形式の保存・読み込み

4. MLflow Registry

  • モデルの中央管理
  • ステージング、本番環境への展開管理

# 実践:MLflowセットアップ

Week1で作成したプロジェクトにMLflowを統合していきます。

## 環境の準備

# プロジェクトディレクトリに移動
cd mlops-taxi-project

# 仮想環境のアクティベート
conda activate mlops-zoomcamp

# MLflow UIの起動テスト
mlflow ui --port 5000
# リンクにアクセスできるか確認してください。

## プロジェクト構造の拡張

MLflow用にディレクトリ構造を拡張します:

# MLflow関連ディレクトリの作成
mkdir -p mlruns
mldir -p experiments
mkdir -p src/tarcking

# MLflow設定ファイル
touch .mlflow_tracking_uri
echo "sqlite:///mlruns/mlflow.db" > .mlflow_tracking_uri
.
├── app.py
├── data
│   ├── external
│   ├── processed
│   └── raw
│       └── yellow_tripdata_2021-01.parquet
├── environment.yml
├── experiments
├── mlruns
│   ├── 0
│   │   └── meta.yaml
│   └── models
├── models
├── notebooks
│   └── 01_data_exploration.ipynb
├── README.md
├── reports
│   ├── img
│   └── img.png
├── src
│   ├── data
│   ├── models
│   ├── tracking
│   └── utils
├── test_environment.py
└── tests

# MLflowで実際に使ってみる

## 基本的な使い方

ファイル:src/tracking/basic_experiment.py
##############################
# ライブラリ
##############################

# 標準ライブラリ
import os

# 外部ライブラリ
import joblib
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split

##############################
# MLflowの設定
##############################

mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
mlflow.set_experiment("taxi-duration-prediction")

##############################
# Main関数の定義
##############################

def main():
    print("=== MLflowによる構築履歴の追跡デモ ===")

    # データの準備
    Xtrain,Xtest,ytrain,ytest,features = load_and_prepare_data()

    # 構築するモデル
    models = {
        "LinearRegression": LinearRegression(),
        "RandomForest": RandomForestRegressor(
            n_estimators=100,
            max_depth=10,
            random_state=0,
            n_jobs=-1
        ),
        "RandomForest_Deep": RandomForestRegressor(
            n_estimators=200,
            max_depth=20,
            random_state=0,
            n_jobs=-1
        )
    }

    results = {}

    # 各モデルで実験
    for model_name,model in models.items():
        result = train_and_log_model(
            model, model_name, Xtrain, Xtest, ytrain, ytest, features
        )
        results[model_name] = result

    # 結果の比較
    print(f"\n=== 構築結果の比較 ===")
    for name,result in results.items():
        print(f"{name}:")
        print(f"    RMSE: {result['test_rmse']:.2f}")
        print(f"    MAE: {result['test_mae']:.2f}")
        print(f"    R2: {result['test_r2']:.3f}")
    
    # ベストモデルの特定
    best_model_name = min(results.keys(), key=lambda x: results[x]["test_rmse"])
    print(f"\nベストモデル: {best_model_name}")

    print(f"\nMLflow UIで結果を確認してください:")

##############################
# ヘルパー関数の定義 - データの読み込みと前処理
##############################

def load_and_prepare_data():
    print("データを読み込み中...")

    # データ読み込み
    data_path = "../../data/raw/yellow_tripdata_2021-01.parquet"
    df = pd.read_parquet(data_path)

    # 特徴量エンジニアリング
    df["trip_duration"] = (
        df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]
    ).dt.total_seconds() / 60

    # 時間特徴量の追加
    df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
    df["pickup_day"] = df["tpep_pickup_datetime"].dt.day_of_week

    # 特徴量の選択
    features = [
        "trip_distance", "passenger_count",
        "PULocationID", "DOLocationID",
        "pickup_hour", "pickup_day"
    ]

    X = df[features].fillna(0)
    y = df["trip_duration"].fillna(0)

    # データクリーニング
    mask = (
        (y>1) & (y<120) &
        (X["trip_distance"]>0) & (X["trip_distance"]<50) &
        (X["passenger_count"]>0) & (X["passenger_count"]<=6)
    )

    Xclean,yclean = X[mask],y[mask]

    # 訓練・テストデータの分割
    Xtrain,Xtest,ytrain,ytest = train_test_split(
        Xclean, yclean, test_size=0.2, random_state=0
    )

    print(f"訓練データ: {len(Xtrain):,}")
    print(f"テストデータ: {len(Xtest):,}")

    return Xtrain,Xtest,ytrain,ytest,features

##############################
# ヘルパー関数の定義 - モデルの学習とMLflowへの登録
##############################

def train_and_log_model(model, model_name, Xtrain, Xtest, ytrain, ytest, features):
    
    with mlflow.start_run(run_name=f"{model_name}_experiment"):
        # パラメータログ
        if hasattr(model, "get_params"):
            params = model.get_params()
            for param,value in params.items():
                mlflow.log_param(param, value)
        
        # データ情報のログ
        mlflow.log_param("train_size", len(Xtrain))
        mlflow.log_param("test_size", len(Xtest))
        mlflow.log_param("features", features)

        # モデルの学習
        print(f"\n{model_name} を学習中...")
        model.fit(Xtrain, ytrain)
        
        # 予測
        ypred_train = model.predict(Xtrain)
        ypred_test = model.predict(Xtest)

        # メトリクスの計算のログ
        train_rmse = root_mean_squared_error(ytrain, ypred_train)
        test_rmse = root_mean_squared_error(ytest, ypred_test)
        train_mae = mean_absolute_error(ytrain, ypred_train)
        test_mae = mean_absolute_error(ytest, ypred_test)
        test_r2 = r2_score(ytest, ypred_test)

        # MLflowにメトリクスを登録する
        mlflow.log_metric("train_rmse", train_rmse)
        mlflow.log_metric("test_rmse", test_rmse)
        mlflow.log_metric("train_mae", train_mae)
        mlflow.log_metric("test_mae", test_mae)
        mlflow.log_metric("test_r2", test_r2)

        # モデルの保存
        mlflow.sklearn.log_model(
            model,
            f"{model_name.lower()}_model",
            registered_model_name=f"taxi_duration_{model_name.lower()}",
            input_example=Xtrain.head()
        )

        # 予測結果の可視化(保存)
        # 予測結果の可視化とログ
        import matplotlib.pyplot as plt
        
        plt.figure(figsize=(12, 4))
        
        # 1.実際 vs 予測の散布図
        plt.subplot(1, 3, 1)
        plt.scatter(ytest, ypred_test, alpha=0.3, s=1)
        plt.plot([0, 120], [0, 120], 'r--', linewidth=2)
        plt.xlabel('Actual Duration (min)')
        plt.ylabel('Predicted Duration (min)')
        plt.title(f'{model_name}: Actual vs Predicted')
        plt.grid(True, alpha=0.3)
        
        # 2.残差プロット
        plt.subplot(1, 3, 2)
        residuals = ytest - ypred_test
        plt.scatter(ypred_test, residuals, alpha=0.3, s=1)
        plt.axhline(y=0, color='r', linestyle='--')
        plt.xlabel('Predicted Duration (min)')
        plt.ylabel('Residuals (min)')
        plt.title(f'{model_name}: Residuals Plot')
        plt.grid(True, alpha=0.3)
        
        # 3.残差の分布
        plt.subplot(1, 3, 3)
        plt.hist(residuals, bins=50, alpha=0.7, edgecolor='black')
        plt.xlabel('Residuals (min)')
        plt.ylabel('Frequency')
        plt.title(f'{model_name}: Residuals Distribution')
        plt.grid(True, alpha=0.3)
        
        plt.tight_layout()

        # 図の保存とMLflowへの登録
        plot_path = f"../../reports/{model_name.lower()}_results.png"
        plt.savefig(plot_path, dpi=300, bbox_inches="tight")
        mlflow.log_artifact(plot_path)
        plt.close()

        print(f"RMSE(Test): {test_rmse:.2f} minutes")
        print(f"MAE(Test): {test_mae:.2f} minutes")
        print(f"R2: {test_r2:.3f}")

        return {
            "model": model,
            "test_rmse": test_rmse,
            "test_mae": test_mae,
            "test_r2": test_r2
        }

##############################
# 実行
##############################

if __name__ == "__main__":
    main()

## スクリプトの実行

# MLflow UIの起動(ターミナル: MLflow用)
cd mlops-taxi-project
conda activate mlops-zoomcamp
mlflow ui --backend-store-uri sqlite:///mlruns/mlflow.db

# スクリプトの実行
cd src/tracking
python basic_experiment.py

# ハイパーパラメータ調整とMLflow

## グリッドサーチとMLflow

ファイル:src/tracking/hyperparameter_tuning.py
##############################
# ライブラリ
##############################

# 標準ライブラリ
import itertools
import os
import time

# 外部ライブラリ
import joblib
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split

##############################
# MLflowの設定
##############################

mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
mlflow.set_experiment("taxi-hyperparameter-tuning")

##############################
# Main関数の定義
##############################

def main():
    print("=== ハイパーパラメータチューニング ===")

    start_time = time.time()
    best_params,best_score = hyperparameter_search()
    total_time = time.time() - start_time

    print(f"\n実行完了!総実行時間: {total_time/60:.1f} 分")
    print(f"\nMLflow UIで詳細な結果を確認してください。")


##############################
# ヘルパー関数の定義 - ハイパーパラメータチューニング
##############################
def hyperparameter_search():
    """ハイパーパラメータサーチの実行"""

    # データの準備
    Xtrain,Xtest,ytrain,ytest = load_data()

    # ハイパーパラメータの範囲
    param_grid = {
        "n_estimators": [50,100,200],
        "max_depth": [10,15,20],
        "min_samples_split": [2,5],
        "min_samples_leaf": [1,2]
    }

    # 全組み合わせの数
    total_combinations = np.prod([len(values) for values in param_grid.values()])
    print(f"ハイパーパラメータの組み合わせの数: {total_combinations}")

    best_score = float("inf")
    best_params = None

    # 各組み合わせをMLflowで追跡
    for i,params in enumerate(itertools.product(*param_grid.values())):
        param_dict = dict(zip(param_grid.keys(), params))

        with mlflow.start_run(run_name=f"rf_tuning_{i+1:03d}"):
            print(f"\n[{i+1}/{total_combinations}] パラメータ: {param_dict}")

            # パラメータのログ
            for key,value in param_dict.items():
                mlflow.log_param(key,value)

            # モデルの学習
            start_time = time.time()

            model = RandomForestRegressor(
                random_state=0,
                n_jobs=-1,
                **param_dict
            )

            model.fit(Xtrain,ytrain)
            training_time = time.time() - start_time

            # 予測と評価
            ypred_test = model.predict(Xtest)

            rmse = root_mean_squared_error(ytest, ypred_test)
            mae = mean_absolute_error(ytest, ypred_test)
            r2 = r2_score(ytest, ypred_test)

            # メトリクスのログ
            mlflow.log_metric("rmse", rmse)
            mlflow.log_metric("mae", mae)
            mlflow.log_metric("r2", r2)
            mlflow.log_metric("training_time", training_time)

            # データサイエンスのログ
            mlflow.log_param("train_size", len(Xtrain))
            mlflow.log_param("test_size", len(Xtest))

            print(f"RMSE: {rmse:.3f}, MAE: {mae:.3f}, R2: {r2:.3f}, 時間: {training_time:.1f}s")

            # ベストスコアの更新
            if rmse<best_score:
                best_score = rmse
                best_params = param_dict.copy()

                # ベストモデルを登録
                mlflow.sklearn.log_model(
                    model,
                    "best_model",
                    registered_model_name="taxi_duration_best_rf",
                    input_example=Xtrain.head()
                )
                
                # ベストモデルのタグ付け
                mlflow.set_tag("is_best_model", "true")

            # その他の情報
            mlflow.set_tag("model_type", "RandomForest")
            mlflow.set_tag("experiment_type", "hyperparameter_tuning")
        
        print(f"\n=== ベストの結果 ===")
        print(f"ベストパラメータ: {best_params}")
        print(f"ベストRMSE: {best_score:.3f}")

    return best_params, best_score


##############################
# ヘルパー関数の定義 - データの読み込み
##############################
def load_data():
    # データ読み込み
    data_path = "../../data/raw/yellow_tripdata_2021-01.parquet"
    df = pd.read_parquet(data_path)

    # 特徴量エンジニアリング
    df["trip_duration"] = (
        df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]
    ).dt.total_seconds() / 60

    # 時間特徴量の追加
    df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour
    df["pickup_day"] = df["tpep_pickup_datetime"].dt.day_of_week

    # 特徴量の選択
    features = [
        "trip_distance", "passenger_count",
        "PULocationID", "DOLocationID",
        "pickup_hour", "pickup_day"
    ]

    X = df[features].fillna(0)
    y = df["trip_duration"].fillna(0)

    # データクリーニング
    mask = (
        (y>1) & (y<120) &
        (X["trip_distance"]>0) & (X["trip_distance"]<50) &
        (X["passenger_count"]>0) & (X["passenger_count"]<=6)
    )

    Xclean,yclean = X[mask],y[mask]

    # データのサンプリング
    if len(Xclean)>100000:
        sample_size = 100000
        indices = np.random.choice(len(Xclean), sample_size, replace=False)
        Xclean = Xclean.iloc[indices]
        yclean = yclean.iloc[indices]
        print(f"高速化のため {sample_size:,} 件にサンプリング")

    # 訓練・テストデータの分割
    Xtrain,Xtest,ytrain,ytest = train_test_split(
        Xclean, yclean, test_size=0.2, random_state=0
    )

    print(f"訓練データ: {len(Xtrain):,}")
    print(f"テストデータ: {len(Xtest):,}")

    return Xtrain,Xtest,ytrain,ytest


##############################
# 実行
##############################
if __name__ == "__main__":
    main()

## スクリプトの実行

# スクリプトの実行
cd src/tracking
python hyperparameter_tuning.py

# MLflow Model Registry

## モデルのステージ管理

各種パターンで構築したモデルの一覧から、本番環境に用いるモデルを選択する上で、
効率的にステージ管理を行うことができます。

MLflowでは、モデルを以下の段階で管理します:

  • None: 開発中
  • Staging: テスト環境用
  • Production: 本番環境用
  • Archived: 古いバージョン

ステージが"Production"になっているモデルをMLflowから取得する用に設定すれば、
その度にモデルを構築する必要がなくなります。

ファイル:src/tracking/model_registry_demo.py
##############################
# ライブラリ
##############################

# 標準ライブラリ

# 外部ライブラリ
import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient

##############################
# MLflowの設定
##############################
mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
client = MlflowClient(tracking_uri="sqlite:///../../mlruns/mlflow.db")

##############################
# Main関数の定義
##############################
def main():
    print("=== MLflow Model Registry デモ ===")

    # 1. ベストモデルの登録
    model_name,version = register_best_model()

    # 2. ステージングに移行
    transition_model_stage(model_name, version, "Staging")

    # 3. 検証後、本番に移行
    print("\n検証が完了したと仮定して、本番環境に移行...")
    transition_model_stage(model_name, version, "Production")

    # 4. 本番モデルの読み込みテスト
    model = load_model_from_registry(model_name, "Production")

    print(f"\n本番環境のモデルタイプ: {type(model)}")
    print("モデルの読み込みが成功しました!")

##############################
# ヘルパー関数の定義 - ベストモデルをレジストリに登録
##############################
def register_best_model():
    # 実行履歴からベストランを検索
    experiment = mlflow.get_experiment_by_name("taxi-hyperparameter-tuning")
    runs = mlflow.search_runs(
        experiment_ids=[experiment.experiment_id],
        order_by=["metrics.rmse ASC"],
        max_results=1
    )

    if len(runs)==0:
        print("実験が見つかりません。まずハイパーパラメータチューニングを実行してください。")
        return
    
    best_run = runs.iloc[0]
    run_id = best_run["run_id"]

    print(f"ベストランID: {run_id}")
    print(f"RMSE: {best_run['metrics.rmse']:.3f}")

    # モデルをレジストリに登録
    model_name = "taxi-duration-model"
    model_uri = f"runs:/{run_id}/best_model"

    # モデルバージョンの作成
    model_version = mlflow.register_model(
        model_uri=model_uri,
        name=model_name
    )

    print(f"モデル '{model_name}' のバージョン {model_version.version} を登録しました")

    return model_name, model_version.version

##############################
# ヘルパー関数の定義 - モデルステージの変更
##############################
def transition_model_stage(model_name, version, stage):
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage=stage
    )

    print(f"モデル {model_name} v{version}{stage} ステージに移行しました")

##############################
# ヘルパー関数の定義 - レジストリからモデルの読み込み
##############################
def load_model_from_registry(model_name, stage="Production"):
    model_uri = f"models:/{model_name}/{stage}"
    model = mlflow.sklearn.load_model(model_uri)

    print(f"モデル {model_name} ({stage}) を読み込みました")
    return model

##############################
# 実行
##############################
if __name__ == "__main__":
    main()

## スクリプトの実行

# スクリプトの実行
cd src/tracking
python model_registry_demo.py

# 構築したモデルの可視化と比較

## MLflow UIの活用

MLflow UI では以下の機能を活用できます:

1. 構築した各モデルの比較

  • 複数のRunを並べて比較
  • metricsとパラメータの相関分析

2. 可視化

  • parameterとパフォーマンスの関係

3. 検索とフィルタリング

  • 特定の条件で構築されたモデルの検索
  • タグによる分類

4. アーティファクトの管理

  • モデルファイル(.pkl)の読み込み
  • 図表の表示

## 以上の項目について実装

ファイル:src/tracking/experiment_analysis.py
##############################
# ライブラリ
##############################

# 標準ライブラリ

# 外部ライブラリ
import mlflow
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns

##############################
# Main関数の定義
##############################
def main():
    analyze_experiments()

##############################
# ヘルパー関数の定義 - 実験結果の分析と可視化
##############################
def analyze_experiments():
    mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")

    # ハイパーパラメータチューニングの結果を取得
    experiment = mlflow.get_experiment_by_name("taxi-hyperparameter-tuning")
    runs = mlflow.search_runs(experiment_ids=[experiment.experiment_id])
    print(f"構築パターン数: {len(runs)}")
    print(f"カラム: {runs.columns.tolist()}")

    # metrics と parameter の相関分析
    metric_cols = [col for col in runs.columns if col.startswith("metrics.")]
    param_cols = [col for col in runs.columns if col.startswith("params.")]

    # データの準備
    analysis_data = runs[metric_cols + param_cols].copy()

    # parameter列の型変換
    for col in param_cols:
        if analysis_data[col].dtype=="object":
            analysis_data[col] = pd.to_numeric(analysis_data[col], errors="coerce")
    
    # 可視化
    plt.figure(figsize=(15,10))

    # 1. RMSEの分布
    plt.subplot(2,3,1)
    plt.hist(runs["metrics.rmse"], bins=20, alpha=0.7, edgecolor="black")
    plt.xlabel("RMSE")
    plt.ylabel("Frequency")
    plt.title("RMSE Distribution")
    plt.grid(True, alpha=0.3)

    # 2. parameter と RMSE の関係
    params_to_plot = ["params.n_estimators", "params.max_depth"]
    for i,param in enumerate(params_to_plot, 2):
        plt.subplot(2,3,i)
        plt.scatter(pd.to_numeric(runs[param]), runs["metrics.rmse"], alpha=0.6)
        plt.xlabel(param.replace('params.',''))
        plt.ylabel("RMSE")
        plt.title(f'RMSE vs {param.replace("params.","")}')
        plt.grid(True, alpha=0.3)

    # 4. 訓練時間 vs 性能
    plt.subplot(2,3,4)
    plt.scatter(runs["metrics.training_time"], runs["metrics.rmse"], alpha=0.6)
    plt.xlabel("Training Time (seconds)")
    plt.ylabel("RMSE")
    plt.title("Training Time vs RMSE")
    plt.grid(True, alpha=0.3)

    # 5. R2 vs RMSE
    plt.subplot(2,3,5)
    plt.scatter(runs["metrics.r2"], runs["metrics.rmse"], alpha=0.6)
    plt.xlabel("R2")
    plt.ylabel("RMSE")
    plt.title("R2 vs RMSE")
    plt.grid(True, alpha=0.3)

    # 6. パフォーマンス vs parameter のヒートマップ
    plt.subplot(2,3,6)
    ## parameter の組み合わせごとの平均RMSE
    pivot_data = runs.pivot_table(
        values="metrics.rmse",
        index="params.max_depth",
        columns="params.n_estimators",
        aggfunc="mean"
    )
    sns.heatmap(pivot_data, annot=True, fmt=".3f", cmap="viridis_r")
    plt.title("Average RMSE by Parameters")
    
    # 全体のレイアウト
    plt.tight_layout()
    plt.savefig("../../reports/experiment_analysis.png", dpi=300, bbox_inches="tight")

    # 統計サマリー
    print("\n=== Summary ===")
    print(f"最小RMSE: {runs['metrics.rmse'].min():.3f}")
    print(f"最大RMSE: {runs['metrics.rmse'].max():.3f}")
    print(f"平均RMSE: {runs['metrics.rmse'].mean():.3f}")
    print(f"RMSE標準偏差: {runs['metrics.rmse'].std():.3f}")

    # ベストパラメータの分析
    best_run = runs.loc[runs["metrics.rmse"].idxmin()]
    print("\n=== ベストモデルの詳細 ===")
    for col in param_cols:
        print(f"{col.replace('params.','')}: {best_run[col]}")
    for col in metric_cols:
        print(f"{col.replace('metrics.','')}: {best_run[col]:.3f}")

##############################
# 実行
##############################
if __name__ == "__main__":
    main()

## スクリプトの実行

# スクリプトの実行
cd src/tracking
python experiment_analysis.py

experiment_analysis.pyの実行結果(src/reports/experiment_analysis.png)
experiment_analysis.pyの実行結果

# 自動化された構築パイプライン

## 構築時の設定ファイルの活用

ファイル:experiments/experiment_config.yaml
experiment_name: "taxi-automated-experiments"
random_seed: 0

data:
  source: "../../data/raw/yellow_tripdata_2021-01.parquet"
  sample_size: 100000 
  test_size: 0.2

features:
  base_features:
    - trip_distance
    - passenger_count
    - PULocationID
    - DOLocationID
  
  time_features:
    - pickup_hour
    - pickup_day
    - pickup_month
  
  engineered_features:
    - trip_distance_per_passenger
    - is_weekend

preprocessing:
  duration_range:
    min: 1
    max: 120
  distance_range:
    min: 0
    max: 50
  passenger_range:
    min: 1
    max: 6

models:
  linear_regression:
    type: "sklearn.linear_model.LinearRegression"
    params: {}

  random_forest:
    type: "sklearn.ensemble.RandomForestRegressor"
    params:
      n_estimators: [50,100,200]
      max_depth: [10,15,20]
      min_samples_split: [2,5]
      random_state: 0
      n_jobs: -1
    
  gradient_boosting:
    type: "sklearn.ensemble.GradientBoostingRegressor"
    params:
      n_estimators: [100,200]
      learning_rate: [0.1,0.05]
      max_depth: [3,5]
      random_state: 0

tracking:
  log_models: true
  log_artifacts: true
  auto_register_best: true

## モデル構築自動化スクリプト

ファイル:src/tracking/automated_experiments.py
##############################
# ライブラリ
##############################

# 標準ライブラリ
import importlib
from pathlib import Path
import time
import yaml

# 外部ライブラリ
import joblib
import mlflow
import mlflow.sklearn
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor,GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.metrics import root_mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split,ParameterGrid

##############################
# Main関数の定義
##############################
def main():
    config_path = "../../experiments/experiment_config.yaml"

    if not Path(config_path).exists():
        print(f"設定ファイルが見つかりません: {config_path}")
        print("experiments/experiment_config.yaml を作成してください")
        return
    
    runner = AutomatedExperimentRunner(config_path)
    best_model = runner.run_all_experiments()

    print("\nMLflow UI で詳細を確認")


##############################
# Class: 自動化された機械学習モデルの実行クラス
##############################
class AutomatedExperimentRunner:
    def __init__(self, config_path):
        with open(config_path,"r") as f:
            self.config = yaml.safe_load(f)
        
        # MLflowの設定
        mlflow.set_tracking_uri("sqlite:///../../mlruns/mlflow.db")
        mlflow.set_experiment(self.config["experiment_name"])

        self.best_score = float("inf")
        self.best_model_info = None
    

    def load_and_prepare_data(self):
        """設定に基づいたデータの読み込みと前処理"""
        print("データを読み込み中...")
        
        # データの読み込み
        df = pd.read_parquet(self.config['data']['source'])
        
        # 基本的な特徴量エンジニアリング
        df['trip_duration'] = (
            df['tpep_dropoff_datetime'] - df['tpep_pickup_datetime']
        ).dt.total_seconds() / 60
        
        # 時間特徴量
        if 'pickup_hour' in self.config['features']['time_features']:
            df['pickup_hour'] = df['tpep_pickup_datetime'].dt.hour
        if 'pickup_day' in self.config['features']['time_features']:
            df['pickup_day'] = df['tpep_pickup_datetime'].dt.dayofweek
        if 'pickup_month' in self.config['features']['time_features']:
            df['pickup_month'] = df['tpep_pickup_datetime'].dt.month
        
        # エンジニアリング特徴量
        if 'trip_distance_per_passenger' in self.config['features']['engineered_features']:
            df['trip_distance_per_passenger'] = df['trip_distance'] / df['passenger_count'].clip(lower=1)
        
        if 'is_weekend' in self.config['features']['engineered_features']:
            df['is_weekend'] = (df['tpep_pickup_datetime'].dt.dayofweek >= 5).astype(int)
        
        # 特徴量の選択
        all_features = (
            self.config['features']['base_features'] +
            self.config['features']['time_features'] +
            self.config['features']['engineered_features']
        )
        
        available_features = [f for f in all_features if f in df.columns]
        X = df[available_features].fillna(0)
        y = df['trip_duration'].fillna(0)
        
        # データクリーニング
        duration_range = self.config['preprocessing']['duration_range']
        distance_range = self.config['preprocessing']['distance_range']
        passenger_range = self.config['preprocessing']['passenger_range']
        
        mask = (
            (y >= duration_range['min']) & (y <= duration_range['max']) &
            (df['trip_distance'] >= distance_range['min']) & 
            (df['trip_distance'] <= distance_range['max']) &
            (df['passenger_count'] >= passenger_range['min']) & 
            (df['passenger_count'] <= passenger_range['max'])
        )
        
        Xclean, yclean = X[mask], y[mask]
        
        # サンプリング
        sample_size = self.config['data'].get('sample_size')
        if sample_size and len(Xclean) > sample_size:
            indices = np.random.choice(len(Xclean), sample_size, replace=False)
            Xclean = Xclean.iloc[indices]
            yclean = yclean.iloc[indices]
            print(f"データを {sample_size:,} 件にサンプリング")
        
        # 訓練・テストデータの分割
        Xtrain,Xtest,ytrain,ytest = train_test_split(
            Xclean, yclean, 
            test_size=self.config['data']['test_size'],
            random_state=self.config['random_seed']
        )
        
        print(f"訓練データ: {len(Xtrain):,}")
        print(f"テストデータ: {len(Xtest):,}")
        print(f"特徴量: {available_features}")
        
        return Xtrain, Xtest, ytrain, ytest, available_features
    

    def get_model_class(self, model_type_str):
        """文字列からモデルクラスを取得"""
        module_name,class_name = model_type_str.rsplit('.',1)
        module = importlib.import_module(module_name)
        return getattr(module,class_name)
    

    def run_model_experiments(self, model_name, model_config, Xtrain, Xtest, ytrain, ytest, features):
        """特定のモデルに対する実行"""
        model_class = self.get_model_class(model_config["type"])
        params = model_config["params"]

        # パラメータグリッドの生成
        param_combinations = []
        grid_params = {}
        fixed_params = {}

        for key,value in params.items():
            if isinstance(value,list):
                grid_params[key] = value
            else:
                fixed_params[key] = value
        
        if grid_params:
            # グリッドサーチ
            param_grid = ParameterGrid(grid_params)
            for grid_param in param_grid:
                combined_params = {**fixed_params, **grid_param}
                param_combinations.append(combined_params)
        else:
            # 単一パラメータの場合
            param_combinations.append(fixed_params)
        
        print(f"\n{model_name}: {len(param_combinations)} パターンの実験")

        for i,param_set in enumerate(param_combinations):
            with mlflow.start_run(run_name=f"{model_name}_{i+1:03d}"):
                self._run_single_experiment(
                    model_name, model_class, param_set,
                    Xtrain, Xtest, ytrain, ytest, features
                )
    

    def _run_single_experiment(self, model_name, model_class, params, Xtrain, Xtest, ytrain, ytest, features):
        """単一のモデルを構築"""
        # parameter のログ
        for key,value in params.items():
            mlflow.log_param(key,value)
        mlflow.log_param("model_type", model_name)
        mlflow.log_param("train_size", len(Xtrain))
        mlflow.log_param("test_size", len(Xtest))
        mlflow.log_param("feature_count", len(features))

        # モデルの学習
        start_time = time.time()
        model = model_class(**params)
        model.fit(Xtrain,ytrain)
        training_time = time.time() - start_time

        # 予測
        ypred_train = model.predict(Xtrain)
        ypred_test = model.predict(Xtest)

        # Metricsの計算
        train_rmse = root_mean_squared_error(ytrain, ypred_train)
        test_rmse = root_mean_squared_error(ytest, ypred_test)
        train_mae = mean_absolute_error(ytrain, ypred_train)
        test_mae = mean_absolute_error(ytest, ypred_test)
        test_r2 = r2_score(ytest, ypred_test)

        # Overfittingの指標
        overfitting_ratio = test_rmse/train_rmse

        # MLflowにLog
        mlflow.log_metric("train_rmse", train_rmse)
        mlflow.log_metric("test_rmse", test_rmse)
        mlflow.log_metric("train_mae", train_mae)
        mlflow.log_metric("test_mae", test_mae)
        mlflow.log_metric("test_r2", test_r2)
        mlflow.log_metric("training_time", training_time)
        mlflow.log_metric("overfitting_ratio", overfitting_ratio)

        # モデルの保存
        if self.config["tracking"]["log_models"]:
            mlflow.sklearn.log_model(
                model, 
                f"{model_name}_model",
                input_example=Xtrain.head()
            )
        
        # ベストモデルの更新
        if test_rmse<self.best_score:
            self.best_score = test_rmse
            self.best_model_info = {
                "model_name": model_name,
                "params": params,
                "rmse": test_rmse,
                "run_id": mlflow.active_run().info.run_id
            }
            # ベストモデルのタグ付け
            mlflow.set_tag("is_best_model","true")
            if self.config["tracking"]["auto_register_best"]:
                mlflow.sklearn.log_model(
                    model,
                    "best_model",
                    registered_model_name=f"taxi_duration_best_{model_name}",
                    input_example=Xtrain.head()
                )
            
        print(f" {model_name} - RMSE: {test_rmse:.3f}, R2: {test_r2:.3f}, Time: {training_time:.1f}s")


    def run_all_experiments(self):
        """すべてのモデルに対するparameterごとにモデル構築を実行"""
        print(f"=== {self.config['experiment_name']} ===")
        # データの準備
        Xtrain,Xtest,ytrain,ytest,features = self.load_and_prepare_data()
        # 各モデル、parameterごとに構築
        total_start_time = time.time()
        for model_name,model_config in self.config["models"].items():
            self.run_model_experiments(
                model_name, model_config,
                Xtrain,Xtest,ytrain,ytest,features
            )
        total_time = time.time() - total_start_time
        # Summary
        print("\n=== すべての構築完了 ===")
        print(f"総実行時間: {total_time/60:.1f} 分")
        print(f"ベストモデル: {self.best_model_info['model_name']}")
        print(f"ベストRMSE: {self.best_model_info['rmse']:.3f}")
        print(f"ベストparameter: {self.best_model_info['params']}")

        return self.best_model_info

##############################
# 実行
##############################
if __name__ == "__main__":
    main()

## スクリプトの実行

# スクリプトの実行
cd src/tracking
python automated_experiments.py

# まとめ

Week2では、MLflowを使った包括的な構築したモデルの管理について学びました。

今回実装した内容は以下の通りです。

  • ✅ 基本的なMLflowの使い方
  • ✅ ハイパーパラメータチューニング
  • ✅ モデルレジストリの活用
  • ✅ 構築した各モデルの可視化・分析
  • ✅ 設定ファイルベースの自動化

Week3では、ワークフローオーケストレーションについて学んでいきます。
今回作成した各モデルの構築結果を、"Prefect"を使って、自動化・スケージューリングする方法を習得します。

参考情報

Discussion