🆗
【Python・PySparkで学ぶ!】withColumn()とselect()を使い分けて最適化しよう!処理効率向上
df = (
spark.read.parquet("s3://sample/df.csv")
.withColumn("y",fn.date_format(fn.col("date"), "yyyy"))
.withColumn("m",fn.date_format(fn.col("date"), "mm"))
.withColumn("d",fn.date_format(fn.col("date"), "dd"))
.withColumn(
"pay_code_status_cls",
fn.when(fn.col("pay_code_paycnt_monthly") >= 7, fn.lit("00_heavy"))
.when(fn.col("pay_code_paycnt_monthly") >= 1, fn.lit("01_light"))
.otherwise(fn.lit("04_new"))
)
)
df.show(truncate=False)
上記のようなコードが存在すると仮定します。
このコードは以下の加工を行います。
- 日付カラムを年・月・日で分離させる。
- 月にコード決済した回数からステータスクラスを作成する
◾️要望
とある日の朝会MTGにて、クライアントから次のような要望を頂きました。
『処理時間を短くしたい』
本稿では、クライアントからの要望に答えながら、 withColumn()とselect()の使い分け について学びます。
よろしくお願いいたします。
◾️AsIs(現状把握)
エンジニアとクライアント間の認識に相違があるとアウトプットのイメージに相違が発生します。
はじめに、 データアセスメントの観点から論点を提示し、クライアントと集計ロジックの認識を擦り合わせるタッチポイント を設けましょう。
◾️タッチポイント議事録
-
-
合意①『処理時間の始点はDataFrameReadハンドルが実行される直前』
- データソースが静的データなので、DataFrameReadハンドルはread()メソッドとなる。
-
合意②『処理時間の終点はアクション操作が実行された直後』
- アクション操作:show()を採用。
- 小数点第2位まで計測。
-
合意①『処理時間の始点はDataFrameReadハンドルが実行される直前』
-
- 合意『Pysparkの設定は変更しない』
◾️アウトプットイメージ
タッチポイントより、クライアントとアウトプットイメージを次の通り合意いたしました。
例)
スクリプト全量
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
# セッション作成
spark = SparkSession.builder.getOrCreate()
# 改修スクリプト
~~~
~~~
df.show(truncate=False)
◾️計測
現行スクリプト:秒
改修スクリプト:秒
◾️ToBe(スクリプト作成)
タッチポイント議事録をもとに、スクリプトを作成します。
◾️作成手順
はじめに、現行スクリプトの処理時間を確認します。PySpark で処理時間を計測するには、Python の time モジュールを使うのが簡単です。
改修前スクリプト処理時間確認
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd
# セッション作成
spark = SparkSession.builder.getOrCreate()
# 計測開始
start_time = time.time()
# 改修前スクリプト
df = (
spark.read.csv("s3://sample/df.csv")
.withColumn("y", fn.date_format(fn.col("date"), "yyyy"))
.withColumn("m", fn.date_format(fn.col("date"), "MM")) # mm → MM に修正
.withColumn("d", fn.date_format(fn.col("date"), "dd"))
.withColumn(
"pay_code_status_cls",
fn.when(fn.col("pay_code_paycnt_monthly") >= 7, fn.lit("00_heavy"))
.when(fn.col("pay_code_paycnt_monthly") >= 1, fn.lit("01_light"))
.otherwise(fn.lit("04_new")) # qrapp_useflg の条件を削除
)
)
# アクション操作
df.show(truncate=False)
# 計測終了
end_time = time.time()
# 処理時間の出力
print(f"現行スクリプト:{end_time - start_time:.2f}秒")
アウトプット
現行スクリプト:1.29秒
次に、処理時間を短縮する方法を実践します。短縮する方法はいくつかあります。
- Pysparkの設定をチューニングする方法
- withColumn()の代わりにselect()を使用する。
今回は、クライアントとPysparkの設定は変更しないことを合意したため『スキーマを事前に定義する方法』を実践します。
改修後スクリプト処理時間確認
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd
# セッション作成
spark = SparkSession.builder.getOrCreate()
# 計測開始
start_time = time.time()
# 改修後スクリプト
df = (
spark.read.csv("s3://sample/df.csv")
.select(
fn.col("user_id"),
fn.col("date"),
fn.col("pay_code_paycnt_monthly"),
fn.col("subs_cls"),
fn.col("invalid_flg"),
fn.date_format(fn.col("date"), "yyyy").alias("y"),
fn.date_format(fn.col("date"), "mm").alias("m"),
fn.date_format(fn.col("date"), "dd").alias("d"),
fn.when(fn.col("pay_code_paycnt_monthly") >= 7, fn.lit("00_heavy"))
.when(fn.col("pay_code_paycnt_monthly") >= 1, fn.lit("01_light"))
.otherwise(fn.lit("04_new")).alias("pay_code_status_cls"),
)
)
# アクション操作
df.show(truncate=False)
# 計測終了
end_time = time.time()
# 処理時間の出力
print(f"現行スクリプト:{end_time - start_time:.2f}秒")
アウトプット
改修スクリプト:0.57秒
# 改修前と比較して55%短縮
オーバーヘッド処理
項目追加の方法は、withColumnと()とselect()の2通りあります。
両者の比較をしましょう。
方法 | メリット | デメリット |
---|---|---|
withColumn | 直感的で分かりやすい。カラムを1つずつ追加できる | 何度も呼び出すと計算ツリーが複雑になり、最適化が難しくなる |
select | 一度に複数のカラムを処理でき、計算ツリーが単純になるため効率的 | カラム名をすべて指定しなければならない |
これはCPUとGPUの処理スタイルの比較と似ています。
比較項目 | withColumn | select | CPU | GPU |
---|---|---|---|---|
処理スタイル | 逐次処理(カラムごとに個別処理) | 並列処理(一括で処理) | 逐次処理 | 並列処理 |
中間計算の発生 | あり(カラム追加ごとに新しい DataFrame が生成) | なし(1回の処理で全カラムを追加) | 計算ごとに中間データが増える | 計算を一括実行し、不要なオーバーヘッドを削減 |
最適化のしやすさ | Catalyst Optimizer の恩恵を受けにくい | Catalyst Optimizer による最適化が効きやすい | 命令の依存関係が多く、最適化が難しい | 同時処理が可能で、パフォーマンスが向上 |
パフォーマンス | 低い(複数回の処理が必要) | 高い(一括で処理) | 高クロックだが並列性が低い | 一度に多くの処理を実行できる |
視覚的に処理を解説いたします。
withColumn
加工前のデータフレーム
│
├── withColumn("y") ← 処理1 dateカラムに対する変換
│ │
│ └── 加工後データフレーム ("y"追加)
│
└── withColumn("m") ← 処理2 dateカラムに対する変換
│ │
│ └── 加工後データフレーム ("y","m"追加)
│
└── withColumn("d") ← 処理3 dateカラムに対する変換
│ │
│ └── 加工後データフレーム ("y","m","d"追加)
│
└── withColumn("pay_code_status_cls") ← 処理4 pay_code_paycnt_monthlyカラムに対する変換
│
└── 加工後データフレーム ("y","m","d","pay_code_status_cls"追加)
select
加工前のデータフレーム
│
└── select("*", "y","m","d","pay_code_status_cls") ← "date","pay_code_paycnt_monthly" を同時に変換
│
└── 加工後データフレーム ("y","m","d","pay_code_status_cls"追加)
前者と後者のオーバーヘッド数を比較すると、どちらのアルゴリズムが優れているか一目瞭然ですね。
上記の結果から、改修後のスクリプトの処理時間が意図した通り短縮できていることが確認できました。
最終的なスクリプトを紹介します。
改修後スクリプト
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd
# セッション作成
spark = SparkSession.builder.getOrCreate()
# 改修後スクリプト
df = (
spark.read.csv("s3://sample/df.csv")
.select(
fn.col("user_id"),
fn.col("date"),
fn.col("pay_code_paycnt_monthly"),
fn.col("subs_cls"),
fn.col("invalid_flg"),
fn.date_format(fn.col("date"), "yyyy").alias("y"),
fn.date_format(fn.col("date"), "mm").alias("m"),
fn.date_format(fn.col("date"), "dd").alias("d"),
fn.when(fn.col("pay_code_paycnt_monthly") >= 7, fn.lit("00_heavy"))
.when(fn.col("pay_code_paycnt_monthly") >= 1, fn.lit("01_light"))
.otherwise(fn.lit("04_new")).alias("pay_code_status_cls"),
)
)
# アクション操作
df.show(truncate=False)
『結論』
◾️withColumnとselectの使い分けるメリット(ビジネスサイド)
- 『データ品質の確保』
- withColumn を使えば、データを段階的に変換・検証できるため、途中のデータチェックがしやすい。select で最終的な形にすることで、データの整合性を保ちつつ提供できる。
- 『可読性とメンテナンス性』
- withColumn は既存のデータに対してカラムを追加・更新するため、データ変換の流れが明確で後から修正しやすい。select は最終的に必要なカラムのみを取得するため、不要なデータを省略できる。
◾️withColumnとselectの使い分けるメリット(エンジニアサイド)
- 『処理の最適化とパフォーマンス』
- withColumn はカラムを1つずつ処理するため、大量のカラム追加を繰り返すと非効率になりがち。select で不要なカラムを削ると、Sparkのオプティマイザー(Catalyst)が最適な実行計画を立てやすくなる。
- 『データのトレースとデバッグ』
- withColumn を使えば、途中のデータを show() で確認しやすく、デバッグが容易。select は最終的な出力を明確にできるため、データの仕様変更に強い。
Discussion