【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加②over(Window.partitionBy(""))】
【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加②over(Window.partitionBy(""))】
↓トランザクションテーブル(transaction_table)のサンプル
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記のような決済データを集約したテーブルが [ 2025/04/04 ] に存在すると仮定します。
◾️要望
2025/04/04 朝会MTGにて、クライアントから次のような要望を頂きました。
『1~3月決済の 休眠復帰クラス別 の合計金額を評価したい』
本稿では、クライアントからの要望に答えながら、 データフレームの項目追加 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
- 決済者のナーチャリングをステータスを用いて評価する。
-
合意『物理名を"pay_status"とし、区分分けを以下の通りとする』
-
過去決済なし・当月決済なし → 未利用
- 例) 1月に決済なしの人は"未利用"。1月決済なしかつ2月決済なしの人は"未利用"。
-
過去決済なし・当月決済あり → 新規
- 例) 1月に決済ありの人は"新規"。1月決済なしかつ2月決済ありの人は"新規"。
-
前月決済あり・当月決済あり → 継続
- 例) 1月決済ありかつ2月決済ありの人は"継続"。
-
過去決済あり・前月決済なし・当月決済あり → 復帰
- 例) 1月決済ありかつ2月決済なしかつ3月決済なしの人は"復帰"。
-
過去決済あり・当月決済なし → 休眠
- 例) 1月決済ありかつ2月決済なしの人は"休眠"。
-
過去決済なし・当月決済なし → 未利用
- 注意:決済ステータスが"未利用"や"休眠"の時、合計金額が0となるため、アウトプットに反映されません。
-
合意『物理名を"pay_status"とし、区分分けを以下の通りとする』
- 決済者のナーチャリングをステータスを用いて評価する。
- 合計金額
- 税込と税抜のカラムがあり、合計金額の解釈が複数ある。『税込と税抜のそれぞれで合計金額を集計』『税込のみ』『税抜のみ』
- 合意『 税込合計金額(pay_gross_raw) と 税抜合計金額(pay_gross_without_tax) のそれぞれを集計』
- 税込と税抜のカラムがあり、合計金額の解釈が複数ある。『税込と税抜のそれぞれで合計金額を集計』『税込のみ』『税抜のみ』
- 人数の集計
- "user_id"のカウント方法が複数考えられる。
- 合意『 各カテゴリ毎にuser_idのユニーク数をカウントする。 』
- "user_id"のカウント方法が複数考えられる。
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
shop_id | pay_method | pay_status | user_cnt | pay_gross_raw | pay_gross_without_tax |
---|---|---|---|---|---|
shop000001 | cache | 継続 | 2 | 9900 | 9000 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、データの期間抽出(2025/01 ~ 2025/03)を行います。
tran_sdf = tran_sdf.filter(fn.concat("y","m").between("202501","202503"))
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48:03 | user000005 | shop0004 | 1540 | 1400 | 2025 | 1 | 3 |
102 | cache | 2025/01/04 22:27:16 | user000027 | shop0006 | 6050 | 5500 | 2025 | 1 | 4 |
103 | code | 2025/01/08 03:22:07 | user000020 | shop0003 | 8140 | 7400 | 2025 | 1 | 8 |
次に、"pay_flg"カラムと"year_month"カラムを作成し、それぞれfn.lit("1")
、fn.concat_ws("-","y","m")
とします。
tran_sdf = (
tran_sdf
.withColumn("pay_flg",fn.lit("1"))
.withColumn("year_month",fn.concat_ws("-","y","m"))
)
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d | pay_flg | year_month |
---|---|---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48:03 | user000005 | shop0004 | 1540 | 1400 | 2025 | 1 | 3 | 1 | 2025-01 |
102 | cache | 2025/01/04 22:27:16 | user000027 | shop0006 | 6050 | 5500 | 2025 | 1 | 4 | 1 | 2025-01 |
103 | code | 2025/01/08 03:22:07 | user000020 | shop0003 | 8140 | 7400 | 2025 | 1 | 8 | 1 | 2025-01 |
ここで、 ユーザー毎に月単位でユニークな決済フラグ(monthly_pay_flg) を取得するため、pivot()メソッドを使用します。具体的には、"user_id"で集約し、"pay_flg"の最大値(max)を"m"で縦持ちします。
その後、selectExpr()メソッドとstack()メソッドを使用し、 ユーザー毎に月単位でユニークな決済フラグ(monthly_pay_flg) を作成します。
monthly_tran_sdf = (
tran_sdf
.groupBy("user_id")
.pivot("m").max("pay_flg")
.fillna(0)
.selectExpr("user_id", "stack(3, '1', `1`, '2', `2`, '3', `3`) as (month, monthly_pay_flg)")
)
user_id | month | monthly_pay_flg |
---|---|---|
user000016 | 1 | 1 |
user000016 | 2 | 0 |
user000016 | 3 | 0 |
user000017 | 1 | 1 |
user000017 | 2 | 0 |
user000017 | 3 | 1 |
上記の操作で、"user_id"と月単位でユニークな決済フラグが作れたため、"user_id"毎に"monthly_pay_flg"を0区切りの累積和を算出します。
cont_months_sdf = (
monthly_tran_sdf
# "monthly_pay_flg"を0区切りで区分する"group"を作成
.withColumn(
"group",
fn.sum((fn.col("monthly_pay_flg") == 0).cast("int")).over(Window.partitionBy("user_id").orderBy("month"))
)
# "user_id"と"group"で集約し、"monthly_pay_flg"の合計値を算出するcont_monthsを作成
.withColumn(
"cont_months",
fn.sum("monthly_pay_flg").over(Window.partitionBy("user_id", "group").orderBy("month"))
)
)
user_id | month | monthly_pay_flg | group | cont_months |
---|---|---|---|---|
user000001 | 1 | 0 | 1 | 0 |
user000001 | 2 | 1 | 1 | 1 |
user000001 | 3 | 1 | 1 | 2 |
user000003 | 1 | 0 | 1 | 0 |
user000003 | 2 | 1 | 1 | 1 |
user000003 | 3 | 0 | 2 | 1 |
user000012 | 1 | 1 | 0 | 1 |
user000012 | 2 | 0 | 1 | 1 |
user000012 | 3 | 1 | 1 | 1 |
また、"monthly_pay_flg"を"user_id"毎に参照し、前月決済回数"bef_pay_cnt" を作成します。
cont_months_sdf = (
cont_months_sdf
.withColumn(
"bef_pay_cnt",
fn.sum("monthly_pay_flg")
.over(Window.partitionBy("user_id").orderBy("month"))
)
)
user_id | month | monthly_pay_flg | group | cont_months | bef_pay_cnt |
---|---|---|---|---|---|
user000001 | 1 | 0 | 1 | 0 | 0 |
user000001 | 2 | 1 | 1 | 1 | 1 |
user000001 | 3 | 1 | 1 | 2 | 2 |
user000003 | 1 | 0 | 1 | 0 | 0 |
user000003 | 2 | 1 | 1 | 1 | 1 |
user000003 | 3 | 0 | 2 | 0 | 1 |
user000012 | 1 | 1 | 0 | 1 | 1 |
user000012 | 2 | 0 | 1 | 0 | 1 |
user000012 | 3 | 1 | 1 | 1 | 2 |
そして、条件分岐when()メソッドで"group","bef_pay_cnt","monthly_pay_flg","cont_months"の値を使用し、決済ステータス"pay_status"を作成 します。
pay_status_sdf = (
cont_months_sdf
.withColumn(
"pay_status",
fn.when(
(fn.col("group")>="1") & (fn.col("bef_pay_cnt")=="0"),
"未利用"
)
.when(
(fn.col("group")>="1") & (fn.col("cont_months")=="0") & (fn.col("bef_pay_cnt")>="1"),
"休眠"
)
.when(
(fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")=="1") & (fn.col("bef_pay_cnt")=="1"),
"新規"
)
.when(
(fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")=="1") & (fn.col("bef_pay_cnt")>="2"),
"復帰"
)
.when(
(fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")>="2") & (fn.col("bef_pay_cnt")>="2"),
"継続"
)
.otherwise("区分エラー")
)
)
user_id | month | monthly_pay_flg | group | cont_months | bef_pay_cnt | pay_status |
---|---|---|---|---|---|---|
user000001 | 1 | 0 | 1 | 0 | 0 | 未利用 |
user000001 | 2 | 1 | 1 | 1 | 1 | 新規 |
user000001 | 3 | 1 | 1 | 2 | 2 | 継続 |
user000003 | 1 | 0 | 1 | 0 | 0 | 未利用 |
user000003 | 2 | 1 | 1 | 1 | 1 | 新規 |
user000003 | 3 | 0 | 2 | 0 | 1 | 休眠 |
user000012 | 1 | 1 | 0 | 1 | 1 | 新規 |
user000012 | 2 | 0 | 1 | 0 | 1 | 休眠 |
user000012 | 3 | 1 | 1 | 1 | 2 | 復帰 |
上記の操作の結果から、決済ステータスが意図した通りのロジックになっていることが確認できました。
最後の操作として、"transaction_table"と"pay_status_sdf"を"user_id"をキーに結合します。また、最終的なアウトプットを作成します。
output_sdf = (
tran_sdf
.join(pay_status_sdf, on=["user_id","m"], how="left")
.groupBy("shop_id","pay_method","pay_status")
.agg(
fn.countDistinct("user_id").alias("user_cnt"),
fn.sum("pay_amount_raw").alias("pay_gross_raw"),
fn.sum("pay_amount_without_tax").alias("pay_gross_without_tax")
)
.orderBy("shop_id","pay_method","pay_status")
)
shop_id | pay_method | pay_status | user_cnt | pay_gross_raw | pay_gross_without_tax |
---|---|---|---|---|---|
shop0001 | cache | 新規 | 1 | 1760.0 | 1600.0 |
shop0001 | card | 復帰 | 1 | 9900.0 | 9000.0 |
shop0001 | card | 新規 | 2 | 5280.0 | 4800.0 |
shop0001 | code | 新規 | 1 | 8140.0 | 7400.0 |
shop0002 | cache | 復帰 | 1 | 1430.0 | 1300.0 |
shop0002 | card | 新規 | 1 | 110.0 | 100.0 |
shop0003 | cache | 新規 | 2 | 5610.0 | 5100.0 |
shop0003 | card | 復帰 | 1 | 1100.0 | 1000.0 |
shop0003 | card | 新規 | 1 | 1760.0 | 1600.0 |
shop0003 | code | 新規 | 1 | 8140.0 | 7400.0 |
◾️コードスニペット全量
上記のコードスニペットをまとめたスクリプトの全量を紹介します。
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.window import Window
# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブル読み込み
# データの期間抽出(2025/01 ~ 2025/03)およびpay_flgとyear_month作成
# 月単位でユニークな決済フラグを取得
monthly_tran_sdf = (
tran_sdf
.filter(fn.concat("y","m").between("202501","202503"))
.withColumn("pay_flg", fn.lit(1))
.withColumn("year_month", fn.concat_ws("-", fn.col("y"), fn.col("m")))
.groupBy("user_id")
.pivot("m").max("pay_flg")
.fillna(0)
.selectExpr(
"user_id",
"stack(3, '1', `1`, '2', `2`, '3', `3`) as (month, monthly_pay_flg)"
)
)
cont_months_sdf = (
monthly_tran_sdf
# "monthly_pay_flg"を0区切りで区分する"group"を作成
.withColumn(
"group",
fn.sum((fn.col("monthly_pay_flg") == 0).cast("int"))
.over(Window.partitionBy("user_id")
.orderBy("m"))
)
# "user_id"と"group"で集約し、
# "monthly_pay_flg"の合計値を算出するcont_monthsを作成
.withColumn(
"cont_months",
fn.sum("monthly_pay_flg")
.over(Window.partitionBy("user_id", "group")
.orderBy("m"))
)
# "user_id"で集約し、
# "monthly_pay_flg"の合計値を算出する"bef_pay_cnt"を作成
.withColumn(
"bef_pay_cnt",
fn.sum("monthly_pay_flg")
.over(Window.partitionBy("user_id")
.orderBy("m"))
)
)
# 決済ステータス"pay_status"を作成
pay_status_sdf = (
cont_months_sdf
.withColumn(
"pay_status",
fn.when(
(fn.col("group")>="1") & (fn.col("bef_pay_cnt")=="0"),
"未利用"
)
.when(
(fn.col("group")>="1") & (fn.col("cont_months")=="0") & (fn.col("bef_pay_cnt")>="1"),
"休眠"
)
.when(
(fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")=="1") & (fn.col("bef_pay_cnt")=="1"),
"新規"
)
.when(
(fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")=="1") & (fn.col("bef_pay_cnt")>="2"),
"復帰"
)
.when(
(fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")>="2") & (fn.col("bef_pay_cnt")>="2"),
"継続"
)
.otherwise("区分エラー")
)
)
# アウトプットを作成
output_sdf = (
tran_sdf
.join(pay_status_sdf, on=["user_id","m"], how="left")
.groupBy("shop_id","pay_method","pay_status")
.agg(
fn.countDistinct("user_id").alias("user_cnt"),
fn.sum("pay_amount_raw").alias("pay_gross_raw"),
fn.sum("pay_amount_without_tax").alias("pay_gross_without_tax")
)
.orderBy("shop_id","pay_method","pay_status")
)
『結論』
◾️決済ステータスのロジック構築の意義(ビジネスサイド)
決済ステータスのロジック構築を行うことの意義は以下の通りです。
- 「新規」「継続」「復帰」「休眠」など、顧客の決済行動を時系列で細分化でき、顧客ライフサイクルの変化を追跡できる。
- マーケティングや営業施策を顧客セグメントに応じて最適化できる。
◾️決済ステータスのロジック構築のメリット(エンジニアリングサイド)
決済ステータスのロジック構築を行うことのメリットは以下の通りです。
- 『条件分岐やウィンドウ関数を活用する経験』 は、データ処理の最適化や再利用性の高いコード設計能力を育てます。
- "pay_status" のような条件分岐が多いロジックでは、単純なスクリプトでは処理が遅くなるケースが多いですが、PySparkならデータの並列処理を活かして高速化が可能です。
Discussion
「0区切りの累積和」の理解がちょっと難しいですが、頑張ります!
"連続" というシンプルな概念がアルゴリズムだと複雑になるの面白いですよね!