NTT DATA TECH
🍨

SparkからIcebergへのMERGE INTO最適化 - ON句でシャッフルを減らす方法

に公開

はじめに

Apache Spark(以下Spark)からApache Iceberg(以下Iceberg)に対して、MERGE INTOを実行するユースケースは、差分更新やCDC(Change Data Capture)の取り込み、冪等性の担保においてよく使われます。実際にupsertを実現したいという要件から、MERGE INTOが利用できるIcebergを選択するケースもあります。

一方で、MERGE INTOは単純なINSERTUPDATEと比較して、処理が複雑です。内部的にはsourceテーブルとtargetテーブルの突合が必要となり、Spark内部でシャッフルが発生することが多いです。このシャッフルが大規模になると、処理時間が増加し、性能問題につながる可能性があります

そこで、本記事ではSparkのシャッフルに着目し、SparkからMERGE INTOを実行するときの最適化方法を2回に分けて整理します。
1本目では、MERGE INTOON句に条件を追加することで、シャッフルに流れ込むデータ量を削減する方法を扱います。
2本目では、SPJ(Storage Partitioned Join)を取り上げ、シャッフルを発生させない方法を扱います。

本記事を書くきっかけの一つは、AWS Big Data Blog の How Ancestry optimizes a 100 billion row Iceberg tableです。是非ご確認ください。
https://aws.amazon.com/jp/blogs/big-data/how-ancestry-optimizes-a-100-billion-row-iceberg-table/

本記事の対象者

  • SparkからIcebergにMERGE INTOを実行している方
  • Sparkの性能改善を実施している方

1. MERGE INTO の概要

IcebergにおけるMERGE INTOは、単純にデータを書き込む処理ではありません。内部的にはsourceテーブルとtargetテーブルをON句の条件で突合し、その結果をもとに更新や挿入等を実施します。そのためMERGE INTOJOINを伴う更新処理と捉えることができます。


MERGE INTO の概要図

例えば、以下のようなMERGE INTOを考えます。

MERGE INTO target t
USING source s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...

このときSparkはsourceとtargetをON句の条件で突合します。一致したデータはUPDATE、sourceのみに存在するデータはINSERTとなるため、内部的にはsource側を残す形の結合、すなわち右外部結合に近い形で実行されます。そして、このJOINを実行する際に、Spark内部でシャッフルが発生します。

MERGE INTOではsourceテーブルのデータ量は更新対象としてあらかじめ決まっています。そのため、シャッフルの規模を抑制するためには、いかにtargetテーブル側の不要なデータをシャッフルに流し込まないかが重要です。今回の例でいうと、sourceテーブル側に存在しないorder_idを持つtargetテーブルのレコードは不要です。しかし、targetテーブルのデータをフルスキャンしてしまうと、不要なデータまでJOINに流れ込み、結果としてシャッフルが大規模になります。

そのため、MERGE INTOの最適化を考えるうえで、JOINの前段でどれだけtarget側のデータ量を減らせるかが重要になります。そのための実践的な方法が、ON句に条件を追加して、target側の読み取り対象を絞ることです。

2. 環境準備

  • クライアント:AWS Glue 5.0(Spark 3.5.2)
  • データカタログ:AWS Glue Data Catalog
  • ストレージ:Amazon S3

事前にIcebergテーブルを作成しデータを投入します。Sparkに設定するパラメータについては、以下の記事を参考にしてください。
https://zenn.dev/nttdata_tech/articles/e429508e0d58b5
検証実施のため以下の方針でテーブルとデータを用意します。

  • targetとsourceのスキーマとパーティション列は同じにする
  • targetは1億件、sourceは更新1000件、追加1000件となるようデータを投入する
  • targetはchannelカラムとして、WEB/APP/STOREの3種を持つ一方、sourceはWEB/STOREの2種のみとする
targetテーブルの作成
CREATE TABLE mycatalog.db.target (
    order_id BIGINT,
    customer_id BIGINT,
    order_ts TIMESTAMP,
    channel STRING,
    product_type STRING,
    amount DECIMAL(12,2),
    status STRING
)
USING iceberg
PARTITIONED BY (bucket(10, order_id), channel)
TBLPROPERTIES (
    'format-version' = '2',
    'write.delete.mode' = 'merge-on-read',
    'write.update.mode' = 'merge-on-read',
    'write.merge.mode'  = 'merge-on-read'
);
sourceテーブルの作成
CREATE TABLE mycatalog.db.source (
    order_id BIGINT,
    customer_id BIGINT,
    order_ts TIMESTAMP,
    channel STRING,
    product_type STRING,
    amount DECIMAL(12,2),
    status STRING
)
USING iceberg
PARTITIONED BY (bucket(10, order_id), channel)
TBLPROPERTIES (
    'format-version' = '2'
);
targetテーブルへのデータ投入
from pyspark.sql import functions as F
target_df = (
    spark.range(1, 100000001, 1, 8)
    .withColumnRenamed("id", "order_id")
    .withColumn("customer_id", (F.col("order_id") % 5000) + F.lit(100000))
    .withColumn(
        "order_ts",
        F.expr("""
            CASE
                WHEN order_id <= 10000 THEN timestamp'2025-01-01 10:00:00'
                WHEN order_id <= 20000 THEN timestamp'2025-01-02 10:00:00'
                ELSE timestamp'2025-01-03 10:00:00'
            END
        """)
    )
    .withColumn(
        "channel",
        F.expr("""
            CASE
                WHEN pmod(order_id, 3) = 0 THEN 'WEB'
                WHEN pmod(order_id, 3) = 1 THEN 'APP'
                ELSE 'STORE'
            END
        """)
    )
    .withColumn(
        "product_type",
        F.expr("""
            CASE
                WHEN pmod(order_id, 4) = 0 THEN 'BOOK'
                WHEN pmod(order_id, 4) = 1 THEN 'FOOD'
                WHEN pmod(order_id, 4) = 2 THEN 'TOY'
                ELSE 'OTHER'
            END
        """)
    )
    .withColumn("amount", (F.col("order_id") * F.lit(1.15)).cast("decimal(12,2)"))
    .withColumn("status", F.lit("ACTIVE"))
)

target_df.writeTo("mycatalog.db.target").append()
sourceテーブルへのデータ投入
updates_df = (
    spark.range(99999001, 100000001)
    .withColumnRenamed("id", "order_id")
    .withColumn("customer_id", (F.col("order_id") % 5000) + F.lit(100000))
    .withColumn("order_ts", F.expr("timestamp'2025-01-03 12:00:00'"))
    .withColumn(
        "channel",
        F.expr("""
            CASE
                WHEN pmod(order_id, 2) = 0 THEN 'WEB'
                ELSE 'STORE'
            END
        """)
    )
    .withColumn("product_type", F.lit("UPDATED_ITEM"))
    .withColumn("amount", (F.col("order_id") * F.lit(2.00)).cast("decimal(12,2)"))
    .withColumn("status", F.lit("UPDATED"))
)

inserts_df = (
    spark.range(100000001, 100001001)
    .withColumnRenamed("id", "order_id")
    .withColumn("customer_id", (F.col("order_id") % 5000) + F.lit(100000))
    .withColumn("order_ts", F.expr("timestamp'2025-01-04 09:00:00'"))
    .withColumn(
        "channel",
        F.expr("""
            CASE
                WHEN pmod(order_id, 2) = 0 THEN 'WEB'
                ELSE 'STORE'
            END
        """)
    )
    .withColumn("product_type", F.lit("NEW_ITEM"))
    .withColumn("amount", (F.col("order_id") * F.lit(3.00)).cast("decimal(12,2)"))
    .withColumn("status", F.lit("NEW"))
)

source_df = updates_df.unionByName(inserts_df)
source_df.writeTo("mycatalog.db.source").append()

3. 検証

MERGE INTOON句に条件を追加し、target側のscan量やシャッフルへ流入するデータ量がどのように変化するかを確認します。

検証するために、ON句の条件を変えて、以下の3パターンで検証します。

  1. order_idのみで結合
  2. order_idで結合し範囲条件を追加
  3. order_idchannelで結合し、範囲条件とchannel条件を追加

まずは最もシンプルなorder_idのみで結合するパターンを検証します。その後ON句に条件を追加することで、target側のscan量とシャッフルに流れ込むデータ量がどのように変化するかを見ていきます。あわせて、パーティション列によって、条件の効き方がどのように異なるかも確認します。

3.1 order_idのみで結合

まずは、最もシンプルなorder_idのみを結合条件にしたパターンです。

MERGE INTO mycatalog.db.target AS tgt
USING mycatalog.db.source AS src
ON tgt.order_id = src.order_id
WHEN MATCHED THEN UPDATE SET
    tgt.customer_id = src.customer_id,
    tgt.order_ts = src.order_ts,
    tgt.channel = src.channel,
    tgt.product_type = src.product_type,
    tgt.amount = src.amount,
    tgt.status = src.status
WHEN NOT MATCHED THEN INSERT (
    order_id, customer_id, order_ts, channel, product_type, amount, status
) VALUES (
    src.order_id, src.customer_id, src.order_ts, src.channel, src.product_type, src.amount, src.status
)

Spark UIを確認すると、targetテーブルはフルスキャンとなり、1億件すべてのデータが読み込まれていることが確認できます。さらに、その読み込まれた1億件のデータが後続のJOIN処理に流れ込み、そのままシャッフル対象になっています。
つまり、source側の更新対象は少量にもかかわらず、target側をフルスキャンしているため、不要なデータまでシャッフルに流入しています。性能面では不利になりやすい実行計画です。

Spark UIの実行計画

実行計画全体
== Physical Plan ==
WriteDelta
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      AQEShuffleRead coalesced
      +- ShuffleQueryStage 2
         +- Exchange hashpartitioning(_spec_id#38, _partition#39, staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketLong, IntegerType, invoke, 10, order_id#29L, IntegerType, LongType, false, true, true), channel#32, 4), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=146]
            +- MergeRowsExec[__row_operation#28, order_id#29L, customer_id#30L, order_ts#31, channel#32, product_type#33, amount#34, status#35, _file#36, _pos#37L, _spec_id#38, _partition#39]
               +- *(3) ShuffledHashJoin [order_id#0L], [order_id#7L], RightOuter, BuildRight
                  :- ShuffleQueryStage 0
                  :  +- Exchange hashpartitioning(order_id#0L, 4), ENSURE_REQUIREMENTS, [plan_id=69]
                  :     +- *(1) Filter isnotnull(order_id#0L)
                  :        +- *(1) Project [order_id#0L, _file#16, _pos#17L, _spec_id#14, _partition#15, true AS __row_from_target#24, monotonically_increasing_id() AS __row_id#25L]
                  :           +- *(1) ColumnarToRow
                  :              +- BatchScan mycatalog.db.target[order_id#0L, _file#16, _pos#17L, _spec_id#14, _partition#15] mycatalog.db.target (branch=null) [filters=, groupedBy=, pushedLimit=None] RuntimeFilters: []
                  +- ShuffleQueryStage 1
                     +- Exchange hashpartitioning(order_id#7L, 4), ENSURE_REQUIREMENTS, [plan_id=85]
                        +- *(2) ColumnarToRow
                           +- BatchScan mycatalog.db.source[order_id#7L, customer_id#8L, order_ts#9, channel#10, product_type#11, amount#12, status#13] mycatalog.db.source (branch=null) [filters=, groupedBy=, pushedLimit=None] RuntimeFilters: []

3.2 order_idで結合し、範囲条件を追加

次に、ON句にorder_idの結合条件に加えて、order_idの範囲条件を追加します。具体的にはMERGE INTOの実行前にsourceテーブルからorder_idの最小値と最大値を取得し、その範囲をBETWEENON句に含めます。

stats = (
    spark.read.table("mycatalog.db.source")
    .agg(
        F.min("order_id").alias("min_value"),
        F.max("order_id").alias("max_value")
    )
)
 
stat_row = stats.first()
min_id = stat_row["min_value"]
max_id = stat_row["max_value"]
    
spark.sql(f"""
MERGE INTO mycatalog.db.target AS tgt
USING mycatalog.db.source AS src
ON (tgt.order_id BETWEEN '{min_id}' AND '{max_id}')
    AND (tgt.order_id = src.order_id)
WHEN MATCHED THEN UPDATE SET
    tgt.customer_id = src.customer_id,
    tgt.order_ts = src.order_ts,
    tgt.channel = src.channel,
    tgt.product_type = src.product_type,
    tgt.amount = src.amount,
    tgt.status = src.status
WHEN NOT MATCHED THEN INSERT (
    order_id, customer_id, order_ts, channel, product_type, amount, status
) VALUES (
    src.order_id, src.customer_id, src.order_ts, src.channel, src.product_type, src.amount, src.status
)
""")

Spark UIを確認すると、target側のscanに対して、order_idの範囲条件がpushdownされており、取得件数が約4000万件まで削減されていることが分かります。さらに、その後のFilterでorder_id >= ... , order_id <= ...条件が適用され、最終的にシャッフルへ流入するtarget側のデータ量は2000件まで減少しています。

つまり、このパターンではON句に範囲条件を含めたことで、target側のscan段階から条件を適用できています。ただし、今回のテーブルはorder_idbucket(10, order_id)でパーティション化しているため、範囲条件を与えてもscanだけでは完全に絞り込めません。そのため、後段のFilterで最終的にデータを絞り込んでいます。

その結果、sourceに関係する範囲だけをtarget側から読み出し、シャッフルに流し込むデータ量も大きく削減できています。

Spark UIの実行計画

実行計画全体
== Physical Plan ==
WriteDelta
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      AQEShuffleRead coalesced
      +- ShuffleQueryStage 2
         +- Exchange hashpartitioning(_spec_id#74, _partition#75, staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketLong, IntegerType, invoke, 10, order_id#65L, IntegerType, LongType, false, true, true), channel#68, 4), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=161]
            +- MergeRowsExec[__row_operation#64, order_id#65L, customer_id#66L, order_ts#67, channel#68, product_type#69, amount#70, status#71, _file#72, _pos#73L, _spec_id#74, _partition#75]
               +- *(5) SortMergeJoin [order_id#36L], [order_id#43L], RightOuter
                  :- *(3) Sort [order_id#36L ASC NULLS FIRST], false, 0
                  :  +- AQEShuffleRead coalesced
                  :     +- ShuffleQueryStage 0
                  :        +- Exchange hashpartitioning(order_id#36L, 4), ENSURE_REQUIREMENTS, [plan_id=70]
                  :           +- *(1) Project [order_id#36L, _file#52, _pos#53L, _spec_id#50, _partition#51, true AS __row_from_target#60, monotonically_increasing_id() AS __row_id#61L]
                  :              +- *(1) Filter ((order_id#36L >= 99999001) AND (order_id#36L <= 100001000))
                  :                 +- *(1) ColumnarToRow
                  :                    +- BatchScan mycatalog.db.target[order_id#36L, _file#52, _pos#53L, _spec_id#50, _partition#51] mycatalog.db.target (branch=null) [filters=order_id IS NOT NULL, order_id >= 99999001, order_id <= 100001000, groupedBy=, pushedLimit=None] RuntimeFilters: []
                  +- *(4) Sort [order_id#43L ASC NULLS FIRST], false, 0
                     +- AQEShuffleRead coalesced
                        +- ShuffleQueryStage 1
                           +- Exchange hashpartitioning(order_id#43L, 4), ENSURE_REQUIREMENTS, [plan_id=86]
                              +- *(2) ColumnarToRow
                                 +- BatchScan mycatalog.db.source[order_id#43L, customer_id#44L, order_ts#45, channel#46, product_type#47, amount#48, status#49] mycatalog.db.source (branch=null) [filters=, groupedBy=, pushedLimit=None] RuntimeFilters: []

3.3 order_idとchannelで結合し、条件を追加

最後に、結合キーをorder_idchannelの両方に変更したうえで、さらにorder_idの範囲条件とchannelの条件を追加します。具体的には、前節と同様にorder_idへ範囲条件を追加します。さらに、source側に存在するchannelWEBSTOREのみであることを利用して、tgt.channelに対する条件もON句へ明示します。

stats = (
    spark.read.table("mycatalog.db.source")
    .agg(
        F.min("order_id").alias("min_value"),
        F.max("order_id").alias("max_value")
    )
)
 
stat_row = stats.first()
min_id = stat_row["min_value"]
max_id = stat_row["max_value"]

spark.sql(f"""
MERGE INTO mycatalog.db.target AS tgt
USING mycatalog.db.source AS src
ON (tgt.order_id BETWEEN '{min_id}' AND '{max_id}')
    AND (tgt.channel = 'WEB' OR tgt.channel = 'STORE')
    AND (tgt.order_id = src.order_id)
    AND (tgt.channel = src.channel)
WHEN MATCHED THEN UPDATE SET
    tgt.customer_id = src.customer_id,
    tgt.order_ts = src.order_ts,
    tgt.channel = src.channel,
    tgt.product_type = src.product_type,
    tgt.amount = src.amount,
    tgt.status = src.status
WHEN NOT MATCHED THEN INSERT (
    order_id, customer_id, order_ts, channel, product_type, amount, status
) VALUES (
    src.order_id, src.customer_id, src.order_ts, src.channel, src.product_type, src.amount, src.status
)
""")

Spark UIを確認すると、target側のscanにてorder_idの範囲条件に加えて、channelの条件もpushdownされていることが分かります。

また、今回のテーブルでは、channelはそのままのパーティション列(identity)です。そのため、tgt.channel = 'WEB' OR tgt.channel = 'STORE' のようにtarget側の条件を明示すると、scan時にその条件で完全に絞り込むことが出来ます。したがって、order_idのようにFilterによる追加の絞り込みは発生していません。

Spark UIの実行計画

実行計画全体
== Physical Plan ==
WriteDelta
+- AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      AQEShuffleRead coalesced
      +- ShuffleQueryStage 2
         +- Exchange hashpartitioning(_spec_id#74, _partition#75, staticinvoke(class org.apache.iceberg.spark.functions.BucketFunction$BucketLong, IntegerType, invoke, 10, order_id#65L, IntegerType, LongType, false, true, true), channel#68, 4), REBALANCE_PARTITIONS_BY_COL, 402653184, [plan_id=221]
            +- MergeRowsExec[__row_operation#64, order_id#65L, customer_id#66L, order_ts#67, channel#68, product_type#69, amount#70, status#71, _file#72, _pos#73L, _spec_id#74, _partition#75]
               +- *(5) Project [order_id#36L, _file#52, _pos#53L, _spec_id#50, _partition#51, __row_from_target#60, __row_id#61L, order_id#43L, customer_id#44L, order_ts#45, channel#46, product_type#47, amount#48, status#49]
                  +- *(5) SortMergeJoin [order_id#36L, channel#39], [order_id#43L, channel#46], RightOuter
                     :- *(3) Sort [order_id#36L ASC NULLS FIRST, channel#39 ASC NULLS FIRST], false, 0
                     :  +- AQEShuffleRead coalesced
                     :     +- ShuffleQueryStage 0
                     :        +- Exchange hashpartitioning(order_id#36L, channel#39, 4), ENSURE_REQUIREMENTS, [plan_id=89]
                     :           +- *(1) Filter isnotnull(channel#39)
                     :              +- *(1) Project [order_id#36L, channel#39, _file#52, _pos#53L, _spec_id#50, _partition#51, true AS __row_from_target#60, monotonically_increasing_id() AS __row_id#61L]
                     :                 +- *(1) Filter ((order_id#36L >= 99999001) AND (order_id#36L <= 100001000))
                     :                    +- *(1) ColumnarToRow
                     :                       +- BatchScan mycatalog.db.target[order_id#36L, channel#39, _file#52, _pos#53L, _spec_id#50, _partition#51] mycatalog.db.target (branch=null) [filters=order_id IS NOT NULL, order_id >= 99999001, order_id <= 100001000, (channel = 'WEB' OR channel = 'STORE'), groupedBy=, pushedLimit=None] RuntimeFilters: []
                     +- *(4) Sort [order_id#43L ASC NULLS FIRST, channel#46 ASC NULLS FIRST], false, 0
                        +- AQEShuffleRead coalesced
                           +- ShuffleQueryStage 1
                              +- Exchange hashpartitioning(order_id#43L, channel#46, 4), ENSURE_REQUIREMENTS, [plan_id=107]
                                 +- *(2) ColumnarToRow
                                    +- BatchScan mycatalog.db.source[order_id#43L, customer_id#44L, order_ts#45, channel#46, product_type#47, amount#48, status#49] mycatalog.db.source (branch=null) [filters=, groupedBy=, pushedLimit=None] 

4. 最後に

本記事では、SparkからIcebergに対してMERGE INTOを実行する際に、ON句の条件によってtarget側のscan量やシャッフル量がどのように変化するかを確認しました。

何も工夫せずにMERGE INTOを実行すると、targetテーブルがフルスキャンされ、不要なデータまでJOINとシャッフルに流れ込んでしまいます。特にtargetテーブルが大きい場合、この無駄なデータ流入が性能低下の大きな要因になります。今回用意したデータでは、1億件から2000件までシャッフルに流入するデータ量を削減できました。

そのため、MERGE INTOを最適化するうえでは、ON句でtarget側をどこまで事前に絞り込めるかが重要です。ON句に条件を追加することで、scan時のpushdownやFilterが働き、JOIN前の段階で不要なデータを減らせます。

次回はSPJによる最適化について説明します。

NTT DATA TECH
NTT DATA TECH
設定によりコメント欄が無効化されています