🆗
【Python・Pysparkで学ぶ!】データ分析の基礎【データの読み込み③spark.sql("""""")】
【Python・Pysparkで学ぶ!】データ分析の基礎【データの読み込み③spark.sql("""""")】
↓トランザクションテーブル(transaction_table)のサンプル
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記のような決済データを集約したテーブルが存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『SQLテーブルをPysparkデータフレームにしてほしい』
本稿では、クライアントからの要望に答えながら、 SQLテーブルのPysparkデータフレーム化 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントの要望との認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
-
SQLテーブル名称
- 合意『テーブル名称:transaction_table』
- SQLテーブルの格納先
- 合意『格納先:s3://data/warehouse/transaction_table/』
-
項目名称・項目型
-
合意『項目名称と項目型は以下の通りとする。』
- tran_id : INT型
- pay_method : STRING型
- tran_dt : TIMESTAMP型
- user_id : STRING型
- shop_id : STRING型
- pay_amount : INT型
- y : STRING型
- m : STRING型
- d : STRING型
-
合意『項目名称と項目型は以下の通りとする。』
- パーティション
- 合意『SQLテーブルのパーティションはy,m,dで分割される』
- 保存形式
- 合意『SQLテーブルの保存形式:parquet形式』
- Parquetファイルの圧縮アルゴリズム
- 合意『SQLテーブルの圧縮アルゴリズムはSNAPPYとする』
-
セルの区切り
- Parquet形式では、区切り文字の概念はありません。※Parquetが列指向のバイナリフォーマットであるためです。
-
SQLテーブル名称
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、SQLテーブルをDDL(Data Definition Language)を使用して読み込みます。
SQLテーブルをDDLで読み込み
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS transaction_table (
tran_id STRING, --トランザクションID
pay_method STRING, --決済手段
tran_dt TIMESTAMP, --トランザクション日時
user_id STRING, --ユーザーID
shop_id STRING, --店舗ID
pay_amount_raw INT, --決済金額(税込)
pay_amount_without_tax INT, --決済金額(税抜)
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/transaction_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")
上記のコードで大切なことを3つ挙げます。
-
- 古いテーブルとその関連データを削除することで、クリーンな状態で作業を始めることができる。
-
- s3://とs3a://のスキーマ(URIスキーム)はパフォーマンスやセキュリティの面で違う。
- 結論:s3a://のほうが良い。
-
- 次の3つの条件が揃ったとき、MSCK(Metastore Check) REPAIR TABLEを使用する。
- SQLテーブルが、LOCATIONで指定されるような外部テーブルである。
- パーティション形式がy=2024/m=01/d=01 のようなディレクトリベースである
- ディレクトリ構造がシンプルで、パーティション数が多くない。
- スキャンに時間がかかりすぎないように注意
- 次の3つの条件が揃ったとき、MSCK(Metastore Check) REPAIR TABLEを使用する。
次に、SQLテーブルが読み込めたかどうかを show()メソッド を使用してデータ確認します。
データ確認
# 読み込み確認
spark.read.table("transaction_table").show(4,truncate=False)
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記の操作の結果から、SQLテーブル(transaction_table) が意図した通りのアウトプットになっていることが確認できました。
最終的なスクリプトを紹介します。
スクリプト全量
from pyspark.sql import SparkSession
# Hiveサポートを有効にしたSparkSessionの作成
spark = (
SparkSession.builder
.appName("DDLExample")
.config("spark.sql.catalogImplementation", "hive")
.enableHiveSupport()
.getOrCreate()
)
# テーブルの削除
spark.sql("DROP TABLE IF EXISTS transaction_table")
# テーブルの作成
spark.sql("""
CREATE TABLE IF NOT EXISTS transaction_table (
tran_id INT, --トランザクションID
pay_method STRING, --決済手段
tran_dt TIMESTAMP, --トランザクション日時
user_id STRING, --ユーザーID
shop_id STRING, --店舗ID
pay_amount_raw INT, --決済金額(税込)
pay_amount_without_tax INT, --決済金額(税抜)
y STRING, --年
m STRING, --月
d STRING --日
)
STORED AS PARQUET
PARTITIONED BY (y string,m string,d string) -- パーティションの定義
TBLPROPERTIES (
'parquet.compression' = 'SNAPPY' -- Parquetファイルの圧縮方式
)
LOCATION 's3a://data/warehouse/transaction_table/'; -- データの格納場所
""")
# MSCK REPAIRでパーティションの修復
spark.sql("""MSCK REPAIR TABLE transaction_table;""")
『結論』
◾️SQLテーブルを読み込み、Pysparkデータフレーム化するメリット(ビジネスサイド)
- SQLテーブルを直接処理できるため、データを移動したり、別のシステムに変換したりする必要がなく、運用コストや手間を削減できます。
- SQLデータベースとPySpark間で直接データをやり取りすることで、データの整合性を保ちながら分析が可能となり、信頼性の高い結果を得ることができます。
◾️SQLテーブルを読み込み、Pysparkデータフレーム化するメリット(エンジニアリングサイド)
- SQLデータをPySparkに取り込む処理をスクリプト化することで、データ処理の自動化と再現性を確保できます。
- PySparkの分散処理能力を活用し、大規模データセットでも効率的に処理できます。SQLテーブルから直接データを読み込むことで、パフォーマンスの最適化が容易になります。
Discussion
MSCK REPAIR TABLEを実行する理由はパフォーマンスが良くなるからですか?