🗂
Amazon EMR 7.12 で Apache Iceberg v3 の Deletion Vectors を検証
目的
Amazon EMR 7.12 が Iceberg v3 をサポートしたので、EMR Serverless で Deletion Vectors(削除ベクトル) を検証してみました。
Row Lineage(行レベル系譜追跡)検証は ⇩ 参照
Deletion Vectors とは
Deletion Vectors(削除ベクトル)は、Apache Iceberg v3 で導入された削除管理の新しい仕組みです。
仕組み
従来の方式(v2 - Copy-on-Write):
データファイル: [行1, 行2, 行3, 行4, 行5]
↓ 行2を削除
新ファイル作成: [行1, 行3, 行4, 行5] ← ファイル全体を書き換え
Deletion Vectors(v3 - Merge-on-Read):
データファイル: [行1, 行2, 行3, 行4, 行5] ← 物理ファイルは変更なし
削除ベクトル: [0, 1, 0, 0, 0] ← メタデータで削除を記録(1=削除)
↓ クエリ時
読み取り結果: [行1, 行3, 行4, 行5] ← 削除ベクトルを参照してフィルタ
特徴
-
位置ベースの削除記録: 削除された行のファイルパスと位置を記録
(ビットマップのように0/1で管理) - メタデータ管理: 削除情報はテーブルメタデータに格納
- 遅延マージ: 物理的なデータ削除は後続のコンパクション時に実行
- 効率的なストレージ: 大量の削除でも小さなメタデータのみ更新
メリット
- 書き込み性能: データファイルの書き換え不要で削除が高速
- ストレージ効率: 少数の行削除時にファイル全体をコピーしない
環境
EMR Studio を作成する
作成手順はAWS Document参照
EMR Application を作成する
作成手順はAWS Document参照
Script
test_deletion_vectors.py
# test_deletion_vectors.py
from pyspark.sql import SparkSession
import sys
import time
def main():
bucket = sys.argv[1] if len(sys.argv) > 1 else "20251203-shigeruoda-test-deletion-vectors"
spark = SparkSession.builder \
.appName("Iceberg-v2-v3-Performance") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.glue", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.glue.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.glue.warehouse", f"s3://{bucket}/warehouse/") \
.getOrCreate()
print("=" * 80)
print("Iceberg v2 vs v3 Performance Comparison")
print("=" * 80)
print(f"Bucket: s3://{bucket}/warehouse/")
spark.sql("CREATE DATABASE IF NOT EXISTS glue.test_db")
# v2テーブル
print("\n[1] Creating v2 table with 1M rows...")
spark.sql("DROP TABLE IF EXISTS glue.test_db.data_v2")
spark.sql("""
CREATE TABLE glue.test_db.data_v2 (id BIGINT, value STRING)
USING iceberg TBLPROPERTIES ('format-version' = '2')
""")
spark.range(1000000).selectExpr("id", "concat('value_', id) as value").writeTo("glue.test_db.data_v2").append()
print("Deleting 100k rows in 10 batches...")
start = time.time()
for i in range(10):
spark.sql(f"DELETE FROM glue.test_db.data_v2 WHERE id >= {i * 10000} AND id < {(i + 1) * 10000}")
v2_time = time.time() - start
v2_files = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v2.files").collect()[0][0]
print(f"v2: {v2_time:.2f}s, {v2_files} files")
# v3テーブル
print("\n[2] Creating v3 table with 1M rows...")
spark.sql("DROP TABLE IF EXISTS glue.test_db.data_v3")
spark.sql("""
CREATE TABLE glue.test_db.data_v3 (id BIGINT, value STRING)
USING iceberg TBLPROPERTIES (
'format-version' = '3',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read'
)
""")
spark.range(1000000).selectExpr("id", "concat('value_', id) as value").writeTo("glue.test_db.data_v3").append()
print("Deleting 100k rows in 10 batches...")
start = time.time()
for i in range(10):
spark.sql(f"DELETE FROM glue.test_db.data_v3 WHERE id >= {i * 10000} AND id < {(i + 1) * 10000}")
v3_time = time.time() - start
v3_files = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v3.files").collect()[0][0]
print(f"v3: {v3_time:.2f}s, {v3_files} files")
print("\n" + "=" * 80)
print(f"v3 is {((v2_time - v3_time) / v2_time * 100):.1f}% faster")
print("=" * 80)
# 削除後のクエリ検証
print("\n[3] Query verification after deletion...")
start = time.time()
v2_count = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v2").collect()[0][0]
v2_query_time = time.time() - start
start = time.time()
v3_count = spark.sql("SELECT COUNT(*) FROM glue.test_db.data_v3").collect()[0][0]
v3_query_time = time.time() - start
print(f"v2: {v2_count:,} rows, query time: {v2_query_time:.2f}s")
print(f"v3: {v3_count:,} rows, query time: {v3_query_time:.2f}s")
v2_sample = spark.sql("SELECT MIN(id), MAX(id) FROM glue.test_db.data_v2").collect()[0]
v3_sample = spark.sql("SELECT MIN(id), MAX(id) FROM glue.test_db.data_v3").collect()[0]
print(f"v2 id range: {v2_sample[0]} - {v2_sample[1]}")
print(f"v3 id range: {v3_sample[0]} - {v3_sample[1]}")
if v2_query_time > v3_query_time:
print(f"v3 query is {((v2_query_time - v3_query_time) / v2_query_time * 100):.1f}% faster")
else:
print(f"v2 query is {((v3_query_time - v2_query_time) / v3_query_time * 100):.1f}% faster")
spark.stop()
if __name__ == "__main__":
main()
JOB 実行
aws emr-serverless start-job-run \
--application-id 00g1jlmh4c97192l \
--execution-role-arn arn:aws:iam::123456789012:role/EMRServerlessIcebergRole \
--name "test_deletion_vectors" \
--job-driver '{
"sparkSubmit": {
"entryPoint": "s3://20251203-shigeruoda-test-deletion-vectors/test_deletion_vectors.py"
}
}' \
--region ap-northeast-1
検証結果
テスト概要
Apache Iceberg v2 と v3 の削除操作のパフォーマンス比較を実施しました。
テストシナリオ:
- 各バージョンで 100 万行のテーブルを作成
- 10 バッチに分けて合計 10 万行を削除(各バッチ 1 万行)
- 削除処理時間とクエリ性能を計測
v3 の設定:
TBLPROPERTIES (
'format-version' = '3',
'write.delete.mode' = 'merge-on-read',
'write.update.mode' = 'merge-on-read',
'write.merge.mode' = 'merge-on-read'
)
削除処理のパフォーマンス
| バージョン | 削除時間 | データファイル数 | 削除ファイル数 | パフォーマンス |
|---|---|---|---|---|
| Iceberg v2 (CoW) | 22.53 秒 | 11 ファイル | 0 | ベースライン |
| Iceberg v3 (MoR) | 13.54 秒 | 12 ファイル | 2 | 39.9%高速 |
結果:
- v3 の Deletion Vectors(Merge-on-Read)により、削除処理が約 40%高速化
- v2 はデータファイルを書き換え(Copy-on-Write)
- v3 は削除情報を別ファイル(position delete files)で管理
v3 のメタデータ情報:
total-data-files: 12
total-delete-files: 2
total-position-deletes: 100000
クエリパフォーマンス(削除後)
| バージョン | 残存行数 | COUNT(*)実行時間 | パフォーマンス |
|---|---|---|---|
| Iceberg v2 (CoW) | 900,000 行 | 0.17 秒 | ベースライン |
| Iceberg v3 (MoR) | 900,000 行 | 0.67 秒 | 74.7%低速 |
ID レンジ検証:
- v2: 100000 - 999999
- v3: 100000 - 999999
- 両バージョンとも削除対象(0-99999)が正しく除外されている
考察
Deletion Vectors の特性
書き込み性能(削除処理):
- 約 40%高速: データファイルの書き換えが不要
- ストレージ効率: 小さな削除ファイルのみ追加(39 バイト/ファイル)
読み取り性能(クエリ):
- 約 75%低速: 削除ファイルとのマージ処理が必要
- 削除率が高い場合、読み取りオーバーヘッドが顕著
Copy-on-Write vs Merge-on-Read
v2 (Copy-on-Write):
- 削除時にデータファイル全体を書き換え
- 書き込みは遅いが、読み取りは高速
- 削除のたびに新しいデータファイルを生成
v3 (Merge-on-Read):
- 削除情報を別ファイル(position delete files)で管理
- 書き込みは高速だが、読み取り時にマージ処理が必要
- 物理的なデータファイルは変更せず、削除情報を別管理
適用シーン
Deletion Vectors が有効なケース:
- 頻繁な削除・更新操作が発生するテーブル
- リアルタイム性が求められるデータ更新
- 大きなデータファイルから少数の行を削除する場合
- 書き込みスループットを優先する場合
Copy-on-Write が有効なケース:
- 読み取りクエリが多く、書き込みが少ない
- 削除率が高く、コンパクションの頻度を減らしたい
- クエリレイテンシを最小化したい場合
Discussion