🆗
【Python・Pysparkで学ぶ!】データ分析の基礎【データ結合①join(how="left")】
【Python・Pysparkで学ぶ!】データ分析の基礎【データ結合①join(how="left")】
↓トランザクションテーブル(transaction_table)のサンプル
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
↓エントリーリスト(entry_list.csv)のサンプル
上記のような決済データを集約したSQLテーブルと、エントリーユーザーのCSVファイルが存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『エントリー情報を付加した決済データを作成したい』
本稿では、クライアントからの要望に答えながら、 データ結合 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録(分析の概要)
-
- 施策の参加ユーザーと、そのエントリー日時情報
-
- トランザクションテーブルは2025年1月~3月の決済情報を集約している。
-
- 合意『主テーブル(Main Table)をトランザクションテーブル。副テーブル(Lookup TableまたはSecondary Table)をエントリーリストとする』
- 合意『結合キーは"user_id"』
◾️タッチポイント議事録(データソース)
-
-
SQLテーブル名称
- 合意『テーブル名称:transaction_table』
- SQLテーブルの格納先
- 合意『格納先:s3://data/warehouse/transaction_table/』
-
項目名称・項目型
-
合意『項目名称と項目型は以下の通りとする。』
- tran_id : INT型
- pay_method : STRING型
- tran_dt : TIMESTAMP型
- user_id : STRING型
- shop_id : STRING型
- pay_amount : INT型
- y : STRING型
- m : STRING型
- d : STRING型
-
合意『項目名称と項目型は以下の通りとする。』
- パーティション
- 合意『SQLテーブルのパーティションはy,m,dで分割される』
- 保存形式
- 合意『SQLテーブルの保存形式:parquet形式』
- Parquetファイルの圧縮アルゴリズム
- 合意『SQLテーブルの圧縮アルゴリズムはSNAPPYとする』
-
セルの区切り
- Parquet形式では、区切り文字の概念はありません。※Parquetが列指向のバイナリフォーマットであるためです。
-
SQLテーブル名称
-
-
合意『データソースの特性を以下の通りとする』
- エントリーリストCSVファイル(entry_list.csv): 静的データ
-
合意『データソースの格納先を以下の通りとする』
-
エントリーリストCSVファイル(entry_list.csv)
- 格納先:s3://data/content/pj01/static/entry_list.csv
-
エントリーリストCSVファイル(entry_list.csv)
-
合意『データソースのオプション情報を以下の通りとする』
-
エントリーリストCSVファイル(entry_list.csv)
- ヘッダー名称とデータ型
- user_id : StringType : nullを許可しない
- entry_date : StringType : nullを許可しない
- データソースのヘッダー行の有無
- ヘッダー行:無し
- 空文字の対応
- 空文字はnullに置換。その後、0埋めする。
- 複数行が1セル内に存在する場合の対応
- 複数行が1セル内に存在する場合がそもそもない。
- 文字コード
- データソースの文字コードがUTF-8なので、変換する必要なし
- Indexの必要性
- 必要なし
- ヘッダー名称とデータ型
-
エントリーリストCSVファイル(entry_list.csv)
-
合意『データソースの特性を以下の通りとする』
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
tran_id | pay_method | tran_dt | user_id | entry_flg | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | 0 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | 1 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | 1 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | 0 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
はじめに、SQLテーブルとCSVファイルを読み込みます。
SQLテーブルとCSVファイルを読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS transaction_table (
tran_id INT, --トランザクションID
pay_method STRING, --決済手段
tran_dt TIMESTAMP, --トランザクション日時
user_id STRING, --ユーザーID
shop_id STRING, --店舗ID
pay_amount_raw INT, --決済金額(税込)
pay_amount_without_tax INT, --決済金額(税抜)
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/transaction_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")
from pyspark.sql.types import StructType, StructField, StringType
# スキーマ定義
entry_list_schema = StructType([
StructField("user_id", StringType(), False),
StructField("entry_date", StringType(), False),
])
# データの読み込み
spark.read.csv(
"s3://data/content/pj01/static/entry_list.csv",
schema=entry_list_schema,
header=True,
)
次に、PySparkデータフレームを作成し、show()メソッドで外観を確認します。
データフレームを作成
transaction_df = spark.read.table("transaction_table")
entry_lsit_df = spark.read.csv(
"s3://data/content/pj01/static/entry_list.csv",
schema=entry_list_schema,
header=True,
)
transaction_df.show(4, truncate=False)
entry_lsit_df.show(3, truncate=False)
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
user_id | entry_date |
---|---|
user000001 | 2024/12/02 09:08:15 |
user000002 | 2024/12/07 21:53:01 |
user000003 | 2024/12/11 19:20:40 |
データソースを問題なく読み込めたことが確認できました。
それでは、データフレーム同士をjoin()メソッドを使用して結合(left)します。主テーブルと副テーブル、および結合キーはタッチポイントの合意に従います。
データフレーム同士をjoin()メソッドを使用して結合(left)
union_df = transaction_df.join(entry_lsit_df, on="user_id", how="left")
tran_id | pay_method | tran_dt | user_id | entry_flg | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | 0 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | 1 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | 1 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | 0 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記の操作の結果から、transaction_tableとentry_listを結合したデータ が意図した通りのアウトプットになっていることが確認できました。
最終的なスクリプトを紹介します。
スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# セッション作成
spark = SparkSession.builder.getOrCreate()
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS transaction_table (
tran_id INT, --トランザクションID
pay_method STRING, --決済手段
tran_dt TIMESTAMP, --トランザクション日時
user_id STRING, --ユーザーID
shop_id STRING, --店舗ID
pay_amount_raw INT, --決済金額(税込)
pay_amount_without_tax INT, --決済金額(税抜)
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/transaction_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")
# スキーマ定義
entry_list_schema = StructType([
StructField("user_id", StringType(), False),
StructField("entry_date", StringType(), False),
])
# SQLテーブルからPySparkデータフレームを作成
transaction_df = spark.read.table("transaction_table")
# CSVファイルからPySparkデータフレームを作成
entry_lsit_df = spark.read.csv(
"s3://data/content/pj01/static/entry_list.csv",
schema=entry_list_schema,
header=True,
)
# データフレームをleft結合
union_df = transaction_df.join(entry_lsit_df, on="user_id", how="left")
『結論』
◾️データフレームのleft結合を学ぶメリット(ビジネスサイド)
データフレームのleft結合を学ぶメリットは以下の通りです。
- 決済履歴データと顧客情報を結合することでマーケティング施策の分析が実現できる。
- データのマッチングを通じて欠損や不整合を明らかにし、クリーニングの手助けができる。
◾️データフレームのleft結合を学ぶメリット(エンジニアリングサイド)
データフレームのleft結合を学ぶメリットは以下の通りです。
- 結合操作は処理コストが高いため、実装を通じてパフォーマンス改善(パーティションの適切な設定やブロードキャスト結合の活用)を学べる。
- SQL的な操作を実践しながら学べるため、SQLの理解も深まる。
Discussion
データフレームを主テーブルと副テーブルに言い分けてあるの理解しやすくて良かったです!