🆗

【Python・Pysparkで学ぶ!】データ分析の基礎【データ分析の最適化①StructType, StructField】

2025/01/18に公開
2

【Python・Pysparkで学ぶ!】データ分析の基礎【データ分析の最適化①StructType, StructField】

◾️現行スクリプト

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

# セッション作成
spark = SparkSession.builder.getOrCreate()

# データソース読み込み
df = spark.read.csv("s3://data/content/pj01/static/transaction.csv", header=True)

上記のようなスクリプトが存在すると仮定します。

◾️要望

とある日の朝会MTGにて、クライアントから次のような要望を頂きました。

『データソースの読み込みを最適化し、処理時間を短縮したい』

本稿では、クライアントからの要望に答えながら、 データ分析の最適化 について学びます。
よろしくお願いいたします。

◾️AsIs(現状把握)

エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。

◾️タッチポイント議事録

  • 処理時間(始点と終点)

    • 合意①『処理時間の始点はDataFrameReadハンドルが実行される直前』
      • データソースが静的データなので、DataFrameReadハンドルはread()メソッドとなる。
    • 合意②『処理時間の終点はアクション操作が実行された直後』
      • アクション操作:show()を採用。
      • 小数点第2位まで計測。
  • PySpark 設定のチューニング

    • 合意『Pysparkの設定は変更しない』

◾️アウトプットイメージ
 タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)

スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

# セッション作成
spark = SparkSession.builder.getOrCreate()

# 改修スクリプト

現行スクリプト:秒
改修スクリプト:秒

◾️ToBe(スクリプト作成)

タッチポイント議事録をもとに、スクリプトを作成します。

◾️作成手順

はじめに、現行スクリプトの処理時間を確認します。PySpark で処理時間を計測するには、Python の time モジュールを使うのが簡単です。

改修前スクリプト処理時間確認
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn

# セッション作成
spark = SparkSession.builder.getOrCreate()

# 計測開始
start_time = time.time()

# データソース読み込み
df = spark.read.csv("s3://data/content/pj01/static/transaction.csv", header=True)
df.show()

# 計測終了
end_time = time.time()

# 処理時間の出力
print(f"現行スクリプト:{end_time - start_time:.2f}秒")
アウトプット
現行スクリプト:1.49

次に、処理時間を短縮する方法を実践します。短縮する方法はいくつかあります。

  • Pysparkの設定をチューニングする方法
  • スキーマを事前に定義する方法
    先にPysaprkの設定をチューニングする方法を紹介します。
    方向性としては、デフォルトの並列度 (spark.default.parallelism) を増やすことで、複数のタスクを同時に実行可能にする、というものです。
並列度を増加
spark.conf.set("spark.default.parallelism", "16")  # 並列度を例として16に設定

今回は、クライアントとPysparkの設定は変更しないことを合意したため『スキーマを事前に定義する方法』を実践します。

スキーマ定義
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# スキーマ定義
transaction_table_schema = StructType([
    StructField("tran_id", StringType(), False),
    StructField("pay_method", StringType(), False),
    StructField("tran_dt", TimestampType(), True),
    StructField("user_id", StringType(), True),
    StructField("shop_id", StringType(), True),
    StructField("pay_amount_raw", IntegerType(), False),
    StructField("pay_amount_without_tax", IntegerType(), False),
    StructField("y", StringType(), True),
    StructField("m", StringType(), True),
    StructField("d", StringType(), True)
])

次に、改修後スクリプトの処理時間を確認します。

改修前スクリプト処理時間確認
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# セッション作成
spark = SparkSession.builder.getOrCreate()

# スキーマ定義
transaction_table_schema = StructType([
    StructField("tran_id", StringType(), False),
    StructField("pay_method", StringType(), False),
    StructField("tran_dt", TimestampType(), True),
    StructField("user_id", StringType(), True),
    StructField("shop_id", StringType(), True),
    StructField("pay_amount_raw", IntegerType(), False),
    StructField("pay_amount_without_tax", IntegerType(), False),
    StructField("y", StringType(), True),
    StructField("m", StringType(), True),
    StructField("d", StringType(), True)
])

# 計測開始
start_time = time.time()

# データソース読み込み
df = spark.read.csv("s3://data/content/pj01/static/transaction.csv", header=True,schema=transaction_table_schema)
df.show()

# 計測終了
end_time = time.time()

# 処理時間の出力
print(f"改修スクリプト:{end_time - start_time:.2f}秒")
アウトプット
改修スクリプト:1.17# 改修前と比較して20%短縮

型推測処理の負荷

Spark はデフォルトで、CSV ファイルや JSON ファイルを読み込む際に、データ型を自動的に推測するために一部または全データをスキャンします。このプロセスには計算時間がかかります。

上記の結果から、改修後のスクリプトの処理時間が意図した通り短縮できていることが確認できました。
最終的なスクリプトを紹介します。

改修前スクリプト処理時間確認
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# セッション作成
spark = SparkSession.builder.getOrCreate()

# スキーマ定義
transaction_table_schema = StructType([
    StructField("tran_id", StringType(), False),
    StructField("pay_method", StringType(), False),
    StructField("tran_dt", TimestampType(), True),
    StructField("user_id", StringType(), True),
    StructField("shop_id", StringType(), True),
    StructField("pay_amount_raw", IntegerType(), False),
    StructField("pay_amount_without_tax", IntegerType(), False),
    StructField("y", StringType(), True),
    StructField("m", StringType(), True),
    StructField("d", StringType(), True)
])

# データソース読み込み
df = spark.read.csv("s3://data/content/pj01/static/transaction.csv", header=True,schema=transaction_table_schema)

『結論』

◾️事前にスキーマを定義する意義(ビジネスサイド)

  • データ処理のスピードアップにより、意思決定のタイミングが早まる。
  • クラウドリソースの使用時間を削減し、計算コストやストレージコストを最適化につながる。

◾️事前にスキーマを定義する意義(エンジニアサイド)

  • PySpark の型推測処理をスキップし、データ読み込みのパフォーマンスを向上につながる。
  • 明示的な型情報に基づき、Spark が効率的な内部データ形式を利用可能になる。

Discussion

fact601fact601

スキーマを定義すると、Spark側が型確認するフローが省略されて、その結果で処理時間が短くなるという認識でよろしいでしょうか?

たいきたいき

その通りです。型推論(Schema Inference)の具体的なステップは以下の通りです。

  1. サンプリング
    最初に、Sparkはデータソースからサンプルを取得して、型推論に使用します。

  2. データ型の推論
    次に、サンプルデータをもとに、各列のデータ型を推論します。Sparkは、データの内容に基づいて最適な型(整数型、文字列型、日付型など)を決定します。

  3. スキーマの構築
    最後に、推論したデータ型を使って、全体のスキーマが構築されます。このスキーマは、後続の処理に使用されます。

特に、SparkはデフォルトでCSVファイルを読み込む際、サンプリングを全行に対して実施するため、型推論に時間がかかることがあります。