NTT DATA TECH
🧊

AWS Glue 5.1でIceberg Table Spec 3を試してみる

に公開

はじめに

先月AWS(Amazon Web Services)でGlue 5.1がGAされました。私がAWS Glue(以下Glueと表記)に関わり始めてから3年経過しますが、初めてのマイナーバージョンアップです。今回のリリースではApache Spark 3.5.4 → Apache Spark 3.5.6のようなエンジンのバージョンアップもありましたが、個人的に最も大きなアップデートはApache Iceberg(以下Icebergと表記)のTable Spec Version 3のサポートが導入されたことだと感じています。

Iceberg Table Spec 3 では様々な新機能が追加されていますが、本記事ではその中でも特にインパクトが大きいと考えている Deletion Vector に焦点を当てます。そこで、今回はGlue 5.1を使用して、実際に Iceberg Table Spec 3 のテーブルを作成し、DELETE 操作がどのように扱われるのかを確認していきます。

本記事の対象者

  • Iceberg をすでに利用している方
  • Glueを使って、最新バージョンの Iceberg を手軽に試したい方

https://gihyo.jp/book/2025/978-4-297-15074-7

記事の構成

記事の構成は以下の通りです。

  1. Iceberg Table Specとは
  2. 環境準備
  3. 検証
  4. 最後に

1. Iceberg Table Specとは

まず、Icebergとはテーブルのフォーマットです。データファイルやメタデータをどのような構造で管理し、どのような手順で読み書きするかを決めたルール・規則です。

そのため、Iceberg自体は何かしらのソフトウェアやエンジンを指しません。このルール・規則のことをIceberg Table Specと呼びます。Table Specにはバージョンがあり、現在はバージョン3まで存在し、バージョン4が開発中となります。
詳細は公式ドキュメントを参照してください。
https://iceberg.apache.org/spec/#format-versioning

Table Specは上記で述べた通り、あくまでもルールです。このルールに従って、読み書きするのはSparkやTrinoのようなクライアントです。そのため、どの Table Spec の機能が使えるかはクライアントによって様々です。

現時点では、Table Spec 3に対応しているクライアントはまだ多くないですが、今回Glue 5.1がTable Spec 3に対応したことで、Glue上で最新仕様のIcebergを使えるようになりました。

1.1 Table Spec 3で追加された新機能

追加された機能の代表例を記載します。詳細は、Icebergの公式ドキュメントまたは、各機能に記載のプルリクエスト(PR)を確認してください。

  • Deletion Vector
    • レコード削除時に「どの行が削除されたか」をビットマップ形式でPuffinファイルとして管理。
    • Table Spec 2 までは、削除のたびに削除対象の位置が記載された Delete 用 Parquet ファイル が作成されていた。
    • Table Spec 3 では、ビットマップ形式で管理することで、ファイルのサイズ削減と読み取り性能の向上が期待できる。
    • PR:#11240
  • Default Values
    • カラムにデフォルト値を設定可能。
    • 処理時間を記録するカラムに対し、デフォルト値に現在時刻を設定するような使い方となる。
    • PR:#10761
  • Row Lineage

本記事では、この中から Deletion Vector を実際に動かして確認します。

2. 環境準備

今回は、AWS上でIcebergを試す際によく使われるシンプルな構成で検証します。

  • クライアント:AWS Glue Studio Notebook
  • データカタログ:AWS Glue Data Catalog
  • ストレージ:Amazon S3

各リソースの準備は以下の通り、最小限です。

  1. Glue Notebook
    • S3にアクセスできるIAMを設定し、Notebookを作成
  2. Glue DataCatalog
    • 任意の名称でデータベースを作成
    • 今回はdbという名前で作成
  3. S3
    • Iceberg のデータ・メタデータ保存用のバケットを用意

3. 検証

ここからは、Glue Notebookを使用して、以下の流れで検証します。

  1. Table Spec 3のIcebergテーブルを作成
  2. データをINSERT
  3. DELETEを実行
  4. Deletion Vector がどのように扱われるかを確認

3.1 Notebook上での検証

  1. Glue バージョンを 5.1 に指定
    Notebookではマジックコマンドを使用して、Glueのバージョンを指定できます。
    ここではGlue 5.1を指定します。
%idle_timeout 2880
%glue_version 5.1
%worker_type G.1X
%number_of_workers 2
  1. Spark Session の初期化と Iceberg 設定
  • Icebergカタログ名:mycatalog
  • データとメタデータの保存先:s3://<バケット名>/iceberg
  • Icebergカタログタイプ:glue
  • Iceberg 独自の SQL(DELETE など)を有効化
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .config("spark.sql.catalog.mycatalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.mycatalog.warehouse", "s3://<バケット名>/iceberg") \
    .config("spark.sql.catalog.mycatalog.type", "glue") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .getOrCreate()
  1. Icebergテーブルの作成
  • <カタログ>.<データベース名>.<テーブル名>という形式で指定する。データベース名はGlue DataCatalogで作成したものを使用します。
  • デフォルトではTable Spec 2となるため、format-versionで3を指定
  • Merge On Read(MOR)を指定
%%sql
CREATE TABLE mycatalog.db.tbl_mor (
    product_name string,
    price decimal(10, 2),
    customer_id bigint,
    order_id string,
    datetime timestamp,
    category string
) USING iceberg
TBLPROPERTIES (
    'format-version'='3',
    'write.delete.mode'='merge-on-read',
    'write.write.mode'='merge-on-read',
    'write.merge.mode'='merge-on-read'
)
  1. 挿入データの作成
  • DataFrameを作成
  • temp viewに登録し、後続のSparkSQLで参照できるようにしています。
from pyspark.sql.types import *
from datetime import datetime

# スキーマ定義
schema = StructType([
    StructField("product_name", StringType(), True),
    StructField("price", DecimalType(10, 2), True),
    StructField("customer_id", LongType(), True),
    StructField("order_id", StringType(), True),
    StructField("datetime", TimestampType(), True),
    StructField("category", StringType(), True)
])
 
# 10 件分のデータ
data = [
    ("tomato juice", 2.00, 1698, "DRE8DLTFNX0MLCE8DLTFNX0MLC", datetime.fromisoformat("2023-07-18 02:20:58"), "drink"),
    ("cocoa",        2.00, 1652, "DR1UNFHET81UNFHET8",         datetime.fromisoformat("2024-08-26 11:36:48"), "drink"),
    ("espresso",     2.00, 1037, "DRBFZUJWPZ9SRABFZUJWPZ9SRA", datetime.fromisoformat("2024-04-19 12:17:22"), "drink"),
    ("broccoli",     1.00, 3092, "GRK0L8ZQK0L8ZQ",             datetime.fromisoformat("2023-03-22 18:48:04"), "grocery"),
    ("nutmeg",       1.00, 3512, "GR15U0LKA15U0LKA",           datetime.fromisoformat("2024-02-27 15:13:31"), "grocery"),
    ("orange juice", 3.50, 1200, "DR8XYZTFNX0MLCXYZTFNX0MLC",  datetime.fromisoformat("2024-01-10 09:15:00"), "drink"),
    ("milk",         1.20, 2200, "DR12MILKET81UNFHET8",        datetime.fromisoformat("2023-11-03 07:25:10"), "drink"),
    ("spinach",      0.80, 1800, "GRSPNQK0L8ZQK0L8ZQ",         datetime.fromisoformat("2023-05-30 14:55:44"), "grocery"),
    ("pepper",       0.50, 2600, "GRPPR15U0LKA15U0LKA",        datetime.fromisoformat("2024-03-12 16:05:21"), "grocery"),
    ("cinnamon",     1.50, 900,  "GR5CINLKA15U0LKACIN",        datetime.fromisoformat("2024-04-01 10:45:33"), "grocery")
]
 
# DataFrame を作成
df = spark.createDataFrame(data, schema)

# temp viewに登録
df.createOrReplaceTempView("tmp_df")
  1. INSERTの実行
  • temp viewの中身をINSERTする
%%sql
INSERT INTO mycatalog.db.tbl_mor
SELECT * FROM tmp_df
  1. INSERT結果の確認
%%sql
SELECT * FROM mycatalog.db.tbl_mor

実行結果:

+------------+-----+-----------+--------------------+-------------------+--------+
|product_name|price|customer_id|            order_id|           datetime|category|
+------------+-----+-----------+--------------------+-------------------+--------+
|tomato juice| 2.00|       1698|DRE8DLTFNX0MLCE8D...|2023-07-18 02:20:58|   drink|
|       cocoa| 2.00|       1652|  DR1UNFHET81UNFHET8|2024-08-26 11:36:48|   drink|
|    espresso| 2.00|       1037|DRBFZUJWPZ9SRABFZ...|2024-04-19 12:17:22|   drink|
|    broccoli| 1.00|       3092|      GRK0L8ZQK0L8ZQ|2023-03-22 18:48:04| grocery|
|      nutmeg| 1.00|       3512|    GR15U0LKA15U0LKA|2024-02-27 15:13:31| grocery|
|orange juice| 3.50|       1200|DR8XYZTFNX0MLCXYZ...|2024-01-10 09:15:00|   drink|
|        milk| 1.20|       2200| DR12MILKET81UNFHET8|2023-11-03 07:25:10|   drink|
|     spinach| 0.80|       1800|  GRSPNQK0L8ZQK0L8ZQ|2023-05-30 14:55:44| grocery|
|      pepper| 0.50|       2600| GRPPR15U0LKA15U0LKA|2024-03-12 16:05:21| grocery|
|    cinnamon| 1.50|        900| GR5CINLKA15U0LKACIN|2024-04-01 10:45:33| grocery|
+------------+-----+-----------+--------------------+-------------------+--------+
  1. DELETEの実行
%%sql
DELETE FROM mycatalog.db.tbl_mor
WHERE customer_id = '1698'
  1. DELETE結果の確認
%%sql
SELECT * FROM mycatalog.db.tbl_mor

確認結果:

+------------+-----+-----------+--------------------+-------------------+--------+
|product_name|price|customer_id|            order_id|           datetime|category|
+------------+-----+-----------+--------------------+-------------------+--------+
|    espresso| 2.00|       1037|DRBFZUJWPZ9SRABFZ...|2024-04-19 12:17:22|   drink|
|    broccoli| 1.00|       3092|      GRK0L8ZQK0L8ZQ|2023-03-22 18:48:04| grocery|
|      nutmeg| 1.00|       3512|    GR15U0LKA15U0LKA|2024-02-27 15:13:31| grocery|
|orange juice| 3.50|       1200|DR8XYZTFNX0MLCXYZ...|2024-01-10 09:15:00|   drink|
|       cocoa| 2.00|       1652|  DR1UNFHET81UNFHET8|2024-08-26 11:36:48|   drink|
|        milk| 1.20|       2200| DR12MILKET81UNFHET8|2023-11-03 07:25:10|   drink|
|     spinach| 0.80|       1800|  GRSPNQK0L8ZQK0L8ZQ|2023-05-30 14:55:44| grocery|
|      pepper| 0.50|       2600| GRPPR15U0LKA15U0LKA|2024-03-12 16:05:21| grocery|
|    cinnamon| 1.50|        900| GR5CINLKA15U0LKACIN|2024-04-01 10:45:33| grocery|
+------------+-----+-----------+--------------------+-------------------+--------+

削除した行が結果から除外されていることが確認できます。しかし、Deletion Vector が使われたかどうかはSQLの実行結果だけでは判断できません
そこで、S3上のデータを確認し、Deletion Vector(Puffinファイル)が実際に作成されたかを確認します。

3.2 S3のデータを確認

S3上に作成されたファイルを見て、Deletion Vectorが実際に作成されたかを確認します。
Icebergのディレクトリ構造や各ファイルの役割については、以下の記事が非常に分かりやので参考にしてください。
https://bering.hatenadiary.com/entry/2023/09/24/175953

メタデータ

以下のmetadata.jsonを確認します。

s3://<バケット名>/iceberg/db.db/tbl_mor/metadata/00000-6c6ef04f-0b20-4e1d-b7ce-6451162a832b.metadata.json

メタデータには様々な情報が記載されています。その中で、自身が設定した以下の2点を確認できます。

  • format-versionが 3 であること
  • merge-on-readが指定されていること
//metadata.jsonの一部抜粋
{
    "current-schema-id": 0,
    "current-snapshot-id": null,
    "default-sort-order-id": 0,
    "default-spec-id": 0,
    "format-version": 3,   // フォーマットバージョンの指定 
    "last-column-id": 6,
    "last-partition-id": 999,
    "last-sequence-number": 0,
    "last-updated-ms": 1766407192594,
    "location": "s3://<バケット名>/iceberg/db.db/tbl_mor",
    "metadata-log": [],
    "next-row-id": 0,
    "partition-specs": [
        {
            "fields": [],
            "spec-id": 0
        }
    ],
    "partition-statistics": [],
    "properties": {
        "owner": "hadoop",
        "write.delete.mode": "merge-on-read",  // MORの設定内容
        "write.merge.mode": "merge-on-read",  // MORの設定内容
        "write.parquet.compression-codec": "zstd",
        "write.write.mode": "merge-on-read"  // MORの設定内容
    },

データファイル

次に、データファイルが格納されているディレクトリを確認します。

s3://<バケット名>/iceberg/db.db/tbl_mor/data/


ここには Parquetに加えて、〇〇-deletes.puffinファイル が作成されていることが確認できます。このPuffinファイルがDeletion Vectorによって作成されるビットマップ形式のファイルです。Puffinファイルのサイズは453 Byteと非常に小さく、従来の Table Spec 2 までで使用されていたParquetと比較して、削除情報をよりコンパクトに管理できていることが分かります。

4. 最後に

本記事では、Glue 5.1を使用してIceberg Table Spec 3を試してみました。今回は基本的な挙動の確認に留めましたが、Icebergの面白い点は、OSSとしてソースコードが公開されていることに加え、データファイルやメタデータを実際に確認できるため、内部仕様を調査しやすい点だと思います。
今後は、実際の挙動やソースコードを読み解きながら、Iceberg のさまざまな機能を紹介していければと思います。本記事が、Glue で最新の Iceberg 機能を試す際の参考になれば幸いです。

NTT DATA TECH
NTT DATA TECH
設定によりコメント欄が無効化されています