🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【データの読み込み③spark.sql("""""")】

2025/01/21に公開
1

【Python・Pysparkで学ぶ!】データ分析の基礎【データの読み込み③spark.sql("""""")】

↓トランザクションテーブル(transaction_table)のサンプル

tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 shop0001 770 700 2025 01 09

上記のような決済データを集約したテーブルが存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『SQLテーブルをPysparkデータフレームにしてほしい』

本稿では、クライアントからの要望に答えながら、 SQLテーブルのPysparkデータフレーム化 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントの要望との認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • SQLテーブルの情報

    • SQLテーブル名称
      • 合意『テーブル名称:transaction_table』
    • SQLテーブルの格納先
      • 合意『格納先:s3://data/warehouse/transaction_table/』
    • 項目名称・項目型
      • 合意『項目名称と項目型は以下の通りとする。』
        • tran_id : INT型
        • pay_method : STRING型
        • tran_dt : TIMESTAMP型
        • user_id : STRING型
        • shop_id : STRING型
        • pay_amount : INT型
        • y : STRING型
        • m : STRING型
        • d : STRING型
    • パーティション
      • 合意『SQLテーブルのパーティションはy,m,dで分割される』
    • 保存形式
      • 合意『SQLテーブルの保存形式:parquet形式』
    • Parquetファイルの圧縮アルゴリズム
      • 合意『SQLテーブルの圧縮アルゴリズムはSNAPPYとする』
    • セルの区切り
      • Parquet形式では、区切り文字の概念はありません。※Parquetが列指向のバイナリフォーマットであるためです。

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、SQLテーブルをDDL(Data Definition Language)を使用して読み込みます。

SQLテーブルをDDLで読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS transaction_table (
        tran_id                   STRING,     --トランザクションID
        pay_method                STRING,     --決済手段
        tran_dt                   TIMESTAMP,  --トランザクション日時
        user_id                   STRING,     --ユーザーID
        shop_id                   STRING,     --店舗ID
        pay_amount_raw            INT,        --決済金額(税込)
        pay_amount_without_tax    INT,        --決済金額(税抜)
        y                         STRING,     --年
        m                         STRING,     --月
        d                         STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string,m string,d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/transaction_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")

上記のコードで大切なことを3つ挙げます。

  • DROP TABLE IF EXISTS transaction_tableをする

    • 古いテーブルとその関連データを削除することで、クリーンな状態で作業を始めることができる。
  • s3ではなく、s3aを使用する

    • s3://とs3a://のスキーマ(URIスキーム)はパフォーマンスやセキュリティの面で違う。
    • 結論:s3a://のほうが良い。
  • MSCK REPAIR TABLEの使い所に気をつける。今回のケースは使う

    • 次の3つの条件が揃ったとき、MSCK(Metastore Check) REPAIR TABLEを使用する。
      • SQLテーブルが、LOCATIONで指定されるような外部テーブルである。
      • パーティション形式がy=2024/m=01/d=01 のようなディレクトリベースである
      • ディレクトリ構造がシンプルで、パーティション数が多くない。
        • スキャンに時間がかかりすぎないように注意

次に、SQLテーブルが読み込めたかどうかを show()メソッド を使用してデータ確認します。

データ確認
# 読み込み確認
spark.read.table("transaction_table").show(4,truncate=False)
tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 shop0001 770 700 2025 01 09

上記の操作の結果から、SQLテーブル(transaction_table) が意図した通りのアウトプットになっていることが確認できました。
最終的なスクリプトを紹介します。

スクリプト全量
from pyspark.sql import SparkSession

# Hiveサポートを有効にしたSparkSessionの作成
spark = (
    SparkSession.builder
    .appName("DDLExample")
    .config("spark.sql.catalogImplementation", "hive")
    .enableHiveSupport()
    .getOrCreate()
)

# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")

# テーブルの作成
spark.sql("""
    CREATE TABLE IF NOT EXISTS transaction_table (
        tran_id                   INT,        --トランザクションID
        pay_method                STRING,     --決済手段
        tran_dt                   TIMESTAMP,  --トランザクション日時
        user_id                   STRING,     --ユーザーID
        shop_id                   STRING,     --店舗ID
        pay_amount_raw            INT,        --決済金額(税込)
        pay_amount_without_tax    INT,        --決済金額(税抜)
        y                         STRING,     --年
        m                         STRING,     --月
        d                         STRING      --日
    )
    STORED AS PARQUET
    PARTITIONED BY (y string,m string,d string)  -- パーティションの定義
    TBLPROPERTIES (
        'parquet.compression' = 'SNAPPY'  -- Parquetファイルの圧縮方式
    )
    LOCATION 's3a://data/warehouse/transaction_table/';  -- データの格納場所
""")

# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")

『結論』

◾️SQLテーブルを読み込み、Pysparkデータフレーム化するメリット(ビジネスサイド)

  • SQLテーブルを直接処理できるため、データを移動したり、別のシステムに変換したりする必要がなく、運用コストや手間を削減できます。
  • SQLデータベースとPySpark間で直接データをやり取りすることで、データの整合性を保ちながら分析が可能となり、信頼性の高い結果を得ることができます。

◾️SQLテーブルを読み込み、Pysparkデータフレーム化するメリット(エンジニアリングサイド)

  • SQLデータをPySparkに取り込む処理をスクリプト化することで、データ処理の自動化と再現性を確保できます。
  • PySparkの分散処理能力を活用し、大規模データセットでも効率的に処理できます。SQLテーブルから直接データを読み込むことで、パフォーマンスの最適化が容易になります。

Discussion

fact601fact601

MSCK REPAIR TABLEを実行する理由はパフォーマンスが良くなるからですか?