🆗

【Python・PySparkで学ぶ!】broadcast()でデータ結合を最適化しよう!処理効率向上

2025/03/09に公開

はじめに、結合するデータを確認する

決済データ(transaction_data)

tran_id pay_method tran_dt user_id brand_id pay_amount_raw pay_amount_without_tax
101 code 2025/01/03 06:48:03 user000007 brand0003 1540 1400
102 cache 2025/01/04 22:27:16 user000001 brand0001 6050 5500
103 code 2025/01/08 03:22:07 user000001 brand0009 8140 7400
104 card 2025/01/09 09:10:45 user000011 brand0010 770 700
105 card 2025/01/20 20:52:03 user000009 brand0001 1760 1600

ブランドマスタ(brand_mst)

上記のような決済データブランドマスタが存在すると仮定します。

改修前スクリプトを確認する

改修前スクリプト
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import time

# セッション作成
spark = SparkSession.builder.getOrCreate()

# スキーマ定義
tran_schema = StructType([
    StructField("tran_id", StringType(), False),
    StructField("pay_method", StringType(), False),
    StructField("tran_dt", StringType(), False),
    StructField("user_id", StringType(), False),
    StructField("brand_id", StringType(), False),
    StructField("pay_amount_raw", IntegerType(), False),
    StructField("pay_amount_without_tax", IntegerType(), False),
])
						
# スキーマ定義
brand_schema = StructType([
    StructField("brand_id", StringType(), False),
    StructField("brand_name", StringType(), False)
])

# データ格納先
tran_path = "/content/sample_data/sample_transaction_table.csv"
brand_path = "/content/sample_data/sample_brand_list.csv"

# データフレーム作成
tran_sdf = spark.read.csv(path=tran_path, schema=tran_schema ,header=True)
brand_sdf = spark.read.csv(path=brand_path, schema=brand_schema ,header=True)

# データ結合
union_sdf = tran_sdf.join(brand_sdf,on="brand_id",how="left")
union_sdf.show(4, truncate=False)
brand_id brand_name tran_id pay_method tran_dt user_id pay_amount_raw pay_amount_without_tax
brand0003 高橋カフェ 101 code 2025/01/03 06:48:03 user000007 1540 1400
brand0001 佐藤レストラン 102 cache 2025/01/04 22:27:16 user000001 6050 5500
brand0009 小林ショップ 103 code 2025/01/08 03:22:07 user000001 8140 7400
brand0010 加藤雑貨店 104 card 2025/01/09 09:10:45 user000011 770 700

broadcast()を使用したスクリプトと実行時間を比較する。

実行時間の比較
# broadcast()なしのデータ結合
start_time = time.time()
tran_sdf.join(brand_sdf,on="brand_id",how="left").explain()
union_sdf = tran_sdf.join(brand_sdf,on="brand_id",how="left")
print(f"レコード数: {union_sdf.count()})
end_time = time.time()
no_broadcast_time = end_time - start_time
print(f"Broadcastなしの実行時間: {no_broadcast_time}秒")

# broadcast()ありのデータ結合
start_time = time.time()
tran_sdf.join(fn.broadcast(brand_sdf),on="brand_id",how="left").explain(True)
union_sdf = tran_sdf.join(fn.broadcast(brand_sdf),on="brand_id",how="left")
print(f"レコード数: {union_sdf.count()})
end_time = time.time()
with_broadcast_time = end_time - start_time
print(f"Broadcastありの実行時間: {with_broadcast_time}秒")

print(f"実行時間差: {no_broadcast_time - with_broadcast_time}秒")
レコード数: 10860
Broadcastなしの実行時間: 0.8570866584777832秒
レコード数: 10860
Broadcastありの実行時間: 0.7788143157958984秒
実行時間差: 0.07827234268188477秒

上記の結果から、broadcast()を使用した方が実行時間が短いことがわかりました。

Shuffle JoinとBroadcast Joinの比較する

それぞれの特性を一覧で確認する

項目 Shuffle Join (Sort-Merge Join) Broadcast Join
シャッフル発生 あり(ネットワーク負荷大) なし(各ノードがローカル処理)
ソート処理 必要(CPU 負荷大) 不要
適用できるデータサイズ 大規模データ同士 小さいテーブル(数MB以下) × 大きいテーブル
処理速度 遅い(シャッフルがボトルネック) 速い(ノード間通信なし)
リソース消費 少なめ(シャッフルで分配) 多め(各ノードにコピー)
適用方法 自動(autoBroadcastJoinThreshold)または broadcast(df) 自動(デフォルト JOIN)

押さえておくべきポイント

  1. シャッフルの有無:
    • Shuffle Join ではネットワークを通じてデータをシャッフルするため、ネットワーク負荷が大きくなります。一方、Broadcast Join ではデータのシャッフルは発生せず、ノード間の通信がないため、負荷が軽減されます。
  2. ソート処理の必要性:
    • Shuffle Join はデータのソートが必要で、CPUの負荷が大きくなります。これは、データを特定のキーで整列するために多くのリソースを消費するからです。Broadcast Join ではソート処理が不要です。
  3. データサイズの適用範囲:
    • Shuffle Join は、大規模データ同士に適用されます。データサイズが非常に大きい場合、シャッフルジョインが適切です。Broadcast Join は、小さいテーブル(数MB以下)と大きいテーブルを結合する場合に適用されます。
  4. 処理速度:
    • Shuffle Join はシャッフルがボトルネックとなるため、遅くなることが多いです。対して、Broadcast Join はノード間のデータ転送がないため、非常に速く処理が完了します。
  5. リソース消費:
    • Shuffle Join は、データのシャッフルを行うため、リソースの消費が少なめですが、その分ネットワークやディスクのI/Oを多く消費します。Broadcast Join は、各ノードにデータをコピーするため、多くのメモリを消費しますが、ネットワークやI/O負荷は軽減されます。

結論

これらの特徴から、データサイズが小さい場合や、パフォーマンス重視の場面ではBroadcast Joinが有利であり、大規模なデータセットの結合にはShuffle Joinが適切だということがわかります。

それぞれの処理手順を確認する。

Shuffle Join
【元データ】(JOIN前)
--------------------------------------------------
 ノード1        ノード2        ノード3        ノード4
+---------+    +---------+    +---------+    +---------+
| LargeDF |    | LargeDF |    | LargeDF |    | LargeDF |
|  (GB)   |    |  (GB)   |    |  (GB)   |    |  (GB)   |
+---------+    +---------+    +---------+    +---------+

  ↓ **小さいテーブル(数MB)を全ノードにコピー!**

--------------------------------------------------
 ノード1        ノード2        ノード3        ノード4
+---------+    +---------+    +---------+    +---------+
| LargeDF |    | LargeDF |    | LargeDF |    | LargeDF |
|  (GB)   |    |  (GB)   |    |  (GB)   |    |  (GB)   |
| SmallDF |    | SmallDF |    | SmallDF |    | SmallDF |
|  (MB)   |    |  (MB)   |    |  (MB)   |    |  (MB)   |
+---------+    +---------+    +---------+    +---------+

  ↓ **各ノードで直接 JOIN(シャッフルなし!)**

--------------------------------------------------
 ノード1        ノード2        ノード3        ノード4
+---------------------+    +---------------------+
| LargeDF ⨝ SmallDF  |    | LargeDF ⨝ SmallDF  |
+---------------------+    +---------------------+
+---------------------+    +---------------------+
| LargeDF ⨝ SmallDF  |    | LargeDF ⨝ SmallDF  |
+---------------------+    +---------------------+

**各ノードが個別に処理 → ネットワーク負荷なし!**

Broadcast Join
【元データ】(JOIN前)
--------------------------------------------------
 ノード1        ノード2        ノード3        ノード4
+---------+    +---------+    +---------+    +---------+
| LargeDF |    | LargeDF |    | LargeDF |    | LargeDF |
|  (GB)   |    |  (GB)   |    |  (GB)   |    |  (GB)   |
+---------+    +---------+    +---------+    +---------+

  ↓ **キーごとにデータをシャッフル(ノード間で通信発生!)**

--------------------------------------------------
 ノード1        ノード2        ノード3        ノード4
+---------+    +---------+    +---------+    +---------+
| Key=1   |    | Key=2   |    | Key=3   |    | Key=4   |
| から収集 |    | から収集 |    | から収集 |    | から収集 |
+---------+    +---------+    +---------+    +---------+

  ↓ **各ノードでソート&結合(Sort-Merge Join)**

--------------------------------------------------
 ノード1                      ノード2
+----------------------+    +----------------------+
| Key=1                |    | Key=2                |
| LargeDF ⨝ SmallDF   |    | LargeDF ⨝ SmallDF   |
+----------------------+    +----------------------+
ノード3                       ノード4
+----------------------+    +----------------------+
| Key=3                |    | Key=4                |
| LargeDF ⨝ SmallDF   |    | LargeDF ⨝ SmallDF   |
+----------------------+    +----------------------+

**ノード間の通信が発生 → ネットワーク負荷大 → 遅い**

Discussion