😊

前処理大全をpysparkで試みる(1〜3章)

2022/05/28に公開約5,100字

モチベーション

前処理大全のpythonのサンプルがpandasベースのため、データエンジニア的な大量データにも対応できるようにPysparkによる記述を試みる。(なお、not awesomeのものもあるかもしれない。。。。よりawesomeのものがあれば、コメントいただけると幸いです。)

https://www.amazon.co.jp/dp/B07C3JFK3V

更新履歴

  • 2022/5/30:1章のデータ読み込みにて、.option("inferSchema",True)を付加する。付加する前は、すべてのカラムがString型となる。そのため、数値計算などではIntegerTypeへの変換『.withColumn('total_price', df_reserve.total_price.cast(IntegerType()))』などが必要となっていた。本修正に伴い、不要なcast()を削除した。
  • 2022/6/2:inferSchemaの記事へのリンクを1章に追加した。

前提

import
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

1章:データ読み込み

  • pathは各環境ごとに要修正
df_reserve = spark.read.option("header", True).option("inferSchema",True).csv('../../../data/reserve.csv')
df_hotel = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/hotel.csv')
df_customer = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/customer.csv')
df_production = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/production.csv')

inferSchema optionについてはこちらも参考。なお、timestamp型は推論しない設定で進める。

https://zenn.dev/articles/a93293960a495a/edit

2章:抽出

2-1:カラム抽出

2-1
df_reserve_2_1 = df_reserve.select('reserve_id','hotel_id','customer_id','reserve_datetime','checkin_date')

2-2:条件抽出

  • 前処理大全のawesomeのような記載がpysparkの場合できない。
  • whereは下記のようにSQL式の文字による表記とcoloumnベースでの表記が可能である。
2-2
df_reserve_2_2 = df_reserve.where("'2016-10-12' <= checkin_date and checkin_date <= '2016-10-13'")

2-3:サンプリング抽出(レコード)

2-3
df_reserve_2_3 = df_reserve.sample(0.5)

2-4:サンプリング抽出(customer_id)

  • joinとisinの2パターンが考えられる。パフォーマンスは、サンプリングcustomer_idのレコード数(データ量)によると思われる。ただ、可読性的にはjoinの方が高い気がする。
2-4(joinパターン)
df_reserve_customer_id_sampling =  df_reserve.select('customer_id').distinct().sample(0.5)

df_reserve_2_4 =  df_reserve.join(df_reserve_customer_id_sampling,'customer_id')
2-4(isinパターン)
df_reserve_customer_id_sampling =  df_reserve.select('customer_id').distinct().sample(0.5)

sample_cusotmer_id_list = df_reserve_customer_id_sampling.select('customer_id').rdd.flatMap(lambda x: x).collect()
df_reserve_2_4 = df_reserve.filter(F.col('customer_id').isin(sample_cusotmer_id_list))

3章:集約

3-1:カウント

3-1
df_3_1 = df_reserve.groupby('hotel_id').agg(F.count('reserve_id'),F.countDistinct('customer_id'))
参考(not awesome)
df_count = df_reserve.groupBy('hotel_id').count().withColumnRenamed('count','count(reserve_id)')
df_count_customer_id = df_reserve.dropDuplicates(['hotel_id','customer_id']).groupBy('hotel_id').count().withColumnRenamed('count','count(customer_id)')
df_3_1_not_awesome = df_count.join(df_count_customer_id,'hotel_id')

3-2:合計

3-2
from pyspark.sql.types import IntegerType
df_3_2 = df_reserve.groupBy(['hotel_id', 'people_num']).sum('total_price').withColumnRenamed(
    'sum(total_price)', 'price_sum').orderBy(['hotel_id', 'people_num'])

3-3:極値、代表値

  • Spark SQLの関数には組み込み含めて、medianを出力する関数がない。そのため、percentileを用いて行う。また、percentileにおいてもorg.apache.spark.sql.functionsにはpercentile_approxのパーセンタイル近似値のみがあるので、正確なパーセンタイルを出したい場合には、exprを利用し、built-in functionsのpercentileを用いる。
    以下では、中央値には、percentileを、0.2パーセンタイルにはpercentile_approxを用いた。
3-3
df_3_3 = df_reserve.groupBy('hotel_id').agg(F.max(
    'total_price'), F.min('total_price'), F.avg('total_price'), F.expr('percentile(total_price,0.5)'), F.percentile_approx('total_price', 0.2))
  
# 前処理大全の出力に合わせて、column名のリネイムも実施。(withColumnRenamedを利用せずに)
new_names = ['hotel_id','price_max','price_min','price_avg','price_med','price_20per']
df_3_3 = df_3_3.toDF(*new_names)

3-4:ばらつき(母分散と母標準偏差)

  • 問とは関係ないが、hotel_idの並び順を変更
  • withColumnにおけるカラム指定を前項と違うパターンで指定
3-4
df_3_4 = df_reserve.groupBy('hotel_id').agg(
    F.var_pop('total_price'), F.stddev_pop('total_price')).orderBy(F.regexp_replace('hotel_id', 'h_', '').cast(IntegerType()))
  • var_samp,stddev_sampの場合には、標本不偏分散、標本不偏標準偏差になるため、レコード数が1の場合にはnullになる。
参考
l = [('Alice', 3)]
schema = ['name','age']

df = spark.createDataFrame(l,schema=schema)
df_unbiased = df.select(F.var_samp('age'),F.stddev_samp('age'))
df_unbiased.show()

+-------------+----------------+
|var_samp(age)|stddev_samp(age)|
+-------------+----------------+
|         null|            null|
+-------------+----------------+

3-5:最頻値

  • pysparkでは、modeがない。(pyspark.pandasは除く)
  • pysparkのbroundはHALF_EVENで、roundはHALF_UPであり、pandasのroundはHALF_EVENのため注意が必要。前処理大全の結果に合わせて、broundを使用。(Roundingについてはこちらがわかりやすい[1])
3-5
df_3_5 = df_reserve.select(F.bround('total_price', -3).alias('total_price_bround')).groupBy('total_price_bround').count(
).withColumn('max_count', F.max('count').over(W.partitionBy())).filter('count == max_count').drop('max_count')

3-6:集約関数(順位/ランキング)

3-6-1
df_3_6_1 = df_reserve.withColumn('log_no', F.row_number().over(
    W.partitionBy('customer_id').orderBy('reserve_datetime')))
3-6-2
df_3_6_2 = df_reserve.groupBy('hotel_id').count().withColumn(
    'rsv_cnt_rank', F.rank().over(W.partitionBy().orderBy(F.desc('count'))))

注釈

脚注
  1. https://qiita.com/tobira-code/items/d70093c4ad6c3e3e214f ↩︎

Discussion

ログインするとコメントできます