🗂

Amazon EMR 7.12 で Apache Iceberg v3 の Deletion Vectors を検証

に公開

目的

Amazon EMR 7.12 が Iceberg v3 をサポートしたので、EMR Serverless で Deletion Vectors(削除ベクトル) を検証してみました。
Row Lineage(行レベル系譜追跡)検証は ⇩ 参照
https://zenn.dev/shigeru_oda/articles/0f9431d9029a54

Deletion Vectors とは

Deletion Vectors(削除ベクトル)は、Apache Iceberg v3 で導入された削除管理の新しい仕組みです。

仕組み

従来の方式(v2 - Copy-on-Write):

データファイル: [行1, 行2, 行3, 行4, 行5]
↓ 行2を削除
新ファイル作成: [行1, 行3, 行4, 行5]  ← ファイル全体を書き換え

Deletion Vectors(v3 - Merge-on-Read):

データファイル: [行1, 行2, 行3, 行4, 行5]  ← 物理ファイルは変更なし
削除ベクトル: [0, 1, 0, 0, 0]  ← メタデータで削除を記録(1=削除)
↓ クエリ時
読み取り結果: [行1, 行3, 行4, 行5]  ← 削除ベクトルを参照してフィルタ

特徴

  1. 位置ベースの削除記録: 削除された行のファイルパスと位置を記録
    (ビットマップのように0/1で管理)
  2. メタデータ管理: 削除情報はテーブルメタデータに格納
  3. 遅延マージ: 物理的なデータ削除は後続のコンパクション時に実行
  4. 効率的なストレージ: 大量の削除でも小さなメタデータのみ更新

メリット

  • 書き込み性能: データファイルの書き換え不要で削除が高速
  • ストレージ効率: 少数の行削除時にファイル全体をコピーしない

環境

EMR Studio を作成する

作成手順はAWS Document参照

EMR Application を作成する

作成手順はAWS Document参照

Script

test_deletion_vectors.py
# test_deletion_vectors.py
from pyspark.sql import SparkSession
import sys
import time

def main():
    bucket = sys.argv[1] if len(sys.argv) > 1 else "20251203-shigeruoda-test-deletion-vectors"

    spark = SparkSession.builder \
        .appName("Iceberg-v2-v3-Performance") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config("spark.sql.catalog.glue", "org.apache.iceberg.spark.SparkCatalog") \
        .config("spark.sql.catalog.glue.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
        .config("spark.sql.catalog.glue.warehouse", f"s3://{bucket}/warehouse/") \
        .getOrCreate()

    print("=" * 80)
    print("Iceberg v2 vs v3 Performance Comparison")
    print("=" * 80)
    print(f"Bucket: s3://{bucket}/warehouse/")

    spark.sql("CREATE DATABASE IF NOT EXISTS glue.test_db")

    # v2テーブル
    print("\n[1] Creating v2 table with 1M rows...")
    spark.sql("DROP TABLE IF EXISTS glue.test_db.data_v2")
    spark.sql("""
        CREATE TABLE glue.test_db.data_v2 (id BIGINT, value STRING)
        USING iceberg TBLPROPERTIES ('format-version' = '2')
    """)
    spark.range(1000000).selectExpr("id", "concat('value_', id) as value").writeTo("glue.test_db.data_v2").append()

    print("Deleting 100k rows in 10 batches...")
    start = time.time()
    for i in range(10):
        spark.sql(f"DELETE FROM glue.test_db.data_v2 WHERE id >= {i * 10000} AND id < {(i + 1) * 10000}")
    v2_time = time.time() - start

    v2_files = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v2.files").collect()[0][0]

    print(f"v2: {v2_time:.2f}s, {v2_files} files")

    # v3テーブル
    print("\n[2] Creating v3 table with 1M rows...")
    spark.sql("DROP TABLE IF EXISTS glue.test_db.data_v3")
    spark.sql("""
        CREATE TABLE glue.test_db.data_v3 (id BIGINT, value STRING)
        USING iceberg TBLPROPERTIES (
            'format-version' = '3',
            'write.delete.mode' = 'merge-on-read',
            'write.update.mode' = 'merge-on-read',
            'write.merge.mode' = 'merge-on-read'
        )
    """)
    spark.range(1000000).selectExpr("id", "concat('value_', id) as value").writeTo("glue.test_db.data_v3").append()

    print("Deleting 100k rows in 10 batches...")
    start = time.time()
    for i in range(10):
        spark.sql(f"DELETE FROM glue.test_db.data_v3 WHERE id >= {i * 10000} AND id < {(i + 1) * 10000}")
    v3_time = time.time() - start

    v3_files = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v3.files").collect()[0][0]

    print(f"v3: {v3_time:.2f}s, {v3_files} files")

    print("\n" + "=" * 80)
    print(f"v3 is {((v2_time - v3_time) / v2_time * 100):.1f}% faster")
    print("=" * 80)

    # 削除後のクエリ検証
    print("\n[3] Query verification after deletion...")

    start = time.time()
    v2_count = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v2").collect()[0][0]
    v2_query_time = time.time() - start

    start = time.time()
    v3_count = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v3").collect()[0][0]
    v3_query_time = time.time() - start

    print(f"v2: {v2_count:,} rows, query time: {v2_query_time:.2f}s")
    print(f"v3: {v3_count:,} rows, query time: {v3_query_time:.2f}s")

    v2_sample = spark.sql("SELECT MIN(id), MAX(id) FROM glue.test_db.data_v2").collect()[0]
    v3_sample = spark.sql("SELECT MIN(id), MAX(id) FROM glue.test_db.data_v3").collect()[0]

    print(f"v2 id range: {v2_sample[0]} - {v2_sample[1]}")
    print(f"v3 id range: {v3_sample[0]} - {v3_sample[1]}")

    if v2_query_time > v3_query_time:
        print(f"v3 query is {((v2_query_time - v3_query_time) / v2_query_time * 100):.1f}% faster")
    else:
        print(f"v2 query is {((v3_query_time - v2_query_time) / v3_query_time * 100):.1f}% faster")

    spark.stop()

if __name__ == "__main__":
    main()

JOB 実行

aws emr-serverless start-job-run \
  --application-id 00g1jlmh4c97192l \
  --execution-role-arn arn:aws:iam::123456789012:role/EMRServerlessIcebergRole \
  --name "test_deletion_vectors" \
  --job-driver '{
    "sparkSubmit": {
      "entryPoint": "s3://20251203-shigeruoda-test-deletion-vectors/test_deletion_vectors.py"
    }
  }' \
  --region ap-northeast-1

検証結果

テスト概要

Apache Iceberg v2 と v3 の削除操作のパフォーマンス比較を実施しました。

テストシナリオ:

  1. 各バージョンで 100 万行のテーブルを作成
  2. 10 バッチに分けて合計 10 万行を削除(各バッチ 1 万行)
  3. 削除処理時間とクエリ性能を計測

v3 の設定:

TBLPROPERTIES (
    'format-version' = '3',
    'write.delete.mode' = 'merge-on-read',
    'write.update.mode' = 'merge-on-read',
    'write.merge.mode' = 'merge-on-read'
)

削除処理のパフォーマンス

バージョン 削除時間 データファイル数 削除ファイル数 パフォーマンス
Iceberg v2 (CoW) 22.53 秒 11 ファイル 0 ベースライン
Iceberg v3 (MoR) 13.54 秒 12 ファイル 2 39.9%高速

結果:

  • v3 の Deletion Vectors(Merge-on-Read)により、削除処理が約 40%高速化
  • v2 はデータファイルを書き換え(Copy-on-Write)
  • v3 は削除情報を別ファイル(position delete files)で管理

v3 のメタデータ情報:

total-data-files: 12
total-delete-files: 2
total-position-deletes: 100000

クエリパフォーマンス(削除後)

バージョン 残存行数 COUNT(*)実行時間 パフォーマンス
Iceberg v2 (CoW) 900,000 行 0.17 秒 ベースライン
Iceberg v3 (MoR) 900,000 行 0.67 秒 74.7%低速

ID レンジ検証:

  • v2: 100000 - 999999
  • v3: 100000 - 999999
  • 両バージョンとも削除対象(0-99999)が正しく除外されている

考察

Deletion Vectors の特性

書き込み性能(削除処理):

  • 約 40%高速: データファイルの書き換えが不要
  • ストレージ効率: 小さな削除ファイルのみ追加(39 バイト/ファイル)

読み取り性能(クエリ):

  • 約 75%低速: 削除ファイルとのマージ処理が必要
  • 削除率が高い場合、読み取りオーバーヘッドが顕著

Copy-on-Write vs Merge-on-Read

v2 (Copy-on-Write):

  • 削除時にデータファイル全体を書き換え
  • 書き込みは遅いが、読み取りは高速
  • 削除のたびに新しいデータファイルを生成

v3 (Merge-on-Read):

  • 削除情報を別ファイル(position delete files)で管理
  • 書き込みは高速だが、読み取り時にマージ処理が必要
  • 物理的なデータファイルは変更せず、削除情報を別管理

適用シーン

Deletion Vectors が有効なケース:

  • 頻繁な削除・更新操作が発生するテーブル
  • リアルタイム性が求められるデータ更新
  • 大きなデータファイルから少数の行を削除する場合
  • 書き込みスループットを優先する場合

Copy-on-Write が有効なケース:

  • 読み取りクエリが多く、書き込みが少ない
  • 削除率が高く、コンパクションの頻度を減らしたい
  • クエリレイテンシを最小化したい場合

Discussion