🔖

Docker+FastAPI+CatBoostで回帰モデルを手軽に構築して、評価まで行ってみた!CPU過負荷による学習停止にも対処!

2024/09/27に公開

はじめに

こんにちは!CareNetのYTです。
今回はDocker+FastAPI+CatBoostを使用して、手軽に回帰モデルを構築し、評価まで行う方法をご紹介します。
このアーキテクチャはさまざまなデータに対して汎用的に適用可能です。学習や推論を評価するためのコードやCPU過負荷による学習停止への対応策についても内容に含めておりますので、回帰モデルを試す際の初手として、ご参考になりますと幸いです。

なお、以下の点は本記事の対象外としておりますので、ご留意ください。

  • 特徴量を作成する前処理部分
  • 各技術や指標の詳細な解説

なぜDocker + FastAPI + CatBoostなのか?

簡単にこのアーキテクチャを採用した理由をご説明します。

Docker

少ない手順で同じ環境を再現できるため、どの環境でも一貫した動作が可能です。

FastAPI

軽量かつ高速なWebフレームワークであり、学習や推論のエンドポイントを簡単に作成できます。後述するUI画面からテストも行うことができ、リアルタイム推論システムへの導入も行いやすいです。

CatBoost

勾配ブースティングの手法の一つで、カテゴリカルデータを扱いやすく、高速で高精度なモデル構築が可能です。設定もシンプルで、初心者から上級者まで扱いやすいのが特徴です。

環境準備

Dockerコマンドが実行できる環境であれば、どこでも実行可能です。
今回は、MacにDocker Desktopをインストールして実行しました。
https://matsuand.github.io/docs.docker.jp.onthefly/desktop/mac/install/

処理に必要なファイルの準備

以下のようなディレクトリ構成で各ファイルを作成していきます。

ディレクトリ構成

sample_repository
├── app
│   ├── main.py
│   └── processors
│        ├── train.py
│        └── predict.py
├── docker-compose.yaml
├── Dockerfile
└── requirements.txt

Dockerfileの例

Dockerfile
FROM python:3.12.6-slim

ENV PYTHONPATH "/app:$PYTHONPATH"

ENV APP_HOME /app
WORKDIR $APP_HOME

RUN apt-get update && \
    apt-get install -y gcc python3-dev

COPY requirements.txt .
RUN pip install --upgrade pip && \
    pip install -r ./requirements.txt

COPY ./app .

CMD ["python", "/app/main.py"]

docker-compose.yamlの例

docker-compose.yaml
services:
  app:
    build:
      context: .
      dockerfile: Dockerfile
    image: app
    container_name: app
    stdin_open: true
    tty: true
    logging:
      driver: "json-file"
      options:
        max-size: "1g"
        max-file: "3"
    volumes:
      - "./app/:/app"
    ports:
      - 8080:8080
      - 8888:8888

requirements.txtの例

catboost==1.2.7
fastapi==0.115.0
ipython==8.27.0
ipywidgets==8.1.5
matplotlib==3.9.2
numpy==1.26.4
pandas==2.2.2
scikit-learn==1.5.2
shap==0.46.0
traitlets==5.14.3
uvicorn==0.30.6

main.pyの例

main.py
import os

import uvicorn
from fastapi import FastAPI

from processors import train, predict

app = FastAPI()


@app.post("/train",
          summary="CatBoostによる学習")
def run_train():
    return train.post()


@app.post("/predict",
          summary="CatBoostによる推論と評価")
def run_predict():
    return predict.post()


if __name__ == "__main__":
    uvicorn.run(
        "main:app",
        host="0.0.0.0",
        port=int(os.environ.get("PORT", 8080)),
        reload=True,
        log_level="info",
    )

main.pyから呼び出される学習用の処理(train.py)と推論用の処理(predict.py)については後述します。

コンテナの起動

以下のコマンドを実行すると、Dockerコンテナが立ち上がります。

docker compose up --build

コンテナの起動に成功すると、自動的にSwagger UIが生成され、ブラウザから直接処理を呼び出してテストすることができます。こちらも、FastAPIが便利な点の一つです。
https://fastapi.tiangolo.com/ja/features/#_3

UI画面には、 http://0.0.0.0:8080/docs もしくは http://localhost:8080/docs からアクセスできます。

学習の実行

POST /trainエンドポイントを実行するとCatBoostによる学習が実行されます。
FastAPIのUI画面において、「Try it out」→「Execute」を押下すると、実行できます。

train.pyの例をご紹介します。
以下では、train_features.csvという形で特徴量は作成済みであることを前提にしています。
feature_cols, categorical_cols, target_colはデータに応じて適宜変更してください。
学習が問題なく行われているかを確認する学習曲線や、特徴量を評価するための重要度、SHAP値を出力するためのコードも含めています。

train.py
import traceback

import matplotlib.pyplot as plt
import pandas as pd
import shap
from catboost import CatBoostRegressor, Pool
from fastapi.responses import JSONResponse
from sklearn.model_selection import train_test_split


def train(early_stopping_rounds=1000, val_size=0.2, random_state=1):
    # 特徴量と目的変数の準備
    df = pd.read_csv("train_features.csv")

    feature_cols = ["column1", "column2", "column3", "column4", "column5"] # 特徴量として利用するカラム名リスト
    categorical_cols = ["column1", "column2"] # feature_colsのうち、カテゴリカル特徴量のリスト
    target_col = "target_column" # 目的変数のカラム名

    X = df[feature_cols]
    y = df[target_col]
    local_file_name = "model_file.cbm"
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=val_size, random_state=random_state)

    # make catboost datasets
    train_data = Pool(X_train, label=y_train, cat_features=categorical_cols)
    val_data = Pool(X_val, label=y_val, cat_features=categorical_cols)

    # fit
    params = {
        "iterations": 5000,
        "learning_rate": 0.01,
        "loss_function": "RMSE",
        "task_type": "CPU"
    }
    model = CatBoostRegressor(**params)
    model.fit(train_data,
              eval_set=val_data,
              early_stopping_rounds=early_stopping_rounds,
              use_best_model=True,
              plot=False)
    
    # モデルファイルの保存
    model.save_model(local_file_name)

    # SHAP値のプロット
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X)
    plt.figure()
    shap.summary_plot(shap_values, X, show=False)
    plt.savefig(f"shap_summary_plot.png", format="png")
    plt.close() 

    # 学習曲線のプロット
    train_error = model.get_evals_result()["learn"]["RMSE"]
    valid_error = model.get_evals_result()["validation"]["RMSE"]
    plt.figure(figsize=(10, 6))
    plt.plot(train_error, label="Train RMSE")
    plt.plot(valid_error, label="Validation RMSE")
    plt.xlabel("Iterations")
    plt.ylabel("RMSE")
    plt.title(f"Learning Curve")
    plt.legend()
    plt.grid(True)
    plt.savefig("learning_curve.png")
    plt.close()

    # 特徴量の重要度をcsv形式で出力
    feature_names = model.feature_names_
    categorical_features_indices = model.get_cat_feature_indices()
    categorical_features = [feature_names[i] for i in categorical_features_indices]
    feature_importances = model.get_feature_importance()
    info_list = []
    for feature_name, importance in zip(feature_names, feature_importances):
        #Categorical or Numericalも出力
        feature_type = "Categorical" if feature_name in categorical_features else "Numerical"
        info_list.append([feature_name, feature_type, f"{importance:.6f}"])
    info_df = pd.DataFrame(info_list)
    info_df.columns = ["feature_name", "type", "importance"]# 特徴量の名前、型、重要度を表示
    info_df.to_csv("feature_importance.csv", index=False)
    return


def post():
    try:
        train()
        return JSONResponse(status_code=200, content={"message": "OK"})
    except:
        stack_trace = traceback.format_exc()
        print(stack_trace)
        return JSONResponse(status_code=500, content={"message": "NG"})

処理に成功すると、FastAPIのUI画面にはOKが表示され、エラーが発生すると、NGが表示されます。
ターミナルには学習の進行状況が出力され、学習が完了すると、検証データセットの最良スコア(bestTest)とそのイテレーション数(bestIteration)が出力されます。

出力される学習曲線の例


※SHAPのサマリープロットと特徴量のcsvファイルのサンプルは掲載を割愛します。

main.pyにおいて、POST /trainに対しパラメータを定義することで、パラメータを変更しながら、様々な学習のパターンを検証することも可能です。

推論と評価の実行

POST /predictエンドポイントを実行するとCatBoostによる推論が実行されます。

predict.pyの例をご紹介します。
predict_features.csvという形で特徴量は作成済みであることを前提にしています。
回帰モデルの評価の際には、機械学習により算出された予測値と、機械学習を用いずに算出したベースライン値を比較することがよくありますが、以下の例では、その比較のためのコードも含めています。評価指標としてRMSE・決定係数・相関係数・MSLEを利用しており、実測値vs予測値の散布図も出力しています。

predict.py
import traceback

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from catboost import CatBoostRegressor, Pool
from fastapi.responses import JSONResponse
from sklearn.metrics import root_mean_squared_error, r2_score


def predict():
    """
    推論する

    Returns
    ----------
    pandas.DataFrame
    """
    # 特徴量と目的変数の準備
    df = pd.read_csv("predict_features.csv")
    local_file_name = "model_file.cbm"

    frame_cols = ["contents_id", "contents_name"] # 予測値の各レコードを識別するための情報のカラム名リスト
    feature_cols = ["column1", "column2", "column3", "column4", "column5"] # 特徴量として利用するカラム名リスト
    categorical_cols = ["column1", "column2"] # feature_colsのうち、カテゴリカル特徴量のリスト
    target_col = "target_column" # 目的変数のカラム名
    base_col = "base_col" #予測値の比較対象となるベースライン値

    X = df[feature_cols]
    y = df[[target_col]].reset_index(drop=True)
    z = df[[base_col]].reset_index(drop=True)
    frame_df = df[frame_cols].reset_index(drop=True)

    # fit
    model = CatBoostRegressor()
    model.load_model(local_file_name)

    feature_data = Pool(X, cat_features=categorical_cols)
    predicts = model.predict(feature_data)
    predict_df = pd.DataFrame(predicts)

    # 予測値・ベースライン値・実測値を横結合してcsv出力
    result_df = pd.concat([frame_df, y, z, predict_df], axis=1)
    result_df.columns = frame_cols + ["actual", "baseline", "prediction"]
    result_df["base_diff"] = (result_df["actual"] - result_df["baseline"]).abs()
    result_df["pred_diff"] = (result_df["actual"] - result_df["prediction"]).abs()
    result_df.to_csv("predicts.csv", index=False)
    return result_df


def score(result_df):
    """
    精度評価する

    Parameters
    ----------
    result_df: pandas.DataFrame
    """
    # 予測値vs実測値の散布図の作成(データに応じて、プロット範囲は修正してください)
    plt.scatter(result_df["actual"], result_df["prediction"], s=10, alpha=0.5)
    plt.gca().set_aspect("equal", adjustable="box")
    plt.xlabel("actual")
    plt.ylabel("prediction")
    plt.xlim(0, 100)
    plt.ylim(0, 100)
    plt.xticks(range(0, 110, 10))
    plt.yticks(range(0, 110, 10))
    plt.plot([0, 100], [0, 100], color="gray", linestyle="--")
    plt.grid(True)
    plt.title("actual vs prediction")
    plt.savefig("scatter_plot.png")
    plt.close()

    actual_column = "actual"
    base_column = "baseline"
    predict_column = "prediction"

    rmse_p = root_mean_squared_error(result_df[actual_column], result_df[predict_column])
    r2_p = r2_score(result_df[actual_column], result_df[predict_column])
    correlation_coefficient_p = np.corrcoef(result_df[actual_column], result_df[predict_column])[0, 1]
    msle_p = np.mean((np.log1p(result_df[actual_column]) - np.log1p(result_df[predict_column])) ** 2)

    rmse_b = root_mean_squared_error(result_df[actual_column], result_df[base_column])
    r2_b = r2_score(result_df[actual_column], result_df[base_column])
    correlation_coefficient_b = np.corrcoef(result_df[actual_column], result_df[base_column])[0, 1]
    msle_b = np.mean((np.log1p(result_df[actual_column]) - np.log1p(result_df[base_column])) ** 2)

    eval_df = pd.DataFrame({
        "eval_items": ["RMSE", "R^2", "Correlation Coefficient", "MSLE"],
        "actual vs prediction": [f"{rmse_p:.3f}", f"{r2_p:.3f}", f"{correlation_coefficient_p:.3f}", f"{msle_p:.3f}"],
        "actual vs baseline": [f"{rmse_b:.3f}", f"{r2_b:.3f}", f"{correlation_coefficient_b:.3f}", f"{msle_b:.3f}"]
    })
    eval_df.to_csv("eval.csv", index=False)
    return


def post():
    try:
        result_df = predict()
        score(result_df)
        return JSONResponse(status_code=200, content={"message": "OK"})
    except:
        stack_trace = traceback.format_exc()
        print(stack_trace)
        return JSONResponse(status_code=500, content={"message": "NG"})

実測値と予測値の散布図の例


※予測値・ベースライン値・実測値のcsvファイルと、各評価指標のcsvファイルのサンプルは掲載を割愛します

CPU過負荷による学習停止への対処

以下の条件で学習実行時に、処理が途中停止する事象が頻発しました。

  • ホスト環境
    • macOS: Sonoma 14.6.1
    • CPUコア数: 12(パフォーマンス: 6、効率性: 6)
    • メモリ: 36 GB
  • Dockerのversion: 27.2.0
  • 特徴量のデータサイズ: 6526行×111列の2.39MBのpandas.DataFrame

この時、FastAPIのUI上では、実行したエンドポイントのLoadingが続いた状態となり、

ターミナルへの学習進行状況の出力もストップしました。

Docker Desktopにより、コンテナのリソースの使用状況を確認したところ、メモリの使用には余力があったものの、CPUの使用が逼迫していました。
CatBoostの学習のハイパーパラメータの1つとして、使用するスレッド数を設定するthread_countがあります。
指定していない状態(デフォルト)では、thread_count=プロセッサコア数となっています。

https://catboost.ai/en/docs/references/training-parameters/performance

この値をDocker Desktopに割り当てているCPUのコア数に対し、小さめの数値とすることで、コンテナクラッシュは発生しなくなりました。
以下は、thread_countを6に設定した例です。

predict.py
# fit
params = {
    "iterations": 5000,
    "learning_rate": 0.01,
    "loss_function": "RMSE",
    "task_type": "CPU",
    "thread_count": 6
}
model = CatBoostRegressor(**params)
model.fit(train_data,
          eval_set=val_data,
          early_stopping_rounds=early_stopping_rounds,
          use_best_model=True,
          plot=False)

メモリが逼迫しているようであれば、Docker Desktopに割り当てるメモリ数やCatBoostのハイパーパラメータのused_ram_limit等を見直すことが対処法となります。

まとめ

本記事では、Docker、FastAPI、CatBoostを活用して回帰モデルの構築と評価を行う手法を紹介しました。この構成により、様々なデータに対し、手軽かつ効率的にモデルのトレーニングと推論を行うことができます。

We are hiring!

ケアネットでは一緒に働く仲間を募集しています。詳しくは以下をご覧ください。
https://hrmos.co/pages/carenet5800

CareNet Engineers

Discussion