🆗

【Python・PySparkで学ぶ!】動的CSVファイルを読み込もう

に公開

↓トランザクションエクセルファイル(transaction_table.csv)のサンプル

tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/01/03 06:48 user000007 shop0004 1540 1400 2025 01 03
102 cache 2025/01/04 22:27 user000001 shop0006 6050 5500 2025 01 04
103 code 2025/01/08 03:22 user000001 shop0003 8140 7400 2025 01 08
104 card 2025/01/09 09:10 user000011 shop0001 770 700 2025 01 09

上記のようなデータがCSVファイル形式で存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『CSVファイルをテーブル化してほしい』

本稿では、クライアントからの要望に答えながら、 CSVファイルのPysparkデータフレーム化 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • データソースの特性(静的か動的か)

    • 合意『データソースの特性を以下の通りとする』
      • トランザクションエクセルファイル(transaction_table.csv): 動的データ
  • データソースの格納先

    • 合意『データソースの格納先を以下の通りとする』
      • トランザクションエクセルファイル(transaction_table.csv)
        • 格納先:s3://data/content/pj01/dynamic/transaction_table.csv
  • データソースのオプション情報
    • 合意『データソースのオプション情報を以下の通りとする』
      • トランザクションエクセルファイル(transaction.csv)
        • ヘッダー名称とデータ型
          • tran_id : StringType : nullを許可しない
          • pay_method : StringType : nullを許可しない
          • tran_dt : TimestampType : nullを許可しない
          • user_id : StringType : nullを許可しない
          • shop_id : StringType : nullを許可しない
          • pay_amount_raw : IntegerType : nullを許可しない
          • pay_amount_without_tax : IntegerType : nullを許可しない
          • y : StringType : nullを許可しない
          • m : StringType : nullを許可しない
          • d : StringType : nullを許可しない
        • データソースのヘッダー行の有無
          • ヘッダー行:有り
        • 空文字の対応
          • nullは想定していないため、空文字の場合はエラー情報を上げる
        • 複数行が1セル内に存在する場合の対応
          • 複数行が1セル内に存在する場合がそもそもない。
        • 文字コード
          • データソースの文字コードがUTF-8なので、変換する必要なし
        • Indexの必要性
          • 必要なし
  • アウトプットテーブル情報
    • 合意『テーブル名称は"transaction_table"』

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/1/3 6:48 user000005 shop0004 1540 1400 2025 1 3
102 cache 2025/1/4 22:27 user000027 shop0006 6050 5500 2025 1 4
103 code 2025/1/8 3:22 user000020 shop0003 8140 7400 2025 1 8

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、タッチポイントで合意した「ヘッダー名称とデータ型」に従い、読み込むデータソースの『スキーマ定義』 を行います。

読み込むデータソースの『スキーマ定義』
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# スキーマ定義
transaction_table_schema = StructType([
    StructField("tran_id", StringType(), False), # nullを許可しない
    StructField("pay_method", StringType(), False), # nullを許可しない
    StructField("tran_dt", TimestampType(), True), # nullを許可しない
    StructField("user_id", StringType(), True), # nullを許可しない
    StructField("shop_id", StringType(), True), # nullを許可しない
    StructField("pay_amount_raw", IntegerType(), False), # nullを許可しない
    StructField("pay_amount_without_tax", IntegerType(), False), # nullを許可しない
    StructField("y", StringType(), True), # nullを許可しない
    StructField("m", StringType(), True), # nullを許可しない
    StructField("d", StringType(), True) # nullを許可しない
])

次に、データソースを読み込みます。ここで、データソースの特性は動的データなので、DataFrameReaderのハンドル readStream()メソッド を使用します。
また、ストリーミング処理では、ファイルを個別に指定することはできません。代わりに、ディレクトリを指定します。

データソースを読み込み
spark.readStream.format("csv") \ # readStream()メソッドを使用
.option("header", "true") \ # ヘッダー指定
.schema(transaction_table_schema) \ # スキーマ定義(構造化ストリームには必須) 
.load("s3://data/content/pj01/dynamic/") \ # ディレクトリ指定

ここで、データソースが正確に読み込むことができたかを確認します。
readStream()を使用して読み込んだデータフレーム(df)は、ストリーミングデータとして扱われます。通常のバッチ処理用のshow()メソッドではデータを直接確認することはできません。 ストリーミングデータを確認するには、Sink(出力先) を設定して処理を開始する必要があります。

Sink 例① ) コンソール出力

コンソール出力
query = df.writeStream \
    .outputMode("append") \  # 出力モード
    .format("console") \  # Sinkの指定
    .start()

Sink 例② ) ファイル出力(CSV)

ファイル出力(CSV)
query = df.writeStream \
    .outputMode("append") \
    .format("csv") \  # 出力形式
    .option("path", "output_directory") \  # 出力先ディレクトリ
    .option("checkpointLocation", "checkpoint_directory") \  # チェックポイントディレクトリ(必須)
    .start()

Sink 例③ ) メモリ出力(SQLテーブル)

メモリ出力(SQLテーブル)
query = df.writeStream \
    .outputMode("overwrite") \
    .format("memory") \
    .queryName("streaming_table") \  # テーブル名
    .start()

Sinkは他にもKafka出力などにも対応しています。一番手軽なのはコンソール出力ですが、今回は要望に対応するため、メモリ出力(SQLテーブル)します。
最後に、上記のコードスニペットをまとめたスクリプトの全量を紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# セッション作成
spark = SparkSession.builder.getOrCreate()

# スキーマ定義
transaction_table_schema = StructType([
    StructField("tran_id", StringType(), False), # nullを許可しない
    StructField("pay_method", StringType(), False), # nullを許可しない
    StructField("tran_dt", TimestampType(), True), # nullを許可しない
    StructField("user_id", StringType(), True), # nullを許可しない
    StructField("shop_id", StringType(), True), # nullを許可しない
    StructField("pay_amount_raw", IntegerType(), False), # nullを許可しない
    StructField("pay_amount_without_tax", IntegerType(), False), # nullを許可しない
    StructField("y", StringType(), True), # nullを許可しない
    StructField("m", StringType(), True), # nullを許可しない
    StructField("d", StringType(), True) # nullを許可しない
])

# データソースを読み込み
df = (
    spark.readStream.format("csv") \ # readStream()メソッドを使用
    .option("header", "true") \ # ヘッダー指定
    .schema(transaction_table_schema) \ # スキーマ定義(構造化ストリームには必須) 
    .load("s3://data/content/pj01/dynamic/") \ # ディレクトリ指定
)

query = df.writeStream \
    .outputMode("overwrite") \
    .format("memory") \
    .queryName("streaming_table") \  # テーブル名
    .start()

『結論』

◾️構造化ストリーミングを使用するメリット(ビジネス)

  • 『迅速な意思決定』
    リアルタイムでデータを分析することで、トレンドや異常を素早く検知し、迅速な意思決定が可能になります(例: 在庫管理、マーケティングキャンペーンの最適化)。

  • 『顧客体験の向上』
    リアルタイムデータを利用して、パーソナライズされたサービス(例: リアルタイム推奨エンジン、顧客サポートの迅速化)を提供することで、顧客満足度を向上させることができます。

◾️構造化ストリーミングを使用するメリット(エンジニア)

  • 『リアルタイム処理が可能』
    データが到着した瞬間に処理を行えるため、即時性が求められるシステム(例: IoTデバイス、監視システム、金融取引)の実装が容易になります。

  • 『スケーラブルで耐障害性の高い設計』
    PySparkの構造化ストリーミングはマイクロバッチ処理を採用しており、大量のデータを効率的に処理しつつ、障害時にもチェックポイントを利用して再開可能です。

Discussion