AWS Glue 5.1でIceberg Table Spec 3を試してみる
はじめに
先月AWS(Amazon Web Services)でGlue 5.1がGAされました。私がAWS Glue(以下Glueと表記)に関わり始めてから3年経過しますが、初めてのマイナーバージョンアップです。今回のリリースではApache Spark 3.5.4 → Apache Spark 3.5.6のようなエンジンのバージョンアップもありましたが、個人的に最も大きなアップデートはApache Iceberg(以下Icebergと表記)のTable Spec Version 3のサポートが導入されたことだと感じています。
Iceberg Table Spec 3 では様々な新機能が追加されていますが、本記事ではその中でも特にインパクトが大きいと考えている Deletion Vector に焦点を当てます。そこで、今回はGlue 5.1を使用して、実際に Iceberg Table Spec 3 のテーブルを作成し、DELETE 操作がどのように扱われるのかを確認していきます。
本記事の対象者
- Iceberg をすでに利用している方
- Glueを使って、最新バージョンの Iceberg を手軽に試したい方
記事の構成
記事の構成は以下の通りです。
- Iceberg Table Specとは
- 環境準備
- 検証
- 最後に
1. Iceberg Table Specとは
まず、Icebergとはテーブルのフォーマットです。データファイルやメタデータをどのような構造で管理し、どのような手順で読み書きするかを決めたルール・規則です。
そのため、Iceberg自体は何かしらのソフトウェアやエンジンを指しません。このルール・規則のことをIceberg Table Specと呼びます。Table Specにはバージョンがあり、現在はバージョン3まで存在し、バージョン4が開発中となります。
詳細は公式ドキュメントを参照してください。
Table Specは上記で述べた通り、あくまでもルールです。このルールに従って、読み書きするのはSparkやTrinoのようなクライアントです。そのため、どの Table Spec の機能が使えるかはクライアントによって様々です。
現時点では、Table Spec 3に対応しているクライアントはまだ多くないですが、今回Glue 5.1がTable Spec 3に対応したことで、Glue上で最新仕様のIcebergを使えるようになりました。
1.1 Table Spec 3で追加された新機能
追加された機能の代表例を記載します。詳細は、Icebergの公式ドキュメントまたは、各機能に記載のプルリクエスト(PR)を確認してください。
-
Deletion Vector
- レコード削除時に「どの行が削除されたか」をビットマップ形式でPuffinファイルとして管理。
- Table Spec 2 までは、削除のたびに削除対象の位置が記載された Delete 用 Parquet ファイル が作成されていた。
- Table Spec 3 では、ビットマップ形式で管理することで、ファイルのサイズ削減と読み取り性能の向上が期待できる。
- PR:#11240
-
Default Values
- カラムにデフォルト値を設定可能。
- 処理時間を記録するカラムに対し、デフォルト値に現在時刻を設定するような使い方となる。
- PR:#10761
-
Row Lineage
- 行単位で変更を追跡することが可能。
- 以下に詳しい解説があります。
- PR:#11130
本記事では、この中から Deletion Vector を実際に動かして確認します。
2. 環境準備
今回は、AWS上でIcebergを試す際によく使われるシンプルな構成で検証します。
- クライアント:AWS Glue Studio Notebook
- データカタログ:AWS Glue Data Catalog
- ストレージ:Amazon S3
各リソースの準備は以下の通り、最小限です。
- Glue Notebook
- S3にアクセスできるIAMを設定し、Notebookを作成
- Glue DataCatalog
- 任意の名称でデータベースを作成
- 今回は
dbという名前で作成
- S3
- Iceberg のデータ・メタデータ保存用のバケットを用意
3. 検証
ここからは、Glue Notebookを使用して、以下の流れで検証します。
- Table Spec 3のIcebergテーブルを作成
- データをINSERT
- DELETEを実行
- Deletion Vector がどのように扱われるかを確認
3.1 Notebook上での検証
-
Glue バージョンを 5.1 に指定
Notebookではマジックコマンドを使用して、Glueのバージョンを指定できます。
ここではGlue 5.1を指定します。
%idle_timeout 2880
%glue_version 5.1
%worker_type G.1X
%number_of_workers 2
- Spark Session の初期化と Iceberg 設定
- Icebergカタログ名:
mycatalog - データとメタデータの保存先:
s3://<バケット名>/iceberg - Icebergカタログタイプ:
glue - Iceberg 独自の SQL(DELETE など)を有効化
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.config("spark.sql.catalog.mycatalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.mycatalog.warehouse", "s3://<バケット名>/iceberg") \
.config("spark.sql.catalog.mycatalog.type", "glue") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.getOrCreate()
- Icebergテーブルの作成
-
<カタログ>.<データベース名>.<テーブル名>という形式で指定する。データベース名はGlue DataCatalogで作成したものを使用します。 -
デフォルトではTable Spec 2となるため、
format-versionで3を指定 - Merge On Read(MOR)を指定
%%sql
CREATE TABLE mycatalog.db.tbl_mor (
product_name string,
price decimal(10, 2),
customer_id bigint,
order_id string,
datetime timestamp,
category string
) USING iceberg
TBLPROPERTIES (
'format-version'='3',
'write.delete.mode'='merge-on-read',
'write.write.mode'='merge-on-read',
'write.merge.mode'='merge-on-read'
)
- 挿入データの作成
- DataFrameを作成
- temp viewに登録し、後続のSparkSQLで参照できるようにしています。
from pyspark.sql.types import *
from datetime import datetime
# スキーマ定義
schema = StructType([
StructField("product_name", StringType(), True),
StructField("price", DecimalType(10, 2), True),
StructField("customer_id", LongType(), True),
StructField("order_id", StringType(), True),
StructField("datetime", TimestampType(), True),
StructField("category", StringType(), True)
])
# 10 件分のデータ
data = [
("tomato juice", 2.00, 1698, "DRE8DLTFNX0MLCE8DLTFNX0MLC", datetime.fromisoformat("2023-07-18 02:20:58"), "drink"),
("cocoa", 2.00, 1652, "DR1UNFHET81UNFHET8", datetime.fromisoformat("2024-08-26 11:36:48"), "drink"),
("espresso", 2.00, 1037, "DRBFZUJWPZ9SRABFZUJWPZ9SRA", datetime.fromisoformat("2024-04-19 12:17:22"), "drink"),
("broccoli", 1.00, 3092, "GRK0L8ZQK0L8ZQ", datetime.fromisoformat("2023-03-22 18:48:04"), "grocery"),
("nutmeg", 1.00, 3512, "GR15U0LKA15U0LKA", datetime.fromisoformat("2024-02-27 15:13:31"), "grocery"),
("orange juice", 3.50, 1200, "DR8XYZTFNX0MLCXYZTFNX0MLC", datetime.fromisoformat("2024-01-10 09:15:00"), "drink"),
("milk", 1.20, 2200, "DR12MILKET81UNFHET8", datetime.fromisoformat("2023-11-03 07:25:10"), "drink"),
("spinach", 0.80, 1800, "GRSPNQK0L8ZQK0L8ZQ", datetime.fromisoformat("2023-05-30 14:55:44"), "grocery"),
("pepper", 0.50, 2600, "GRPPR15U0LKA15U0LKA", datetime.fromisoformat("2024-03-12 16:05:21"), "grocery"),
("cinnamon", 1.50, 900, "GR5CINLKA15U0LKACIN", datetime.fromisoformat("2024-04-01 10:45:33"), "grocery")
]
# DataFrame を作成
df = spark.createDataFrame(data, schema)
# temp viewに登録
df.createOrReplaceTempView("tmp_df")
- INSERTの実行
- temp viewの中身をINSERTする
%%sql
INSERT INTO mycatalog.db.tbl_mor
SELECT * FROM tmp_df
- INSERT結果の確認
%%sql
SELECT * FROM mycatalog.db.tbl_mor
実行結果:
+------------+-----+-----------+--------------------+-------------------+--------+
|product_name|price|customer_id| order_id| datetime|category|
+------------+-----+-----------+--------------------+-------------------+--------+
|tomato juice| 2.00| 1698|DRE8DLTFNX0MLCE8D...|2023-07-18 02:20:58| drink|
| cocoa| 2.00| 1652| DR1UNFHET81UNFHET8|2024-08-26 11:36:48| drink|
| espresso| 2.00| 1037|DRBFZUJWPZ9SRABFZ...|2024-04-19 12:17:22| drink|
| broccoli| 1.00| 3092| GRK0L8ZQK0L8ZQ|2023-03-22 18:48:04| grocery|
| nutmeg| 1.00| 3512| GR15U0LKA15U0LKA|2024-02-27 15:13:31| grocery|
|orange juice| 3.50| 1200|DR8XYZTFNX0MLCXYZ...|2024-01-10 09:15:00| drink|
| milk| 1.20| 2200| DR12MILKET81UNFHET8|2023-11-03 07:25:10| drink|
| spinach| 0.80| 1800| GRSPNQK0L8ZQK0L8ZQ|2023-05-30 14:55:44| grocery|
| pepper| 0.50| 2600| GRPPR15U0LKA15U0LKA|2024-03-12 16:05:21| grocery|
| cinnamon| 1.50| 900| GR5CINLKA15U0LKACIN|2024-04-01 10:45:33| grocery|
+------------+-----+-----------+--------------------+-------------------+--------+
- DELETEの実行
%%sql
DELETE FROM mycatalog.db.tbl_mor
WHERE customer_id = '1698'
- DELETE結果の確認
%%sql
SELECT * FROM mycatalog.db.tbl_mor
確認結果:
+------------+-----+-----------+--------------------+-------------------+--------+
|product_name|price|customer_id| order_id| datetime|category|
+------------+-----+-----------+--------------------+-------------------+--------+
| espresso| 2.00| 1037|DRBFZUJWPZ9SRABFZ...|2024-04-19 12:17:22| drink|
| broccoli| 1.00| 3092| GRK0L8ZQK0L8ZQ|2023-03-22 18:48:04| grocery|
| nutmeg| 1.00| 3512| GR15U0LKA15U0LKA|2024-02-27 15:13:31| grocery|
|orange juice| 3.50| 1200|DR8XYZTFNX0MLCXYZ...|2024-01-10 09:15:00| drink|
| cocoa| 2.00| 1652| DR1UNFHET81UNFHET8|2024-08-26 11:36:48| drink|
| milk| 1.20| 2200| DR12MILKET81UNFHET8|2023-11-03 07:25:10| drink|
| spinach| 0.80| 1800| GRSPNQK0L8ZQK0L8ZQ|2023-05-30 14:55:44| grocery|
| pepper| 0.50| 2600| GRPPR15U0LKA15U0LKA|2024-03-12 16:05:21| grocery|
| cinnamon| 1.50| 900| GR5CINLKA15U0LKACIN|2024-04-01 10:45:33| grocery|
+------------+-----+-----------+--------------------+-------------------+--------+
削除した行が結果から除外されていることが確認できます。しかし、Deletion Vector が使われたかどうかはSQLの実行結果だけでは判断できません。
そこで、S3上のデータを確認し、Deletion Vector(Puffinファイル)が実際に作成されたかを確認します。
3.2 S3のデータを確認
S3上に作成されたファイルを見て、Deletion Vectorが実際に作成されたかを確認します。
Icebergのディレクトリ構造や各ファイルの役割については、以下の記事が非常に分かりやので参考にしてください。
メタデータ
以下のmetadata.jsonを確認します。
s3://<バケット名>/iceberg/db.db/tbl_mor/metadata/00000-6c6ef04f-0b20-4e1d-b7ce-6451162a832b.metadata.json
メタデータには様々な情報が記載されています。その中で、自身が設定した以下の2点を確認できます。
-
format-versionが 3 であること -
merge-on-readが指定されていること
//metadata.jsonの一部抜粋
{
"current-schema-id": 0,
"current-snapshot-id": null,
"default-sort-order-id": 0,
"default-spec-id": 0,
"format-version": 3, // フォーマットバージョンの指定
"last-column-id": 6,
"last-partition-id": 999,
"last-sequence-number": 0,
"last-updated-ms": 1766407192594,
"location": "s3://<バケット名>/iceberg/db.db/tbl_mor",
"metadata-log": [],
"next-row-id": 0,
"partition-specs": [
{
"fields": [],
"spec-id": 0
}
],
"partition-statistics": [],
"properties": {
"owner": "hadoop",
"write.delete.mode": "merge-on-read", // MORの設定内容
"write.merge.mode": "merge-on-read", // MORの設定内容
"write.parquet.compression-codec": "zstd",
"write.write.mode": "merge-on-read" // MORの設定内容
},
データファイル
次に、データファイルが格納されているディレクトリを確認します。
s3://<バケット名>/iceberg/db.db/tbl_mor/data/

ここには Parquetに加えて、〇〇-deletes.puffinファイル が作成されていることが確認できます。このPuffinファイルがDeletion Vectorによって作成されるビットマップ形式のファイルです。Puffinファイルのサイズは453 Byteと非常に小さく、従来の Table Spec 2 までで使用されていたParquetと比較して、削除情報をよりコンパクトに管理できていることが分かります。
4. 最後に
本記事では、Glue 5.1を使用してIceberg Table Spec 3を試してみました。今回は基本的な挙動の確認に留めましたが、Icebergの面白い点は、OSSとしてソースコードが公開されていることに加え、データファイルやメタデータを実際に確認できるため、内部仕様を調査しやすい点だと思います。
今後は、実際の挙動やソースコードを読み解きながら、Iceberg のさまざまな機能を紹介していければと思います。本記事が、Glue で最新の Iceberg 機能を試す際の参考になれば幸いです。
NTT DATA公式アカウントです。 技術を愛するNTT DATAの技術者が、気軽に楽しく発信していきます。 当社のサービスなどについてのお問い合わせは、 お問い合わせフォーム nttdata.com/jp/ja/contact-us/ へお願いします。