🐦

AWSのIcebergテーブルのスナップショット履歴をCLIで可視化するツールを作ってみた

に公開

はじめに

AWS Community Builderのぺんぎん(@jitepengin)です。

Apache Iceberg は、近年注目されているオープンテーブルフォーマットの1つで、スキーマの進化やACIDトランザクション、タイムトラベルといった機能を備えています。

スナップショットベースでテーブル管理を行う際に、開発やテスト、そして実運用の中でスナップショットの履歴を追跡・操作したくなるケースも多々あると思います。

今回は学習を兼ねてスナップショットの履歴や詳細を確認できるツール「iceberg-navigator」を自作したので、このツールを紹介したいと思います。

解決したかった課題

  • S3に存在するIcebergテーブルのスナップショット履歴をコマンドラインで手軽に確認したい。
  • スナップショットの親子関係を辿ってバージョンの系譜を把握したい。

使用した技術とライブラリ

  • PyIceberg:Icebergテーブル操作のためのPythonライブラリ
  • PyArrow:Icebergスキーマ操作の依存ライブラリ(間接利用)
  • Click:CLIインターフェースの構築
  • NetworkX / Matplotlib:スナップショットの親子関係をDAGとして可視化するための描画ツール

実装のポイント

今回のツールのポイントは以下となります。

  • PyIcebergを使ってIcebergカタログ、テーブルとスナップショットメタデータにアクセス。
  • AWS Glue Iceberg RESTエンドポイントをPyIceberg経由で実行。
  • ClickでシンプルなCLIインターフェースを構築。
  • NetworkXとMatplotlibを活用して簡単なDAGを作成。

ソース

以下のリポジトリに配置しています。
https://github.com/dataPenginPenguin/iceberg_navigator

CLIツールの使い方

AWS CLIの設定

事前にAWS CLIの設定を実施してください。

ライブラリのインストール

pip install -r requirements.txt

スナップショット一覧の取得

$ python -m iceberg_navigator list --table <dbname>.<tablename>

| Snapshot ID         | Timestamp            | Operation        | Parent Snapshot ID   |   Total Size (MB) |   Record Count |
|---------------------|----------------------|------------------|----------------------|-------------------|----------------|
| 1533347322559466931 | 2025-05-22T02:10:24Z | Operation.APPEND | null                 |             13.48 |        729,732 |
| 1485371543345582290 | 2025-05-22T02:10:54Z | Operation.DELETE | 1533347322559466931  |              0.00 |              0 |
| 67848960317145716   | 2025-05-22T02:15:45Z | Operation.APPEND | 1485371543345582290  |             13.48 |        729,732 |
| 3920289554540444894 | 2025-05-22T02:38:46Z | Operation.DELETE | 67848960317145716    |              0.00 |              0 |
| 6369576239134108166 | 2025-05-22T02:41:51Z | Operation.APPEND | 3920289554540444894  |             13.48 |        729,732 |
| 6216935665394419954 | 2025-05-22T02:41:54Z | Operation.APPEND | 6369576239134108166  |             26.96 |      1,459,464 |
| 9058990433822511495 | 2025-05-22T02:42:28Z | Operation.APPEND | 6216935665394419954  |             40.44 |      2,189,196 |
| 5224576979788468429 | 2025-05-22T02:46:53Z | Operation.DELETE | 9058990433822511495  |              0.00 |              0 |
| 8997131439115911397 | 2025-05-22T02:47:21Z | Operation.APPEND | 5224576979788468429  |             13.48 |        729,732 |
| 4246095293733855575 | 2025-08-02T22:51:16Z | Operation.DELETE | 8997131439115911397  |              0.00 |              0 |
| 8106328257365313720 | 2025-08-04T07:50:14Z | Operation.APPEND | 6369576239134108166  |             13.48 |        729,733 |
...

スナップショット詳細の表示

$ python -m iceberg_navigator show <Snapshot ID> --table <dbname>.<tablename>

Table: yellow_tripdata

Snapshot ID: 8106328257365313720
Timestamp: 2025-08-04T07:50:14Z
Operation: Operation.APPEND
Parent Snapshot ID: 6369576239134108166
Manifest List: s3://your-bucket/warehouse/yellow_tripdata/metadata/snap-8106328257365313720-1-a4fb8059-7bf8-4254-b640-bf1fcbf100dd.avro

Schema:
  1: vendorid: optional int
  2: tpep_pickup_datetime: optional timestamp
  3: tpep_dropoff_datetime: optional timestamp
  4: passenger_count: optional long
  5: trip_distance: optional double
  6: ratecodeid: optional long
  7: store_and_fwd_flag: optional string
  8: pulocationid: optional int
  9: dolocationid: optional int
  10: payment_type: optional long
  11: fare_amount: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double

Summary:
  added-data-files: 1
  total-equality-deletes: 0
  added-records: 1
  total-position-deletes: 0
  added-files-size: 3046
  total-delete-files: 0
  total-files-size: 14138545
  total-data-files: 2
  total-records: 729733

親スナップショットの比較

$ python -m iceberg_navigator compare <snapshot_id> --table <database>.<table>

----------------------------------------
Parent Snapshot
----------------------------------------
ID:         6369576239134108166
File Size:  13.48 MB
Records:    729,732

----------------------------------------
Current Snapshot
----------------------------------------
ID:         6216935665394419954
File Size:  26.96 MB
Records:    1,459,464

========================================
Summary
========================================
Added Records:   729,732
Deleted Records: 0

スナップショット系譜のグラフ表示

$ python -m iceberg_navigator graph --table <dbname>.<tablename>

DiGraph with 11 nodes and 10 edges
Snapshot graph saved to snapshot_graph.png

NetworkX + Matplotlib で親子関係をDAGとして可視化

ディレクトリ構成(抜粋)

iceberg_navigator/
├── cli.py
├── __main__.py
├── aws/
│   ├── auth.py
│   └── glue.py
├── commands/
│   ├── compare.py
│   ├── list.py
│   ├── show.py
│   └── graph.py
├── utils/
│   └── display.py

実装の概要

エントリーポイント

__main__.py にて Click コマンドを定義:

__main__.py
import click
from iceberg_navigator.commands.list import list_snapshots
from iceberg_navigator.commands.show import show_snapshot
from iceberg_navigator.commands.graph import graph_snapshots
from iceberg_navigator.commands.compare import compare_snapshots

@click.group()
def cli():
    """Iceberg Navigator CLI"""
    pass

cli.add_command(list_snapshots)
cli.add_command(show_snapshot)
cli.add_command(graph_snapshots)
cli.add_command(compare_snapshots)

if __name__ == "__main__":
    cli()

AWS Glue Iceberg Catalogへの接続

Glue REST Catalog APIで接続します。

glue.py
from urllib.parse import urlparse
from pyiceberg.catalog import load_catalog

class GlueCatalog:
    def __init__(self, profile_name=None, region_name=None, catalog_id="AwsDataCatalog"):
        import boto3
        if not region_name:
            session = boto3.Session(profile_name=profile_name)
            region_name = session.region_name
            if not region_name:
                raise ValueError("region_name Error")
        self.region_name = region_name
        self.catalog_id = catalog_id

        session = boto3.Session(profile_name=profile_name, region_name=region_name)
        self.glue_client = session.client("glue", region_name=region_name)

    def _get_catalog(self):
        conf = {
            "type": "rest",
            "uri": f"https://glue.{self.region_name}.amazonaws.com/iceberg",
            "s3.region": self.region_name,
            "rest.sigv4-enabled": "true",
            "rest.signing-name": "glue",
            "rest.signing-region": self.region_name,
        }
        return load_catalog(**conf)

    def get_table_location(self, table_identifier: str) -> str:
        database, table = table_identifier.split(".", 1)
        resp = self.glue_client.get_table(DatabaseName=database, Name=table)
        return resp["Table"]["Parameters"]["metadata_location"]

    def list_snapshots(self, table_identifier: str):
        catalog = self._get_catalog()
        namespace, table_name = table_identifier.split(".", 1)
        table = catalog.load_table(f"{namespace}.{table_name}")

        snapshots = []
        for snap in table.snapshots():
            total_bytes = int(snap.summary.get("total-files-size", 0)) if snap.summary else 0
            total_records = int(snap.summary.get("total-records", 0)) if snap.summary else 0

            snapshots.append({
                "snapshot_id": str(snap.snapshot_id),
                "timestamp": snap.timestamp_ms,
                "operation": snap.summary.get("operation") if snap.summary else None,
                "parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None,
                "total_size_mb": round((total_bytes) / (1024 * 1024), 2),
                "record_count": total_records
            })

        return snapshots

    def show_snapshot(self, table_identifier: str, snapshot_id: str):
        catalog = self._get_catalog()
        namespace, table_name = table_identifier.split(".", 1)
        table = catalog.load_table(f"{namespace}.{table_name}")

        snap = table.snapshot_by_id(int(snapshot_id))
        if not snap:
            return {"error": f"snapshot_id {snapshot_id} not found"}

        schema_columns = []
        for idx, col in enumerate(table.schema().columns, start=1):
            requiredness = "optional" if col.optional else "required"
            schema_columns.append(f"{idx}: {col.name}: {requiredness} {col.field_type}")

        summary_dict = {}
        if snap.summary:
            summary_dict["operation"] = snap.summary.operation
            if hasattr(snap.summary, "additional_properties"):
                summary_dict.update(snap.summary.additional_properties)


        return {
            "table": table_name,
            "snapshot_id": str(snap.snapshot_id),
            "timestamp": snap.timestamp_ms,
            "operation": summary_dict.get("operation"),
            "parent_id": str(snap.parent_snapshot_id) if snap.parent_snapshot_id else None,
            "manifest_list": snap.manifest_list,
            "schema": schema_columns,
            "summary": summary_dict,
        }

    def compare_snapshots(self, table_identifier: str, snapshot_id: str):
        catalog = self._get_catalog()
        namespace, table_name = table_identifier.split(".", 1)
        table = catalog.load_table(f"{namespace}.{table_name}")

        current_snap = table.snapshot_by_id(int(snapshot_id))
        if not current_snap:
            return {"error": f"snapshot_id {snapshot_id} not found"}

        parent_snap = table.snapshot_by_id(int(current_snap.parent_snapshot_id))
        if not parent_snap:
            return {"error": f"parent_snapshot not found"}

        current_summary_dict = {}
        if current_snap.summary:
            current_summary_dict["operation"] = current_snap.summary.operation
            if hasattr(current_snap.summary, "additional_properties"):
                current_summary_dict.update(current_snap.summary.additional_properties)

        parent_summary_dict = {}
        if parent_snap.summary:
            parent_summary_dict["operation"] = parent_snap.summary.operation
            if hasattr(parent_snap.summary, "additional_properties"):
                parent_summary_dict.update(parent_snap.summary.additional_properties)


        current_size = int(current_snap.summary.get("total-files-size", 0))
        current_records = int(current_snap.summary.get("total-records", 0))

        parent_size = int(parent_snap.summary.get("total-files-size", 0))
        parent_records = int(parent_snap.summary.get("total-records", 0))

        added = current_records - parent_records if current_records > parent_records else 0
        deleted = parent_records - current_records if parent_records > current_records else 0

        return {
                "current_snapshot_id": str(current_snap.snapshot_id),
                "current_size": current_size,
                "current_records": current_records,
                "parent_snapshot_id": str(parent_snap.snapshot_id),
                "parent_size": parent_size,
                "parent_records": parent_records,
                "added": added,
                "deleted": deleted,
            }

スナップショット一覧取得

list.py
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import format_snapshots_table

@click.command("list")
@click.option("--table", required=True, help="Table identifier, e.g. db.table")
def list_snapshots(table):

    glue = GlueCatalog()
    snapshots = glue.list_snapshots(table)
    if not snapshots:
        click.echo("No snapshots found.")
        return

    table_str = format_snapshots_table(snapshots)
    click.echo(table_str)

スナップショット詳細表示

show.py
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import show_snapshot_details

@click.command(name="show")
@click.argument("snapshot_id")
@click.option('--table', required=True, help="Table identifier, e.g. db.table")
def show_snapshot(table, snapshot_id):
    glue_catalog = GlueCatalog()
    snapshot = glue_catalog.show_snapshot(table, snapshot_id)
    if snapshot is None or "error" in snapshot:
        click.echo(f"Snapshot {snapshot_id} not found in table {table}.")
        return

    show_snapshot_details(snapshot)

親スナップショットとの比較

compare.py
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import compare_snapshot

@click.command(name="compare")
@click.argument("snapshot_id")
@click.option('--table', required=True, help="Table identifier, e.g. db.table")
def compare_snapshots(table, snapshot_id):
    glue_catalog = GlueCatalog()
    comparison_result = glue_catalog.compare_snapshots(table, snapshot_id)

    if comparison_result is None or "error" in comparison_result:
        click.echo(f"Snapshot {snapshot_id} not found in table {table}.")
        return

    compare_snapshot(comparison_result)

スナップショットグラフ描画

graph.py
import click
from iceberg_navigator.aws.glue import GlueCatalog
from iceberg_navigator.utils.display import build_snapshot_graph, draw_graph

@click.command("graph")
@click.option("--table", required=True, help="Table name (e.g., db.table)")
@click.option("--output", default="snapshot_graph.png", help="Output image filename")
def graph_snapshots(table: str, output: str):
    glue_catalog = GlueCatalog()
    snapshots = glue_catalog.list_snapshots(table)
    if not snapshots:
        click.echo(f"No snapshots found for table {table}")
        return

    G = build_snapshot_graph(snapshots)
    draw_graph(G, output)
    click.echo(f"Snapshot graph saved to {output}")

if __name__ == "__main__":
    graph_snapshots()

カタログアクセス方式について

PyIcebergは複数のカタログ実装をサポートしていますが、AWS環境では以下の2種類が主に利用されます。

  • RestCatalog: GlueのIceberg REST APIを通じてIcebergメタデータにアクセス
  • GlueCatalog: boto3のGlueクライアントを用いテーブル情報を取得

AWS公式ドキュメントや最近の動向を踏まえると、GlueのRESTエンドポイントを使うRestCatalogが主流と考えられます。
本ツールではPyIcebergからGlueのIceberg REST APIを通してカタログアクセスを実施しています。これにより標準的かつ軽量なアクセスを実現しています。

こちらの記事でアクセスパターンの比較をしているので参考にどうぞ!

https://zenn.dev/penginpenguin/articles/e44880aaa2d5e3

PyIcebergの制約

PyIcebergはPythonでIcebergのメタデータを扱う強力なツールですが、現時点で以下の制約があります。

  • ロールバックなどのメタデータ操作は限定的
    テーブルのスナップショット復元ができない。

  • RESTカタログ経由での一部機能制限
    GlueのREST APIがまだ発展途上なため、すべてのIceberg機能にアクセスできないケースがある(特にロールバック周り)。

  • 差分取得やスナップショット間操作は自前実装が必要
    差分計算や複雑な履歴操作は現状、ユーザーがロジックを作り込む必要がある。

AWSでのIcebergテーブルロールバック操作

前述の通りPyIcebergでのロールバック操作はできません。
そして、Icebergと親和性の高いサービスとしてAthenaを思いつく方も多いと思います。
ですが、Athenaについても直接Icebergテーブルのスナップショットロールバックを行う手段はありません。
そのため、Icebergテーブルのロールバックを実現するにはGlueまたはEMRから操作を行うことになります。

今回紹介したCLIツールはGlueのRESTカタログを用いたスナップショット閲覧が中心ですが、将来的にロールバック操作を含めたIcebergメタデータ管理ツールとして拡張することも視野に入れています。

まとめ

今回は、スナップショットの履歴や詳細を確認できるツール「iceberg-navigator」を紹介しました。
Apache Icebergを学習したり、開発・運用するうえでスナップショットを操作する機会が多いと思います。

Icebergのスナップショット履歴は、データの変更履歴を把握したり、ロールバック可能な状態を保つうえで重要な情報です。
本記事で紹介したCLIツールを使えば、そうした履歴を手軽に取得し、開発やデバッグに役立てることができます。

本ツールは個人学習の一環であり、まだまだ発展途上ですが、AWSでのIceberg対応やPyIcebergの活用例として参考になれば幸いです。

興味を持っていただけた方は、ぜひ試してフィードバックをいただけると幸いです。

Discussion