Databricks上でscikit-learnを高速化する ~ハイパーパラメーターチューニング編~
Databricks上でscikit-learnを高速化する ~ハイパーパラメーターチューニング編~
前回はDatabricks上にシングルノードを立ち上げ、Intel(R) Extension for Scikit-learn*を用いて、scikit-learnベースのモデル学習処理の高速化を検証しました。
今回はマルチノード環境、つまり、Sparkクラスター上で、同様に性能が上がるかをみて行こうと思います。かつ、今回はHyperopt
とSparkTrials
を使ったハイパーパラメーターチューニングの処理を対象として、その性能向上を試みます。
Hyperopt と SparkTrials について
HyperoptはOSSの最適化エンジンで、主にハイパーパラメーターチューニングなどに使用されます。現在は下記3つのアルゴリズムが実装されています。
- Random Search
- Tree of Parzen Estimators (TPE)
- Adaptive TPE
TPEというのはベイズ最適化の一種であり、従来のガウス過程ベースのものよりもさらに良い結果が期待できる手法です。TPEを実装している同様のOSSエンジンとしてOptunaなどがあります。他にも2023年4月にOSS化されたSigOptなどもあり、面白い領域なので、改めて記事にしたいと思います。
TPEに関してはこちらが参考になります。
なお、上記3つのアルゴリズムは以下2つの方法で並列化することが可能です。
- Apache Spark
- MongoDB
というわけでSparkクラスター上で並列化させたいのですが、その際に用いられるのがSparkTrials
です。
SparkTrialsはDatabricksが開発したAPIで、Sparkクラスター上でHyperoptのチューニング処理を並列実行するためのAPIで、ワーカーノード上で並列にパラメーターチューニングを実行します。
Hyperopt と SparkTrials についてはこちらもぜひ参照ください。
Intel(R) Extension for Scikit-learn* とは
その名の通り、インテルがインテル CPUとインテル GPU上でのアルゴリズム実行の高速化のために最適化を行なったScikit-learnモジュールです。これ単体で動くわけではなく、オリジナルのscikit-learnのアドオンモジュールという形で動作します。したがって、前提として、オリジナルがインストールされている必要があります。
環境
本ブログではAzure Databricksを用いていますが、AWS版、GCP版のいずれでも再現可能かと思います。
- Databricks Runtime: 13.3 LTS ML (includes Apache Spark 3.4.1, Scala 2.12, scikit-learn 1.1.1 pre-installed)
- ノードタイプ: Azure Standard Ds_V5シリーズ(ドライバー 1ノード + ワーカー 4ノード)
- 主要ライブラリ: Intel(R) Extension for Scikit-learn*
制約
Intel(R) Extension for Scikit-learn*
ではscikit-learnのすべてのアルゴリズムが最適化されているわけではありません。こちらのURL に記載されているアルゴリズムのみ、性能向上が期待できます。今回は「LogisticRegression」を試していきます。
また、原則インテル社のCPUおよびGPUベースのインスタンスのみサポートされます。(2023年10月18日現在、AWS/Azure/GCPではインテルGPUインスタンスというのは存在しないので、実質CPUインスタンスのみが¥対象)。AMD CPU、または、NVIDIA GPUのインスタンスでは動作が保証されていませんのでご注意を。
コード
ベースとしてこちらの公式サンプルを利用しました。
その上で、以下の点を追加・変更しました。
-
Intel(R) Extension for Scikit-learn*
をインストール
%pip install scikit-learn-intelex
- Pythonランタイムの再起動
dbutils.library.restartPython()
- データ拡張
元のノートブックで使用しているデータのレコード数が6,497だけなので、最適化前でも一瞬で学習が終了してしまい、性能差が出づらいと考え、レコード数を100倍に増幅しました。
def data_augmentation(df, scale : int):
new_df = None
for i in range(scale):
new_df = pd.concat([new_df, df], axis=0)
print(f"The data is successfully augumented from {df.shape} to {new_df.shape}")
return new_df
#レコード数に100倍の増幅
data_df = data_augmentation(data_df, 100)
- 使用するアルゴリズムを元の「GradientBoostingClassifier」から「LogisticRegression」に変更
with mlflow.start_run(nested=True):
# model_hp = sklearn.ensemble.GradientBoostingClassifier(
model_hp = sklearn.linear_model.LogisticRegression(
random_state=0,
max_iter=10000,
n_jobs=-1,
**params
)
- 当然探索スペースもアルゴリズムに合わせて変更。
# Define the search space to explore
search_space = {
# 'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)),
# 'learning_rate': hp.loguniform('learning_rate', -3, 0),
# 'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)),
'C': hp.loguniform('x_C',-10,1),
'tol': hp.loguniform('x_tol',-13,-1),
'l1_ratio': hp.uniform('x_l1',0,1)
}
-
Intel(R) Extension for Scikit-learn*
のインポートおよびアクティベーション追加
各ワーカーノードで実行されるtrain_model関数の先頭でアクティベーションを実施
def train_model(params):
from sklearnex import patch_sklearn
patch_sklearn()
・・・
というわけで、以下のようなコードが出来上がります。SparkTrialsのparallelism
パラメーターはワーカーノード数にあわせて4に設定。(parallelism
とHyperoptのmax_evals
の設定に関してはそれなりに奥深いので、こちらも改めて記事化しようと思います。)
# Define the search space to explore
search_space = {
# 'n_estimators': scope.int(hp.quniform('n_estimators', 20, 1000, 1)),
# 'learning_rate': hp.loguniform('learning_rate', -3, 0),
# 'max_depth': scope.int(hp.quniform('max_depth', 2, 5, 1)),
'C': hp.loguniform('x_C',-10,1),
'tol': hp.loguniform('x_tol',-13,-1),
'l1_ratio': hp.uniform('x_l1',0,1)
}
def train_model(params):
# 各ワーカー上でIntel scikit-learnをインポートして、アクティベート
from sklearnex import patch_sklearn #追加
patch_sklearn() #追加
# 各ワーカー上でautologgingをイネーブル
mlflow.autolog()
with mlflow.start_run(nested=True):
# GradientBoostingClassifierは最適化されていないため、LogisticRegressionに変更
# model_hp = sklearn.ensemble.GradientBoostingClassifier(
model_hp = sklearn.linear_model.LogisticRegression(
random_state=0,
max_iter=10000,
n_jobs=-1,
**params
)
model_hp.fit(X_train, y_train)
predicted_probs = model_hp.predict_proba(X_test)
# テストAUCに基づいてチューニングする
# 本番環境では、代わりに別の検証セットを使うこともできる
roc_auc = sklearn.metrics.roc_auc_score(y_test, predicted_probs[:,1])
mlflow.log_metric('test_auc', roc_auc)
# 損失を-1*auc_scoreに設定し、fminがauc_scoreを最大化するようにする
return {'status': STATUS_OK, 'loss': -1*roc_auc}
# SparkTrialsはSparkワーカーを使ってチューニングを分散する。
# 並列性を高めると処理が高速化するが、各ハイパーパラメータのトライアルは他のトライアルからの情報が少なくなる
# 小規模なクラスタやDatabricks Community Editionでは、以下のように設定してみてください。parallelism=2
spark_trials = SparkTrials(
parallelism=4
)
with mlflow.start_run(run_name='lr_hyperopt') as run:
# Hyperoptを使用して、ベストなAUCをもたらすパラメーターを見つける。
best_params = fmin(
fn=train_model,
space=search_space,
algo=tpe.suggest,
max_evals=32,
trials=spark_trials)
結果
以下が結果です。Standard_D16s_v5を用いて検証しています。
ノードタイプ | vCPU (物理コア数) | ワーカーノード数 | scikit-learnのみ (分) | scikit-learn + Intel scikit-learn (分) |
---|---|---|---|---|
Standard_D16s_v5 | 16 (8) | 4 | 5.11 | 1.81 |
https://learn.microsoft.com/ja-jp/azure/virtual-machines/dv5-dsv5-series
前回と同様にIntel scikit-learnを使用した方が、処理時間が大幅に短くなっています。一回あたりの試行(モデル学習)時間が短くなっているので当然と言えば当然ですね。
今回は時間の都合で1種類のノードタイプしか試してませんが、ノードをスケールアップして、コア数を増やすとさらに性能向上の割合が高くなることが期待できます。
シングルノード編と同様に、VMサイズを変えずとも、ライブラリを一つ追加して、コードをわずかに修正するだけで、大きな性能向上が実現できたので、TCO削減に向けた有効なオプションの一つとして活用できるかと思います。
ぜひ皆様のDatabricks環境でもお試しください。
フルのコードはこちら
ただ、何度も言いますが、性能向上が期待できるアルゴリズムは限定的なので、公式ドキュメントを参照し、自身が使用しているアルゴリズムがリストに含まれているかをご確認ください。
次は、AutoMLでも試してみようと思います。
BFN!
参考
Use scikit-learn on Databricks
Intel(R) Extension for Scikit-learn*
Databricks無料トライアル
Discussion