😊

前処理大全をPySparkで試みる(1〜3章)

2022/05/29に公開
4

モチベーション

前処理大全のpythonのサンプルがpandasベースのため、データエンジニア的な大量データにも対応できるようにPyspark[1]による記述を試みる。(なお、not awesomeのものもあるかもしれない。。。。よりawesomeのものがあれば、コメントいただけると幸いです。)

https://www.amazon.co.jp/dp/B07C3JFK3V

更新履歴

  • 2022/5/30:1章のデータ読み込みにて、.option("inferSchema",True)を付加する。付加する前は、すべてのカラムがString型となる。そのため、数値計算などではIntegerTypeへの変換『.withColumn('total_price', df_reserve.total_price.cast(IntegerType()))』などが必要となっていた。本修正に伴い、不要なcast()を削除した。
  • 2022/6/2:inferSchemaの記事へのリンクを1章に追加した。
  • 2022/8/6:3-6-2において、不要なpartitionByを削除したものを追記
  • 2022/8/7:集約関数の補足を追記
  • 2022/8/13:3-4において、stddevとvarianceのエイリアスをついてを追記。
  • 2023/6/27:2-2において、whereがfilterのエイリアスである旨を追記。

前提

import
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

1章:データ読み込み

  • pathは各環境ごとに要修正
df_reserve = spark.read.option("header", True).option("inferSchema",True).csv('../../../data/reserve.csv')
df_hotel = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/hotel.csv')
df_customer = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/customer.csv')
df_production = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/production.csv')

inferSchema optionについてはこちらも参考。なお、timestamp型は推論しない設定で進める。
https://zenn.dev/articles/a93293960a495a/edit

2章:抽出

2-1:カラム抽出

2-1
df_reserve_2_1 = df_reserve.select('reserve_id','hotel_id','customer_id','reserve_datetime','checkin_date')

2-2:条件抽出

  • 前処理大全のawesomeのような記載がpysparkの場合できない。
  • whereは下記のようにSQL式の文字による表記とcoloumnベースでの表記が可能である。
  • 補足として、whereはfilterのエイリアスである。
2-2
df_reserve_2_2 = df_reserve.where("'2016-10-12' <= checkin_date and checkin_date <= '2016-10-13'")

2-3:サンプリング抽出(レコード)

2-3
df_reserve_2_3 = df_reserve.sample(0.5)

2-4:サンプリング抽出(customer_id)

  • joinとisinの2パターンが考えられる。パフォーマンスは、サンプリングcustomer_idのレコード数(データ量)によると思われる。ただ、可読性的にはjoinの方が高い気がする。
2-4(joinパターン)
df_reserve_customer_id_sampling =  df_reserve.select('customer_id').distinct().sample(0.5)

df_reserve_2_4 =  df_reserve.join(df_reserve_customer_id_sampling,'customer_id')
2-4(isinパターン)
df_reserve_customer_id_sampling =  df_reserve.select('customer_id').distinct().sample(0.5)

sample_cusotmer_id_list = df_reserve_customer_id_sampling.select('customer_id').rdd.flatMap(lambda x: x).collect()
df_reserve_2_4 = df_reserve.filter(F.col('customer_id').isin(sample_cusotmer_id_list))

3章:集約

3-1:カウント

3-1
df_3_1 = df_reserve.groupby('hotel_id').agg(F.count('reserve_id'),F.countDistinct('customer_id'))
参考(not awesome)
df_count = df_reserve.groupBy('hotel_id').count().withColumnRenamed('count','count(reserve_id)')
df_count_customer_id = df_reserve.dropDuplicates(['hotel_id','customer_id']).groupBy('hotel_id').count().withColumnRenamed('count','count(customer_id)')
df_3_1_not_awesome = df_count.join(df_count_customer_id,'hotel_id')

3-2:合計

3-2
from pyspark.sql.types import IntegerType
df_3_2 = df_reserve.groupBy(['hotel_id', 'people_num']).sum('total_price').withColumnRenamed(
    'sum(total_price)', 'price_sum').orderBy(['hotel_id', 'people_num'])

3-3:極値、代表値

  • Spark SQLの関数には組み込み含めて、medianを出力する関数がない。そのため、percentileを用いて行う。また、percentileにおいてもorg.apache.spark.sql.functionsにはpercentile_approxのパーセンタイル近似値のみがあるので、正確なパーセンタイルを出したい場合には、exprを利用し、built-in functionsのpercentileを用いる。
    以下では、中央値には、percentileを、0.2パーセンタイルにはpercentile_approxを用いた。
3-3
df_3_3 = df_reserve.groupBy('hotel_id').agg(F.max(
    'total_price'), F.min('total_price'), F.avg('total_price'), F.expr('percentile(total_price,0.5)'), F.percentile_approx('total_price', 0.2))
  
# 前処理大全の出力に合わせて、column名のリネイムも実施。(withColumnRenamedを利用せずに)
new_names = ['hotel_id','price_max','price_min','price_avg','price_med','price_20per']
df_3_3 = df_3_3.toDF(*new_names)

3-4:ばらつき(母分散と母標準偏差)

  • 問とは関係ないが、hotel_idの並び順を変更
  • withColumnにおけるカラム指定を前項と違うパターンで指定
3-4
df_3_4 = df_reserve.groupBy('hotel_id').agg(
    F.var_pop('total_price'), F.stddev_pop('total_price')).orderBy(F.regexp_replace('hotel_id', 'h_', '').cast(IntegerType()))
  • var_samp,stddev_sampの場合には、標本不偏分散、標本不偏標準偏差になるため、レコード数が1の場合にはnullになる。なお、stddevはstddev_sampのエイリアス、varianceはvar_sampのエイリアスである。
参考
l = [('Alice', 3)]
schema = ['name','age']

df = spark.createDataFrame(l,schema=schema)
df_unbiased = df.select(F.var_samp('age'),F.stddev_samp('age'))
df_unbiased.show()

+-------------+----------------+
|var_samp(age)|stddev_samp(age)|
+-------------+----------------+
|         null|            null|
+-------------+----------------+

3-5:最頻値

  • pysparkでは、modeがない。(pyspark.pandasは除く)
  • pysparkのbroundはHALF_EVENで、roundはHALF_UPであり、pandasのroundはHALF_EVENのため注意が必要。前処理大全の結果に合わせて、broundを使用。(Roundingについてはこちらがわかりやすい[2])
3-5
df_3_5 = df_reserve.select(F.bround('total_price', -3).alias('total_price_bround')).groupBy('total_price_bround').count(
).withColumn('max_count', F.max('count').over(W.partitionBy())).filter('count == max_count').drop('max_count')

3-6:集約関数(順位/ランキング)

3-6-1
df_3_6_1 = df_reserve.withColumn('log_no', F.row_number().over(
    W.partitionBy('customer_id').orderBy('reserve_datetime')))
3-6-2
df_3_6_2_1 = df_reserve.groupBy('hotel_id').count().withColumn(
    'rsv_cnt_rank', F.rank().over(W.partitionBy().orderBy(F.desc('count'))))

# 不要なpartitionByを削除
df_3_6_2_2 = df_reserve.groupBy('hotel_id').count().withColumn(
    'rsv_cnt_rank', F.rank().over(W.orderBy(F.desc('count'))))

# なお、_1と_2もLogical Planは同様
df_3_6_2_1.sameSemantics(df_3_6_2_2)
>True

# samSematicsが同じため、どちらも当然single partitionの処理による警告はでる。
# 集約関数の補足のとおり、当然な結果である。

集約関数の補足

  • 集約関数では、第4章でも参考にさせていただいている分析関数(ウインドウ関数)をわかりやすく説明してみたに記載があるorderByの『行を順番に並べた上で、最初の行から現在行までのみを集計の対象にする』が大事である。
  • 以下、上記のリンクにあるサンプルデータに近い形をもとになんかこ、上記の言葉における処理内容確認をPySparkで行う。
  • なお、3-6もそうであるが、集約関数でpartitionByがない場合には、single partitionの処理になるため、警告にも出ているとおりパフォーマンス等を考慮する必要がある。要はSparkは分散処理フレームワークであり、処理分散がされないようなものはできるだけ避けたほうがよい。
data = [(1001, 'apple', 4), (1005, 'banana', 8), (1010, 'banana', 2),
        (1021, 'apple', 10), (1025, 'apple', 6), (1026, 'apple', 5)]
schema = ['order_id', 'item', 'qty']
df = spark.createDataFrame(data=data, schema=schema)

df.show()
+--------+------+---+
|order_id|  item|qty|
+--------+------+---+
|    1001| apple|  4|
|    1005|banana|  8|
|    1010|banana|  2|
|    1021| apple| 10|
|    1025| apple|  6|
|    1026| apple|  5|
+--------+------+---+

# orderByの指定がないので、各partitionで処理順番がなく、一括で処理される。
df.withColumn('qty_sum',F.sum('qty').over(W.partitionBy('item'))).show()
+--------+------+---+-------+
|order_id|  item|qty|qty_sum|
+--------+------+---+-------+
|    1001| apple|  4|     25|
|    1021| apple| 10|     25|
|    1025| apple|  6|     25|
|    1026| apple|  5|     25|
|    1005|banana|  8|     10|
|    1010|banana|  2|     10|
+--------+------+---+-------+

# 一方、orderByを指定した場合、各partition内で指定した順に処理がされる。
df.withColumn('qty_sum',F.sum('qty').over(W.partitionBy('item').orderBy('order_id'))).show()
+--------+------+---+-------+
|order_id|  item|qty|qty_sum|
+--------+------+---+-------+
|    1001| apple|  4|      4|
|    1021| apple| 10|     14|
|    1025| apple|  6|     20|
|    1026| apple|  5|     25|
|    1005|banana|  8|      8|
|    1010|banana|  2|     10|
+--------+------+---+-------+

# rank関数のように順番に意味があるウィンドウ関数でorderByをしてしない場合はエラーとなる。
df.withColumn('rank',F.rank().over(W.partitionBy('item'))).show()

AnalysisException: Window function rank() requires window to be ordered, please add ORDER BY clause. For example SELECT rank()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table

# 当然、rank関数で順番が同じとなるカラムをしてした場合には、同順位が付けられる。
df.withColumn('rank',F.rank().over(W.orderBy('item'))).show()
+--------+------+---+----+
|order_id|  item|qty|rank|
+--------+------+---+----+
|    1001| apple|  4|   1|
|    1021| apple| 10|   1|
|    1025| apple|  6|   1|
|    1026| apple|  5|   1|
|    1005|banana|  8|   5|
|    1010|banana|  2|   5|
+--------+------+---+----+

脚注
  1. https://sparkbyexamples.com/
    (PySparkをコーディングするうえで、例を参考にさせてもらった。) ↩︎

  2. https://qiita.com/tobira-code/items/d70093c4ad6c3e3e214f ↩︎

Discussion

YasutoYasuto

度々すみません.

3_6_2 を実行すると, 以下の警告が出ます.

22/08/01 10:22:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

partitionBy() で明示的に全ての行単位で分割 (つまり分割しない) ことを示す方法,もしくは別解等ありますか?

tjjjtjjj

自分もすべてでawesomeコードを書けているわけではないので
ぜひ、別解を考えていただけると幸いです。

3_6_2で、partitionByは不要でしたので削除版を追記しました。これで警告自体はでません。
ただし、現状の処理では全行を集約させてしまっているのでawesomeとは言い難い面があると思います。

YasutoYasuto

別解を少し考えてみます.ありがとうございます.