🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【データ結合①join(how="left")】

2025/01/22に公開
1

【Python・Pysparkで学ぶ!】データ分析の基礎【データ結合①join(how="left")】

↓トランザクションテーブル(transaction_table)のサンプル

tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 shop0001 770 700 2025 01 09

↓エントリーリスト(entry_list.csv)のサンプル

上記のような決済データを集約したSQLテーブルと、エントリーユーザーのCSVファイルが存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『エントリー情報を付加した決済データを作成したい』

本稿では、クライアントからの要望に答えながら、 データ結合 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録(分析の概要)

  • エントリーに関する情報

    • 施策の参加ユーザーと、そのエントリー日時情報
  • トランザクションテーブルの情報

    • トランザクションテーブルは2025年1月~3月の決済情報を集約している。
  • 結合に関する合意

    • 合意『主テーブル(Main Table)をトランザクションテーブル。副テーブル(Lookup TableまたはSecondary Table)をエントリーリストとする』
    • 合意『結合キーは"user_id"』

◾️タッチポイント議事録(データソース)

  • SQLテーブルの情報

    • SQLテーブル名称
      • 合意『テーブル名称:transaction_table』
    • SQLテーブルの格納先
      • 合意『格納先:s3://data/warehouse/transaction_table/』
    • 項目名称・項目型
      • 合意『項目名称と項目型は以下の通りとする。』
        • tran_id : INT型
        • pay_method : STRING型
        • tran_dt : TIMESTAMP型
        • user_id : STRING型
        • shop_id : STRING型
        • pay_amount : INT型
        • y : STRING型
        • m : STRING型
        • d : STRING型
    • パーティション
      • 合意『SQLテーブルのパーティションはy,m,dで分割される』
    • 保存形式
      • 合意『SQLテーブルの保存形式:parquet形式』
    • Parquetファイルの圧縮アルゴリズム
      • 合意『SQLテーブルの圧縮アルゴリズムはSNAPPYとする』
    • セルの区切り
      • Parquet形式では、区切り文字の概念はありません。※Parquetが列指向のバイナリフォーマットであるためです。
  • CSVファイルの情報

    • 合意『データソースの特性を以下の通りとする』
      • エントリーリストCSVファイル(entry_list.csv): 静的データ
    • 合意『データソースの格納先を以下の通りとする』
      • エントリーリストCSVファイル(entry_list.csv)
        • 格納先:s3://data/content/pj01/static/entry_list.csv
    • 合意『データソースのオプション情報を以下の通りとする』
      • エントリーリストCSVファイル(entry_list.csv)
        • ヘッダー名称とデータ型
          • user_id : StringType : nullを許可しない
          • entry_date : StringType : nullを許可しない
        • データソースのヘッダー行の有無
          • ヘッダー行:無し
        • 空文字の対応
          • 空文字はnullに置換。その後、0埋めする。
        • 複数行が1セル内に存在する場合の対応
          • 複数行が1セル内に存在する場合がそもそもない。
        • 文字コード
          • データソースの文字コードがUTF-8なので、変換する必要なし
        • Indexの必要性
          • 必要なし

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

tran_id pay_method tran_dt user_id entry_flg shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 0 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 1 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 1 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 0 shop0001 770 700 2025 01 09

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。
 はじめに、SQLテーブルとCSVファイルを読み込みます。

SQLテーブルとCSVファイルを読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS transaction_table (
        tran_id                   INT,        --トランザクションID
        pay_method                STRING,     --決済手段
        tran_dt                   TIMESTAMP,  --トランザクション日時
        user_id                   STRING,     --ユーザーID
        shop_id                   STRING,     --店舗ID
        pay_amount_raw            INT,        --決済金額(税込)
        pay_amount_without_tax    INT,        --決済金額(税抜)
        y                         STRING,     --年
        m                         STRING,     --月
        d                         STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string,m string,d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/transaction_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")

from pyspark.sql.types import StructType, StructField, StringType

# スキーマ定義
entry_list_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("entry_date", StringType(), False),
])

# データの読み込み
spark.read.csv(
    "s3://data/content/pj01/static/entry_list.csv",
    schema=entry_list_schema,
    header=True,
)

次に、PySparkデータフレームを作成し、show()メソッドで外観を確認します。

データフレームを作成
transaction_df = spark.read.table("transaction_table")
entry_lsit_df = spark.read.csv(
    "s3://data/content/pj01/static/entry_list.csv",
    schema=entry_list_schema,
    header=True,
)

transaction_df.show(4, truncate=False)
entry_lsit_df.show(3, truncate=False)
tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 shop0001 770 700 2025 01 09
user_id entry_date
user000001 2024/12/02 09:08:15
user000002 2024/12/07 21:53:01
user000003 2024/12/11 19:20:40

データソースを問題なく読み込めたことが確認できました。
それでは、データフレーム同士をjoin()メソッドを使用して結合(left)します。主テーブルと副テーブル、および結合キーはタッチポイントの合意に従います。

データフレーム同士をjoin()メソッドを使用して結合(left)
union_df = transaction_df.join(entry_lsit_df, on="user_id", how="left")
tran_id pay_method tran_dt user_id entry_flg shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 0 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 1 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 1 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 0 shop0001 770 700 2025 01 09

上記の操作の結果から、transaction_tableとentry_listを結合したデータ が意図した通りのアウトプットになっていることが確認できました。
最終的なスクリプトを紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# セッション作成
spark = SparkSession.builder.getOrCreate()

# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS transaction_table (
        tran_id                   INT,        --トランザクションID
        pay_method                STRING,     --決済手段
        tran_dt                   TIMESTAMP,  --トランザクション日時
        user_id                   STRING,     --ユーザーID
        shop_id                   STRING,     --店舗ID
        pay_amount_raw            INT,        --決済金額(税込)
        pay_amount_without_tax    INT,        --決済金額(税抜)
        y                         STRING,     --年
        m                         STRING,     --月
        d                         STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string,m string,d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/transaction_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")

# スキーマ定義
entry_list_schema = StructType([
    StructField("user_id", StringType(), False),
    StructField("entry_date", StringType(), False),
])

# SQLテーブルからPySparkデータフレームを作成
transaction_df = spark.read.table("transaction_table")

# CSVファイルからPySparkデータフレームを作成
entry_lsit_df = spark.read.csv(
    "s3://data/content/pj01/static/entry_list.csv",
    schema=entry_list_schema,
    header=True,
)

# データフレームをleft結合
union_df = transaction_df.join(entry_lsit_df, on="user_id", how="left")

『結論』

◾️データフレームのleft結合を学ぶメリット(ビジネスサイド)

データフレームのleft結合を学ぶメリットは以下の通りです。

  • 決済履歴データと顧客情報を結合することでマーケティング施策の分析が実現できる。
  • データのマッチングを通じて欠損や不整合を明らかにし、クリーニングの手助けができる。

◾️データフレームのleft結合を学ぶメリット(エンジニアリングサイド)

データフレームのleft結合を学ぶメリットは以下の通りです。

  • 結合操作は処理コストが高いため、実装を通じてパフォーマンス改善(パーティションの適切な設定やブロードキャスト結合の活用)を学べる。
  • SQL的な操作を実践しながら学べるため、SQLの理解も深まる。

Discussion

fact601fact601

データフレームを主テーブルと副テーブルに言い分けてあるの理解しやすくて良かったです!