🐥
前処理大全をPysparkで試みる(10章)
はじめに
こちらの記事のつづき
前提
from pyspark.sql import functions as F
from pyspark.sql.window import Window as W
10章:日時型
10-1:日時型、日付型への変換
- Sparkには日時型TimestampTypeと日付型DateTypeがある[1]。
- Timestamp型であれば、読み込み時にoption:"timestampFormat"を指定することにより、Timestamp型での読み込みが可能である。[2]
- stringからTimestamp、Dateへの変換の関数はto_timestamp,to_dateとなる。デフォルトフォーマットは次のとおりであり、デフォルトフォーマットの場合はcastを使用しても同様である。
- DateType: yyyy-MM-dd
- TimestampType: yyyy-MM-dd HH:mm:ss.SSSS
df_reserve = spark.read.option("header", True).option("inferSchema", True).option(
"timestampFormat", "yyyy-MM-dd HH:mm:ss").csv('../../../data/reserve.csv')
df_10_1 = df_reserve.withColumn('checkin_timestamp', F.to_timestamp(F.concat_ws(
' ', 'checkin_date', 'checkin_time'),)).withColumn('checkin_date', F.to_date('checkin_date',))
df_10_1.printSchema()
df_10_1.show(3)
root
|-- reserve_id: string (nullable = true)
|-- hotel_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- reserve_datetime: timestamp (nullable = true)
|-- checkin_date: date (nullable = true)
|-- checkin_time: string (nullable = true)
|-- checkout_date: string (nullable = true)
|-- people_num: integer (nullable = true)
|-- total_price: integer (nullable = true)
|-- checkin_timestamp: timestamp (nullable = true)
+----------+--------+-----------+-------------------+------------+------------+-------------+----------+-----------+-------------------+
|reserve_id|hotel_id|customer_id| reserve_datetime|checkin_date|checkin_time|checkout_date|people_num|total_price| checkin_timestamp|
+----------+--------+-----------+-------------------+------------+------------+-------------+----------+-----------+-------------------+
| r1| h_75| c_1|2016-03-06 13:09:42| 2016-03-26| 10:00:00| 2016-03-29| 4| 97200|2016-03-26 10:00:00|
| r2| h_219| c_1|2016-07-16 23:39:55| 2016-07-20| 11:30:00| 2016-07-21| 2| 20600|2016-07-20 11:30:00|
| r3| h_179| c_1|2016-09-24 10:03:17| 2016-10-19| 09:00:00| 2016-10-22| 2| 33600|2016-10-19 09:00:00|
+----------+--------+-----------+-------------------+------------+------------+-------------+----------+-----------+-------------------+
only showing top 3 rows
10-2:年、月、日、曜日、時、分、秒、文字列の変換
- reserve_datetimeに関連したカラムのみを表示して確認[3]
df_10_2 = df_reserve.withColumn('reserve_datetime_year', F.year(
'reserve_datetime')).withColumn('reserve_datetime_month', F.month(
'reserve_datetime')).withColumn('reserve_datetime_day', F.dayofmonth(
'reserve_datetime')).withColumn('reserve_datetime_wday', F.dayofweek(
'reserve_datetime')).withColumn('reserve_datetime_hour', F.hour(
'reserve_datetime')).withColumn('reserve_datetime_minute', F.minute(
'reserve_datetime')).withColumn('reserve_datetime_second', F.second(
'reserve_datetime')).withColumn('reserve_datetime_string', F.date_format(
'reserve_datetime', 'yyyy-MM-dd hh:mm:ss'))
df_10_2.select(df_10_2.colRegex("`reserve_datetime.*`")
).show(3, truncate=False)
df_10_2.printSchema()
+-------------------+---------------------+----------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+-----------------------+
|reserve_datetime |reserve_datetime_year|reserve_datetime_month|reserve_datetime_day|reserve_datetime_wday|reserve_datetime_hour|reserve_datetime_minute|reserve_datetime_second|reserve_datetime_string|
+-------------------+---------------------+----------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+-----------------------+
|2016-03-06 13:09:42|2016 |3 |6 |1 |13 |9 |42 |2016-03-06 01:09:42 |
|2016-07-16 23:39:55|2016 |7 |16 |7 |23 |39 |55 |2016-07-16 11:39:55 |
|2016-09-24 10:03:17|2016 |9 |24 |7 |10 |3 |17 |2016-09-24 10:03:17 |
+-------------------+---------------------+----------------------+--------------------+---------------------+---------------------+-----------------------+-----------------------+-----------------------+
only showing top 3 rows
root
|-- reserve_id: string (nullable = true)
|-- hotel_id: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- reserve_datetime: timestamp (nullable = true)
|-- checkin_date: string (nullable = true)
|-- checkin_time: string (nullable = true)
|-- checkout_date: string (nullable = true)
|-- people_num: integer (nullable = true)
|-- total_price: integer (nullable = true)
|-- reserve_datetime_year: integer (nullable = true)
|-- reserve_datetime_month: integer (nullable = true)
|-- reserve_datetime_day: integer (nullable = true)
|-- reserve_datetime_wday: integer (nullable = true)
|-- reserve_datetime_hour: integer (nullable = true)
|-- reserve_datetime_minute: integer (nullable = true)
|-- reserve_datetime_second: integer (nullable = true)
|-- reserve_datetime_string: string (nullable = true)
10-3:日時差への変換
- 年、月はそれ以外の単位を考慮せずに差分を取る問なので、10-2のように年、月を抜き出して単純に差分をとる。(months_betweenは、2つの日付の差を月数で出力するので、本問に対する解法としてはそぐわない)
- 日の差分はdatediff関数があるが、これはyyyy-mm-ddにおける差分のため、時間までを考慮した場合にはズレが生じる。(例えば、2016-07-16 23:39:55と2016-07-20 11:30:00の場合、datediffでは4になるが、時間までを含めた日の差分であれば、3.4…となる)今回の問では時間までを考慮した問のため、unixtimeに変換した上で差分を取る。
- 時、分、秒は日と同様に差分を取る。
- 対象データは年で差がでるように一部加工する。
df_10_3 = df_10_3.withColumn('diff_year', F.year(
'checkin_timestamp') - F.year('reserve_datetime'))
df_10_3 = df_10_3.withColumn('diff_month', (F.year('checkin_timestamp') * F.lit(12) + F.month(
'checkin_timestamp')) - (F.year('reserve_datetime') * F.lit(12) + F.month('reserve_datetime')))
df_10_3 = df_10_3.withColumn('diff_date', (F.unix_timestamp(
'checkin_timestamp') - F.unix_timestamp('reserve_datetime')) / F.lit(60) / F.lit(60) / F.lit(24))
df_10_3 = df_10_3.withColumn('diff_hour', (F.unix_timestamp(
'checkin_timestamp') - F.unix_timestamp('reserve_datetime')) / F.lit(60) / F.lit(60))
df_10_3 = df_10_3.withColumn('diff_minute', (F.unix_timestamp(
'checkin_timestamp') - F.unix_timestamp('reserve_datetime')) / F.lit(60))
df_10_3 = df_10_3.withColumn('diff_second', (F.unix_timestamp(
'checkin_timestamp') - F.unix_timestamp('reserve_datetime')))
df_10_3.select('reserve_datetime', 'checkin_timestamp',
df_10_3.colRegex("`diff.*`")).show(3)
+-------------------+-------------------+---------+----------+------------------+-----------------+-----------------+-----------+
| reserve_datetime| checkin_timestamp|diff_year|diff_month| diff_date| diff_hour| diff_minute|diff_second|
+-------------------+-------------------+---------+----------+------------------+-----------------+-----------------+-----------+
|2016-03-06 13:09:42|2017-02-26 10:00:00| 1| 11|356.86826388888886|8564.838333333333| 513890.3| 30833418|
|2016-07-16 23:39:55|2016-07-20 11:30:00| 0| 0|3.4931134259259253|83.83472222222221|5030.083333333333| 301805|
|2016-09-24 10:03:17|2016-10-19 09:00:00| 0| 1| 24.95605324074074|598.9452777777777|35936.71666666667| 2156203|
+-------------------+-------------------+---------+----------+------------------+-----------------+-----------------+-----------+
10-4:日時型の増減
- Pysparkの関数自体には、日時の増減の関数はない。そのため、Spark SQLのINTERVALを用いて行う。
df_10_4 = df_reserve.withColumn('reserve_date', F.to_date('reserve_datetime'))
df_10_4 = df_10_4.withColumn('reserve_date_add_day', F.col(
'reserve_date') + F.expr('INTERVAL 1 days'))
df_10_4 = df_10_4.withColumn('reserve_datetime_add_day', F.col(
'reserve_datetime') + F.expr('INTERVAL 1 days'))
df_10_4 = df_10_4.withColumn('reserve_datetime_add_hour', F.col(
'reserve_datetime') + F.expr('INTERVAL 1 HOURS'))
df_10_4 = df_10_4.withColumn('reserve_datetime_add_minute', F.col(
'reserve_datetime') + F.expr('INTERVAL 1 minutes'))
df_10_4 = df_10_4.withColumn('reserve_datetime_add_second', F.col(
'reserve_datetime') + F.expr('INTERVAL 1 seconds'))
df_10_4.select(df_10_4.colRegex("`reserve.*`")
).show(3, truncate=False)
+----------+-------------------+------------+--------------------+------------------------+-------------------------+---------------------------+---------------------------+
|reserve_id|reserve_datetime |reserve_date|reserve_date_add_day|reserve_datetime_add_day|reserve_datetime_add_hour|reserve_datetime_add_minute|reserve_datetime_add_second|
+----------+-------------------+------------+--------------------+------------------------+-------------------------+---------------------------+---------------------------+
|r1 |2016-03-06 13:09:42|2016-03-06 |2016-03-07 |2016-03-07 13:09:42 |2016-03-06 14:09:42 |2016-03-06 13:10:42 |2016-03-06 13:09:43 |
|r2 |2016-07-16 23:39:55|2016-07-16 |2016-07-17 |2016-07-17 23:39:55 |2016-07-17 00:39:55 |2016-07-16 23:40:55 |2016-07-16 23:39:56 |
|r3 |2016-09-24 10:03:17|2016-09-24 |2016-09-25 |2016-09-25 10:03:17 |2016-09-24 11:03:17 |2016-09-24 10:04:17 |2016-09-24 10:03:18 |
+----------+-------------------+------------+--------------------+------------------------+-------------------------+---------------------------+---------------------------+
10-5:季節への変換
df_10_5 = df_reserve.withColumn(
'reserve_datetime_month', F.month('reserve_datetime'))
df_10_5 = df_10_5.withColumn('reserve_season',
F.when((3 <= F.col('reserve_datetime_month')) & (F.col('reserve_datetime_month') <= 5), 'spring')
.when((6 <= F.col('reserve_datetime_month')) & (F.col('reserve_datetime_month') <= 8), 'summer')
.when((9 <= F.col('reserve_datetime_month')) & (F.col('reserve_datetime_month') <= 11), 'autumn').otherwise('winter'))
10-6:Qなし
10-7:平日/休日への変換
- 祝日などもあるので、休日マスタを作ってjoinしましょうという項のためコードはなし。
Discussion