🐥

前処理大全をPysparkで試みる(10章)

2022/10/28に公開

はじめに

こちらの記事のつづき
https://zenn.dev/tjjj/articles/b00118391dd854

前提

from pyspark.sql import functions as F
from pyspark.sql.window import Window as W

10章:日時型

10-1:日時型、日付型への変換

  • Sparkには日時型TimestampTypeと日付型DateTypeがある[1]
  • Timestamp型であれば、読み込み時にoption:"timestampFormat"を指定することにより、Timestamp型での読み込みが可能である。[2]
  • stringからTimestamp、Dateへの変換の関数はto_timestamp,to_dateとなる。デフォルトフォーマットは次のとおりであり、デフォルトフォーマットの場合はcastを使用しても同様である。
    • DateType: yyyy-MM-dd
    • TimestampType: yyyy-MM-dd HH:mm:ss.SSSS
df_reserve = spark.read.option("header", True).option("inferSchema", True).option(
    "timestampFormat", "yyyy-MM-dd HH:mm:ss").csv('../../../data/reserve.csv')

df_10_1 = df_reserve.withColumn('checkin_timestamp', F.to_timestamp(F.concat_ws(
    ' ', 'checkin_date', 'checkin_time'),)).withColumn('checkin_date', F.to_date('checkin_date',))

df_10_1.printSchema()
df_10_1.show(3)

root
 |-- reserve_id: string (nullable = true)
 |-- hotel_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- reserve_datetime: timestamp (nullable = true)
 |-- checkin_date: date (nullable = true)
 |-- checkin_time: string (nullable = true)
 |-- checkout_date: string (nullable = true)
 |-- people_num: integer (nullable = true)
 |-- total_price: integer (nullable = true)
 |-- checkin_timestamp: timestamp (nullable = true)

+----------+--------+-----------+-------------------+------------+------------+-------------+----------+-----------+-------------------+
|reserve_id|hotel_id|customer_id|   reserve_datetime|checkin_date|checkin_time|checkout_date|people_num|total_price|  checkin_timestamp|
+----------+--------+-----------+-------------------+------------+------------+-------------+----------+-----------+-------------------+
|        r1|    h_75|        c_1|2016-03-06 13:09:42|  2016-03-26|    10:00:00|   2016-03-29|         4|      97200|2016-03-26 10:00:00|
|        r2|   h_219|        c_1|2016-07-16 23:39:55|  2016-07-20|    11:30:00|   2016-07-21|         2|      20600|2016-07-20 11:30:00|
|        r3|   h_179|        c_1|2016-09-24 10:03:17|  2016-10-19|    09:00:00|   2016-10-22|         2|      33600|2016-10-19 09:00:00|
+----------+--------+-----------+-------------------+------------+------------+-------------+----------+-----------+-------------------+
only showing top 3 rows

10-2:年、月、日、曜日、時、分、秒、文字列の変換

  • reserve_datetimeに関連したカラムのみを表示して確認[3]
df_10_2 = df_reserve.withColumn('reserve_datetime_year', F.year(
    'reserve_datetime')).withColumn('reserve_datetime_month', F.month(
        'reserve_datetime')).withColumn('reserve_datetime_day', F.dayofmonth(
            'reserve_datetime')).withColumn('reserve_datetime_wday', F.dayofweek(
                'reserve_datetime')).withColumn('reserve_datetime_hour', F.hour(
                    'reserve_datetime')).withColumn('reserve_datetime_minute', F.minute(
                        'reserve_datetime')).withColumn('reserve_datetime_second', F.second(
                            'reserve_datetime')).withColumn('reserve_datetime_string', F.date_format(
                                'reserve_datetime', 'yyyy-MM-dd hh:mm:ss'))


df_10_2.select(df_10_2.colRegex("`reserve_datetime.*`")
               ).show(3, truncate=False)

df_10_2.printSchema()

+-------------------+---------------------+----------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+-----------------------+
|reserve_datetime   |reserve_datetime_year|reserve_datetime_month|reserve_datetime_day|reserve_datetime_wday|reserve_datetime_hour|reserve_datetime_minute|reserve_datetime_second|reserve_datetime_string|
+-------------------+---------------------+----------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+-----------------------+
|2016-03-06 13:09:42|2016                 |3                     |6                   |1                    |13                   |9                      |42                     |2016-03-06 01:09:42    |
|2016-07-16 23:39:55|2016                 |7                     |16                  |7                    |23                   |39                     |55                     |2016-07-16 11:39:55    |
|2016-09-24 10:03:17|2016                 |9                     |24                  |7                    |10                   |3                      |17                     |2016-09-24 10:03:17    |
+-------------------+---------------------+----------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+-----------------------+
only showing top 3 rows

root
 |-- reserve_id: string (nullable = true)
 |-- hotel_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- reserve_datetime: timestamp (nullable = true)
 |-- checkin_date: string (nullable = true)
 |-- checkin_time: string (nullable = true)
 |-- checkout_date: string (nullable = true)
 |-- people_num: integer (nullable = true)
 |-- total_price: integer (nullable = true)
 |-- reserve_datetime_year: integer (nullable = true)
 |-- reserve_datetime_month: integer (nullable = true)
 |-- reserve_datetime_day: integer (nullable = true)
 |-- reserve_datetime_wday: integer (nullable = true)
 |-- reserve_datetime_hour: integer (nullable = true)
 |-- reserve_datetime_minute: integer (nullable = true)
 |-- reserve_datetime_second: integer (nullable = true)
 |-- reserve_datetime_string: string (nullable = true)

10-3:日時差への変換

  • 年、月はそれ以外の単位を考慮せずに差分を取る問なので、10-2のように年、月を抜き出して単純に差分をとる。(months_betweenは、2つの日付の差を月数で出力するので、本問に対する解法としてはそぐわない)
  • 日の差分はdatediff関数があるが、これはyyyy-mm-ddにおける差分のため、時間までを考慮した場合にはズレが生じる。(例えば、2016-07-16 23:39:55と2016-07-20 11:30:00の場合、datediffでは4になるが、時間までを含めた日の差分であれば、3.4…となる)今回の問では時間までを考慮した問のため、unixtimeに変換した上で差分を取る。
  • 時、分、秒は日と同様に差分を取る。
  • 対象データは年で差がでるように一部加工する。
df_10_3 = df_10_3.withColumn('diff_year', F.year(
    'checkin_timestamp') - F.year('reserve_datetime'))

df_10_3 = df_10_3.withColumn('diff_month', (F.year('checkin_timestamp') * F.lit(12) + F.month(
    'checkin_timestamp')) - (F.year('reserve_datetime') * F.lit(12) + F.month('reserve_datetime')))

df_10_3 = df_10_3.withColumn('diff_date', (F.unix_timestamp(
    'checkin_timestamp') - F.unix_timestamp('reserve_datetime')) / F.lit(60) / F.lit(60) / F.lit(24))

df_10_3 = df_10_3.withColumn('diff_hour', (F.unix_timestamp(
    'checkin_timestamp') - F.unix_timestamp('reserve_datetime')) / F.lit(60) / F.lit(60))

df_10_3 = df_10_3.withColumn('diff_minute', (F.unix_timestamp(
    'checkin_timestamp') - F.unix_timestamp('reserve_datetime')) / F.lit(60))

df_10_3 = df_10_3.withColumn('diff_second', (F.unix_timestamp(
    'checkin_timestamp') - F.unix_timestamp('reserve_datetime')))

df_10_3.select('reserve_datetime', 'checkin_timestamp',
               df_10_3.colRegex("`diff.*`")).show(3)

+-------------------+-------------------+---------+----------+------------------+-----------------+-----------------+-----------+
|   reserve_datetime|  checkin_timestamp|diff_year|diff_month|         diff_date|        diff_hour|      diff_minute|diff_second|
+-------------------+-------------------+---------+----------+------------------+-----------------+-----------------+-----------+
|2016-03-06 13:09:42|2017-02-26 10:00:00|        1|        11|356.86826388888886|8564.838333333333|         513890.3|   30833418|
|2016-07-16 23:39:55|2016-07-20 11:30:00|        0|         0|3.4931134259259253|83.83472222222221|5030.083333333333|     301805|
|2016-09-24 10:03:17|2016-10-19 09:00:00|        0|         1| 24.95605324074074|598.9452777777777|35936.71666666667|    2156203|
+-------------------+-------------------+---------+----------+------------------+-----------------+-----------------+-----------+

10-4:日時型の増減

  • Pysparkの関数自体には、日時の増減の関数はない。そのため、Spark SQLのINTERVALを用いて行う。
df_10_4 = df_reserve.withColumn('reserve_date', F.to_date('reserve_datetime'))

df_10_4 = df_10_4.withColumn('reserve_date_add_day', F.col(
    'reserve_date') + F.expr('INTERVAL 1 days'))
df_10_4 = df_10_4.withColumn('reserve_datetime_add_day', F.col(
    'reserve_datetime') + F.expr('INTERVAL 1 days'))
df_10_4 = df_10_4.withColumn('reserve_datetime_add_hour', F.col(
    'reserve_datetime') + F.expr('INTERVAL 1 HOURS'))
df_10_4 = df_10_4.withColumn('reserve_datetime_add_minute', F.col(
    'reserve_datetime') + F.expr('INTERVAL 1 minutes'))
df_10_4 = df_10_4.withColumn('reserve_datetime_add_second', F.col(
    'reserve_datetime') + F.expr('INTERVAL 1 seconds'))

df_10_4.select(df_10_4.colRegex("`reserve.*`")
               ).show(3, truncate=False)
	       
+----------+-------------------+------------+--------------------+------------------------+-------------------------+---------------------------+---------------------------+
|reserve_id|reserve_datetime   |reserve_date|reserve_date_add_day|reserve_datetime_add_day|reserve_datetime_add_hour|reserve_datetime_add_minute|reserve_datetime_add_second|
+----------+-------------------+------------+--------------------+------------------------+-------------------------+---------------------------+---------------------------+
|r1        |2016-03-06 13:09:42|2016-03-06  |2016-03-07          |2016-03-07 13:09:42     |2016-03-06 14:09:42      |2016-03-06 13:10:42        |2016-03-06 13:09:43        |
|r2        |2016-07-16 23:39:55|2016-07-16  |2016-07-17          |2016-07-17 23:39:55     |2016-07-17 00:39:55      |2016-07-16 23:40:55        |2016-07-16 23:39:56        |
|r3        |2016-09-24 10:03:17|2016-09-24  |2016-09-25          |2016-09-25 10:03:17     |2016-09-24 11:03:17      |2016-09-24 10:04:17        |2016-09-24 10:03:18        |
+----------+-------------------+------------+--------------------+------------------------+-------------------------+---------------------------+---------------------------+

10-5:季節への変換

df_10_5 = df_reserve.withColumn(
    'reserve_datetime_month', F.month('reserve_datetime'))
df_10_5 = df_10_5.withColumn('reserve_season',
                             F.when((3 <= F.col('reserve_datetime_month')) & (F.col('reserve_datetime_month') <= 5), 'spring')
                             .when((6 <= F.col('reserve_datetime_month')) & (F.col('reserve_datetime_month') <= 8), 'summer')
                             .when((9 <= F.col('reserve_datetime_month')) & (F.col('reserve_datetime_month') <= 11), 'autumn').otherwise('winter'))

10-6:Qなし

10-7:平日/休日への変換

  • 祝日などもあるので、休日マスタを作ってjoinしましょうという項のためコードはなし。
脚注
  1. https://spark.apache.org/docs/latest/sql-ref-datatypes.html ↩︎

  2. https://zenn.dev/tjjj/articles/a93293960a495a ↩︎

  3. https://towardsdatascience.com/selecting-multiple-columns-in-pyspark-d1aac072fcc0 ↩︎

Discussion