🆗
【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加③複雑なwhen()分岐】
【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加③複雑なwhen()分岐】
↓前日情報つき料金プラン顧客情報(user_plan_table_v002)のサンプル
acq_dt | user_id | cur_price_plan_cd | cur_price_plan_name | new_price_plan_cd | new_price_plan_name | bef_cur_price_plan_cd | bef_cur_price_plan_name | bef_new_price_plan_cd | bef_new_price_plan_name |
---|---|---|---|---|---|---|---|---|---|
20250502 | user000001 | P001 | ベーシックプラン | NULL | NULL | P001 | ベーシックプラン | NULL | NULL |
20250502 | user000002 | P001 | ベーシックプラン | NULL | NULL | P002 | プレミアムプラン | P001 | ベーシックプラン |
20250502 | user000004 | NULL | NULL | NULL | NULL | P001 | ベーシックプラン | NULL | NULL |
20250502 | user000005 | NULL | NULL | P001 | ベーシックプラン | NULL | NULL | NULL | NULL |
20250502 | user000006 | P001 | ベーシックプラン | NULL | NULL | NULL | NULL | P001 | ベーシックプラン |
上記のような決済データを集約したSQLテーブルが存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『"料金プラン変更ステータス"項目を作成したい』
本稿では、クライアントからの要望に答えながら、 複雑なwhen()分岐 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
- 料金プランは『ベーシックプラン・プレミアムプラン』の2種類
-
-
合意『物理名を"plan_change_status"とし、区分分けを以下の通りとする』
-
前日無加入・当日加入 → 新規
- 例) 20250501に料金プランに加入せず、20250502に料金プランに加入した人は"新規"。
-
当日プラン変更なし → 継続
- 例) 20250501に同じ料金プランに加入した人は"継続"
-
当日プラン変更あり → 変更・
- 例) 20250501に別の料金プランに加入した人は"変更"
-
前日加入・当日無加入 → 解約
- 例) 20250501に料金プランに加入していて、20250502に料金プランに加入していない人は"解約"。
-
前日無加入・当日加入 → 新規
-
合意『物理名を"plan_change_status"とし、区分分けを以下の通りとする』
-
- 当日料金プラン加入者のユーザーID
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
acq_date | user_id | plan_change_status |
---|---|---|
20250501 | user000001 | 継続 |
20250501 | user000002 | 変更 |
20250502 | user000001 | 継続 |
20250502 | user000005 | 新規 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、spark.sql()メソッドでDDLを書き、SQLテーブルを読み込みます。
SQLテーブルを読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS user_plan_table_v002")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS user_plan_table_v002 (
acq_date STRING, --取得日
user_id STRING, --ユーザーID
cur_price_plan_cd STRING, --現加入料金プランコード
cur_price_plan_name STRING, --現加入料金プラン名称
new_price_plan_cd STRING, --新加入料金プランコード
new_price_plan_name STRING, --新加入料金プラン名称
bef_cur_price_plan_cd STRING, --前日現加入料金プランコード
bef_cur_price_plan_name STRING, --前日現加入料金プラン名称
bef_new_price_plan_cd STRING, --前日新加入料金プランコード
bef_new_price_plan_name STRING, --前日現加入料金プラン名称
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/user_plan_table_v002/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE user_plan_table_v002;""")
読み込んだデータから、"plan_change_status(料金ステータス変更)"項目を作成します。
作成ロジックは以下の通りです。
- "新規"
- "bef_cur_price_plan_cd"と"bef_new_price_plan_cd"が共にNullかつ、
- "変更"
- "bef_cur_price_plan_cd"と"bef_new_price_plan_cd"のいずれか1つがNullでないかつ、"new_price_plan_cd"がNullでなく"cur_price_plan_cd"と"new_price_plan_cd"の値が異なる
- "継続"
- "cur_price_plan_cd"と"new_price_plan_cd"のいずれか1つがNullでないかつ、cur_price_plan_cdがNullでなく"new_price_plan_cd"がNull
- "解約"
- "bef_cur_price_plan_cd"と"bef_new_price_plan_cd"のいずれか1つがNullでないかつ、"cur_price_plan_cd"がNull
複雑なwhen()分岐を捌くコツとしてはMECE、ベン図、フローチャートのようなツールを頭で想像することです。
"plan_change_status"項目を作成
output_sdf = (
spark.read.table("user_plan_table_v002")
.withColumn(
"plan_change_status",
fn.when(
(fn.col("bef_cur_price_plan_cd").isNull()) & (fn.col("bef_new_price_plan_cd").isNull()),
"新規"
).when(
(
(fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
) & (fn.col("new_price_plan_cd").isNotNull()) & (fn.col("cur_price_plan_cd") != fn.col("new_price_plan_cd")),
"変更"
).when(
(
(fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
) & (fn.col("cur_price_plan_cd").isNotNull() & fn.col("new_price_plan_cd").isNull()),
"継続"
).when(
(
(fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
) & (fn.col("cur_price_plan_cd").isNull()),
"解約"
).otherwise("不明")
)
)
output_sdf.show(7)
acq_dt | user_id | plan_change_status |
---|---|---|
20250502 | user000001 | 継続 |
20250502 | user000002 | 継続 |
20250502 | user000004 | 解約 |
20250502 | user000005 | 新規 |
20250502 | user000006 | 継続 |
20250502 | user000007 | 継続 |
20250502 | user000009 | 変更 |
上記の結果から、操作が意図した通りであることが確認できました。
最後にスクリプト全量を紹介します。
スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd
# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS user_plan_table_v002")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS user_plan_table_v002 (
acq_date STRING, --取得日
user_id STRING, --ユーザーID
cur_price_plan_cd STRING, --現加入料金プランコード
cur_price_plan_name STRING, --現加入料金プラン名称
new_price_plan_cd STRING, --新加入料金プランコード
new_price_plan_name STRING, --新加入料金プラン名称
bef_cur_price_plan_cd STRING, --前日現加入料金プランコード
bef_cur_price_plan_name STRING, --前日現加入料金プラン名称
bef_new_price_plan_cd STRING, --前日新加入料金プランコード
bef_new_price_plan_name STRING, --前日現加入料金プラン名称
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/user_plan_table_v002/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE user_plan_table_v002;""")
# "plan_change_status(料金ステータス変更)"項目作成
output_sdf = (
spark.read.table("user_plan_table_v002")
.withColumn(
"plan_change_status",
fn.when(
(fn.col("bef_cur_price_plan_cd").isNull()) & (fn.col("bef_new_price_plan_cd").isNull()),
"新規"
).when(
(
(fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
) & (fn.col("new_price_plan_cd").isNotNull()) & (fn.col("cur_price_plan_cd") != fn.col("new_price_plan_cd")),
"変更"
).when(
(
(fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
) & (fn.col("cur_price_plan_cd").isNotNull() & fn.col("new_price_plan_cd").isNull()),
"継続"
).when(
(
(fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
) & (fn.col("cur_price_plan_cd").isNull()),
"解約"
).otherwise("不明")
)
)
『結論』
◾️複雑なwhen()分岐を習得するメリット(ビジネスサイド)
複雑なwhen()分岐を習得するメリットは以下の通りです。
-
柔軟なビジネスルールの反映
- 顧客行動の分類など、複雑なビジネス要件をデータ分析ロジックに忠実に反映可能です。
-
データ品質向上
- データの特性に応じた複雑な条件分岐ができるため、データクレンジングや異常データの早期発見が容易になります。
◾️複雑なwhen()分岐を習得するメリット(エンジニアリングサイド)
Fullouter(完全外部結合)を習得するメリットは以下の通りです。
-
柔軟なデータ処理能力の習得
- when()を使った条件分岐によって、データフローの柔軟性が向上します。
-
エラーの予防とメンテナンス性向上
- 複雑な条件をコードで明確に定義することで、誤判定を防止し、保守性の高いコードを作成できます。
Discussion