Zenn
🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【テーブル正規化①split()】

2025/01/26に公開
1
2

【Python・Pysparkで学ぶ!】データ分析の基礎【テーブル正規化①split()】

↓トランザクションテーブル(transaction_table)のサンプル

tran_id pay_method tran_dt user_id brand_shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 brand0002_shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 brand0001_shop0006 6050 5500 2025 01 04
104 card 2025/01/09 09:10 user000011 brand0002_shop0001 770 700 2025 01 09

上記のような決済データを集約したSQLテーブルが存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『決済データを第一正規形にしたい』

本稿では、クライアントからの要望に答えながら、 データの正規化 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録(データソース)

  • SQLテーブルの情報

    • SQLテーブル名称
      • 合意『テーブル名称:transaction_table』
    • SQLテーブルの格納先
      • 合意『格納先:s3://data/warehouse/transaction_table/』
    • 項目名称・項目型
      • 合意『項目名称と項目型は以下の通りとする。』
        • tran_id : INT型
        • pay_method : STRING型
        • tran_dt : TIMESTAMP型
        • user_id : STRING型
        • brand_shop_id : STRING型
        • pay_amount : INT型
        • y : STRING型
        • m : STRING型
        • d : STRING型
    • パーティション
      • 合意『SQLテーブルのパーティションはy,m,dで分割される』
    • 保存形式
      • 合意『SQLテーブルの保存形式:parquet形式』
    • Parquetファイルの圧縮アルゴリズム
      • 合意『SQLテーブルの圧縮アルゴリズムはSNAPPYとする』
  • ◾️タッチポイント議事録(第一正規化)

    • 論点:"brand_shop_id"項目にブランドIDと店舗IDが入っている

      • テクニカルに述べると、brand_shop_id列は非原子値を含んでいて、原子性を満たさない。
      • 『合意:"brand_shop_id"列からブランド列(brand_id)と店舗ID列(shop_id)を作成する』

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

tran_id pay_method tran_dt user_id brand_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 brand0002 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 brand0001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 brand0005 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 brand0002 shop0001 770 700 2025 01 09

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。
 はじめに、SQLテーブルを読み込みます。

SQLテーブルを読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS transaction_table (
        tran_id                   INT,        --トランザクションID
        pay_method                STRING,     --決済手段
        tran_dt                   TIMESTAMP,  --トランザクション日時
        user_id                   STRING,     --ユーザーID
        brand_shop_id             STRING,     --ブランド_店舗ID
        pay_amount_raw            INT,        --決済金額(税込)
        pay_amount_without_tax    INT,        --決済金額(税抜)
        y                         STRING,     --年
        m                         STRING,     --月
        d                         STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string,m string,d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/transaction_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")

読み込んだデータを加工し、"brand_id"列と"shop_id"列を作成します。
"brand_shop_id"列から"brand_id"列と"shop_id"列を分解するには、split()メソッドを使用すると便利です。
このメソッドを使って、brand_shop_idを区切り文字(_)で分割し、それぞれの値を抽出します。

"brand_id"列と"shop_id"列を作成
transaction_df = (
    spark.read.table("transaction_table") # SQLテーブルの読み込み
    .select(
        "tran_id",
        "pay_method",
        "tran_dt",
        "user_id",
        fn.split(fn.col("brand_shop_id"), "_")[0].alias("brand_id"),  # brand_id を抽出
        fn.split(fn.col("brand_shop_id"), "_")[1].alias("shop_id"),   # shop_id を抽出
        "pay_amount_raw",
        "pay_amount_without_tax",
        "y",
        "m",
        "d"
    )
)
tran_id pay_method tran_dt user_id brand_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 brand0002 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 brand0001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 brand0005 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 brand0002 shop0001 770 700 2025 01 09

上記の操作により、合意に沿ったアウトプットを取得することができました。
最後に、スクリプトの全量を紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

# セッション作成
spark = SparkSession.builder.getOrCreate()

# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS transaction_table (
        tran_id                   INT,        --トランザクションID
        pay_method                STRING,     --決済手段
        tran_dt                   TIMESTAMP,  --トランザクション日時
        user_id                   STRING,     --ユーザーID
        brand_shop_id             STRING,     --ブランド_店舗ID
        pay_amount_raw            INT,        --決済金額(税込)
        pay_amount_without_tax    INT,        --決済金額(税抜)
        y                         STRING,     --年
        m                         STRING,     --月
        d                         STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string,m string,d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/transaction_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")

# データ加工
transaction_df = (
    spark.read.table("transaction_table") # SQLテーブルの読み込み
    .select(
        "tran_id",
        "pay_method",
        "tran_dt",
        "user_id",
        fn.split(fn.col("brand_shop_id"), "_")[0].alias("brand_id"),  # brand_id を抽出
        fn.split(fn.col("brand_shop_id"), "_")[1].alias("shop_id"),   # shop_id を抽出
        "pay_amount_raw",
        "pay_amount_without_tax",
        "y",
        "m",
        "d"
    )
)

『結論』

◾️brand_shop_idからbrand_idとshop_idを分けるメリット(ビジネスサイド)

  • ブランドごとの分析が容易
    • 売上や取引データをブランド単位で集計しやすくなるため、ブランドのパフォーマンスを比較できます。
  • データ品質の向上
    • ブランドと店舗を明確に分けることで、分析時のミスや誤解を減らすことができます。

◾️brand_shop_idからbrand_idとshop_idを分けるメリット(エンジニアサイド)

  • データの独立性
    • ブランドやショップの情報を個別に管理できるため、例えば新しいブランドやショップが追加されても、既存のトランザクションデータに影響を与えません。
  • JOINや集計が容易
    • 例えば、brand_id をキーに他のテーブルと結合する処理や、shop_id を利用した詳細な店舗分析が簡単に行えるようになります。
2

Discussion

fact601fact601

split関数とかの知識ってどこで入手するんですか?

ログインするとコメントできます