🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加➀selectExpr("stack()")】

2025/01/13に公開
2

【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加➀selectExpr("stack()")】

↓トランザクションテーブル(transaction_table)のサンプル

tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 shop0001 770 700 2025 01 09

上記のような決済データを集約したテーブルが [ 2025/04/01 ] に存在すると仮定します。

◾️要望

2025/04/01 朝会MTGにて、クライアントから次のような要望を頂きました。

『1~3月決済の 継続月数クラス別 の平均決済額を評価したい』

本稿では、クライアントからの要望に答えながら、 データフレームの項目追加 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • タイムログ
    • 現場の締め作業やデータ処理の関係により、本日(2025/04/01)時点で2025/03/30,31のデータが未連携
      • 合意『集計実行:4/4。納品:4/5』
  • 継続月数クラス

    • 決済者のナーチャリングを継続月数クラスを用いて評価
      • 合意『物理名を"max_cont_months"とし、区分分けを以下の通りとする』
        • 1月決済・2月決済・3月決済 → 3
        • 1月決済・2月決済・3月未決済 → 2
        • 1月決済・2月未決済・3月決済 → 1
        • 1月未決済・2月決済・3月決済 → 2
        • 1月決済・2月未決済・3月未決済 → 1
        • 1月未決済・2月決済・3月未決済 → 1
        • 1月未決済・2月未決済・3月決済 → 1
        • 1月未決済・2月未決済・3月未決済 → 0
  • 平均額
    • 税込と税抜のカラムがあり、平均額の解釈が複数ある。『税込と税抜のそれぞれで平均額を集計』『税込のみ』『税抜のみ』
      • 合意『 税込平均額(pay_mean_raw)税抜平均額(pay_mean_without_tax) のそれぞれを集計』

◾️タイムラグデータ
 2025/04/03の集計前に決済データを確認すると、以下のレコードがあることがわかりました。

これで3月末までのデータがストレージにあることが確認できるため、集計可能です。

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

shop_id pay_method max_cont_months pay_mean_raw pay_mean_without_tax
shop000001 cache 1 9900 9000

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、データの期間抽出(2025/01 ~ 2025/03)を行います。

期間抽出(2025/01 ~ 2025/03)
tran_sdf = tran_sdf.filter(fn.concat("y","m").between("202501","202503"))
tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48:03 user000005 shop0004 1540 1400 2025 1 3
102 cache 2025/01/04 22:27:16 user000027 shop0006 6050 5500 2025 1 4
103 code 2025/01/08 03:22:07 user000020 shop0003 8140 7400 2025 1 8

次に、"pay_flg"カラムと"year_month"カラムを作成し、それぞれfn.lit("1")fn.concat_ws("-","y","m")とします。

pay_flgとyear_month作成
tran_sdf = (
    tran_sdf
    .withColumn("pay_flg",fn.lit("1"))
    .withColumn("year_month",fn.concat_ws("-","y","m"))
)
tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d pay_flg year_month
101 code 2025/01/03 06:48:03 user000005 shop0004 1540 1400 2025 1 3 1 2025-01
102 cache 2025/01/04 22:27:16 user000027 shop0006 6050 5500 2025 1 4 1 2025-01
103 code 2025/01/08 03:22:07 user000020 shop0003 8140 7400 2025 1 8 1 2025-01

ここで、 ユーザー毎に月単位でユニークな決済フラグ(monthly_pay_flg) を取得するため、pivot()メソッドを使用します。具体的には、"user_id"で集約し、"pay_flg"の最大値(max)を"m"で縦持ちします。
その後、selectExpr()メソッドstack()メソッドを使用し、 ユーザー毎に月単位でユニークな決済フラグ(monthly_pay_flg) を作成します。

月単位でユニークな決済フラグを取得
monthly_tran_sdf = (
    tran_sdf
    .groupBy("user_id")
    .pivot("m").max("pay_flg")
    .fillna(0)
    .selectExpr(
        "user_id",
        "stack(3, '1', `1`, '2', `2`, '3', `3`) as (month, monthly_pay_flag)"
    )
)
user_id month monthly_pay_flg
user000016 1 1
user000016 2 0
user000016 3 0
user000017 1 1
user000017 2 0
user000017 3 1

上記の操作で、"user_id"と月単位でユニークな決済フラグが作れたため、"user_id"毎に"monthly_pay_flg"を0区切りの累積和を算出します。

0区切りの累積和を算出
cont_months_sdf = (
    monthly_tran_sdf
    # "monthly_pay_flg"を0区切りで区分する"group"を作成
    .withColumn(
        "group",
        fn.sum((fn.col("monthly_pay_flag") == 0).cast("int"))
        .over(Window.partitionBy("user_id")
        .orderBy("month"))
    )
    # "user_id"と"group"で集約し、
    # "monthly_pay_flg"の合計値を算出するcont_monthsを作成
    .withColumn(
        "cont_months",
        fn.sum("monthly_pay_flag")
        .over(Window.partitionBy("user_id", "group")
        .orderBy("month"))
    )
)
user_id month monthly_pay_flag group cont_months
user000001 1 0 1 0
user000001 2 1 1 1
user000001 3 1 1 2
user000003 1 0 1 0
user000003 2 1 1 1
user000003 3 0 2 1
user000012 1 1 0 1
user000012 2 0 1 1
user000012 3 1 1 1

そして、グループ(group)毎の累積和の最大値("max_cont_months")を"user_id"で集計します。

グループ(group)毎の累積和の最大値("max_cont_months")を"user_id"で集計
max_cont_months_sdf = (
    cont_months_sdf
    .groupBy("user_id")
    .agg(fn.max("cont_months").alias("max_cont_months"))
)
user_id max_cont_months
user000001 2
user000003 1
user000012 1

上記の操作の結果より、user000012のように「1月決済あり・2月決済なし・3月決済あり」の最大連続決済回数が1となることから、意図した通りの値が算出できることが確認できました。
最後の操作として、"transaction_table"と"cont_months_df"を"user_id"をキーに結合します。また、最終的なアウトプットも集計します。

アウトプット
output_sdf = (
    tran_sdf
    .join(max_cont_months_sdf, on="user_id", how="left")
    .groupBy("shop_id","pay_method")
    .agg(
        fn.max("max_cont_months").alias("max_cont_months"),
        fn.mean("pay_amount_raw").alias("pay_gross_raw"),
        fn.mean("pay_amount_without_tax").alias("pay_gross_without_tax"),
    )
    .orderBy("shop_id","pay_method","max_cont_months")
)
output_sdf.show()
shop_id pay_method max_cont_months pay_mean_raw pay_mean_without_tax
shop0001 cache 1 1760.0 1600.0
shop0001 card 1 4235.0 3850.0
shop0001 code 2 8140.0 7400.0
shop0002 cache 1 1430.0 1300.0
shop0002 card 1 110.0 100.0
shop0003 cache 2 2805.0 2550.0
shop0003 card 1 1430.0 1300.0
shop0003 code 1 8140.0 7400.0

◾️コードスニペット全量

上記のコードスニペットをまとめたスクリプトの全量を紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.window import Window

# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブル読み込み
tran_sdf = spark.read.table("transaction_table")
# データの期間抽出(2025/01 ~ 2025/03)およびpay_flgとyear_month作成
# 月単位でユニークな決済フラグを取得
monthly_tran_sdf = (
    tran_sdf
    .filter(fn.concat("y","m").between("202501","202503"))
    .withColumn("pay_flg", fn.lit(1))
    .withColumn("year_month", fn.concat_ws("-", fn.col("y"), fn.col("m")))
    .groupBy("user_id")
    .pivot("m").max("pay_flg")
    .fillna(0)
    .selectExpr(
        "user_id",
        "stack(3, '1', `1`, '2', `2`, '3', `3`) as (month, monthly_pay_flg)"
    )
)
# 0区切りの累積和を算出
cont_months_sdf = (
    monthly_tran_sdf
    # "monthly_pay_flg"を0区切りで区分する"group"を作成
    .withColumn(
        "group",
        fn.sum((fn.col("monthly_pay_flg") == 0).cast("int"))
        .over(Window.partitionBy("user_id")
        .orderBy("month"))
    )
    # "user_id"と"group"で集約し、
    # "monthly_pay_flg"の合計値を算出するcont_monthsを作成
    .withColumn(
        "cont_months",
        fn.sum("monthly_pay_flg")
        .over(Window.partitionBy("user_id", "group")
        .orderBy("month"))
    )
)

# グループ(group)毎の累積和の最大値("max_cont_months")を"user_id"で集計
max_cont_months_sdf = (
    cont_months_sdf
    .groupBy("user_id")
    .agg(fn.max("cont_months").alias("max_cont_months"))
)

# アウトプット
output_sdf = (
    tran_sdf
    .join(max_cont_months_sdf, on="user_id", how="left")
    .groupBy("shop_id","pay_method")
    .agg(
        fn.max("max_cont_months").alias("max_cont_months"),
        fn.mean("pay_amount_raw").alias("pay_gross_raw"),
        fn.mean("pay_amount_without_tax").alias("pay_gross_without_tax"),
    )
    .orderBy("shop_id","pay_method","max_cont_months")
)

『結論』

◾️決済連続月数クラスのロジック構築の意義(ビジネスサイド)

決済連続月数クラスのロジック構築を行うことの意義は次の通りです。
・ビジネス的には顧客の継続利用パターンや行動の「離脱」と「復帰」を識別しやすくなる。
・マーケティング施策やリテンション戦略の立案に活用できる。

◾️決済連続月数クラスのロジック構築のメリット(エンジニアリングサイド)

決済連続月数クラスのロジック構築を行うことのメリットは以下の通りです。
『0区切りの累積和を算出』 など、このロジックは類似課題にも応用可能で、異常検知やパフォーマンスモニタリングなど多方面で役立ちます。
・データから価値を引き出す能力を向上させ、ビジネス価値とエンジニアリング力を両立できます。

Discussion

fact601fact601

データが遅れて連携される想定が面白いです!

たいきたいき

バッチ処理の関係で、実行日の2日前が最新日として連携されることが多いですね