AWSのIcebergテーブルのスナップショット履歴をCLIで可視化するツールを作ってみた
はじめに
AWS Community Builderのぺんぎん(@jitepengin)です。
Apache Iceberg は、近年注目されているオープンテーブルフォーマットの1つで、スキーマの進化やACIDトランザクション、タイムトラベルといった機能を備えています。
スナップショットベースでテーブル管理を行う際に、開発やテスト、そして実運用の中でスナップショットの履歴を追跡・操作したくなるケースも多々あると思います。
今回は学習を兼ねてスナップショットの履歴や詳細を確認できるツール「iceberg-navigator」を自作したので、このツールを紹介したいと思います。
解決したかった課題
- S3に存在するIcebergテーブルのスナップショット履歴をコマンドラインで手軽に確認したい。
- スナップショットの親子関係を辿ってバージョンの系譜を把握したい。
使用した技術とライブラリ
- PyIceberg:Icebergテーブル操作のためのPythonライブラリ
- PyArrow:Icebergスキーマ操作の依存ライブラリ(間接利用)
- Click:CLIインターフェースの構築
- NetworkX / Matplotlib:スナップショットの親子関係をDAGとして可視化するための描画ツール
実装のポイント
今回のツールのポイントは以下となります。
- PyIcebergを使ってIcebergカタログ、テーブルとスナップショットメタデータにアクセス。
- AWS Glue Iceberg RESTエンドポイントをPyIceberg経由で実行。
- ClickでシンプルなCLIインターフェースを構築。
- NetworkXとMatplotlibを活用して簡単なDAGを作成。
ソース
以下のリポジトリに配置しています。
CLIツールの使い方
AWS CLIの設定
事前にAWS CLIの設定を実施してください。
ライブラリのインストール
pip install -r requirements.txt
スナップショット一覧の取得
$ python -m iceberg_navigator list --table <dbname>.<tablename>
| Snapshot ID | Timestamp | Operation | Parent Snapshot ID | Total Size (MB) | Record Count |
|---------------------|----------------------|------------------|----------------------|-------------------|----------------|
| 1533347322559466931 | 2025-05-22T02:10:24Z | Operation.APPEND | null | 13.48 | 729,732 |
| 1485371543345582290 | 2025-05-22T02:10:54Z | Operation.DELETE | 1533347322559466931 | 0.00 | 0 |
| 67848960317145716 | 2025-05-22T02:15:45Z | Operation.APPEND | 1485371543345582290 | 13.48 | 729,732 |
| 3920289554540444894 | 2025-05-22T02:38:46Z | Operation.DELETE | 67848960317145716 | 0.00 | 0 |
| 6369576239134108166 | 2025-05-22T02:41:51Z | Operation.APPEND | 3920289554540444894 | 13.48 | 729,732 |
| 6216935665394419954 | 2025-05-22T02:41:54Z | Operation.APPEND | 6369576239134108166 | 26.96 | 1,459,464 |
| 9058990433822511495 | 2025-05-22T02:42:28Z | Operation.APPEND | 6216935665394419954 | 40.44 | 2,189,196 |
| 5224576979788468429 | 2025-05-22T02:46:53Z | Operation.DELETE | 9058990433822511495 | 0.00 | 0 |
| 8997131439115911397 | 2025-05-22T02:47:21Z | Operation.APPEND | 5224576979788468429 | 13.48 | 729,732 |
| 4246095293733855575 | 2025-08-02T22:51:16Z | Operation.DELETE | 8997131439115911397 | 0.00 | 0 |
| 8106328257365313720 | 2025-08-04T07:50:14Z | Operation.APPEND | 6369576239134108166 | 13.48 | 729,733 |
...
スナップショット詳細の表示
$ python -m iceberg_navigator show <Snapshot ID> --table <dbname>.<tablename>
Table: yellow_tripdata
Snapshot ID: 8106328257365313720
Timestamp: 2025-08-04T07:50:14Z
Operation: Operation.APPEND
Parent Snapshot ID: 6369576239134108166
Manifest List: s3://your-bucket/warehouse/yellow_tripdata/metadata/snap-8106328257365313720-1-a4fb8059-7bf8-4254-b640-bf1fcbf100dd.avro
Schema:
1: vendorid: optional int
2: tpep_pickup_datetime: optional timestamp
3: tpep_dropoff_datetime: optional timestamp
4: passenger_count: optional long
5: trip_distance: optional double
6: ratecodeid: optional long
7: store_and_fwd_flag: optional string
8: pulocationid: optional int
9: dolocationid: optional int
10: payment_type: optional long
11: fare_amount: optional double
12: extra: optional double
13: mta_tax: optional double
14: tip_amount: optional double
15: tolls_amount: optional double
16: improvement_surcharge: optional double
17: total_amount: optional double
18: congestion_surcharge: optional double
19: airport_fee: optional double
Summary:
added-data-files: 1
total-equality-deletes: 0
added-records: 1
total-position-deletes: 0
added-files-size: 3046
total-delete-files: 0
total-files-size: 14138545
total-data-files: 2
total-records: 729733
親スナップショットの比較
$ python -m iceberg_navigator compare <snapshot_id> --table <database>.<table>
----------------------------------------
Parent Snapshot
----------------------------------------
ID: 6369576239134108166
File Size: 13.48 MB
Records: 729,732
----------------------------------------
Current Snapshot
----------------------------------------
ID: 6216935665394419954
File Size: 26.96 MB
Records: 1,459,464
========================================
Summary
========================================
Added Records: 729,732
Deleted Records: 0
スナップショット系譜のグラフ表示
$ python -m iceberg_navigator graph --table <dbname>.<tablename>
DiGraph with 11 nodes and 10 edges
Snapshot graph saved to snapshot_graph.png
NetworkX + Matplotlib で親子関係をDAGとして可視化
ディレクトリ構成(抜粋)
iceberg_navigator/
├── cli.py
├── __main__.py
├── aws/
│ ├── auth.py
│ └── glue.py
├── commands/
│ ├── compare.py
│ ├── list.py
│ ├── show.py
│ └── graph.py
├── utils/
│ └── display.py
実装の概要
エントリーポイント
__main__.py
にて Click コマンドを定義:
import click
from iceberg_navigator.commands.list import list_snapshots
from iceberg_navigator.commands.show import show_snapshot
from iceberg_navigator.commands.graph import graph_snapshots
from iceberg_navigator.commands.compare import compare_snapshots
@click.group()
def cli():
"""Iceberg Navigator CLI"""
pass
cli.add_command(list_snapshots)
cli.add_command(show_snapshot)
cli.add_command(graph_snapshots)
cli.add_command(compare_snapshots)
if __name__ == "__main__":
cli()
AWS Glue Iceberg Catalogへの接続
Glue REST Catalog APIで接続します。
from urllib.parse import urlparse
from pyiceberg.catalog import load_catalog
class GlueCatalog:
def __init__(self, profile_name=None, region_name=None, catalog_id="AwsDataCatalog"):
import boto3
if not region_name:
session = boto3.Session(profile_name=profile_name)
region_name = session.region_name
if not region_name:
raise ValueError("region_name Error")
self.region_name = region_name
self.catalog_id = catalog_id
session = boto3.Session(profile_name=profile_name, region_name=region_name)
self.glue_client = session.client("glue", region_name=region_name)
def _get_catalog(self):
conf = {
"type": "rest",
"uri": f"https://glue.{self.region_name}.amazonaws.com/iceberg",
"s3.region": self.region_name,
"rest.sigv4-enabled": "true",
"rest.signing-name": "glue",
"rest.signing-region": self.region_name,
}
return load_catalog(**conf)
def get_table_location(self, table_identifier: str) -> str:
database, table = table_identifier.split(".", 1)
resp = self.glue_client.get_table(DatabaseName=database, Name=table)
return resp["Table"]["Parameters"]["metadata_location"]
def list_snapshots(self, table_identifier: str):
catalog = self._get_catalog()
namespace, table_name = table_identifier.split(".", 1)
table = catalog.load_table(f"{namespace}.{table_name}")
snapshots = []
for snap in table.snapshots():
total_bytes = int(snap.summary.get("total-files-size", 0)) if snap.summary else 0
total_records = int(snap.summary.get("total-records", 0)) if snap.summary else 0
snapshots.append({
"snapshot_id": str(snap.snapshot_id),
"timestamp": snap.timestamp_ms,
"operation": snap.summary.get("operation") if snap.summary else None,
"parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None,
"total_size_mb": round((total_bytes) / (1024 * 1024), 2),
"record_count": total_records
})
return snapshots
def show_snapshot(self, table_identifier: str, snapshot_id: str):
catalog = self._get_catalog()
namespace, table_name = table_identifier.split(".", 1)
table = catalog.load_table(f"{namespace}.{table_name}")
snap = table.snapshot_by_id(int(snapshot_id))
if not snap:
return {"error": f"snapshot_id {snapshot_id} not found"}
schema_columns = []
for idx, col in enumerate(table.schema().columns, start=1):
requiredness = "optional" if col.optional else "required"
schema_columns.append(f"{idx}: {col.name}: {requiredness} {col.field_type}")
summary_dict = {}
if snap.summary:
summary_dict["operation"] = snap.summary.operation
if hasattr(snap.summary, "additional_properties"):
summary_dict.update(snap.summary.additional_properties)
return {
"table": table_name,
"snapshot_id": str(snap.snapshot_id),
"timestamp": snap.timestamp_ms,
"operation": summary_dict.get("operation"),
"parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None,
"manifest_list": snap.manifest_list,
"schema": schema_columns,
"summary": summary_dict,
}
def compare_snapshots(self, table_identifier: str, snapshot_id: str):
catalog = self._get_catalog()
namespace, table_name = table_identifier.split(".", 1)
table = catalog.load_table(f"{namespace}.{table_name}")
current_snap = table.snapshot_by_id(int(snapshot_id))
if not current_snap:
return {"error": f"snapshot_id {snapshot_id} not found"}
parent_snap = table.snapshot_by_id(int(current_snap.parent_snapshot_id))
if not parent_snap:
return {"error": f"parent_snapshot not found"}
current_summary_dict = {}
if current_snap.summary:
current_summary_dict["operation"] = current_snap.summary.operation
if hasattr(current_snap.summary, "additional_properties"):
current_summary_dict.update(current_snap.summary.additional_properties)
parent_summary_dict = {}
if parent_snap.summary:
parent_summary_dict["operation"] = parent_snap.summary.operation
if hasattr(parent_snap.summary, "additional_properties"):
parent_summary_dict.update(parent_snap.summary.additional_properties)
current_size = int(current_snap.summary.get("total-files-size", 0))
current_records = int(current_snap.summary.get("total-records", 0))
parent_size = int(parent_snap.summary.get("total-files-size", 0))
parent_records = int(parent_snap.summary.get("total-records", 0))
added = current_records - parent_records if current_records > parent_records else 0
deleted = parent_records - current_records if parent_records > current_records else 0
return {
"current_snapshot_id": str(current_snap.snapshot_id),
"current_size": current_size,
"current_records": current_records,
"parent_snapshot_id": str(parent_snap.snapshot_id),
"parent_size": parent_size,
"parent_records": parent_records,
"added": added,
"deleted": deleted,
}
スナップショット一覧取得
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import format_snapshots_table
@click.command("list")
@click.option("--table", required=True, help="Table identifier, e.g. db.table")
def list_snapshots(table):
glue = GlueCatalog()
snapshots = glue.list_snapshots(table)
if not snapshots:
click.echo("No snapshots found.")
return
table_str = format_snapshots_table(snapshots)
click.echo(table_str)
スナップショット詳細表示
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import show_snapshot_details
@click.command(name="show")
@click.argument("snapshot_id")
@click.option('--table', required=True, help="Table identifier, e.g. db.table")
def show_snapshot(table, snapshot_id):
glue_catalog = GlueCatalog()
snapshot = glue_catalog.show_snapshot(table, snapshot_id)
if snapshot is None or "error" in snapshot:
click.echo(f"Snapshot {snapshot_id} not found in table {table}.")
return
show_snapshot_details(snapshot)
親スナップショットとの比較
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import compare_snapshot
@click.command(name="compare")
@click.argument("snapshot_id")
@click.option('--table', required=True, help="Table identifier, e.g. db.table")
def compare_snapshots(table, snapshot_id):
glue_catalog = GlueCatalog()
comparison_result = glue_catalog.compare_snapshots(table, snapshot_id)
if comparison_result is None or "error" in comparison_result:
click.echo(f"Snapshot {snapshot_id} not found in table {table}.")
return
compare_snapshot(comparison_result)
スナップショットグラフ描画
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import build_snapshot_graph, draw_graph
@click.command("graph")
@click.option("--table", required=True, help="Table name (e.g., db.table)")
@click.option("--output", default="snapshot_graph.png", help="Output image filename")
def graph_snapshots(table: str, output: str):
glue_catalog = GlueCatalog()
snapshots = glue_catalog.list_snapshots(table)
if not snapshots:
click.echo(f"No snapshots found for table {table}")
return
G = build_snapshot_graph(snapshots)
draw_graph(G, output)
click.echo(f"Snapshot graph saved to {output}")
if __name__ == "__main__":
graph_snapshots()
カタログアクセス方式について
PyIcebergは複数のカタログ実装をサポートしていますが、AWS環境では以下の2種類が主に利用されます。
- RestCatalog: GlueのIceberg REST APIを通じてIcebergメタデータにアクセス
- GlueCatalog: boto3のGlueクライアントを用いテーブル情報を取得
AWS公式ドキュメントや最近の動向を踏まえると、GlueのRESTエンドポイントを使うRestCatalogが主流と考えられます。
本ツールではPyIcebergからGlueのIceberg REST APIを通してカタログアクセスを実施しています。これにより標準的かつ軽量なアクセスを実現しています。
こちらの記事でアクセスパターンの比較をしているので参考にどうぞ!
PyIcebergの制約
PyIcebergはPythonでIcebergのメタデータを扱う強力なツールですが、現時点で以下の制約があります。
-
ロールバックなどのメタデータ操作は限定的
テーブルのスナップショット復元ができない。 -
RESTカタログ経由での一部機能制限
GlueのREST APIがまだ発展途上なため、すべてのIceberg機能にアクセスできないケースがある(特にロールバック周り)。 -
差分取得やスナップショット間操作は自前実装が必要
差分計算や複雑な履歴操作は現状、ユーザーがロジックを作り込む必要がある。
AWSでのIcebergテーブルロールバック操作
前述の通りPyIcebergでのロールバック操作はできません。
そして、Icebergと親和性の高いサービスとしてAthenaを思いつく方も多いと思います。
ですが、Athenaについても直接Icebergテーブルのスナップショットロールバックを行う手段はありません。
そのため、Icebergテーブルのロールバックを実現するにはGlueまたはEMRから操作を行うことになります。
今回紹介したCLIツールはGlueのRESTカタログを用いたスナップショット閲覧が中心ですが、将来的にロールバック操作を含めたIcebergメタデータ管理ツールとして拡張することも視野に入れています。
まとめ
今回は、スナップショットの履歴や詳細を確認できるツール「iceberg-navigator」を紹介しました。
Apache Icebergを学習したり、開発・運用するうえでスナップショットを操作する機会が多いと思います。
Icebergのスナップショット履歴は、データの変更履歴を把握したり、ロールバック可能な状態を保つうえで重要な情報です。
本記事で紹介したCLIツールを使えば、そうした履歴を手軽に取得し、開発やデバッグに役立てることができます。
本ツールは個人学習の一環であり、まだまだ発展途上ですが、AWSでのIceberg対応やPyIcebergの活用例として参考になれば幸いです。
興味を持っていただけた方は、ぜひ試してフィードバックをいただけると幸いです。
Discussion