🐥
前処理大全をPysparkで試みる(9章)
はじめに
こちらの記事のつづき
前提
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
9章:カテゴリ型
9-1:カテゴリ型への変換
- Sparkではカテゴリ型が存在しない。そのため、SQLと同様マスターテーブルを作成することにより行う。
- ブール型は存在するため、ブール型を追加する。
マスターテーブル作成
df_mst = df_customer.select('sex').dropDuplicates(['sex'])
df_mst = df_mst.rdd.zipWithIndex().toDF()
df_mst = df_mst.select(F.col("_1.*"), F.col("_2").alias('sex_mst_id'))
df_9_1_1 = df_customer.join(df_mst,on='sex')
ブール型の追加
df_9_1_2 = df_customer.withColumn('sex_is_man', F.when(
F.col("sex") == 'man', True).otherwise(False))
9-2:ダミー変数化
- 9-1で数値化されているsex_mst_idカラムを用いる。
- OneHotEncoderについては、こちらも参照。
- 今回は前処理大全と同様、カテゴリを落とさないようにするため、dropLast=False(defalut:True)を設定する。
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol="sex_mst_id",
outputCol="sexindexVec", dropLast=False)
model_ohe = encoder.fit(df_customer)
df_9_2 = model_ohe.transform(df_customer)
9-3:カテゴリ値の集約
df_9_3 = df_customer.withColumn("age_rank", F.when(F.floor(F.col(
'age')/10) * 10 < 60, F.floor(F.col('age')/10) * 10).otherwise('60以上'))
9-4:カテゴリ値の組み合わせ
- 結合をconcatとconcat_wsのそれぞれで記載する。
df_9_4_1 = df_customer.withColumn("sex_and_age", F.concat(F.floor(F.col(
'age')/10) * 10, F.lit("_"), F.col("sex")))
df_9_4_2 = df_customer.withColumn("sex_and_age", F.concat_ws('_', F.floor(F.col(
'age')/10) * 10, F.col("sex")))
9-5:カテゴリ値の数値化
- 障害件数のPartitionbyのsumで自身のレコードをうまく除外できる記載ができないので、製品種別数、と製品種別ごとの障害数を事前に計算したうえで行う。
df_9_5 = df_production.withColumn('fault_count', (F.sum(
F.col('fault_flg').cast(IntegerType())).over(W.partitionBy('type'))))
df_9_5 = df_9_5.withColumn(
'type_count', F.count('type').over(W.partitionBy('type')))
df_9_5 = df_9_5.withColumn(
'type_fault_rate', (F.col('fault_count') - F.col('fault_flg').cast(IntegerType()))/F.col('type_count'))
9-6:カテゴリ型の補完
- 前処理大全にあるKNNはSparkでは実装されていない。KNNのアルゴリズム上、分散処理に適さない。
- つまりはKNNによるカテゴリ補完はPysparkで行うこと自体がnot awesomeとなる。
Discussion