🎰
PySparkによる機械学習の実装
はじめに
Pyspark(Spark MLlib)を用いた機械学習の一連の流れに関する実装を整理する。(scikit-learnはよく見かけるけどPysparkはあんまり見かけない。。。。)
そのため、機械学習自体の中身については触れないし、自身の能力としても触れられない。
概要
SparkのMLlibにおいて機械学習の一連のワークフローを構成する要素は次の3個になる。これらの構成要素を用いて、前処理や学習を実装する。
Transformers
- Dataframeを入力とし、1個以上のカラムを追加したDataframeを出力する。(メソッドはtransform())
- 入出力の処理は変換処理として定義されたもの(つまりはルールベース)が行わる。
- 例えば
- 複数カラムの特徴量を1カラムのベクトル化する(VectorAssemler)
- 学習済みモデルのTransformerでテスト用データを入力として、予測結果を出力する。
Estimators
- fitメソッドにより、引数に指定されたデータフレームに適合したTransformerを生成する。
- 例えばDataframeのデータによりパラメータを学習したモデルをTransformerとして出力する。
- 機械学習のアルゴリズムだけでなく、前処理のOneHotEncoderなどもEstimatorの一つである。
Pipeline
- TransformersとEstimatorsを組み合わせた一連の処理をEstimatorとして出力する。(つまりは前処理から学習までの一連の流を一つのEstimatorとして生成する)
- 生成されたものはEstimatorなのでfitメソッドにより、TransfomerであるPipelineModelを生成する。
実装
- 概要で出てきた構成要素の具体的な実装を見ていく。なお、一連の流れを整理するためであり、すべての実装/機能を網羅的に取り扱うものではない。
- サンプルコードにおけるデータにはこちらの前処理大全でもお世話になったデータ(df_production)を利用する。
- fault_flgを目的変数として、その他3個を説明変数として利用する。
df_production.printSchema()
root
|-- type: string (nullable = true)
|-- length: double (nullable = true)
|-- thickness: double (nullable = true)
|-- fault_flg: boolean (nullable = true)
df_production.show(3)
+----+------------------+------------------+---------+
|type| length| thickness|fault_flg|
+----+------------------+------------------+---------+
| E| 274.0273827080609| 40.24113135955541| false|
| D| 86.31926860506081|16.906714630016268| false|
| E|123.94038830419984|1.0184619943950775| false|
+----+------------------+------------------+---------+
前処理/特徴量エンジニアリング
- カテゴリカル変数の数値化及びOneHotEncoding
- MLlibではOneHotEncoderをはじめ基本、数値データを取り扱うために、カテゴリカル変数は数値化する。
- カテゴリカル変数であるtypeをStringIndexerを用いて数値化する。
- OneHotEncoderを用いてOneHotEncodingを行う。StringIndexerのoutputColがOneHotEncoderのinputColとなる。
- OneHotEncoderの出力はsparseベクトル[1]となる。
- 特徴量(説明変数)の単一ベクトル化(VectorAssembler)
- Pysparkでは学習データの特徴量を単一ベクトル化した状態で渡す必要がある。
- MLlibでは変数名のデフォルトで特徴量/目的変数をfeatures/labelにしているため、基本合わせる。
- その他
- 基本数値データを扱うため、説明変数のfault_flgの数値化する。
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler
indexer = StringIndexer(inputCol="type", outputCol="typeindex")
model = indexer.fit(df_production)
df_production_index = model.transform(df_production)
encoder = OneHotEncoder(inputCol="typeindex", outputCol="typeindexVec")
model_ohe = encoder.fit(df_production_index)
df_production_ohe = model_ohe.transform(df_production_index)
vecAssembler = VectorAssembler(inputCols=['length','thickness','typeindexVec'],outputCol='features')
df_production_pre = vecAssembler.transform(df_production_ohe)
df_production_pre = df_production_pre.withColumn('label',F.col('fault_flg').cast('integer'))
df_production_pre.show(3,truncate=False)
+----+------------------+------------------+---------+---------+-------------+-------------------------------------------------------+-----+
|type|length |thickness |fault_flg|typeindex|typeindexVec |features |label|
+----+------------------+------------------+---------+---------+-------------+-------------------------------------------------------+-----+
|E |274.0273827080609 |40.24113135955541 |false |3.0 |(4,[3],[1.0])|[274.0273827080609,40.24113135955541,0.0,0.0,0.0,1.0] |0 |
|D |86.31926860506081 |16.906714630016268|false |0.0 |(4,[0],[1.0])|[86.31926860506081,16.906714630016268,1.0,0.0,0.0,0.0] |0 |
|E |123.94038830419984|1.0184619943950775|false |3.0 |(4,[3],[1.0])|[123.94038830419984,1.0184619943950775,0.0,0.0,0.0,1.0]|0 |
+----+------------------+------------------+---------+---------+-------------+-------------------------------------------------------+-----+
データ分割
- Pysparkで単純にデータ分割のみを実装するものはrandomSplitのみである。こちらを用いて、開発データとテストデータに分割する。
- 開発データを訓練デートと検証データに分割して学習をするのはCrossValidatorやTrainValidationSplitを利用する想定。(ハイパーパラメータを参照。)
- sklearnのようにk分割のみを行うクラスはなく、交差検証(CrossValidator)のなかでk分割を分割数を指定することにより行う。
df_production_train,df_production_test = df_production_pre.randomSplit([0.8,0.2],seed=42)
学習/予測
- ロジスティック回帰のアルゴリズムを用いて学習/予測を行う。
- 実装の流れをまとめているので、ハイパーパラメータは適当である。
- 概要で記載しように、ロジスティック回帰のアルゴリズムをインスタンス化(lr)し、fitメソッドにより学習したTransformerを生成し、そのTransformerのtransformメソッドにより予測を出力。
- Spark MLlibの分類アルゴリズムでは、予測結果(prediction)だけでなく、rawPrediction(信頼度)とprobability(予測された確率)も合わせて出力する。[2]
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,
featuresCol="features", labelCol="label")
lrmodel = lr.fit(df_production_train)
predictions = lrmodel.transform(df_production_test)
predictions.show(3)
+----+-----------------+------------------+---------+---------+-------------+--------------------+-----+--------------------+--------------------+----------+
|type| length| thickness|fault_flg|typeindex| typeindexVec| features|label| rawPrediction| probability|prediction|
+----+-----------------+------------------+---------+---------+-------------+--------------------+-----+--------------------+--------------------+----------+
| A|76.05711471764573|3.2319566258540813| true| 2.0|(4,[2],[1.0])|[76.0571147176457...| 1|[2.86916073186452...|[0.94630071599045...| 0.0|
| A|77.54570778375123|12.672581204146455| false| 2.0|(4,[2],[1.0])|[77.5457077837512...| 0|[2.86916073186452...|[0.94630071599045...| 0.0|
| A|77.77992746267778| 7.479241990310379| false| 2.0|(4,[2],[1.0])|[77.7799274626777...| 0|[2.86916073186452...|[0.94630071599045...| 0.0|
+----+-----------------+------------------+---------+---------+-------------+--------------------+-----+--------------------+--------------------+----------+
only showing top 3 rows
評価
- 2値分類のため、評価にはBinaryClassificationEvaluatorを用いる。(評価指標は、areaUnderROC(デフォルト)とareaUnderPRの2つがある。)
- areaUnderPR:Computes the area under the precision-recall curve.
- areaUnderROC:Computes the area under the receiver operating characteristic (ROC) curve.
- 評価のためのinputカラムは、デフォルトでrawPredictionCol:'rawPrediction', labelCol:'label'となる。
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('areaUnderROC', evaluator.evaluate(predictions))
print('areaUnderPR', evaluator.evaluate(
predictions, {evaluator.metricName: "areaUnderPR"}))
パイプライン
- 上記の流れをパイプライン化する。それにより処理順序を覚える必要も各インスタンスがtransformerかestimatorであるかも意識する必要がなくなる。
- Pipelineの一連の処理は引数のstagesに指定する。指定できるのものは、TransformersとEstimatorsである。
- Pipelineのインスタンス化でPipelineのEstimatorが生成される。そのEstimatorのfitメソッドでは、各ステージのfitメソッドが実行され(後続にEstimatorを持つstageはtransformしたデータフレームを後続のstageに連携する)、PipelineModelとしてのTransfomerが生成される。
- データ分割前にwithClumnでcastしているものをTransfomerとして実装して、Pipelineに組み込もことも技術上可能ではあるが、正直コスパや保守性含めて現実的な気がしない。
from pyspark.ml.pipeline import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
indexer = StringIndexer(inputCol="type", outputCol="typeindex")
encoder = OneHotEncoder(inputCol="typeindex", outputCol="typeindexVec")
vecAssembler = VectorAssembler(
inputCols=['length', 'thickness', 'typeindexVec'], outputCol='features')
lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8,
featuresCol="features", labelCol="label")
df_production_pre = df_production.withColumn(
'label', F.col('fault_flg').cast('integer'))
df_production_train, df_production_test = df_production_pre.randomSplit([
0.8, 0.2], seed=42)
pipeline = Pipeline(stages=[indexer, encoder, vecAssembler, lr])
pipelineModel = pipeline.fit(df_production_train)
df_production_prediction = pipelineModel.transform(df_production_test)
ハイパーパラメータチューニング
- Spark MLlibにはハイパーパラメータチューニングのライブラリとして、次の2個がある。
- Cross-Validation(交差検証):CrossValidator
- Train-Validation Split(学習データと検証データに分割):TrainValidationSplit
- 上記ライブラリはデータ分割による検証だけでなく、グリッドサーチによるパラメータ探索を合わせて指定できる。
- コードは上段のロジスティック回帰インスタンスとかを用いた形で記載する。なお、グリッドサーチのパラメータは適当である。
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
.addGrid(lr.regParam, [0.01, 0.5, 2.0])
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
.addGrid(lr.maxIter, [1, 5, 10])
.build())
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid,
evaluator=evaluator, numFolds=5)
cvModel = cv.fit(df_production_train)
- CrossValidatorのestimatorにパイプラインの項で生成した前処理まで含めたestimatorであるpipelineを指定することは可能である。
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid,
evaluator=evaluator, numFolds=5)
- 上記の方法の場合、CrossValidatorが評価のたびに結果が変わらない前処理を再度評価することになりパフォーマンスがよくない。そのため、パイプラインの中にCrossValidatorを組み込むことにより改善することができる。
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid,
evaluator=evaluator, numFolds=5)
pipeline = Pipeline(stages=[indexer, encoder, vecAssembler, cv])
Discussion
投稿ありがとうございます.
上記のコードを上から順に実行していくと以下のようなエラー文が出ます.
IllegalArgumentException: features does not exist. Available: type, length, thickness, fault_flg, label, CrossValidator_862d06ef8b5e_rand
理由がわかりますか? Pyspark はバージョン 3.3.0 を使用しました.
投稿のお試しありがとうございます。
本投稿は完全にそのままのコードを上から順に流す想定ではないものもあります。
基本は各実装を機械学習の流れに沿ってスニペット的に記載しているので、本記事を参考にする場合には、それらを利用しつつご自身で一連の流れをコーディングしていただけると幸いです。
また、上記のエラーはどこで発生したものかわかりませんが、内容はエラー文のとおりではと思いました。
cvModel = cv.fit(df_production_train)
の実行時の,エラー文でした.
了解しました.ありがとうございます.