【Python・PySparkで学ぶ!】PandasでExcelを読み込んでからPySparkデータフレーム化
↓トランザクションエクセルファイル(transaction_table.xlsx)のサンプル
上記のようなデータがエクセルファイル形式で存在すると仮定します。
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『エクセルファイルをPysparkデータフレームにしてほしい』
本稿では、クライアントからの要望に答えながら、 エクセルファイルのPysparkデータフレーム化 について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
-
合意『データソースの特性を以下の通りとする』
- トランザクションエクセルファイル(transaction_table.xlsx): 静的データ
-
合意『データソースの特性を以下の通りとする』
-
-
合意『データソースの格納先を以下の通りとする』
-
トランザクションエクセルファイル(transaction_table.xlsx)
- 格納先:s3://data/content/pj01/static/transaction.xlsx
-
トランザクションエクセルファイル(transaction_table.xlsx)
-
合意『データソースの格納先を以下の通りとする』
- データソースのオプション情報
-
合意『データソースのオプション情報を以下の通りとする』
-
トランザクションエクセルファイル(transaction_table.xlsx)
- ヘッダー名称とデータ型
- tran_id : StringType : nullを許可しない
- pay_method : StringType : nullを許可しない
- tran_dt : TimestampType : nullを許可しない
- user_id : StringType : nullを許可しない
- shop_id : StringType : nullを許可しない
- pay_amount_raw : IntegerType : nullを許可しない
- pay_amount_without_tax : IntegerType : nullを許可しない
- y : StringType : nullを許可しない
- m : StringType : nullを許可しない
- d : StringType : nullを許可しない
- データソースのヘッダー行の有無
- ヘッダー行:有り
- 空文字の対応
- nullは想定していないため、空文字の場合はエラー情報を上げる
- 複数行が1セル内に存在する場合の対応
- 複数行が1セル内に存在する場合がそもそもない。
- 文字コード
- データソースの文字コードがUTF-8なので変換なし。
- Indexの必要性
- 必要なし
- シートの名称
- "Sheet1"
- ヘッダー名称とデータ型
-
トランザクションエクセルファイル(transaction_table.xlsx)
-
合意『データソースのオプション情報を以下の通りとする』
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025/1/3 6:48 | user000005 | shop0004 | 1540 | 1400 | 2025 | 1 | 3 |
102 | cache | 2025/1/4 22:27 | user000027 | shop0006 | 6050 | 5500 | 2025 | 1 | 4 |
103 | code | 2025/1/8 3:22 | user000020 | shop0003 | 8140 | 7400 | 2025 | 1 | 8 |
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、データソースを読み込みます。ここで、データソースはExcelファイルです。Pysparkでは読み込みに直接対応していないため、Pandasライブラリを使用してExcelファイルを読み込みます。
import pandas as pd
tran_pd = pd.read_excel("s3://data/content/pj01/static/transaction.xlsx", sheet_name='Sheet1')
次にタッチポイントで合意した「ヘッダー名称とデータ型」に従い、読み込むデータソースの『スキーマ定義』 を行います。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# スキーマ定義
transaction_table_schema = StructType([
StructField("tran_id", StringType(), False), # nullを許可しない
StructField("pay_method", StringType(), False), # nullを許可しない
StructField("tran_dt", TimestampType(), True), # nullを許可しない
StructField("user_id", StringType(), True), # nullを許可しない
StructField("shop_id", StringType(), True), # nullを許可しない
StructField("pay_amount_raw", IntegerType(), False), # nullを許可しない
StructField("pay_amount_without_tax", IntegerType(), False), # nullを許可しない
StructField("y", StringType(), True), # nullを許可しない
StructField("m", StringType(), True), # nullを許可しない
StructField("d", StringType(), True) # nullを許可しない
])
そして、createDataFrame()メソッドを使用して、PandasデータフレームからPySparkデータフレームを作成します。
createDataFrame():さまざまなデータソース(例えば、Pandas DataFrame、RDD、リスト、辞書など)から PySpark の DataFrame を作成するための関数
spark.createDataFrame(tran_pd,schema=transaction_table_schema)
tran_id | pay_method | tran_dt | user_id | shop_id | pay_amount_raw | pay_amount_without_tax | y | m | d |
---|---|---|---|---|---|---|---|---|---|
101 | code | 2025-01-03 06:48:03 | user000007 | shop0004 | 1540 | 1400 | 2025 | 1 | 3 |
102 | cache | 2025-01-04 22:27:16 | user000001 | shop0006 | 6050 | 5500 | 2025 | 1 | 4 |
103 | code | 2025-01-08 03:22:07 | user000001 | shop0003 | 8140 | 7400 | 2025 | 1 | 8 |
104 | card | 2025-01-09 09:10:45 | user000011 | shop0001 | 770 | 700 | 2025 | 1 | 9 |
上記の操作の結果から、エクセルファイル(transaction_table.xlsx) が意図した通りのアウトプットになっていることが確認できました。
最終的なスクリプトを紹介します。
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
# セッション作成
spark = SparkSession.builder.getOrCreate()
# スキーマ定義
transaction_table_schema = StructType([
StructField("tran_id", StringType(), False),
StructField("pay_method", StringType(), False),
StructField("tran_dt", TimestampType(), True),
StructField("user_id", StringType(), True),
StructField("shop_id", StringType(), True),
StructField("pay_amount_raw", IntegerType(), False),
StructField("pay_amount_without_tax", IntegerType(), False),
StructField("y", StringType(), True),
StructField("m", StringType(), True),
StructField("d", StringType(), True)
])
# エクセルファイルをPandasデータフレーム読み込み
tran_pd = pd.read_excel("s3://data/content/pj01/static/transaction.xlsx", sheet_name='Sheet1')
# PandasデータフレームからPySparkデータフレームを作成
spark.createDataFrame(tran_pd,schema=transaction_table_schema)
『結論』
◾️エクセルファイルをPySparkデータフレーム化する意義(ビジネスサイド)
- PySparkは分散処理に対応しており、大量のExcelデータを効率的に処理できます。ビジネス規模が拡大しても、パフォーマンスを維持したまま処理が可能です。
- Excelファイルは異なる部署やシステムから収集されることが多いため、PySparkで複数のExcelファイルやデータソースを統合して、一元的な分析基盤を作成できます
◾️CSVファイルを読み込み方を学ぶメリット(エンジニアリングサイド)
- ExcelファイルをPySparkのデータフレームに変換することで、Sparkの並列処理機能を活用し、パフォーマンスを向上させます。
- PySparkではデータを処理する前にスキーマを定義できるため、データ型に対する型安全性が担保され、不正なデータ型や不整合が検出されやすくなります。
Discussion
例えば、Excelの文字コードがUTF-16のとき、データフレームをUTF-8に変換するにはどうしますか?