😊
前処理大全をPySparkで試みる(1〜3章)
モチベーション
前処理大全のpythonのサンプルがpandasベースのため、データエンジニア的な大量データにも対応できるようにPyspark[1]による記述を試みる。(なお、not awesomeのものもあるかもしれない。。。。よりawesomeのものがあれば、コメントいただけると幸いです。)
更新履歴
- 2022/5/30:1章のデータ読み込みにて、.option("inferSchema",True)を付加する。付加する前は、すべてのカラムがString型となる。そのため、数値計算などではIntegerTypeへの変換『.withColumn('total_price', df_reserve.total_price.cast(IntegerType()))』などが必要となっていた。本修正に伴い、不要なcast()を削除した。
- 2022/6/2:inferSchemaの記事へのリンクを1章に追加した。
- 2022/8/6:3-6-2において、不要なpartitionByを削除したものを追記
- 2022/8/7:集約関数の補足を追記
- 2022/8/13:3-4において、stddevとvarianceのエイリアスをついてを追記。
- 2023/6/27:2-2において、whereがfilterのエイリアスである旨を追記。
前提
import
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
1章:データ読み込み
- pathは各環境ごとに要修正
df_reserve = spark.read.option("header", True).option("inferSchema",True).csv('../../../data/reserve.csv')
df_hotel = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/hotel.csv')
df_customer = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/customer.csv')
df_production = spark.read.option('header',True).option("inferSchema",True).csv('../../../data/production.csv')
inferSchema optionについてはこちらも参考。なお、timestamp型は推論しない設定で進める。
2章:抽出
2-1:カラム抽出
2-1
df_reserve_2_1 = df_reserve.select('reserve_id','hotel_id','customer_id','reserve_datetime','checkin_date')
2-2:条件抽出
- 前処理大全のawesomeのような記載がpysparkの場合できない。
- whereは下記のようにSQL式の文字による表記とcoloumnベースでの表記が可能である。
- 補足として、whereはfilterのエイリアスである。
2-2
df_reserve_2_2 = df_reserve.where("'2016-10-12' <= checkin_date and checkin_date <= '2016-10-13'")
2-3:サンプリング抽出(レコード)
2-3
df_reserve_2_3 = df_reserve.sample(0.5)
2-4:サンプリング抽出(customer_id)
- joinとisinの2パターンが考えられる。パフォーマンスは、サンプリングcustomer_idのレコード数(データ量)によると思われる。ただ、可読性的にはjoinの方が高い気がする。
2-4(joinパターン)
df_reserve_customer_id_sampling = df_reserve.select('customer_id').distinct().sample(0.5)
df_reserve_2_4 = df_reserve.join(df_reserve_customer_id_sampling,'customer_id')
2-4(isinパターン)
df_reserve_customer_id_sampling = df_reserve.select('customer_id').distinct().sample(0.5)
sample_cusotmer_id_list = df_reserve_customer_id_sampling.select('customer_id').rdd.flatMap(lambda x: x).collect()
df_reserve_2_4 = df_reserve.filter(F.col('customer_id').isin(sample_cusotmer_id_list))
3章:集約
3-1:カウント
3-1
df_3_1 = df_reserve.groupby('hotel_id').agg(F.count('reserve_id'),F.countDistinct('customer_id'))
参考(not awesome)
df_count = df_reserve.groupBy('hotel_id').count().withColumnRenamed('count','count(reserve_id)')
df_count_customer_id = df_reserve.dropDuplicates(['hotel_id','customer_id']).groupBy('hotel_id').count().withColumnRenamed('count','count(customer_id)')
df_3_1_not_awesome = df_count.join(df_count_customer_id,'hotel_id')
3-2:合計
3-2
from pyspark.sql.types import IntegerType
df_3_2 = df_reserve.groupBy(['hotel_id', 'people_num']).sum('total_price').withColumnRenamed(
'sum(total_price)', 'price_sum').orderBy(['hotel_id', 'people_num'])
3-3:極値、代表値
- Spark SQLの関数には組み込み含めて、medianを出力する関数がない。そのため、percentileを用いて行う。また、percentileにおいてもorg.apache.spark.sql.functionsにはpercentile_approxのパーセンタイル近似値のみがあるので、正確なパーセンタイルを出したい場合には、exprを利用し、built-in functionsのpercentileを用いる。
以下では、中央値には、percentileを、0.2パーセンタイルにはpercentile_approxを用いた。
3-3
df_3_3 = df_reserve.groupBy('hotel_id').agg(F.max(
'total_price'), F.min('total_price'), F.avg('total_price'), F.expr('percentile(total_price,0.5)'), F.percentile_approx('total_price', 0.2))
# 前処理大全の出力に合わせて、column名のリネイムも実施。(withColumnRenamedを利用せずに)
new_names = ['hotel_id','price_max','price_min','price_avg','price_med','price_20per']
df_3_3 = df_3_3.toDF(*new_names)
3-4:ばらつき(母分散と母標準偏差)
- 問とは関係ないが、hotel_idの並び順を変更
- withColumnにおけるカラム指定を前項と違うパターンで指定
3-4
df_3_4 = df_reserve.groupBy('hotel_id').agg(
F.var_pop('total_price'), F.stddev_pop('total_price')).orderBy(F.regexp_replace('hotel_id', 'h_', '').cast(IntegerType()))
- var_samp,stddev_sampの場合には、標本不偏分散、標本不偏標準偏差になるため、レコード数が1の場合にはnullになる。なお、stddevはstddev_sampのエイリアス、varianceはvar_sampのエイリアスである。
参考
l = [('Alice', 3)]
schema = ['name','age']
df = spark.createDataFrame(l,schema=schema)
df_unbiased = df.select(F.var_samp('age'),F.stddev_samp('age'))
df_unbiased.show()
+-------------+----------------+
|var_samp(age)|stddev_samp(age)|
+-------------+----------------+
| null| null|
+-------------+----------------+
3-5:最頻値
- pysparkでは、modeがない。(pyspark.pandasは除く)
- pysparkのbroundはHALF_EVENで、roundはHALF_UPであり、pandasのroundはHALF_EVENのため注意が必要。前処理大全の結果に合わせて、broundを使用。(Roundingについてはこちらがわかりやすい[2])
3-5
df_3_5 = df_reserve.select(F.bround('total_price', -3).alias('total_price_bround')).groupBy('total_price_bround').count(
).withColumn('max_count', F.max('count').over(W.partitionBy())).filter('count == max_count').drop('max_count')
3-6:集約関数(順位/ランキング)
3-6-1
df_3_6_1 = df_reserve.withColumn('log_no', F.row_number().over(
W.partitionBy('customer_id').orderBy('reserve_datetime')))
3-6-2
df_3_6_2_1 = df_reserve.groupBy('hotel_id').count().withColumn(
'rsv_cnt_rank', F.rank().over(W.partitionBy().orderBy(F.desc('count'))))
# 不要なpartitionByを削除
df_3_6_2_2 = df_reserve.groupBy('hotel_id').count().withColumn(
'rsv_cnt_rank', F.rank().over(W.orderBy(F.desc('count'))))
# なお、_1と_2もLogical Planは同様
df_3_6_2_1.sameSemantics(df_3_6_2_2)
>True
# samSematicsが同じため、どちらも当然single partitionの処理による警告はでる。
# 集約関数の補足のとおり、当然な結果である。
集約関数の補足
- 集約関数では、第4章でも参考にさせていただいている分析関数(ウインドウ関数)をわかりやすく説明してみたに記載があるorderByの『行を順番に並べた上で、最初の行から現在行までのみを集計の対象にする』が大事である。
- 以下、上記のリンクにあるサンプルデータに近い形をもとになんかこ、上記の言葉における処理内容確認をPySparkで行う。
- なお、3-6もそうであるが、集約関数でpartitionByがない場合には、single partitionの処理になるため、警告にも出ているとおりパフォーマンス等を考慮する必要がある。要はSparkは分散処理フレームワークであり、処理分散がされないようなものはできるだけ避けたほうがよい。
data = [(1001, 'apple', 4), (1005, 'banana', 8), (1010, 'banana', 2),
(1021, 'apple', 10), (1025, 'apple', 6), (1026, 'apple', 5)]
schema = ['order_id', 'item', 'qty']
df = spark.createDataFrame(data=data, schema=schema)
df.show()
+--------+------+---+
|order_id| item|qty|
+--------+------+---+
| 1001| apple| 4|
| 1005|banana| 8|
| 1010|banana| 2|
| 1021| apple| 10|
| 1025| apple| 6|
| 1026| apple| 5|
+--------+------+---+
# orderByの指定がないので、各partitionで処理順番がなく、一括で処理される。
df.withColumn('qty_sum',F.sum('qty').over(W.partitionBy('item'))).show()
+--------+------+---+-------+
|order_id| item|qty|qty_sum|
+--------+------+---+-------+
| 1001| apple| 4| 25|
| 1021| apple| 10| 25|
| 1025| apple| 6| 25|
| 1026| apple| 5| 25|
| 1005|banana| 8| 10|
| 1010|banana| 2| 10|
+--------+------+---+-------+
# 一方、orderByを指定した場合、各partition内で指定した順に処理がされる。
df.withColumn('qty_sum',F.sum('qty').over(W.partitionBy('item').orderBy('order_id'))).show()
+--------+------+---+-------+
|order_id| item|qty|qty_sum|
+--------+------+---+-------+
| 1001| apple| 4| 4|
| 1021| apple| 10| 14|
| 1025| apple| 6| 20|
| 1026| apple| 5| 25|
| 1005|banana| 8| 8|
| 1010|banana| 2| 10|
+--------+------+---+-------+
# rank関数のように順番に意味があるウィンドウ関数でorderByをしてしない場合はエラーとなる。
df.withColumn('rank',F.rank().over(W.partitionBy('item'))).show()
AnalysisException: Window function rank() requires window to be ordered, please add ORDER BY clause. For example SELECT rank()(value_expr) OVER (PARTITION BY window_partition ORDER BY window_ordering) from table
# 当然、rank関数で順番が同じとなるカラムをしてした場合には、同順位が付けられる。
df.withColumn('rank',F.rank().over(W.orderBy('item'))).show()
+--------+------+---+----+
|order_id| item|qty|rank|
+--------+------+---+----+
| 1001| apple| 4| 1|
| 1021| apple| 10| 1|
| 1025| apple| 6| 1|
| 1026| apple| 5| 1|
| 1005|banana| 8| 5|
| 1010|banana| 2| 5|
+--------+------+---+----+
-
https://sparkbyexamples.com/
(PySparkをコーディングするうえで、例を参考にさせてもらった。) ↩︎
Discussion
度々すみません.
3_6_2 を実行すると, 以下の警告が出ます.
22/08/01 10:22:36 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
partitionBy() で明示的に全ての行単位で分割 (つまり分割しない) ことを示す方法,もしくは別解等ありますか?
自分もすべてでawesomeコードを書けているわけではないので
ぜひ、別解を考えていただけると幸いです。
3_6_2で、partitionByは不要でしたので削除版を追記しました。これで警告自体はでません。
ただし、現状の処理では全行を集約させてしまっているのでawesomeとは言い難い面があると思います。
3_6_2は警告も出ます。勘違いでした。
別解は、singleノードでの処理量を削減しているこちらとかも参考になると思いました。
別解を少し考えてみます.ありがとうございます.