🆗
【Python・Pysparkで学ぶ!】データ分析の基礎【テーブル正規化①split()】
【Python・Pysparkで学ぶ!】データ分析の基礎【テーブル正規化①split()】
↓トランザクションテーブル(transaction_table)のサンプル
tran_id | pay_method | tran_dt | user_id | brand_shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | brand0002_shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | brand0001_shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
104 | card | 2025/01/09 09:10 | user000011 | brand0002_shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記のような決済データを集約したSQLテーブルが存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『決済データを第一正規形にしたい』
本稿では、クライアントからの要望に答えながら、 データの正規化 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録(データソース)
-
-
SQLテーブル名称
- 合意『テーブル名称:transaction_table』
- SQLテーブルの格納先
- 合意『格納先:s3://data/warehouse/transaction_table/』
-
項目名称・項目型
-
合意『項目名称と項目型は以下の通りとする。』
- tran_id : INT型
- pay_method : STRING型
- tran_dt : TIMESTAMP型
- user_id : STRING型
- brand_shop_id : STRING型
- pay_amount : INT型
- y : STRING型
- m : STRING型
- d : STRING型
-
合意『項目名称と項目型は以下の通りとする。』
- パーティション
- 合意『SQLテーブルのパーティションはy,m,dで分割される』
- 保存形式
- 合意『SQLテーブルの保存形式:parquet形式』
- Parquetファイルの圧縮アルゴリズム
- 合意『SQLテーブルの圧縮アルゴリズムはSNAPPYとする』
-
SQLテーブル名称
-
-
- テクニカルに述べると、brand_shop_id列は非原子値を含んでいて、原子性を満たさない。
- 『合意:"brand_shop_id"列からブランド列(brand_id)と店舗ID列(shop_id)を作成する』
-
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
tran_id | pay_method | tran_dt | user_id | brand_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | brand0002 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | brand0001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | brand0005 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | brand0002 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
はじめに、SQLテーブルを読み込みます。
SQLテーブルを読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS transaction_table (
tran_id INT, --トランザクションID
pay_method STRING, --決済手段
tran_dt TIMESTAMP, --トランザクション日時
user_id STRING, --ユーザーID
brand_shop_id STRING, --ブランド_店舗ID
pay_amount_raw INT, --決済金額(税込)
pay_amount_without_tax INT, --決済金額(税抜)
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/transaction_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")
読み込んだデータを加工し、"brand_id"列と"shop_id"列を作成します。
"brand_shop_id"列から"brand_id"列と"shop_id"列を分解するには、split()メソッドを使用すると便利です。
このメソッドを使って、brand_shop_idを区切り文字(_)で分割し、それぞれの値を抽出します。
"brand_id"列と"shop_id"列を作成
transaction_df = (
spark.read.table("transaction_table") # SQLテーブルの読み込み
.select(
"tran_id",
"pay_method",
"tran_dt",
"user_id",
fn.split(fn.col("brand_shop_id"), "_")[0].alias("brand_id"), # brand_id を抽出
fn.split(fn.col("brand_shop_id"), "_")[1].alias("shop_id"), # shop_id を抽出
"pay_amount_raw",
"pay_amount_without_tax",
"y",
"m",
"d"
)
)
tran_id | pay_method | tran_dt | user_id | brand_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | brand0002 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | brand0001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | brand0005 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | brand0002 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記の操作により、合意に沿ったアウトプットを取得することができました。
最後に、スクリプトの全量を紹介します。
スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS transaction_table (
tran_id INT, --トランザクションID
pay_method STRING, --決済手段
tran_dt TIMESTAMP, --トランザクション日時
user_id STRING, --ユーザーID
brand_shop_id STRING, --ブランド_店舗ID
pay_amount_raw INT, --決済金額(税込)
pay_amount_without_tax INT, --決済金額(税抜)
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/transaction_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")
# データ加工
transaction_df = (
spark.read.table("transaction_table") # SQLテーブルの読み込み
.select(
"tran_id",
"pay_method",
"tran_dt",
"user_id",
fn.split(fn.col("brand_shop_id"), "_")[0].alias("brand_id"), # brand_id を抽出
fn.split(fn.col("brand_shop_id"), "_")[1].alias("shop_id"), # shop_id を抽出
"pay_amount_raw",
"pay_amount_without_tax",
"y",
"m",
"d"
)
)
『結論』
◾️brand_shop_idからbrand_idとshop_idを分けるメリット(ビジネスサイド)
-
ブランドごとの分析が容易
- 売上や取引データをブランド単位で集計しやすくなるため、ブランドのパフォーマンスを比較できます。
-
データ品質の向上
- ブランドと店舗を明確に分けることで、分析時のミスや誤解を減らすことができます。
◾️brand_shop_idからbrand_idとshop_idを分けるメリット(エンジニアサイド)
-
データの独立性
- ブランドやショップの情報を個別に管理できるため、例えば新しいブランドやショップが追加されても、既存のトランザクションデータに影響を与えません。
-
JOINや集計が容易
- 例えば、brand_id をキーに他のテーブルと結合する処理や、shop_id を利用した詳細な店舗分析が簡単に行えるようになります。
Discussion
split関数とかの知識ってどこで入手するんですか?