【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加➀selectExpr("stack()")】
【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加➀selectExpr("stack()")】
↓トランザクションテーブル(transaction_table)のサンプル
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記のような決済データを集約したテーブルが [ 2025/04/01 ] に存在すると仮定します。
◾️要望
2025/04/01 朝会MTGにて、クライアントから次のような要望を頂きました。
『1~3月決済の 継続月数クラス別 の平均決済額を評価したい』
本稿では、クライアントからの要望に答えながら、 データフレームの項目追加 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
- タイムログ
- 現場の締め作業やデータ処理の関係により、本日(2025/04/01)時点で2025/03/30,31のデータが未連携
- 合意『集計実行:4/4。納品:4/5』
- 現場の締め作業やデータ処理の関係により、本日(2025/04/01)時点で2025/03/30,31のデータが未連携
-
- 決済者のナーチャリングを継続月数クラスを用いて評価
-
合意『物理名を"max_cont_months"とし、区分分けを以下の通りとする』
- 1月決済・2月決済・3月決済 → 3
- 1月決済・2月決済・3月未決済 → 2
- 1月決済・2月未決済・3月決済 → 1
- 1月未決済・2月決済・3月決済 → 2
- 1月決済・2月未決済・3月未決済 → 1
- 1月未決済・2月決済・3月未決済 → 1
- 1月未決済・2月未決済・3月決済 → 1
- 1月未決済・2月未決済・3月未決済 → 0
-
合意『物理名を"max_cont_months"とし、区分分けを以下の通りとする』
- 決済者のナーチャリングを継続月数クラスを用いて評価
- 平均額
- 税込と税抜のカラムがあり、平均額の解釈が複数ある。『税込と税抜のそれぞれで平均額を集計』『税込のみ』『税抜のみ』
- 合意『 税込平均額(pay_mean_raw) と 税抜平均額(pay_mean_without_tax) のそれぞれを集計』
- 税込と税抜のカラムがあり、平均額の解釈が複数ある。『税込と税抜のそれぞれで平均額を集計』『税込のみ』『税抜のみ』
◾️タイムラグデータ
2025/04/03の集計前に決済データを確認すると、以下のレコードがあることがわかりました。
これで3月末までのデータがストレージにあることが確認できるため、集計可能です。
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
shop_id | pay_method | max_cont_months | pay_mean_raw | pay_mean_without_tax |
---|---|---|---|---|
shop000001 | cache | 1 | 9900 | 9000 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、データの期間抽出(2025/01 ~ 2025/03)を行います。
tran_sdf = tran_sdf.filter(fn.concat("y","m").between("202501","202503"))
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48:03 | user000005 | shop0004 | 1540 | 1400 | 2025 | 1 | 3 |
102 | cache | 2025/01/04 22:27:16 | user000027 | shop0006 | 6050 | 5500 | 2025 | 1 | 4 |
103 | code | 2025/01/08 03:22:07 | user000020 | shop0003 | 8140 | 7400 | 2025 | 1 | 8 |
次に、"pay_flg"カラムと"year_month"カラムを作成し、それぞれfn.lit("1")
、fn.concat_ws("-","y","m")
とします。
tran_sdf = (
tran_sdf
.withColumn("pay_flg",fn.lit("1"))
.withColumn("year_month",fn.concat_ws("-","y","m"))
)
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d | pay_flg | year_month |
---|---|---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48:03 | user000005 | shop0004 | 1540 | 1400 | 2025 | 1 | 3 | 1 | 2025-01 |
102 | cache | 2025/01/04 22:27:16 | user000027 | shop0006 | 6050 | 5500 | 2025 | 1 | 4 | 1 | 2025-01 |
103 | code | 2025/01/08 03:22:07 | user000020 | shop0003 | 8140 | 7400 | 2025 | 1 | 8 | 1 | 2025-01 |
ここで、 ユーザー毎に月単位でユニークな決済フラグ(monthly_pay_flg) を取得するため、pivot()メソッドを使用します。具体的には、"user_id"で集約し、"pay_flg"の最大値(max)を"m"で縦持ちします。
その後、selectExpr()メソッドとstack()メソッドを使用し、 ユーザー毎に月単位でユニークな決済フラグ(monthly_pay_flg) を作成します。
monthly_tran_sdf = (
tran_sdf
.groupBy("user_id")
.pivot("m").max("pay_flg")
.fillna(0)
.selectExpr(
"user_id",
"stack(3, '1', `1`, '2', `2`, '3', `3`) as (month, monthly_pay_flag)"
)
)
user_id | month | monthly_pay_flg |
---|---|---|
user000016 | 1 | 1 |
user000016 | 2 | 0 |
user000016 | 3 | 0 |
user000017 | 1 | 1 |
user000017 | 2 | 0 |
user000017 | 3 | 1 |
上記の操作で、"user_id"と月単位でユニークな決済フラグが作れたため、"user_id"毎に"monthly_pay_flg"を0区切りの累積和を算出します。
cont_months_sdf = (
monthly_tran_sdf
# "monthly_pay_flg"を0区切りで区分する"group"を作成
.withColumn(
"group",
fn.sum((fn.col("monthly_pay_flag") == 0).cast("int"))
.over(Window.partitionBy("user_id")
.orderBy("month"))
)
# "user_id"と"group"で集約し、
# "monthly_pay_flg"の合計値を算出するcont_monthsを作成
.withColumn(
"cont_months",
fn.sum("monthly_pay_flag")
.over(Window.partitionBy("user_id", "group")
.orderBy("month"))
)
)
user_id | month | monthly_pay_flag | group | cont_months |
---|---|---|---|---|
user000001 | 1 | 0 | 1 | 0 |
user000001 | 2 | 1 | 1 | 1 |
user000001 | 3 | 1 | 1 | 2 |
user000003 | 1 | 0 | 1 | 0 |
user000003 | 2 | 1 | 1 | 1 |
user000003 | 3 | 0 | 2 | 1 |
user000012 | 1 | 1 | 0 | 1 |
user000012 | 2 | 0 | 1 | 1 |
user000012 | 3 | 1 | 1 | 1 |
そして、グループ(group)毎の累積和の最大値("max_cont_months")を"user_id"で集計します。
max_cont_months_sdf = (
cont_months_sdf
.groupBy("user_id")
.agg(fn.max("cont_months").alias("max_cont_months"))
)
user_id | max_cont_months |
---|---|
user000001 | 2 |
user000003 | 1 |
user000012 | 1 |
上記の操作の結果より、user000012のように「1月決済あり・2月決済なし・3月決済あり」の最大連続決済回数が1となることから、意図した通りの値が算出できることが確認できました。
最後の操作として、"transaction_table"と"cont_months_df"を"user_id"をキーに結合します。また、最終的なアウトプットも集計します。
output_sdf = (
tran_sdf
.join(max_cont_months_sdf, on="user_id", how="left")
.groupBy("shop_id","pay_method")
.agg(
fn.max("max_cont_months").alias("max_cont_months"),
fn.mean("pay_amount_raw").alias("pay_gross_raw"),
fn.mean("pay_amount_without_tax").alias("pay_gross_without_tax"),
)
.orderBy("shop_id","pay_method","max_cont_months")
)
output_sdf.show()
shop_id | pay_method | max_cont_months | pay_mean_raw | pay_mean_without_tax |
---|---|---|---|---|
shop0001 | cache | 1 | 1760.0 | 1600.0 |
shop0001 | card | 1 | 4235.0 | 3850.0 |
shop0001 | code | 2 | 8140.0 | 7400.0 |
shop0002 | cache | 1 | 1430.0 | 1300.0 |
shop0002 | card | 1 | 110.0 | 100.0 |
shop0003 | cache | 2 | 2805.0 | 2550.0 |
shop0003 | card | 1 | 1430.0 | 1300.0 |
shop0003 | code | 1 | 8140.0 | 7400.0 |
◾️コードスニペット全量
上記のコードスニペットをまとめたスクリプトの全量を紹介します。
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.window import Window
# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブル読み込み
tran_sdf = spark.read.table("transaction_table")
# データの期間抽出(2025/01 ~ 2025/03)およびpay_flgとyear_month作成
# 月単位でユニークな決済フラグを取得
monthly_tran_sdf = (
tran_sdf
.filter(fn.concat("y","m").between("202501","202503"))
.withColumn("pay_flg", fn.lit(1))
.withColumn("year_month", fn.concat_ws("-", fn.col("y"), fn.col("m")))
.groupBy("user_id")
.pivot("m").max("pay_flg")
.fillna(0)
.selectExpr(
"user_id",
"stack(3, '1', `1`, '2', `2`, '3', `3`) as (month, monthly_pay_flg)"
)
)
# 0区切りの累積和を算出
cont_months_sdf = (
monthly_tran_sdf
# "monthly_pay_flg"を0区切りで区分する"group"を作成
.withColumn(
"group",
fn.sum((fn.col("monthly_pay_flg") == 0).cast("int"))
.over(Window.partitionBy("user_id")
.orderBy("month"))
)
# "user_id"と"group"で集約し、
# "monthly_pay_flg"の合計値を算出するcont_monthsを作成
.withColumn(
"cont_months",
fn.sum("monthly_pay_flg")
.over(Window.partitionBy("user_id", "group")
.orderBy("month"))
)
)
# グループ(group)毎の累積和の最大値("max_cont_months")を"user_id"で集計
max_cont_months_sdf = (
cont_months_sdf
.groupBy("user_id")
.agg(fn.max("cont_months").alias("max_cont_months"))
)
# アウトプット
output_sdf = (
tran_sdf
.join(max_cont_months_sdf, on="user_id", how="left")
.groupBy("shop_id","pay_method")
.agg(
fn.max("max_cont_months").alias("max_cont_months"),
fn.mean("pay_amount_raw").alias("pay_gross_raw"),
fn.mean("pay_amount_without_tax").alias("pay_gross_without_tax"),
)
.orderBy("shop_id","pay_method","max_cont_months")
)
『結論』
◾️決済連続月数クラスのロジック構築の意義(ビジネスサイド)
決済連続月数クラスのロジック構築を行うことの意義は次の通りです。
・ビジネス的には顧客の継続利用パターンや行動の「離脱」と「復帰」を識別しやすくなる。
・マーケティング施策やリテンション戦略の立案に活用できる。
◾️決済連続月数クラスのロジック構築のメリット(エンジニアリングサイド)
決済連続月数クラスのロジック構築を行うことのメリットは以下の通りです。
・『0区切りの累積和を算出』 など、このロジックは類似課題にも応用可能で、異常検知やパフォーマンスモニタリングなど多方面で役立ちます。
・データから価値を引き出す能力を向上させ、ビジネス価値とエンジニアリング力を両立できます。
Discussion
データが遅れて連携される想定が面白いです!
バッチ処理の関係で、実行日の2日前が最新日として連携されることが多いですね