【Python・PySparkで学ぶ!】動的CSVファイルを読み込もう
↓トランザクションエクセルファイル(transaction_table.csv)のサンプル
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48 | user000007 | shop0004 | 1540 | 1400 | 2025 | 01 | 03 |
102 | cache | 2025/01/04 22:27 | user000001 | shop0006 | 6050 | 5500 | 2025 | 01 | 04 |
103 | code | 2025/01/08 03:22 | user000001 | shop0003 | 8140 | 7400 | 2025 | 01 | 08 |
104 | card | 2025/01/09 09:10 | user000011 | shop0001 | 770 | 700 | 2025 | 01 | 09 |
上記のようなデータがCSVファイル形式で存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『CSVファイルをテーブル化してほしい』
本稿では、クライアントからの要望に答えながら、 CSVファイルのPysparkデータフレーム化 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
-
合意『データソースの特性を以下の通りとする』
- トランザクションエクセルファイル(transaction_table.csv): 動的データ
-
合意『データソースの特性を以下の通りとする』
-
-
合意『データソースの格納先を以下の通りとする』
-
トランザクションエクセルファイル(transaction_table.csv)
- 格納先:s3://data/content/pj01/dynamic/transaction_table.csv
-
トランザクションエクセルファイル(transaction_table.csv)
-
合意『データソースの格納先を以下の通りとする』
- データソースのオプション情報
-
合意『データソースのオプション情報を以下の通りとする』
-
トランザクションエクセルファイル(transaction.csv)
- ヘッダー名称とデータ型
- tran_id : StringType : nullを許可しない
- pay_method : StringType : nullを許可しない
- tran_dt : TimestampType : nullを許可しない
- user_id : StringType : nullを許可しない
- shop_id : StringType : nullを許可しない
- pay_amount_raw : IntegerType : nullを許可しない
- pay_amount_without_tax : IntegerType : nullを許可しない
- y : StringType : nullを許可しない
- m : StringType : nullを許可しない
- d : StringType : nullを許可しない
- データソースのヘッダー行の有無
- ヘッダー行:有り
- 空文字の対応
- nullは想定していないため、空文字の場合はエラー情報を上げる
- 複数行が1セル内に存在する場合の対応
- 複数行が1セル内に存在する場合がそもそもない。
- 文字コード
- データソースの文字コードがUTF-8なので、変換する必要なし
- Indexの必要性
- 必要なし
- ヘッダー名称とデータ型
-
トランザクションエクセルファイル(transaction.csv)
-
合意『データソースのオプション情報を以下の通りとする』
- アウトプットテーブル情報
- 合意『テーブル名称は"transaction_table"』
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/1/3 6:48 | user000005 | shop0004 | 1540 | 1400 | 2025 | 1 | 3 |
102 | cache | 2025/1/4 22:27 | user000027 | shop0006 | 6050 | 5500 | 2025 | 1 | 4 |
103 | code | 2025/1/8 3:22 | user000020 | shop0003 | 8140 | 7400 | 2025 | 1 | 8 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、タッチポイントで合意した「ヘッダー名称とデータ型」に従い、読み込むデータソースの『スキーマ定義』 を行います。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# スキーマ定義
transaction_table_schema = StructType([
StructField("tran_id", StringType(), False), # nullを許可しない
StructField("pay_method", StringType(), False), # nullを許可しない
StructField("tran_dt", TimestampType(), True), # nullを許可しない
StructField("user_id", StringType(), True), # nullを許可しない
StructField("shop_id", StringType(), True), # nullを許可しない
StructField("pay_amount_raw", IntegerType(), False), # nullを許可しない
StructField("pay_amount_without_tax", IntegerType(), False), # nullを許可しない
StructField("y", StringType(), True), # nullを許可しない
StructField("m", StringType(), True), # nullを許可しない
StructField("d", StringType(), True) # nullを許可しない
])
次に、データソースを読み込みます。ここで、データソースの特性は動的データなので、DataFrameReaderのハンドル readStream()メソッド を使用します。
また、ストリーミング処理では、ファイルを個別に指定することはできません。代わりに、ディレクトリを指定します。
spark.readStream.format("csv") \ # readStream()メソッドを使用
.option("header", "true") \ # ヘッダー指定
.schema(transaction_table_schema) \ # スキーマ定義(構造化ストリームには必須)
.load("s3://data/content/pj01/dynamic/") \ # ディレクトリ指定
ここで、データソースが正確に読み込むことができたかを確認します。
readStream()を使用して読み込んだデータフレーム(df)は、ストリーミングデータとして扱われます。通常のバッチ処理用のshow()メソッドではデータを直接確認することはできません。 ストリーミングデータを確認するには、Sink(出力先) を設定して処理を開始する必要があります。
Sink 例① ) コンソール出力
query = df.writeStream \
.outputMode("append") \ # 出力モード
.format("console") \ # Sinkの指定
.start()
Sink 例② ) ファイル出力(CSV)
query = df.writeStream \
.outputMode("append") \
.format("csv") \ # 出力形式
.option("path", "output_directory") \ # 出力先ディレクトリ
.option("checkpointLocation", "checkpoint_directory") \ # チェックポイントディレクトリ(必須)
.start()
Sink 例③ ) メモリ出力(SQLテーブル)
query = df.writeStream \
.outputMode("overwrite") \
.format("memory") \
.queryName("streaming_table") \ # テーブル名
.start()
Sinkは他にもKafka出力などにも対応しています。一番手軽なのはコンソール出力ですが、今回は要望に対応するため、メモリ出力(SQLテーブル)します。
最後に、上記のコードスニペットをまとめたスクリプトの全量を紹介します。
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# セッション作成
spark = SparkSession.builder.getOrCreate()
# スキーマ定義
transaction_table_schema = StructType([
StructField("tran_id", StringType(), False), # nullを許可しない
StructField("pay_method", StringType(), False), # nullを許可しない
StructField("tran_dt", TimestampType(), True), # nullを許可しない
StructField("user_id", StringType(), True), # nullを許可しない
StructField("shop_id", StringType(), True), # nullを許可しない
StructField("pay_amount_raw", IntegerType(), False), # nullを許可しない
StructField("pay_amount_without_tax", IntegerType(), False), # nullを許可しない
StructField("y", StringType(), True), # nullを許可しない
StructField("m", StringType(), True), # nullを許可しない
StructField("d", StringType(), True) # nullを許可しない
])
# データソースを読み込み
df = (
spark.readStream.format("csv") \ # readStream()メソッドを使用
.option("header", "true") \ # ヘッダー指定
.schema(transaction_table_schema) \ # スキーマ定義(構造化ストリームには必須)
.load("s3://data/content/pj01/dynamic/") \ # ディレクトリ指定
)
query = df.writeStream \
.outputMode("overwrite") \
.format("memory") \
.queryName("streaming_table") \ # テーブル名
.start()
『結論』
◾️構造化ストリーミングを使用するメリット(ビジネス)
-
『迅速な意思決定』
リアルタイムでデータを分析することで、トレンドや異常を素早く検知し、迅速な意思決定が可能になります(例: 在庫管理、マーケティングキャンペーンの最適化)。 -
『顧客体験の向上』
リアルタイムデータを利用して、パーソナライズされたサービス(例: リアルタイム推奨エンジン、顧客サポートの迅速化)を提供することで、顧客満足度を向上させることができます。
◾️構造化ストリーミングを使用するメリット(エンジニア)
-
『リアルタイム処理が可能』
データが到着した瞬間に処理を行えるため、即時性が求められるシステム(例: IoTデバイス、監視システム、金融取引)の実装が容易になります。 -
『スケーラブルで耐障害性の高い設計』
PySparkの構造化ストリーミングはマイクロバッチ処理を採用しており、大量のデータを効率的に処理しつつ、障害時にもチェックポイントを利用して再開可能です。
Discussion