💡

前処理大全をPySparkで試みる(4章)

2022/06/12に公開
3

はじめに

こちらの記事のつづき
https://zenn.dev/tjjj/articles/d04237c8ca0441

前提

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

4章:結合

4-1:結合

4-1
df_4_1 = df_reserve.filter('people_num == 1').join(df_hotel.filter('is_business == True'), on='hotel_id')

4-2:条件付き結合

4-2
df_hotel_count = df_hotel.select('big_area_name', 'small_area_name').groupBy(
    'small_area_name', 'big_area_name').count()
df_hotel_join_id = df_hotel_count.withColumn('join_id', F.when(
    df_hotel_count['count'] >= 20, F.col('small_area_name')).otherwise(F.col('big_area_name')))
df_hotel_join_id = df_hotel.select('hotel_id', 'small_area_name').join(
    df_hotel_join_id, on='small_area_name').select('hotel_id', 'join_id')
df_rec_hotel = df_hotel.select('hotel_id', 'small_area_name').union(df_hotel.select(
    'hotel_id', 'big_area_name')).withColumnRenamed('hotel_id', 'rec_hotel_id').withColumnRenamed('small_area_name', 'join_id')
df_4_2 = df_hotel_join_id.join(
    df_rec_hotel, on='join_id').where('hotel_id <> rec_hotel_id').select('hotel_id', 'rec_hotel_id')

4-3:過去データの結合

  • PysparkにはPandasと比べて、lag関数などが揃っているのでSQLと同じように集約ができ便利である。
  • Pysparkにおける説明ではないが、Window関数はこちら[1]の説明がわかりやすい。
4-3-1
df_4_3_1 = df_reserve.withColumn('before_price', F.lag('total_price', 2).over(
    W.partitionBy('customer_id').orderBy('reserve_datetime')))
  • rowsBetweenはstart/endが前の行から後ろの行であることを注意する。0はcurrent row,-1はcurrentの1つ前の行、1はcurrentの1つ後の行である。そのため、現在の行を含めて前3行を対象とする場合にはrowsBetween(-2, 0)となる。
4-3-2
df_4_3_2 = df_reserve.withColumn('price_sum', F.when(F.count('total_price').over(W.partitionBy('customer_id').orderBy('reserve_datetime').rowsBetween(-2, 0)) > 2, F.sum('total_price').over(
    W.partitionBy('customer_id').orderBy('reserve_datetime').rowsBetween(-2, 0))).otherwise(None))
  • 小数部の桁数はroundかbroundなりで調整をしてもよいかもしれない。
4-3-3
df_4_3_3 = df_reserve.withColumn('price_avg', F.avg('total_price').over(
    W.partitionBy('customer_id').orderBy('reserve_datetime').rowsBetween(-3, -1)))
  • 前処理大全のSQLとほぼ同義な処理である。(joinにおいて条件付可能なため)
  • 自己結合であり、同名カラムを処理するために、aliasで別名を付与する。(join後のdataframeでもこの別名が利用可能である。)
  • 集約関数にrangebetweenがあるので、90日間というrangeによる処理も考えられたが、pysparkで90日というinterval dayを指定できなかったのと、処理行を除くのがrangeでは-1などになり、理屈上なさそうであるが、厳密にはずれるのでやめた。なお、interval dayではなく、lambda i: i * 86400のように[2]
    1日を秒(intに換算して処理することも一つの手段としてある)
  • 2段目のgroupbyで全カラムを指定しているのは、前処理大全と同じく、全カラムを表示させるためで計算だけであれば、reserve_idのみで問題ない。
4-3-4
df_reserve_90d = df_reserve.alias('df_reserve_left').join(df_reserve.alias('df_reserve_right'), [
    F.col('df_reserve_left.customer_id') == F.col('df_reserve_right.customer_id'), F.col('df_reserve_left.reserve_datetime') > F.col('df_reserve_right.reserve_datetime'), F.date_sub(F.col('df_reserve_left.reserve_datetime'), 90) <= F.col('df_reserve_right.reserve_datetime')], how='left_outer')

df_4_3_4 = df_reserve_90d.groupBy(F.col('df_reserve_left.reserve_id'), F.col('df_reserve_left.hotel_id'), F.col('df_reserve_left.customer_id'), F.col('df_reserve_left.reserve_datetime'), F.col('df_reserve_left.checkin_date'), F.col(
    'df_reserve_left.checkin_time'), F.col('df_reserve_left.checkout_date'), F.col('df_reserve_left.people_num'), F.col('df_reserve_left.total_price')).sum('df_reserve_right.total_price').withColumnRenamed('sum(total_price)', 'total_price_90d').fillna(0, 'total_price_90d')
4-3-4
import datetime

data = [(2017,1,201701,datetime.date(2017, 1, 1),datetime.date(2017, 1, 31)),(2017,2,201702,datetime.date(2017, 2, 1),datetime.date(2017, 2, 28)),(2017,3,201703,datetime.date(2017, 3, 1),datetime.date(2017, 3, 31)),(2017,4,201704,datetime.date(2017, 4, 1),datetime.date(2017, 4, 30))]
schema = StructType([
   StructField("year_num", IntegerType(), True),
   StructField("month_num", IntegerType(), True),
   StructField("year_month", IntegerType(), True),
   StructField("month_first_day", DateType(), True),
   StructField("month_last_day", DateType(), True)
   ])
df_mst = spark.createDataFrame(data,schema)

# crossJoinで生成するレコードは、集計に必要な範囲に限定する
df_customer_mst = df_customer.crossJoin(df_mst.where(df_mst.month_first_day >= '2017-01-01').where(
    df_mst.month_first_day < '2017-04-01')).select('customer_id', 'year_month', 'month_first_day', 'month_last_day')
    
df_4_3_4 = df_customer_mst.join(df_reserve, on=[df_customer_mst.customer_id == df_reserve.customer_id, df_customer_mst.month_first_day <= df_reserve.checkin_date.cast(DateType(
)), df_customer_mst.month_last_day >= df_reserve.checkin_date.cast(DateType())], how='left_outer').groupBy(df_customer_mst.customer_id, 'year_month').sum('total_price').orderBy([df_customer_mst.customer_id, df_customer_mst.year_month]).fillna(0)

脚注
  1. https://qiita.com/tlokweng/items/fc13dc30cc1aa28231c5 ↩︎

  2. https://blog.damavis.com/en/the-use-of-window-in-apache-spark/ ↩︎

Discussion

YasutoYasuto

Pyspark の練習問題のような日本語の書籍が見つからなかったので,このような投稿は非常に重宝します.

2-4 の isin パターンと join パターンの処理速度の違いなどは検証しましたか?
手元の 13,000,000 行程度のデータで実行したら前者が 350 秒,後者が 0.06秒で完了しました.

まだ,勘で Pyspark を使っている部分が大きいのでどこに問題があるのかを探りたいです.

tjjjtjjj

コメントありがとうございます。

Yasutoさんほどのデータ量で回していないので、処理速度の違いが出るほどの検証はしていません。
また両パターンの比較ですが、
isinの引数となるリストの値がすでにある状態からのスタートなのか、もしくは今回のコードのようにリストの生成からなのか、またどこからどこまでの処理時間を比較とするかによるところもあると思っています。またjoinにおいて、対象データによりjoinがbroadcastになるのかなどの違いもあると思っています。
そのため、一概にisinとjoinのどちらが良い、悪いというよりデータなどの状況よりけりではと考えています。最終的にはexplainなりで見つつ、改善余地がないか見るとかになるのかなとか思ったりもしています。

YasutoYasuto

ご返答ありがとうございます.

データ読み込みの箇所以外は,記載されている2,3行の実行時間のみ計測しました.
ご指摘いただいたような条件の違いも考慮しなければいけないとなると,もう少し検証が必要そうです.