🆗
【Python・PySparkで学ぶ!】broadcast()でデータ結合を最適化しよう!処理効率向上
はじめに、結合するデータを確認する
↓決済データ(transaction_data)
tran_id | pay_method | tran_dt | user_id | brand_id | pay_amount_raw | pay_amount_without_tax |
---|---|---|---|---|---|---|
101 | code | 2025/01/03 06:48:03 | user000007 | brand0003 | 1540 | 1400 |
102 | cache | 2025/01/04 22:27:16 | user000001 | brand0001 | 6050 | 5500 |
103 | code | 2025/01/08 03:22:07 | user000001 | brand0009 | 8140 | 7400 |
104 | card | 2025/01/09 09:10:45 | user000011 | brand0010 | 770 | 700 |
105 | card | 2025/01/20 20:52:03 | user000009 | brand0001 | 1760 | 1600 |
↓ブランドマスタ(brand_mst)
上記のような決済データとブランドマスタが存在すると仮定します。
改修前スクリプトを確認する
改修前スクリプト
from pyspark.sql import SparkSession
from pyspark.sql import functions as fn
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import time
# セッション作成
spark = SparkSession.builder.getOrCreate()
# スキーマ定義
tran_schema = StructType([
StructField("tran_id", StringType(), False),
StructField("pay_method", StringType(), False),
StructField("tran_dt", StringType(), False),
StructField("user_id", StringType(), False),
StructField("brand_id", StringType(), False),
StructField("pay_amount_raw", IntegerType(), False),
StructField("pay_amount_without_tax", IntegerType(), False),
])
# スキーマ定義
brand_schema = StructType([
StructField("brand_id", StringType(), False),
StructField("brand_name", StringType(), False)
])
# データ格納先
tran_path = "/content/sample_data/sample_transaction_table.csv"
brand_path = "/content/sample_data/sample_brand_list.csv"
# データフレーム作成
tran_sdf = spark.read.csv(path=tran_path, schema=tran_schema ,header=True)
brand_sdf = spark.read.csv(path=brand_path, schema=brand_schema ,header=True)
# データ結合
union_sdf = tran_sdf.join(brand_sdf,on="brand_id",how="left")
union_sdf.show(4, truncate=False)
brand_id | brand_name | tran_id | pay_method | tran_dt | user_id | pay_amount_raw | pay_amount_without_tax |
---|---|---|---|---|---|---|---|
brand0003 | 高橋カフェ | 101 | code | 2025/01/03 06:48:03 | user000007 | 1540 | 1400 |
brand0001 | 佐藤レストラン | 102 | cache | 2025/01/04 22:27:16 | user000001 | 6050 | 5500 |
brand0009 | 小林ショップ | 103 | code | 2025/01/08 03:22:07 | user000001 | 8140 | 7400 |
brand0010 | 加藤雑貨店 | 104 | card | 2025/01/09 09:10:45 | user000011 | 770 | 700 |
broadcast()を使用したスクリプトと実行時間を比較する。
実行時間の比較
# broadcast()なしのデータ結合
start_time = time.time()
tran_sdf.join(brand_sdf,on="brand_id",how="left").explain()
union_sdf = tran_sdf.join(brand_sdf,on="brand_id",how="left")
print(f"レコード数: {union_sdf.count()})
end_time = time.time()
no_broadcast_time = end_time - start_time
print(f"Broadcastなしの実行時間: {no_broadcast_time}秒")
# broadcast()ありのデータ結合
start_time = time.time()
tran_sdf.join(fn.broadcast(brand_sdf),on="brand_id",how="left").explain(True)
union_sdf = tran_sdf.join(fn.broadcast(brand_sdf),on="brand_id",how="left")
print(f"レコード数: {union_sdf.count()})
end_time = time.time()
with_broadcast_time = end_time - start_time
print(f"Broadcastありの実行時間: {with_broadcast_time}秒")
print(f"実行時間差: {no_broadcast_time - with_broadcast_time}秒")
レコード数: 10860
Broadcastなしの実行時間: 0.8570866584777832秒
レコード数: 10860
Broadcastありの実行時間: 0.7788143157958984秒
実行時間差: 0.07827234268188477秒
上記の結果から、broadcast()を使用した方が実行時間が短いことがわかりました。
Shuffle JoinとBroadcast Joinの比較する
それぞれの特性を一覧で確認する
項目 | Shuffle Join (Sort-Merge Join) | Broadcast Join |
---|---|---|
シャッフル発生 | あり(ネットワーク負荷大) | なし(各ノードがローカル処理) |
ソート処理 | 必要(CPU 負荷大) | 不要 |
適用できるデータサイズ | 大規模データ同士 | 小さいテーブル(数MB以下) × 大きいテーブル |
処理速度 | 遅い(シャッフルがボトルネック) | 速い(ノード間通信なし) |
リソース消費 | 少なめ(シャッフルで分配) | 多め(各ノードにコピー) |
適用方法 | 自動(autoBroadcastJoinThreshold)または broadcast(df) | 自動(デフォルト JOIN) |
押さえておくべきポイント
- シャッフルの有無:
- Shuffle Join ではネットワークを通じてデータをシャッフルするため、ネットワーク負荷が大きくなります。一方、Broadcast Join ではデータのシャッフルは発生せず、ノード間の通信がないため、負荷が軽減されます。
- ソート処理の必要性:
- Shuffle Join はデータのソートが必要で、CPUの負荷が大きくなります。これは、データを特定のキーで整列するために多くのリソースを消費するからです。Broadcast Join ではソート処理が不要です。
- データサイズの適用範囲:
- Shuffle Join は、大規模データ同士に適用されます。データサイズが非常に大きい場合、シャッフルジョインが適切です。Broadcast Join は、小さいテーブル(数MB以下)と大きいテーブルを結合する場合に適用されます。
- 処理速度:
- Shuffle Join はシャッフルがボトルネックとなるため、遅くなることが多いです。対して、Broadcast Join はノード間のデータ転送がないため、非常に速く処理が完了します。
- リソース消費:
- Shuffle Join は、データのシャッフルを行うため、リソースの消費が少なめですが、その分ネットワークやディスクのI/Oを多く消費します。Broadcast Join は、各ノードにデータをコピーするため、多くのメモリを消費しますが、ネットワークやI/O負荷は軽減されます。
結論
これらの特徴から、データサイズが小さい場合や、パフォーマンス重視の場面ではBroadcast Joinが有利であり、大規模なデータセットの結合にはShuffle Joinが適切だということがわかります。
それぞれの処理手順を確認する。
Shuffle Join
【元データ】(JOIN前)
--------------------------------------------------
ノード1 ノード2 ノード3 ノード4
+---------+ +---------+ +---------+ +---------+
| LargeDF | | LargeDF | | LargeDF | | LargeDF |
| (GB) | | (GB) | | (GB) | | (GB) |
+---------+ +---------+ +---------+ +---------+
↓ **小さいテーブル(数MB)を全ノードにコピー!**
--------------------------------------------------
ノード1 ノード2 ノード3 ノード4
+---------+ +---------+ +---------+ +---------+
| LargeDF | | LargeDF | | LargeDF | | LargeDF |
| (GB) | | (GB) | | (GB) | | (GB) |
| SmallDF | | SmallDF | | SmallDF | | SmallDF |
| (MB) | | (MB) | | (MB) | | (MB) |
+---------+ +---------+ +---------+ +---------+
↓ **各ノードで直接 JOIN(シャッフルなし!)**
--------------------------------------------------
ノード1 ノード2 ノード3 ノード4
+---------------------+ +---------------------+
| LargeDF ⨝ SmallDF | | LargeDF ⨝ SmallDF |
+---------------------+ +---------------------+
+---------------------+ +---------------------+
| LargeDF ⨝ SmallDF | | LargeDF ⨝ SmallDF |
+---------------------+ +---------------------+
**各ノードが個別に処理 → ネットワーク負荷なし!**
Broadcast Join
【元データ】(JOIN前)
--------------------------------------------------
ノード1 ノード2 ノード3 ノード4
+---------+ +---------+ +---------+ +---------+
| LargeDF | | LargeDF | | LargeDF | | LargeDF |
| (GB) | | (GB) | | (GB) | | (GB) |
+---------+ +---------+ +---------+ +---------+
↓ **キーごとにデータをシャッフル(ノード間で通信発生!)**
--------------------------------------------------
ノード1 ノード2 ノード3 ノード4
+---------+ +---------+ +---------+ +---------+
| Key=1 | | Key=2 | | Key=3 | | Key=4 |
| から収集 | | から収集 | | から収集 | | から収集 |
+---------+ +---------+ +---------+ +---------+
↓ **各ノードでソート&結合(Sort-Merge Join)**
--------------------------------------------------
ノード1 ノード2
+----------------------+ +----------------------+
| Key=1 | | Key=2 |
| LargeDF ⨝ SmallDF | | LargeDF ⨝ SmallDF |
+----------------------+ +----------------------+
ノード3 ノード4
+----------------------+ +----------------------+
| Key=3 | | Key=4 |
| LargeDF ⨝ SmallDF | | LargeDF ⨝ SmallDF |
+----------------------+ +----------------------+
**ノード間の通信が発生 → ネットワーク負荷大 → 遅い**
Discussion