身近なデータで試すPythonの機械学習! その2 お住まいの地域の不動産取引価格 続編 2 -ハイパーパラメータ自動最適化!
こんにちは!これまでの記事では、不動産取引価格のデータを用いて、
1. 複数の機械学習モデルを試し、性能を比較する基本フロー
2. 特定のモデル(LightGBM)に焦点を当て、学習・評価・保存する詳細手順
をご紹介しました。
前回の記事の最後では、モデル性能をさらに向上させるための一手として「ハイパーパラメータチューニング」の重要性に触れました。モデルの性能は、その内部設定であるハイパーパラメータに大きく左右されます。しかし、これらのパラメータを手動で調整するのは非常に手間がかかり、最適な組み合わせを見つけるのは至難の業です。
そこで今回は、自動ハイパーパラメータ最適化フレームワークであるOptunaを使って、LightGBMモデルの性能を最大限に引き出す方法を解説します。Optunaを利用することで、効率的に最適なハイパーパラメータの組み合わせを探索できます。
今回のプロセスの流れ
1. データ読み込みと前処理: 前回と同様の処理です。
2. 特徴量とターゲットの指定、データ分割、標準化: これも前回と同じです。
3. Optunaによるハイパーパラメータチューニング:
目的関数の定義: Optunaが試行錯誤するための評価関数を定義します。
探索の実行: Optunaに目的関数と試行回数を指定し、ハイパーパラメータの探索を開始させます。
4. 最適化されたパラメータでのLightGBMモデルの学習と評価:
Optunaが見つけた最も良いハイパーパラメータの組み合わせを使って、最終的なモデルを学習させ、テストデータで性能を評価します。
5. モデルとスケーラーの保存: 最適化されたモデルとスケーラーをファイルに保存します。
コード紹介
それでは、Optunaを組み込んだコードを見ていきましょう。
1. 設定とライブラリのインポート(Optuna追加)
まず、optuna をインポートし、Optunaの試行回数などの設定を追加します。
import pandas as pd
import numpy as np
# ... (他のインポートは前回と同じ) ...
import optuna # Optunaのインポート
from sklearn.model_selection import train_test_split, cross_val_score # クロスバリデーション用
# --- 設定 ---
# ... (FILE_PATH, TARGET_COLUMN などは前回と同じ) ...
N_OPTUNA_TRIALS = 50 # Optunaの試行回数 (環境に応じて調整)
CV_FOLDS = 3 # Optunaの目的関数内で使用するクロスバリデーションの分割数
RANDOM_STATE_SPLIT = 34 # データ分割を固定
N_OPTUNA_TRIALS でOptunaが試行する回数を指定します。回数が多いほど良いパラメータが見つかる可能性が高まりますが、時間もかかります。CV_FOLDS は、Optunaの各試行内でモデルの性能を評価する際のクロスバリデーションの分割数です。
2. データ読み込みと前処理関数 (load_and_preprocess_data)
この関数は前回と同じです。和暦変換、築年数計算、欠損値処理などを行います。
(コードは下記の修正コード全体を参照してください)
3. Optunaの目的関数 (objective)
ここがOptuna導入の核となる部分です。objective という名前の関数を定義し、引数に trial を取ります。この trial オブジェクトを通じて、Optunaは様々なハイパーパラメータの値を提案します。
# Optunaの目的関数
def objective(trial: optuna.trial.Trial, X_train_std: np.ndarray, y_train: pd.Series) -> float:
"""Optunaの目的関数。指定されたハイパーパラメータでLGBMを学習し、評価値を返す。"""
# 探索するハイパーパラメータの範囲を定義
params = {
'objective': 'regression_l1', # 目的関数の一つ
'metric': 'rmse', # 評価指標
'random_state': 42,
'n_estimators': trial.suggest_int('n_estimators', 100, 1000, step=100),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'num_leaves': trial.suggest_int('num_leaves', 20, 300),
'max_depth': trial.suggest_int('max_depth', 3, 12),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'subsample': trial.suggest_float('subsample', 0.4, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.4, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-3, 10.0, log=True), # L1
'reg_lambda': trial.suggest_float('reg_lambda', 1e-3, 10.0, log=True), # L2
'boosting_type': 'gbdt',
'verbose': -1, # LightGBMのログ出力を抑制
}
model = LGBMRegressor(**params)
# クロスバリデーションで評価 (R2スコアを最大化する)
scores = cross_val_score(model, X_train_std, y_train, cv=CV_FOLDS, scoring='r2', n_jobs=-1)
r2_mean = np.mean(scores)
return r2_mean # R2スコアを返す
trial.suggest_int, trial.suggest_float を使って、各ハイパーパラメータの探索範囲(最小値、最大値、ステップ、対数スケールなど)を指定します。Optunaはこの範囲内で値をサンプリングして試行します。今回は、学習データ (X_train_std, y_train) を使ってクロスバリデーションを行い、その平均R2スコアを関数の返り値(Optunaが最大化または最小化を目指す値)としています。scoring='r2' なので、R2スコアが高いほど良いと評価されます。
4. メイン処理:Optunaの実行と最終モデルの学習
main 関数内で、データ準備後にOptunaの最適化プロセスを実行します。
def main():
try:
# 1. データ読み込みと前処理
df_processed = load_and_preprocess_data(FILE_PATH)
X = df_processed[FEATURE_COLUMNS]
y = df_processed[TARGET_COLUMN]
# 2. データ分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE_SPLIT
)
# 3. データ標準化
scaler = StandardScaler()
X_train_std = scaler.fit_transform(X_train)
X_test_std = scaler.transform(X_test)
# 4. Optunaによるハイパーパラメータチューニング
print(f"\nOptunaによるハイパーパラメータチューニングを開始します (試行回数: {N_OPTUNA_TRIALS})...")
# 目的関数に学習データとターゲット変数を渡せるようにlambda式でラップ
objective_with_data = lambda trial: objective(trial, X_train_std, y_train)
study = optuna.create_study(direction="maximize") # R2スコアを最大化するよう設定
study.optimize(objective_with_data, n_trials=N_OPTUNA_TRIALS) # 最適化実行
print("\nハイパーパラメータチューニング完了。")
print(f"最良試行 Trial {study.best_trial.number}:")
print(f" Value (Mean CV R2 score): {study.best_trial.value:.4f}") # 最良スコア
print(" Best Parameters: ") # 見つかった最良のパラメータ
for key, value in study.best_params.items():
print(f" {key}: {value}")
# 5. 最適なハイパーパラメータで最終モデルを学習・評価・保存
train_final_model_and_save(
X_train_std, y_train, X_test_std, y_test, study.best_params, scaler
)
# ... (エラーハンドリング) ...
if __name__ == "__main__":
optuna.logging.set_verbosity(optuna.logging.INFO) # Optunaのログを見やすく設定
main()
optuna.create_study(direction="maximize") で、目的関数の返り値(今回はR2スコア)を最大化するスタディを作成します。
study.optimize(objective_with_data, n_trials=N_OPTUNA_TRIALS) で、指定した回数だけ目的関数を実行し、ハイパーパラメータを探索します。objective_with_data は、objective 関数に学習データ (X_train_std, y_train) を渡すためのものです。
最適化後、study.best_trial.value で最も良かったスコア、study.best_params でその時のハイパーパラメータの組み合わせを取得できます。
5. 最適パラメータでの最終モデル学習・保存関数 (train_final_model_and_save)
Optunaが見つけた最適なパラメータを使って、改めてモデルを学習し、テストデータで評価後、保存します。
def train_final_model_and_save(
X_train_std: np.ndarray,
y_train: pd.Series,
X_test_std: np.ndarray,
y_test: pd.Series,
best_params: Dict[str, Any],
scaler: StandardScaler
) -> None:
"""最適なハイパーパラメータで最終モデルを学習し、評価・保存する。"""
print("\n最適なハイパーパラメータで最終モデルを学習中...")
# Optunaが見つけたパラメータに固定値を追加
final_params = {**best_params}
# objective, metric など、Optunaの探索対象外だがモデルに必要なパラメータを補完
final_params.setdefault('objective', 'regression_l1')
final_params.setdefault('metric', 'rmse')
final_params.setdefault('random_state', 42)
final_params.setdefault('boosting_type', 'gbdt')
final_params['verbose'] = -1
final_model = LGBMRegressor(**final_params)
final_model.fit(X_train_std, y_train) # 全学習データで再学習
print("最終モデル学習完了。")
# ... (評価と保存のロジックは前回とほぼ同様、ファイル名が _optimized に変わる程度) ...
# (上記コード全体を参照してください)
y_pred_test = final_model.predict(X_test_std)
r2_test_final = r2_score(y_test, y_pred_test)
rmse_test_final = np.sqrt(mean_squared_error(y_test, y_pred_test))
# ... (学習データでの評価も同様) ...
print("\n--- 最終モデル評価結果 (最適化後) ---")
print(f"テストデータ: R2スコア = {r2_test_final:.4f}, RMSE = {rmse_test_final:.2f}")
# モデルとスケーラーの保存 (ファイル名を変更)
model_filename_joblib = 'lgbm_optimized_model.joblib'
# ... (他も同様) ...
joblib.dump(final_model, model_filename_joblib, compress=3)
# ...
print("最終モデルとスケーラーの保存が完了しました。")
study.best_params にはOptunaが探索したパラメータのみが含まれるため、objective や metric など、モデルの動作に必要な固定のパラメータをここで追加しています。そして、学習データ全体 (X_train_std, y_train) を使って最終モデルを学習し、テストデータ (X_test_std, y_test) で性能を評価、最後にモデルとスケーラーを保存します。
実行結果のイメージ
コードを実行すると、Optunaが各試行の結果をログに出力します。たとえば今回の検討では下記のような出力がえられました。
前処理後のデータ行数: 432
学習データ数: 302, テストデータ数: 130
Optunaによるハイパーパラメータチューニングを開始します (試行回数: 100)...
[I 2025-##-## 14:07:28,329] Trial 0 finished with value: 0.7406282360071401 and parameters: {'n_estimators': 200, 'learning_rate': 0.2985925634955298, 'num_leaves': 231, 'max_depth': 9, 'min_child_samples': 96, 'subsample': 0.7942615755172218, 'colsample_bytree': 0.7665204834287692, 'reg_alpha': 1.0087250580639948, 'reg_lambda': 0.6562990330966009}. Best is trial 0 with value: 0.7406282360071401.
... (指定したN_OPTUNA_TRIALS回繰り返される) ...
ハイパーパラメータチューニング完了。
最良試行 Trial ##:
Value (Mean CV R2 score): 0.7937
Best Parameters:
n_estimators: 800
learning_rate: 0.040601983708259536
num_leaves: 173
max_depth: 3
min_child_samples: 5
subsample: 0.8820410196652197
colsample_bytree: 0.46936779751384383
reg_alpha: 0.005893088554330475
reg_lambda: 0.1348334963768214
最適なハイパーパラメータで最終モデルを学習中...
最終モデル学習完了。
--- 最終モデル評価結果 (最適化後) ---
学習データ: R2スコア = 0.8896, RMSE = 3282142.68
テストデータ: R2スコア = 0.8023, RMSE = 4529278.58
最終モデルを 'lgbm_optimized_model.joblib' (joblib) と 'lgbm_optimized_model.pkl' (pickle) に保存中...
スケーラーを 'lgbm_optimized_scaler.joblib' (joblib) と 'lgbm_optimized_scaler.pkl' (pickle) に保存中...
最終モデルとスケーラーの保存が完了しました。
最終モデルとスケーラーの保存が完了しました。
最終的なテストデータのR2スコアが、Optunaを使わなかった前回と比較して改善していれば、ハイパーパラメータチューニングの効果があったと言えますが、今回のデータでは大きな変化はありませんでした。
まとめ
今回は、Optunaを用いてLightGBMモデルのハイパーパラメータを自動で最適化し、モデル性能の向上を目指す方法をご紹介しました。Optunaは非常に強力なツールであり、複雑なパラメータ空間から効率的に良い解を見つけ出してくれます。
不動産価格予測は奥が深く、様々なアプローチが考えられます。Optunaのようなツールを使いこなし、データと向き合い続けることで、きっとより良いモデルにたどり着けるはずです。この記事が、皆さんの挑戦の一助となれば幸いです!
コードの全体の記載
import pandas as pd
import numpy as np
import random
import joblib
import pickle
import os
import optuna # Optunaのインポート
from jeraconv import jeraconv
from IPython.display import display
from lightgbm import LGBMRegressor
from sklearn.model_selection import train_test_split, cross_val_score # クロスバリデーション用
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
from typing import Tuple, Dict, Any
# --- 設定 ---
FILE_PATH = "###############.csv" # ご自身のデータファイルパスに置き換えてください
TARGET_COLUMN = '取引価格(総額)'
FEATURE_COLUMNS = [
'最寄駅:距離(分)', '面積(㎡)', '間口',
'延床面積(㎡)', '前面道路:幅員(m)', '築年数'
]
WAREKI_COLUMN = '建築年'
TRADE_TIMING_COLUMN = '取引時点'
NEW_SEIREKI_COLUMN = '建築年_西暦'
NEW_TRADE_YEAR_COLUMN = '取引年'
NEW_AGE_COLUMN = '築年数'
TEST_SIZE = 0.3
RANDOM_STATE_SPLIT = 99 # データ分割時の乱数シード (Optunaの各試行で共通の分割にするため固定)
N_OPTUNA_TRIALS = 100 # Optunaの試行回数 (時間と精度に応じて調整)
CV_FOLDS = 5 # Optunaの目的関数内で使用するクロスバリデーションの分割数
# --- 関数定義 ---
def load_and_preprocess_data(filepath: str) -> pd.DataFrame:
"""
CSVファイルを読み込み、基本的な前処理を行います。
(前回のコードと同じなので、ここでは省略。上記コードを参照してください)
"""
print(f"'{filepath}' からデータを読み込んでいます...")
if not os.path.exists(filepath):
raise FileNotFoundError(f"エラー: ファイルが見つかりません: {filepath}")
try:
df = pd.read_csv(filepath, encoding='utf-8')
except UnicodeDecodeError:
print("UTF-8での読み込みに失敗。'cp932'で再試行します...")
df = pd.read_csv(filepath, encoding='cp932')
df_processed = df.copy()
j2w = jeraconv.J2W()
def convert_wareki_to_seireki_val(wareki_val):
if pd.isna(wareki_val) or not isinstance(wareki_val, str):
return np.nan
try:
if "元年" in wareki_val:
wareki_val = wareki_val.replace("元年", "1年")
if wareki_val[-1].isdigit() and not wareki_val.endswith("年"):
era_part = ""
year_part_str = ""
for char_idx, char_val in enumerate(wareki_val):
if char_val.isdigit():
era_part = wareki_val[:char_idx]
year_part_str = wareki_val[char_idx:]
break
if era_part and year_part_str:
wareki_val = era_part + year_part_str + "年"
return j2w.convert(wareki_val)
except Exception:
return np.nan
if WAREKI_COLUMN in df_processed.columns:
df_processed[NEW_SEIREKI_COLUMN] = df_processed[WAREKI_COLUMN].apply(convert_wareki_to_seireki_val)
else:
print(f"警告: '{WAREKI_COLUMN}' カラムが見つかりません。")
df_processed[NEW_SEIREKI_COLUMN] = np.nan
if TRADE_TIMING_COLUMN in df_processed.columns:
df_processed[NEW_TRADE_YEAR_COLUMN] = df_processed[TRADE_TIMING_COLUMN].astype(str).str.extract(r'(\d{4})').astype(float)
else:
print(f"警告: '{TRADE_TIMING_COLUMN}' カラムが見つかりません。")
df_processed[NEW_TRADE_YEAR_COLUMN] = np.nan
valid_years_mask = df_processed[NEW_SEIREKI_COLUMN].notna() & df_processed[NEW_TRADE_YEAR_COLUMN].notna()
df_processed[NEW_AGE_COLUMN] = np.nan
df_processed.loc[valid_years_mask, NEW_AGE_COLUMN] = (
df_processed.loc[valid_years_mask, NEW_TRADE_YEAR_COLUMN] -
df_processed.loc[valid_years_mask, NEW_SEIREKI_COLUMN] + 1
)
df_processed.loc[df_processed[NEW_AGE_COLUMN] < 0, NEW_AGE_COLUMN] = 0
cols_to_convert = FEATURE_COLUMNS + [TARGET_COLUMN]
for col in cols_to_convert:
if col in df_processed.columns:
if df_processed[col].dtype == 'object':
df_processed[col] = df_processed[col].astype(str).str.extract(r'(\d+\.?\d*)')[0]
df_processed[col] = pd.to_numeric(df_processed[col], errors='coerce')
df_cleaned = df_processed.dropna(subset=FEATURE_COLUMNS + [TARGET_COLUMN])
print(f"前処理後のデータ行数: {len(df_cleaned)}")
if len(df_cleaned) == 0:
raise ValueError("エラー: 前処理とNaN除去の結果、使用できるデータが残りませんでした。")
return df_cleaned
# Optunaの目的関数
def objective(trial: optuna.trial.Trial, X_train_std: np.ndarray, y_train: pd.Series) -> float:
"""Optunaの目的関数。指定されたハイパーパラメータでLGBMを学習し、評価値を返す。"""
# 探索するハイパーパラメータの範囲を定義
params = {
'objective': 'regression_l1', # MAEを目的関数の一つとしてみる(RMSEに近いが外れ値にやや頑健)
'metric': 'rmse', # 評価指標はRMSE
'random_state': 99,
'n_estimators': trial.suggest_int('n_estimators', 100, 1000, step=100),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'num_leaves': trial.suggest_int('num_leaves', 20, 300),
'max_depth': trial.suggest_int('max_depth', 3, 12),
'min_child_samples': trial.suggest_int('min_child_samples', 5, 100),
'subsample': trial.suggest_float('subsample', 0.4, 1.0), # bagging_fraction と同じ
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.4, 1.0),
'reg_alpha': trial.suggest_float('reg_alpha', 1e-3, 10.0, log=True), # L1正則化
'reg_lambda': trial.suggest_float('reg_lambda', 1e-3, 10.0, log=True), # L2正則化
'boosting_type': 'gbdt',
# verbose=-1 でLightGBMのログ出力を抑制 (Optunaのログが見やすくなる)
'verbose': -1,
}
model = LGBMRegressor(**params)
# クロスバリデーションで評価 (R2スコアを最大化する)
# Optunaはデフォルトで最小化を目指すため、R2スコアのマイナスを返すか、
# もしくは study の direction="maximize" を指定する。今回は後者を採用。
# scoring='r2'でR2スコア、'neg_mean_squared_error'でMSEの負の値
scores = cross_val_score(model, X_train_std, y_train, cv=CV_FOLDS, scoring='r2', n_jobs=-1)
r2_mean = np.mean(scores)
return r2_mean # R2スコアを返す (Optunaのstudyでdirection='maximize'を指定)
def train_final_model_and_save(
X_train_std: np.ndarray,
y_train: pd.Series,
X_test_std: np.ndarray,
y_test: pd.Series,
best_params: Dict[str, Any],
scaler: StandardScaler
) -> None:
"""最適なハイパーパラメータで最終モデルを学習し、評価・保存する。"""
print("\n最適なハイパーパラメータで最終モデルを学習中...")
# Optunaが見つけたパラメータに固定値を追加(objectiveやmetricなど)
final_params = {**best_params} # best_paramsをコピー
if 'objective' not in final_params:
final_params['objective'] = 'regression_l1'
if 'metric' not in final_params:
final_params['metric'] = 'rmse'
if 'random_state' not in final_params:
final_params['random_state'] = 42
if 'boosting_type' not in final_params:
final_params['boosting_type'] = 'gbdt'
final_params['verbose'] = -1 # ログ抑制
final_model = LGBMRegressor(**final_params)
final_model.fit(X_train_std, y_train)
print("最終モデル学習完了。")
# 最終モデルの評価
y_pred_test = final_model.predict(X_test_std)
y_pred_train = final_model.predict(X_train_std)
r2_test_final = r2_score(y_test, y_pred_test)
rmse_test_final = np.sqrt(mean_squared_error(y_test, y_pred_test))
r2_train_final = r2_score(y_train, y_pred_train)
rmse_train_final = np.sqrt(mean_squared_error(y_train, y_pred_train))
print("\n--- 最終モデル評価結果 (最適化後) ---")
print(f"学習データ: R2スコア = {r2_train_final:.4f}, RMSE = {rmse_train_final:.2f}")
print(f"テストデータ: R2スコア = {r2_test_final:.4f}, RMSE = {rmse_test_final:.2f}")
# モデルとスケーラーの保存
model_filename_joblib = 'lgbm_optimized_model.joblib'
scaler_filename_joblib = 'lgbm_optimized_scaler.joblib'
model_filename_pickle = 'lgbm_optimized_model.pkl'
scaler_filename_pickle = 'lgbm_optimized_scaler.pkl'
print(f"\n最終モデルを '{model_filename_joblib}' (joblib) と '{model_filename_pickle}' (pickle) に保存中...")
joblib.dump(final_model, model_filename_joblib, compress=3)
with open(model_filename_pickle, 'wb') as f:
pickle.dump(final_model, f)
print(f"スケーラーを '{scaler_filename_joblib}' (joblib) と '{scaler_filename_pickle}' (pickle) に保存中...")
joblib.dump(scaler, scaler_filename_joblib, compress=3)
with open(scaler_filename_pickle, 'wb') as f:
pickle.dump(scaler, f)
print("最終モデルとスケーラーの保存が完了しました。")
# --- メイン実行部分 ---
def main():
"""全体の処理を実行するメイン関数。"""
try:
# 1. データ読み込みと前処理
df_processed = load_and_preprocess_data(FILE_PATH)
X = df_processed[FEATURE_COLUMNS]
y = df_processed[TARGET_COLUMN]
# 2. データ分割
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=TEST_SIZE, random_state=RANDOM_STATE_SPLIT
)
print(f"学習データ数: {len(X_train)}, テストデータ数: {len(X_test)}")
# 3. データ標準化
scaler = StandardScaler()
X_train_std = scaler.fit_transform(X_train)
X_test_std = scaler.transform(X_test)
# 4. Optunaによるハイパーパラメータチューニング
print(f"\nOptunaによるハイパーパラメータチューニングを開始します (試行回数: {N_OPTUNA_TRIALS})...")
# 目的関数に渡す引数をlambdaで固定
objective_with_data = lambda trial: objective(trial, X_train_std, y_train)
study = optuna.create_study(direction="maximize") # R2スコアを最大化
study.optimize(objective_with_data, n_trials=N_OPTUNA_TRIALS)
print("\nハイパーパラメータチューニング完了。")
print(f"最良試行 Trial {study.best_trial.number}:")
print(f" Value (Mean CV R2 score): {study.best_trial.value:.4f}")
print(" Best Parameters: ")
for key, value in study.best_params.items():
print(f" {key}: {value}")
# 5. 最適なハイパーパラメータで最終モデルを学習・評価・保存
train_final_model_and_save(
X_train_std, y_train, X_test_std, y_test, study.best_params, scaler
)
except FileNotFoundError as e:
print(e)
except ValueError as e:
print(e)
except Exception as e:
print(f"予期せぬエラーが発生しました: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
# Optunaのログレベルを設定して、INFO(各試行の情報)のみ表示する
optuna.logging.set_verbosity(optuna.logging.INFO)
main()
Discussion