🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加②over(Window.partitionBy(""))】

2025/01/13に公開
2

【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加②over(Window.partitionBy(""))】

↓トランザクションテーブル(transaction_table)のサンプル

tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 shop0001 770 700 2025 01 09

上記のような決済データを集約したテーブルが [ 2025/04/04 ] に存在すると仮定します。

◾️要望

2025/04/04 朝会MTGにて、クライアントから次のような要望を頂きました。

『1~3月決済の 休眠復帰クラス別 の合計金額を評価したい』

本稿では、クライアントからの要望に答えながら、 データフレームの項目追加 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • 決済ステータス

    • 決済者のナーチャリングをステータスを用いて評価する。
      • 合意『物理名を"pay_status"とし、区分分けを以下の通りとする』
        • 過去決済なし・当月決済なし → 未利用
          • 例) 1月に決済なしの人は"未利用"。1月決済なしかつ2月決済なしの人は"未利用"。
        • 過去決済なし・当月決済あり → 新規
          • 例) 1月に決済ありの人は"新規"。1月決済なしかつ2月決済ありの人は"新規"。
        • 前月決済あり・当月決済あり → 継続
          • 例) 1月決済ありかつ2月決済ありの人は"継続"。
        • 過去決済あり・前月決済なし・当月決済あり → 復帰
          • 例) 1月決済ありかつ2月決済なしかつ3月決済なしの人は"復帰"。
        • 過去決済あり・当月決済なし → 休眠
          • 例) 1月決済ありかつ2月決済なしの人は"休眠"。
      • 注意:決済ステータスが"未利用"や"休眠"の時、合計金額が0となるため、アウトプットに反映されません。
  • 合計金額
    • 税込と税抜のカラムがあり、合計金額の解釈が複数ある。『税込と税抜のそれぞれで合計金額を集計』『税込のみ』『税抜のみ』
      • 合意『 税込合計金額(pay_gross_raw)税抜合計金額(pay_gross_without_tax) のそれぞれを集計』
  • 人数の集計
    • "user_id"のカウント方法が複数考えられる。
      • 合意『 各カテゴリ毎にuser_idのユニーク数をカウントする。

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

shop_id pay_method pay_status user_cnt pay_gross_raw pay_gross_without_tax
shop000001 cache 継続 2 9900 9000

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、データの期間抽出(2025/01 ~ 2025/03)を行います。

期間抽出(2025/01 ~ 2025/03)
tran_sdf = tran_sdf.filter(fn.concat("y","m").between("202501","202503"))
tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48:03 user000005 shop0004 1540 1400 2025 1 3
102 cache 2025/01/04 22:27:16 user000027 shop0006 6050 5500 2025 1 4
103 code 2025/01/08 03:22:07 user000020 shop0003 8140 7400 2025 1 8

次に、"pay_flg"カラムと"year_month"カラムを作成し、それぞれfn.lit("1")fn.concat_ws("-","y","m")とします。

pay_flgとyear_month作成
tran_sdf = (
    tran_sdf
    .withColumn("pay_flg",fn.lit("1"))
    .withColumn("year_month",fn.concat_ws("-","y","m"))
)
tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d pay_flg year_month
101 code 2025/01/03 06:48:03 user000005 shop0004 1540 1400 2025 1 3 1 2025-01
102 cache 2025/01/04 22:27:16 user000027 shop0006 6050 5500 2025 1 4 1 2025-01
103 code 2025/01/08 03:22:07 user000020 shop0003 8140 7400 2025 1 8 1 2025-01

ここで、 ユーザー毎に月単位でユニークな決済フラグ(monthly_pay_flg) を取得するため、pivot()メソッドを使用します。具体的には、"user_id"で集約し、"pay_flg"の最大値(max)を"m"で縦持ちします。
その後、selectExpr()メソッドstack()メソッドを使用し、 ユーザー毎に月単位でユニークな決済フラグ(monthly_pay_flg) を作成します。

月単位でユニークな決済フラグを取得
monthly_tran_sdf = (
    tran_sdf
    .groupBy("user_id")
    .pivot("m").max("pay_flg")
    .fillna(0)
    .selectExpr("user_id", "stack(3, '1', `1`, '2', `2`, '3', `3`) as (month, monthly_pay_flg)")
)
user_id month monthly_pay_flg
user000016 1 1
user000016 2 0
user000016 3 0
user000017 1 1
user000017 2 0
user000017 3 1

上記の操作で、"user_id"と月単位でユニークな決済フラグが作れたため、"user_id"毎に"monthly_pay_flg"を0区切りの累積和を算出します。

0区切りの累積和を算出
cont_months_sdf = (
    monthly_tran_sdf
    # "monthly_pay_flg"を0区切りで区分する"group"を作成
    .withColumn(
        "group",
        fn.sum((fn.col("monthly_pay_flg") == 0).cast("int")).over(Window.partitionBy("user_id").orderBy("month"))
    )
    # "user_id"と"group"で集約し、"monthly_pay_flg"の合計値を算出するcont_monthsを作成
    .withColumn(
        "cont_months",
        fn.sum("monthly_pay_flg").over(Window.partitionBy("user_id", "group").orderBy("month"))
    )
)
user_id month monthly_pay_flg group cont_months
user000001 1 0 1 0
user000001 2 1 1 1
user000001 3 1 1 2
user000003 1 0 1 0
user000003 2 1 1 1
user000003 3 0 2 1
user000012 1 1 0 1
user000012 2 0 1 1
user000012 3 1 1 1

また、"monthly_pay_flg"を"user_id"毎に参照し、前月決済回数"bef_pay_cnt" を作成します。

前月決済回数"bef_pay_cnt"を作成
cont_months_sdf = (
    cont_months_sdf
    .withColumn(
        "bef_pay_cnt",
        fn.sum("monthly_pay_flg")
        .over(Window.partitionBy("user_id").orderBy("month"))
    )
)
user_id month monthly_pay_flg group cont_months bef_pay_cnt
user000001 1 0 1 0 0
user000001 2 1 1 1 1
user000001 3 1 1 2 2
user000003 1 0 1 0 0
user000003 2 1 1 1 1
user000003 3 0 2 0 1
user000012 1 1 0 1 1
user000012 2 0 1 0 1
user000012 3 1 1 1 2

そして、条件分岐when()メソッドで"group","bef_pay_cnt","monthly_pay_flg","cont_months"の値を使用し、決済ステータス"pay_status"を作成 します。

決済ステータス"pay_status"を作成
pay_status_sdf = (
    cont_months_sdf
    .withColumn(
        "pay_status",
        fn.when(
            (fn.col("group")>="1") & (fn.col("bef_pay_cnt")=="0"),
            "未利用"
        )
        .when(
            (fn.col("group")>="1") & (fn.col("cont_months")=="0") & (fn.col("bef_pay_cnt")>="1"),
            "休眠"
        )
        .when(
            (fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")=="1") & (fn.col("bef_pay_cnt")=="1"),
            "新規"
        )
        .when(
            (fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")=="1") & (fn.col("bef_pay_cnt")>="2"),
            "復帰"
        )
        .when(
            (fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")>="2") & (fn.col("bef_pay_cnt")>="2"),
            "継続"
        )
        .otherwise("区分エラー")
    )
)
user_id month monthly_pay_flg group cont_months bef_pay_cnt pay_status
user000001 1 0 1 0 0 未利用
user000001 2 1 1 1 1 新規
user000001 3 1 1 2 2 継続
user000003 1 0 1 0 0 未利用
user000003 2 1 1 1 1 新規
user000003 3 0 2 0 1 休眠
user000012 1 1 0 1 1 新規
user000012 2 0 1 0 1 休眠
user000012 3 1 1 1 2 復帰

上記の操作の結果から、決済ステータスが意図した通りのロジックになっていることが確認できました。
最後の操作として、"transaction_table"と"pay_status_sdf"を"user_id"をキーに結合します。また、最終的なアウトプットを作成します。

アウトプットを作成
output_sdf = (
    tran_sdf
    .join(pay_status_sdf, on=["user_id","m"], how="left")
    .groupBy("shop_id","pay_method","pay_status")
    .agg(
        fn.countDistinct("user_id").alias("user_cnt"),
        fn.sum("pay_amount_raw").alias("pay_gross_raw"),
        fn.sum("pay_amount_without_tax").alias("pay_gross_without_tax")
    )
    .orderBy("shop_id","pay_method","pay_status")
)
shop_id pay_method pay_status user_cnt pay_gross_raw pay_gross_without_tax
shop0001 cache 新規 1 1760.0 1600.0
shop0001 card 復帰 1 9900.0 9000.0
shop0001 card 新規 2 5280.0 4800.0
shop0001 code 新規 1 8140.0 7400.0
shop0002 cache 復帰 1 1430.0 1300.0
shop0002 card 新規 1 110.0 100.0
shop0003 cache 新規 2 5610.0 5100.0
shop0003 card 復帰 1 1100.0 1000.0
shop0003 card 新規 1 1760.0 1600.0
shop0003 code 新規 1 8140.0 7400.0

◾️コードスニペット全量

上記のコードスニペットをまとめたスクリプトの全量を紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.window import Window

# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブル読み込み
# データの期間抽出(2025/01 ~ 2025/03)およびpay_flgとyear_month作成
# 月単位でユニークな決済フラグを取得
monthly_tran_sdf = (
    tran_sdf
    .filter(fn.concat("y","m").between("202501","202503"))
    .withColumn("pay_flg", fn.lit(1))
    .withColumn("year_month", fn.concat_ws("-", fn.col("y"), fn.col("m")))
    .groupBy("user_id")
    .pivot("m").max("pay_flg")
    .fillna(0)
    .selectExpr(
        "user_id",
        "stack(3, '1', `1`, '2', `2`, '3', `3`) as (month, monthly_pay_flg)"
    )
)

cont_months_sdf = (
    monthly_tran_sdf
    # "monthly_pay_flg"を0区切りで区分する"group"を作成
    .withColumn(
        "group",
        fn.sum((fn.col("monthly_pay_flg") == 0).cast("int"))
        .over(Window.partitionBy("user_id")
        .orderBy("m"))
    )
    # "user_id"と"group"で集約し、
    # "monthly_pay_flg"の合計値を算出するcont_monthsを作成
    .withColumn(
        "cont_months",
        fn.sum("monthly_pay_flg")
        .over(Window.partitionBy("user_id", "group")
        .orderBy("m"))
    )
    # "user_id"で集約し、
    # "monthly_pay_flg"の合計値を算出する"bef_pay_cnt"を作成
    .withColumn(
        "bef_pay_cnt",
        fn.sum("monthly_pay_flg")
        .over(Window.partitionBy("user_id")
        .orderBy("m"))
    )
)

# 決済ステータス"pay_status"を作成
pay_status_sdf = (
    cont_months_sdf
    .withColumn(
        "pay_status",
        fn.when(
            (fn.col("group")>="1") & (fn.col("bef_pay_cnt")=="0"),
            "未利用"
        )
        .when(
            (fn.col("group")>="1") & (fn.col("cont_months")=="0") & (fn.col("bef_pay_cnt")>="1"),
            "休眠"
        )
        .when(
            (fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")=="1") & (fn.col("bef_pay_cnt")=="1"),
            "新規"
        )
        .when(
            (fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")=="1") & (fn.col("bef_pay_cnt")>="2"),
            "復帰"
        )
        .when(
            (fn.col("monthly_pay_flg")=="1") & (fn.col("cont_months")>="2") & (fn.col("bef_pay_cnt")>="2"),
            "継続"
        )
        .otherwise("区分エラー")
    )
)

# アウトプットを作成
output_sdf = (
    tran_sdf
    .join(pay_status_sdf, on=["user_id","m"], how="left")
    .groupBy("shop_id","pay_method","pay_status")
    .agg(
        fn.countDistinct("user_id").alias("user_cnt"),
        fn.sum("pay_amount_raw").alias("pay_gross_raw"),
        fn.sum("pay_amount_without_tax").alias("pay_gross_without_tax")
    )
    .orderBy("shop_id","pay_method","pay_status")
)

『結論』

◾️決済ステータスのロジック構築の意義(ビジネスサイド)

決済ステータスのロジック構築を行うことの意義は以下の通りです。

  • 「新規」「継続」「復帰」「休眠」など、顧客の決済行動を時系列で細分化でき、顧客ライフサイクルの変化を追跡できる。
  • マーケティングや営業施策を顧客セグメントに応じて最適化できる。

◾️決済ステータスのロジック構築のメリット(エンジニアリングサイド)

決済ステータスのロジック構築を行うことのメリットは以下の通りです。

  • 『条件分岐やウィンドウ関数を活用する経験』 は、データ処理の最適化や再利用性の高いコード設計能力を育てます。
  • "pay_status" のような条件分岐が多いロジックでは、単純なスクリプトでは処理が遅くなるケースが多いですが、PySparkならデータの並列処理を活かして高速化が可能です。

Discussion

fact601fact601

「0区切りの累積和」の理解がちょっと難しいですが、頑張ります!

たいきたいき

"連続" というシンプルな概念がアルゴリズムだと複雑になるの面白いですよね!