SparkからIcebergへのMERGE INTO最適化 - ON句でシャッフルを減らす方法
はじめに
Apache Spark(以下Spark)からApache Iceberg(以下Iceberg)に対して、MERGE INTOを実行するユースケースは、差分更新やCDC(Change Data Capture)の取り込み、冪等性の担保においてよく使われます。実際にupsertを実現したいという要件から、MERGE INTOが利用できるIcebergを選択するケースもあります。
一方で、MERGE INTOは単純なINSERTやUPDATEと比較して、処理が複雑です。内部的にはsourceテーブルとtargetテーブルの突合が必要となり、Spark内部でシャッフルが発生することが多いです。このシャッフルが大規模になると、処理時間が増加し、性能問題につながる可能性があります。
そこで、本記事ではSparkのシャッフルに着目し、SparkからMERGE INTOを実行するときの最適化方法を2回に分けて整理します。
1本目では、MERGE INTOのON句に条件を追加することで、シャッフルに流れ込むデータ量を削減する方法を扱います。
2本目では、SPJ(Storage Partitioned Join)を取り上げ、シャッフルを発生させない方法を扱います。
本記事を書くきっかけの一つは、AWS Big Data Blog の How Ancestry optimizes a 100 billion row Iceberg tableです。是非ご確認ください。
本記事の対象者
- SparkからIcebergに
MERGE INTOを実行している方 - Sparkの性能改善を実施している方
1. MERGE INTO の概要
IcebergにおけるMERGE INTOは、単純にデータを書き込む処理ではありません。内部的にはsourceテーブルとtargetテーブルをON句の条件で突合し、その結果をもとに更新や挿入等を実施します。そのためMERGE INTOはJOINを伴う更新処理と捉えることができます。

MERGE INTO の概要図
例えば、以下のようなMERGE INTOを考えます。
MERGE INTO target t
USING source s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...
このときSparkはsourceとtargetをON句の条件で突合します。一致したデータはUPDATE、sourceのみに存在するデータはINSERTとなるため、内部的にはsource側を残す形の結合、すなわち右外部結合に近い形で実行されます。そして、このJOINを実行する際に、Spark内部でシャッフルが発生します。
MERGE INTOではsourceテーブルのデータ量は更新対象としてあらかじめ決まっています。そのため、シャッフルの規模を抑制するためには、いかにtargetテーブル側の不要なデータをシャッフルに流し込まないかが重要です。今回の例でいうと、sourceテーブル側に存在しないorder_idを持つtargetテーブルのレコードは不要です。しかし、targetテーブルのデータをフルスキャンしてしまうと、不要なデータまでJOINに流れ込み、結果としてシャッフルが大規模になります。
そのため、MERGE INTOの最適化を考えるうえで、JOINの前段でどれだけtarget側のデータ量を減らせるかが重要になります。そのための実践的な方法が、ON句に条件を追加して、target側の読み取り対象を絞ることです。
2. 環境準備
- クライアント:AWS Glue 5.0(Spark 3.5.2)
- データカタログ:AWS Glue Data Catalog
- ストレージ:Amazon S3
事前にIcebergテーブルを作成しデータを投入します。Sparkに設定するパラメータについては、以下の記事を参考にしてください。 検証実施のため以下の方針でテーブルとデータを用意します。
- targetとsourceのスキーマとパーティション列は同じにする
- targetは1億件、sourceは更新1000件、追加1000件となるようデータを投入する
- targetは
channelカラムとして、WEB/APP/STOREの3種を持つ一方、sourceはWEB/STOREの2種のみとする
targetテーブルの作成
CREATE TABLE mycatalog.db.target (
order_id BIGINT,
customer_id BIGINT,
order_ts TIMESTAMP,
channel STRING,
product_type STRING,
amount DECIMAL(12,2),
status STRING
)
USING iceberg
PARTITIONED BY (bucket(10, order_id), channel)
TBLPROPERTIES (
'format-version' = '2',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read'
);
sourceテーブルの作成
CREATE TABLE mycatalog.db.source (
order_id BIGINT,
customer_id BIGINT,
order_ts TIMESTAMP,
channel STRING,
product_type STRING,
amount DECIMAL(12,2),
status STRING
)
USING iceberg
PARTITIONED BY (bucket(10, order_id), channel)
TBLPROPERTIES (
'format-version' = '2'
);
targetテーブルへのデータ投入
from pyspark.sql import functions as F
target_df = (
spark.range(1, 100000001, 1, 8)
.withColumnRenamed("id", "order_id")
.withColumn("customer_id", (F.col("order_id") % 5000) + F.lit(100000))
.withColumn(
"order_ts",
F.expr("""
CASE
WHEN order_id <= 10000 THEN timestamp'2025-01-01 10:00:00'
WHEN order_id <= 20000 THEN timestamp'2025-01-02 10:00:00'
ELSE timestamp'2025-01-03 10:00:00'
END
""")
)
.withColumn(
"channel",
F.expr("""
CASE
WHEN pmod(order_id, 3) = 0 THEN 'WEB'
WHEN pmod(order_id, 3) = 1 THEN 'APP'
ELSE 'STORE'
END
""")
)
.withColumn(
"product_type",
F.expr("""
CASE
WHEN pmod(order_id, 4) = 0 THEN 'BOOK'
WHEN pmod(order_id, 4) = 1 THEN 'FOOD'
WHEN pmod(order_id, 4) = 2 THEN 'TOY'
ELSE 'OTHER'
END
""")
)
.withColumn("amount", (F.col("order_id") * F.lit(1.15)).cast("decimal(12,2)"))
.withColumn("status", F.lit("ACTIVE"))
)
target_df.writeTo("mycatalog.db.target").append()
sourceテーブルへのデータ投入
updates_df = (
spark.range(99999001, 100000001)
.withColumnRenamed("id", "order_id")
.withColumn("customer_id", (F.col("order_id") % 5000) + F.lit(100000))
.withColumn("order_ts", F.expr("timestamp'2025-01-03 12:00:00'"))
.withColumn(
"channel",
F.expr("""
CASE
WHEN pmod(order_id, 2) = 0 THEN 'WEB'
ELSE 'STORE'
END
""")
)
.withColumn("product_type", F.lit("UPDATED_ITEM"))
.withColumn("amount", (F.col("order_id") * F.lit(2.00)).cast("decimal(12,2)"))
.withColumn("status", F.lit("UPDATED"))
)
inserts_df = (
spark.range(100000001, 100001001)
.withColumnRenamed("id", "order_id")
.withColumn("customer_id", (F.col("order_id") % 5000) + F.lit(100000))
.withColumn("order_ts", F.expr("timestamp'2025-01-04 09:00:00'"))
.withColumn(
"channel",
F.expr("""
CASE
WHEN pmod(order_id, 2) = 0 THEN 'WEB'
ELSE 'STORE'
END
""")
)
.withColumn("product_type", F.lit("NEW_ITEM"))
.withColumn("amount", (F.col("order_id") * F.lit(3.00)).cast("decimal(12,2)"))
.withColumn("status", F.lit("NEW"))
)
source_df = updates_df.unionByName(inserts_df)
source_df.writeTo("mycatalog.db.source").append()
3. 検証
MERGE INTOのON句に条件を追加し、target側のscan量やシャッフルへ流入するデータ量がどのように変化するかを確認します。
検証するために、ON句の条件を変えて、以下の3パターンで検証します。
-
order_idのみで結合 -
order_idで結合し範囲条件を追加 -
order_idとchannelで結合し、範囲条件とchannel条件を追加
まずは最もシンプルなorder_idのみで結合するパターンを検証します。その後ON句に条件を追加することで、target側のscan量とシャッフルに流れ込むデータ量がどのように変化するかを見ていきます。あわせて、パーティション列によって、条件の効き方がどのように異なるかも確認します。
3.1 order_idのみで結合
まずは、最もシンプルなorder_idのみを結合条件にしたパターンです。
MERGE INTO mycatalog.db.target AS tgt
USING mycatalog.db.source AS src
ON tgt.order_id = src.order_id
WHEN MATCHED THEN UPDATE SET
tgt.customer_id = src.customer_id,
tgt.order_ts = src.order_ts,
tgt.channel = src.channel,
tgt.product_type = src.product_type,
tgt.amount = src.amount,
tgt.status = src.status
WHEN NOT MATCHED THEN INSERT (
order_id, customer_id, order_ts, channel, product_type, amount, status
) VALUES (
src.order_id, src.customer_id, src.order_ts, src.channel, src.product_type, src.amount, src.status
)
Spark UIを確認すると、targetテーブルはフルスキャンとなり、1億件すべてのデータが読み込まれていることが確認できます。さらに、その読み込まれた1億件のデータが後続のJOIN処理に流れ込み、そのままシャッフル対象になっています。
つまり、source側の更新対象は少量にもかかわらず、target側をフルスキャンしているため、不要なデータまでシャッフルに流入しています。性能面では不利になりやすい実行計画です。

Spark UIの実行計画
実行計画全体
== Physical Plan ==
WriteDelta
+- AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- Exchange hashpartitioning(_spec_id#38, _partition#39, staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketLong, IntegerType, invoke, 10, order_id#29L, IntegerType, LongType, false, true, true), channel#32, 4), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=146]
+- MergeRowsExec[__row_operation#28, order_id#29L, customer_id#30L, order_ts#31, channel#32, product_type#33, amount#34, status#35, _file#36, _pos#37L, _spec_id#38, _partition#39]
+- *(3) ShuffledHashJoin [order_id#0L], [order_id#7L], RightOuter, BuildRight
:- ShuffleQueryStage 0
: +- Exchange hashpartitioning(order_id#0L, 4), ENSURE_REQUIREMENTS, [plan_id=69]
: +- *(1) Filter isnotnull(order_id#0L)
: +- *(1) Project [order_id#0L, _file#16, _pos#17L, _spec_id#14, _partition#15, true AS __row_from_target#24, monotonically_increasing_id() AS __row_id#25L]
: +- *(1) ColumnarToRow
: +- BatchScan mycatalog.db.target[order_id#0L, _file#16, _pos#17L, _spec_id#14, _partition#15] mycatalog.db.target (branch=null) [filters=, groupedBy=, pushedLimit=None] RuntimeFilters: []
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(order_id#7L, 4), ENSURE_REQUIREMENTS, [plan_id=85]
+- *(2) ColumnarToRow
+- BatchScan mycatalog.db.source[order_id#7L, customer_id#8L, order_ts#9, channel#10, product_type#11, amount#12, status#13] mycatalog.db.source (branch=null) [filters=, groupedBy=, pushedLimit=None] RuntimeFilters: []
3.2 order_idで結合し、範囲条件を追加
次に、ON句にorder_idの結合条件に加えて、order_idの範囲条件を追加します。具体的にはMERGE INTOの実行前にsourceテーブルからorder_idの最小値と最大値を取得し、その範囲をBETWEENでON句に含めます。
stats = (
spark.read.table("mycatalog.db.source")
.agg(
F.min("order_id").alias("min_value"),
F.max("order_id").alias("max_value")
)
)
stat_row = stats.first()
min_id = stat_row["min_value"]
max_id = stat_row["max_value"]
spark.sql(f"""
MERGE INTO mycatalog.db.target AS tgt
USING mycatalog.db.source AS src
ON (tgt.order_id BETWEEN '{min_id}' AND '{max_id}')
AND (tgt.order_id = src.order_id)
WHEN MATCHED THEN UPDATE SET
tgt.customer_id = src.customer_id,
tgt.order_ts = src.order_ts,
tgt.channel = src.channel,
tgt.product_type = src.product_type,
tgt.amount = src.amount,
tgt.status = src.status
WHEN NOT MATCHED THEN INSERT (
order_id, customer_id, order_ts, channel, product_type, amount, status
) VALUES (
src.order_id, src.customer_id, src.order_ts, src.channel, src.product_type, src.amount, src.status
)
""")
Spark UIを確認すると、target側のscanに対して、order_idの範囲条件がpushdownされており、取得件数が約4000万件まで削減されていることが分かります。さらに、その後のFilterでorder_id >= ... , order_id <= ...条件が適用され、最終的にシャッフルへ流入するtarget側のデータ量は2000件まで減少しています。
つまり、このパターンではON句に範囲条件を含めたことで、target側のscan段階から条件を適用できています。ただし、今回のテーブルはorder_idをbucket(10, order_id)でパーティション化しているため、範囲条件を与えてもscanだけでは完全に絞り込めません。そのため、後段のFilterで最終的にデータを絞り込んでいます。
その結果、sourceに関係する範囲だけをtarget側から読み出し、シャッフルに流し込むデータ量も大きく削減できています。
Spark UIの実行計画
実行計画全体
== Physical Plan ==
WriteDelta
+- AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- Exchange hashpartitioning(_spec_id#74, _partition#75, staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketLong, IntegerType, invoke, 10, order_id#65L, IntegerType, LongType, false, true, true), channel#68, 4), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=161]
+- MergeRowsExec[__row_operation#64, order_id#65L, customer_id#66L, order_ts#67, channel#68, product_type#69, amount#70, status#71, _file#72, _pos#73L, _spec_id#74, _partition#75]
+- *(5) SortMergeJoin [order_id#36L], [order_id#43L], RightOuter
:- *(3) Sort [order_id#36L ASC NULLS FIRST], false, 0
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(order_id#36L, 4), ENSURE_REQUIREMENTS, [plan_id=70]
: +- *(1) Project [order_id#36L, _file#52, _pos#53L, _spec_id#50, _partition#51, true AS __row_from_target#60, monotonically_increasing_id() AS __row_id#61L]
: +- *(1) Filter ((order_id#36L >= 99999001) AND (order_id#36L <= 100001000))
: +- *(1) ColumnarToRow
: +- BatchScan mycatalog.db.target[order_id#36L, _file#52, _pos#53L, _spec_id#50, _partition#51] mycatalog.db.target (branch=null) [filters=order_id IS NOT NULL, order_id >= 99999001, order_id <= 100001000, groupedBy=, pushedLimit=None] RuntimeFilters: []
+- *(4) Sort [order_id#43L ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(order_id#43L, 4), ENSURE_REQUIREMENTS, [plan_id=86]
+- *(2) ColumnarToRow
+- BatchScan mycatalog.db.source[order_id#43L, customer_id#44L, order_ts#45, channel#46, product_type#47, amount#48, status#49] mycatalog.db.source (branch=null) [filters=, groupedBy=, pushedLimit=None] RuntimeFilters: []
3.3 order_idとchannelで結合し、条件を追加
最後に、結合キーをorder_idとchannelの両方に変更したうえで、さらにorder_idの範囲条件とchannelの条件を追加します。具体的には、前節と同様にorder_idへ範囲条件を追加します。さらに、source側に存在するchannelがWEBとSTOREのみであることを利用して、tgt.channelに対する条件もON句へ明示します。
stats = (
spark.read.table("mycatalog.db.source")
.agg(
F.min("order_id").alias("min_value"),
F.max("order_id").alias("max_value")
)
)
stat_row = stats.first()
min_id = stat_row["min_value"]
max_id = stat_row["max_value"]
spark.sql(f"""
MERGE INTO mycatalog.db.target AS tgt
USING mycatalog.db.source AS src
ON (tgt.order_id BETWEEN '{min_id}' AND '{max_id}')
AND (tgt.channel = 'WEB' OR tgt.channel = 'STORE')
AND (tgt.order_id = src.order_id)
AND (tgt.channel = src.channel)
WHEN MATCHED THEN UPDATE SET
tgt.customer_id = src.customer_id,
tgt.order_ts = src.order_ts,
tgt.channel = src.channel,
tgt.product_type = src.product_type,
tgt.amount = src.amount,
tgt.status = src.status
WHEN NOT MATCHED THEN INSERT (
order_id, customer_id, order_ts, channel, product_type, amount, status
) VALUES (
src.order_id, src.customer_id, src.order_ts, src.channel, src.product_type, src.amount, src.status
)
""")
Spark UIを確認すると、target側のscanにてorder_idの範囲条件に加えて、channelの条件もpushdownされていることが分かります。
また、今回のテーブルでは、channelはそのままのパーティション列(identity)です。そのため、tgt.channel = 'WEB' OR tgt.channel = 'STORE' のようにtarget側の条件を明示すると、scan時にその条件で完全に絞り込むことが出来ます。したがって、order_idのようにFilterによる追加の絞り込みは発生していません。

Spark UIの実行計画
実行計画全体
== Physical Plan ==
WriteDelta
+- AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
AQEShuffleRead coalesced
+- ShuffleQueryStage 2
+- Exchange hashpartitioning(_spec_id#74, _partition#75, staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketLong, IntegerType, invoke, 10, order_id#65L, IntegerType, LongType, false, true, true), channel#68, 4), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=221]
+- MergeRowsExec[__row_operation#64, order_id#65L, customer_id#66L, order_ts#67, channel#68, product_type#69, amount#70, status#71, _file#72, _pos#73L, _spec_id#74, _partition#75]
+- *(5) Project [order_id#36L, _file#52, _pos#53L, _spec_id#50, _partition#51, __row_from_target#60, __row_id#61L, order_id#43L, customer_id#44L, order_ts#45, channel#46, product_type#47, amount#48, status#49]
+- *(5) SortMergeJoin [order_id#36L, channel#39], [order_id#43L, channel#46], RightOuter
:- *(3) Sort [order_id#36L ASC NULLS FIRST, channel#39 ASC NULLS FIRST], false, 0
: +- AQEShuffleRead coalesced
: +- ShuffleQueryStage 0
: +- Exchange hashpartitioning(order_id#36L, channel#39, 4), ENSURE_REQUIREMENTS, [plan_id=89]
: +- *(1) Filter isnotnull(channel#39)
: +- *(1) Project [order_id#36L, channel#39, _file#52, _pos#53L, _spec_id#50, _partition#51, true AS __row_from_target#60, monotonically_increasing_id() AS __row_id#61L]
: +- *(1) Filter ((order_id#36L >= 99999001) AND (order_id#36L <= 100001000))
: +- *(1) ColumnarToRow
: +- BatchScan mycatalog.db.target[order_id#36L, channel#39, _file#52, _pos#53L, _spec_id#50, _partition#51] mycatalog.db.target (branch=null) [filters=order_id IS NOT NULL, order_id >= 99999001, order_id <= 100001000, (channel = 'WEB' OR channel = 'STORE'), groupedBy=, pushedLimit=None] RuntimeFilters: []
+- *(4) Sort [order_id#43L ASC NULLS FIRST, channel#46 ASC NULLS FIRST], false, 0
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(order_id#43L, channel#46, 4), ENSURE_REQUIREMENTS, [plan_id=107]
+- *(2) ColumnarToRow
+- BatchScan mycatalog.db.source[order_id#43L, customer_id#44L, order_ts#45, channel#46, product_type#47, amount#48, status#49] mycatalog.db.source (branch=null) [filters=, groupedBy=, pushedLimit=None]
4. 最後に
本記事では、SparkからIcebergに対してMERGE INTOを実行する際に、ON句の条件によってtarget側のscan量やシャッフル量がどのように変化するかを確認しました。
何も工夫せずにMERGE INTOを実行すると、targetテーブルがフルスキャンされ、不要なデータまでJOINとシャッフルに流れ込んでしまいます。特にtargetテーブルが大きい場合、この無駄なデータ流入が性能低下の大きな要因になります。今回用意したデータでは、1億件から2000件までシャッフルに流入するデータ量を削減できました。
そのため、MERGE INTOを最適化するうえでは、ON句でtarget側をどこまで事前に絞り込めるかが重要です。ON句に条件を追加することで、scan時のpushdownやFilterが働き、JOIN前の段階で不要なデータを減らせます。
次回はSPJによる最適化について説明します。
NTT DATA公式アカウントです。 技術を愛するNTT DATAの技術者が、気軽に楽しく発信していきます。 当社のサービスなどについてのお問い合わせは、 お問い合わせフォーム nttdata.com/jp/ja/contact-us/ へお願いします。