Zenn
🆗

【Python・PySparkで学ぶ!】PandasでExcelを読み込んでからPySparkデータフレーム化

に公開
1
2

↓トランザクションエクセルファイル(transaction_table.xlsx)のサンプル

上記のようなデータがエクセルファイル形式で存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『エクセルファイルをPysparkデータフレームにしてほしい』

本稿では、クライアントからの要望に答えながら、 エクセルファイルのPysparkデータフレーム化 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • データソースの特性(静的か動的か)

    • 合意『データソースの特性を以下の通りとする』
      • トランザクションエクセルファイル(transaction_table.xlsx): 静的データ
  • データソースの格納先

    • 合意『データソースの格納先を以下の通りとする』
      • トランザクションエクセルファイル(transaction_table.xlsx)
        • 格納先:s3://data/content/pj01/static/transaction.xlsx
  • データソースのオプション情報
    • 合意『データソースのオプション情報を以下の通りとする』
      • トランザクションエクセルファイル(transaction_table.xlsx)
        • ヘッダー名称とデータ型
          • tran_id : StringType : nullを許可しない
          • pay_method : StringType : nullを許可しない
          • tran_dt : TimestampType : nullを許可しない
          • user_id : StringType : nullを許可しない
          • shop_id : StringType : nullを許可しない
          • pay_amount_raw : IntegerType : nullを許可しない
          • pay_amount_without_tax : IntegerType : nullを許可しない
          • y : StringType : nullを許可しない
          • m : StringType : nullを許可しない
          • d : StringType : nullを許可しない
        • データソースのヘッダー行の有無
          • ヘッダー行:有り
        • 空文字の対応
          • nullは想定していないため、空文字の場合はエラー情報を上げる
        • 複数行が1セル内に存在する場合の対応
          • 複数行が1セル内に存在する場合がそもそもない。
        • 文字コード
          • データソースの文字コードがUTF-8なので変換なし。
        • Indexの必要性
          • 必要なし
        • シートの名称
          • "Sheet1"

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025/1/3 6:48 user000005 shop0004 1540 1400 2025 1 3
102 cache 2025/1/4 22:27 user000027 shop0006 6050 5500 2025 1 4
103 code 2025/1/8 3:22 user000020 shop0003 8140 7400 2025 1 8

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、データソースを読み込みます。ここで、データソースはExcelファイルです。Pysparkでは読み込みに直接対応していないため、Pandasライブラリを使用してExcelファイルを読み込みます。

Pandasライブラリを使用してExcelを読み込み
import pandas as pd
tran_pd = pd.read_excel("s3://data/content/pj01/static/transaction.xlsx", sheet_name='Sheet1')

次にタッチポイントで合意した「ヘッダー名称とデータ型」に従い、読み込むデータソースの『スキーマ定義』 を行います。

読み込むデータソースの『スキーマ定義』
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# スキーマ定義
transaction_table_schema = StructType([
    StructField("tran_id", StringType(), False), # nullを許可しない
    StructField("pay_method", StringType(), False), # nullを許可しない
    StructField("tran_dt", TimestampType(), True), # nullを許可しない
    StructField("user_id", StringType(), True), # nullを許可しない
    StructField("shop_id", StringType(), True), # nullを許可しない
    StructField("pay_amount_raw", IntegerType(), False), # nullを許可しない
    StructField("pay_amount_without_tax", IntegerType(), False), # nullを許可しない
    StructField("y", StringType(), True), # nullを許可しない
    StructField("m", StringType(), True), # nullを許可しない
    StructField("d", StringType(), True) # nullを許可しない
])

そして、createDataFrame()メソッドを使用して、PandasデータフレームからPySparkデータフレームを作成します。
createDataFrame():さまざまなデータソース(例えば、Pandas DataFrame、RDD、リスト、辞書など)から PySpark の DataFrame を作成するための関数

PySparkデータフレーム化
spark.createDataFrame(tran_pd,schema=transaction_table_schema)
tran_id pay_method tran_dt user_id shop_id pay_amount_raw pay_amount_without_tax y m d
101 code 2025-01-03 06:48:03 user000007 shop0004 1540 1400 2025 1 3
102 cache 2025-01-04 22:27:16 user000001 shop0006 6050 5500 2025 1 4
103 code 2025-01-08 03:22:07 user000001 shop0003 8140 7400 2025 1 8
104 card 2025-01-09 09:10:45 user000011 shop0001 770 700 2025 1 9

上記の操作の結果から、エクセルファイル(transaction_table.xlsx) が意図した通りのアウトプットになっていることが確認できました。
最終的なスクリプトを紹介します。

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# セッション作成
spark = SparkSession.builder.getOrCreate()

# スキーマ定義
transaction_table_schema = StructType([
    StructField("tran_id", StringType(), False),
    StructField("pay_method", StringType(), False),
    StructField("tran_dt", TimestampType(), True),
    StructField("user_id", StringType(), True),
    StructField("shop_id", StringType(), True),
    StructField("pay_amount_raw", IntegerType(), False),
    StructField("pay_amount_without_tax", IntegerType(), False),
    StructField("y", StringType(), True),
    StructField("m", StringType(), True),
    StructField("d", StringType(), True)
])

# エクセルファイルをPandasデータフレーム読み込み
tran_pd = pd.read_excel("s3://data/content/pj01/static/transaction.xlsx", sheet_name='Sheet1')

# PandasデータフレームからPySparkデータフレームを作成
spark.createDataFrame(tran_pd,schema=transaction_table_schema)

『結論』

◾️エクセルファイルをPySparkデータフレーム化する意義(ビジネスサイド)

  • PySparkは分散処理に対応しており、大量のExcelデータを効率的に処理できます。ビジネス規模が拡大しても、パフォーマンスを維持したまま処理が可能です。
  • Excelファイルは異なる部署やシステムから収集されることが多いため、PySparkで複数のExcelファイルやデータソースを統合して、一元的な分析基盤を作成できます

◾️CSVファイルを読み込み方を学ぶメリット(エンジニアリングサイド)

  • ExcelファイルをPySparkのデータフレームに変換することで、Sparkの並列処理機能を活用し、パフォーマンスを向上させます。
  • PySparkではデータを処理する前にスキーマを定義できるため、データ型に対する型安全性が担保され、不正なデータ型や不整合が検出されやすくなります。
2

Discussion

fact601fact601

例えば、Excelの文字コードがUTF-16のとき、データフレームをUTF-8に変換するにはどうしますか?

ログインするとコメントできます