🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加③複雑なwhen()分岐】

2025/02/03に公開

【Python・Pysparkで学ぶ!】データ分析の基礎【項目追加③複雑なwhen()分岐】

↓前日情報つき料金プラン顧客情報(user_plan_table_v002)のサンプル

acq_dt user_id cur_price_plan_cd cur_price_plan_name new_price_plan_cd new_price_plan_name bef_cur_price_plan_cd bef_cur_price_plan_name bef_new_price_plan_cd bef_new_price_plan_name
20250502 user000001 P001 ベーシックプラン NULL NULL P001 ベーシックプラン NULL NULL
20250502 user000002 P001 ベーシックプラン NULL NULL P002 プレミアムプラン P001 ベーシックプラン
20250502 user000004 NULL NULL NULL NULL P001 ベーシックプラン NULL NULL
20250502 user000005 NULL NULL P001 ベーシックプラン NULL NULL NULL NULL
20250502 user000006 P001 ベーシックプラン NULL NULL NULL NULL P001 ベーシックプラン

上記のような決済データを集約したSQLテーブルが存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『"料金プラン変更ステータス"項目を作成したい』

本稿では、クライアントからの要望に答えながら、 複雑なwhen()分岐 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • 料金プラン

    • 料金プランは『ベーシックプラン・プレミアムプラン』の2種類
  • 料金プラン変更ステータス

    • 合意『物理名を"plan_change_status"とし、区分分けを以下の通りとする』
      • 前日無加入・当日加入 → 新規
        • 例) 20250501に料金プランに加入せず、20250502に料金プランに加入した人は"新規"。
      • 当日プラン変更なし → 継続
        • 例) 20250501に同じ料金プランに加入した人は"継続"
      • 当日プラン変更あり → 変更・
        • 例) 20250501に別の料金プランに加入した人は"変更"
      • 前日加入・当日無加入 → 解約
        • 例) 20250501に料金プランに加入していて、20250502に料金プランに加入していない人は"解約"。
  • "ユーザーID"項目

    • 当日料金プラン加入者のユーザーID

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

acq_date user_id plan_change_status
20250501 user000001 継続
20250501 user000002 変更
20250502 user000001 継続
20250502 user000005 新規

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、spark.sql()メソッドでDDLを書き、SQLテーブルを読み込みます。

SQLテーブルを読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS user_plan_table_v002")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS user_plan_table_v002 (
        acq_date                 STRING,     --取得日
        user_id                  STRING,     --ユーザーID
        cur_price_plan_cd        STRING,     --現加入料金プランコード
        cur_price_plan_name      STRING,     --現加入料金プラン名称
        new_price_plan_cd        STRING,     --新加入料金プランコード
        new_price_plan_name      STRING,     --新加入料金プラン名称
        bef_cur_price_plan_cd    STRING,     --前日現加入料金プランコード
        bef_cur_price_plan_name  STRING,     --前日現加入料金プラン名称
        bef_new_price_plan_cd    STRING,     --前日新加入料金プランコード
        bef_new_price_plan_name  STRING,     --前日現加入料金プラン名称
        y                        STRING,     --年
        m                        STRING,     --月
        d                        STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string,m string,d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/user_plan_table_v002/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE user_plan_table_v002;""")

読み込んだデータから、"plan_change_status(料金ステータス変更)"項目を作成します。
作成ロジックは以下の通りです。

  • "新規"
    • "bef_cur_price_plan_cd"と"bef_new_price_plan_cd"が共にNullかつ、
  • "変更"
    • "bef_cur_price_plan_cd"と"bef_new_price_plan_cd"のいずれか1つがNullでないかつ、"new_price_plan_cd"がNullでなく"cur_price_plan_cd"と"new_price_plan_cd"の値が異なる
  • "継続"
    • "cur_price_plan_cd"と"new_price_plan_cd"のいずれか1つがNullでないかつ、cur_price_plan_cdがNullでなく"new_price_plan_cd"がNull
  • "解約"
    • "bef_cur_price_plan_cd"と"bef_new_price_plan_cd"のいずれか1つがNullでないかつ、"cur_price_plan_cd"がNull

複雑なwhen()分岐を捌くコツとしてはMECE、ベン図、フローチャートのようなツールを頭で想像することです。

"plan_change_status"項目を作成
output_sdf = (
    spark.read.table("user_plan_table_v002")
    .withColumn(
        "plan_change_status",
        fn.when(
            (fn.col("bef_cur_price_plan_cd").isNull()) & (fn.col("bef_new_price_plan_cd").isNull()),
            "新規"
        ).when(
            (
                (fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
            ) & (fn.col("new_price_plan_cd").isNotNull()) & (fn.col("cur_price_plan_cd") != fn.col("new_price_plan_cd")),
            "変更"
        ).when(
            (
                (fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
            ) & (fn.col("cur_price_plan_cd").isNotNull() & fn.col("new_price_plan_cd").isNull()),
            "継続"
        ).when(
            (
                (fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
            ) & (fn.col("cur_price_plan_cd").isNull()),
            "解約"
        ).otherwise("不明")
    )
)
output_sdf.show(7)
acq_dt user_id plan_change_status
20250502 user000001 継続
20250502 user000002 継続
20250502 user000004 解約
20250502 user000005 新規
20250502 user000006 継続
20250502 user000007 継続
20250502 user000009 変更

上記の結果から、操作が意図した通りであることが確認できました。
最後にスクリプト全量を紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd

# セッション作成
spark = SparkSession.builder.getOrCreate()

# テーブルの削除
spark.sql("DROP TABLE IF EXISTS user_plan_table_v002")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS user_plan_table_v002 (
        acq_date                 STRING,     --取得日
        user_id                  STRING,     --ユーザーID
        cur_price_plan_cd        STRING,     --現加入料金プランコード
        cur_price_plan_name      STRING,     --現加入料金プラン名称
        new_price_plan_cd        STRING,     --新加入料金プランコード
        new_price_plan_name      STRING,     --新加入料金プラン名称
        bef_cur_price_plan_cd    STRING,     --前日現加入料金プランコード
        bef_cur_price_plan_name  STRING,     --前日現加入料金プラン名称
        bef_new_price_plan_cd    STRING,     --前日新加入料金プランコード
        bef_new_price_plan_name  STRING,     --前日現加入料金プラン名称
        y                        STRING,     --年
        m                        STRING,     --月
        d                        STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string,m string,d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/user_plan_table_v002/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE user_plan_table_v002;""")

# "plan_change_status(料金ステータス変更)"項目作成
output_sdf = (
    spark.read.table("user_plan_table_v002")
    .withColumn(
        "plan_change_status",
        fn.when(
            (fn.col("bef_cur_price_plan_cd").isNull()) & (fn.col("bef_new_price_plan_cd").isNull()),
            "新規"
        ).when(
            (
                (fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
            ) & (fn.col("new_price_plan_cd").isNotNull()) & (fn.col("cur_price_plan_cd") != fn.col("new_price_plan_cd")),
            "変更"
        ).when(
            (
                (fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
            ) & (fn.col("cur_price_plan_cd").isNotNull() & fn.col("new_price_plan_cd").isNull()),
            "継続"
        ).when(
            (
                (fn.col("bef_cur_price_plan_cd").isNotNull()) | (fn.col("bef_new_price_plan_cd").isNotNull())
            ) & (fn.col("cur_price_plan_cd").isNull()),
            "解約"
        ).otherwise("不明")
    )
)

『結論』

◾️複雑なwhen()分岐を習得するメリット(ビジネスサイド)

複雑なwhen()分岐を習得するメリットは以下の通りです。

  • 柔軟なビジネスルールの反映
    • 顧客行動の分類など、複雑なビジネス要件をデータ分析ロジックに忠実に反映可能です。
  • データ品質向上
    • データの特性に応じた複雑な条件分岐ができるため、データクレンジングや異常データの早期発見が容易になります。

◾️複雑なwhen()分岐を習得するメリット(エンジニアリングサイド)

Fullouter(完全外部結合)を習得するメリットは以下の通りです。

  • 柔軟なデータ処理能力の習得
    • when()を使った条件分岐によって、データフローの柔軟性が向上します。
  • エラーの予防とメンテナンス性向上
    • 複雑な条件をコードで明確に定義することで、誤判定を防止し、保守性の高いコードを作成できます。

Discussion