❄️

The Definitive Guide: Apache Icebergへの移行とSnowflake

2024/11/17に公開

記事の概要

前提:Icebergの特徴

  • オープンな仕様
  • オープンソースライブラリ
  • 透明性の高いプロジェクトガバナンス
  • プロジェクトガバナンスの多様性
  • ベンダーロックインのなさ
  • 多様なエコシステム

Icebergへの移行

必要とされる対応

  1. 「既存のデータ構造の適合」
  2. 「データ取り込みパイプラインの修正」
  3. 「データ処理ワークフローの更新」

Icebergと互換性のある形式でデータストレージを再構築すること も必要です. 特にIaaS/SaaSなどを利用している場合、ストレージ層を自分たちで用意し管理していない場合も存在すると思いますが、Icebergを扱うのならば往々にしてそれらを用意し、管理することになると思います.

移行前の事前評価

既存のデータ構造の評価

データの互換性の確保
要件に合わせたデータの型の調整の他、カラム名の変更なども行う必要がある場合があります.
Icebergはリスト、構造体、マップを提供しており、対応範囲は広く様々な既存のデータ形式に柔軟に対応が可能です. 一方、その柔軟性を持ってしても全てに対応はできません.
JSONなど他のプラットフォームでは独自のフィールド型を持っている可能性があるものについては、データをマップに変換するか、文字列として保存するかを決める必要があります.

処理パイプラインの評価

処理プログラムの書き換え
データ処理パイプラインを更新して、Icebergテーブルへのデータの書き込みを行う形に作り変える必要があります. ETLの変更と、データが正しくパーティション分割されていることを確認することが必要です.

ワークフロー/タスクスケジューリングの評価

処理の実行タイミング,権限,依存関係の見直し
新しいストレージ形式でデータの依存関係、スケジュール、およびデータアクセスがどのように変化するかを確認します.

移行手法:インプレース移行とシャドウ移行

🔸インプレース移行

既存のデータファイルを使用して Apache Iceberg テーブルを構築し移行する手法
公式参考図Apache Iceberg公式HP:Migration Approachesより

🔸シャドウ移行

Apache Icebergに複製データセットを作成し、古いデータセットから移行する手法

メリデメ簡易表

インプレース移行 シャドウ移行
メリット シンプルで初期ストレージコストが安い 元のデータが保存されるため、テストと検証が可能
※既存のシステムへの影響は最小限
デメリット データを直接変更し、問題が発生した場合簡単にロールバックできない.
※Iceberg がサポートするファイル形式を使用するテーブルでのみ機能
複雑で移行中にストレージコストが増加する可能性がある

🔸インプレース移行の手順

  • 1回のトランザクションでテーブル全体を移行するのか、パーティションごとにインクリメンタルに移行するのか
  • 古いテーブルの代わりに新しいテーブルを使用するように、すべての読み取りと書き込みの処理をいつ変更すべきか

< 基本的な移行ステップ >

  1. 古いテーブルのパーティション内のファイル数とレコード数を確定
  2. パーティションの既存ファイルを既存のApache Icebergテーブルに移行
  3. 同じパーティション内のファイル数とレコード数をIcebergテーブル内で確認し、一致することを確認

小/中規模なデータセットの移行

小規模から中規模のデータセットは、1つのトランザクションで簡単に移行が可能であるため、一括でやってしまうのも手である.

大規模なデータセットの移行

大規模なデータセットは、Apache Icebergテーブルにパーティションを1つずつ追加しながら段階的に移行する方法を推奨される.

各ジョブの間にレコード番号とファイル番号のチェックを実行することで、新しいIcebergテーブルが既存のデータファイルを使用して古いテーブルを正確に複製していることを確認することが可能

🔸シャドウ移行の手順

  • 段階的に新しいシステムを導入する時間を確保するために、段階的なアプローチを取る必要がある
  1. 移行前システムへの書き込み、移行前システムからの読み込み
    Icebergテーブルを作成していきます.
  2. 移行前システムと移行後システムに書き込み、移行前システムから読み込む
    移行前システムと移行後システム両方にデータ(テーブル)を書き込んでいきます.
    移行後システムにはIceberg形式での書き込みです.
  3. 移行前システムと移行後システムへの書き込み、移行後システムからの読み取り
    両システム間でデータの一貫性が保たれていることを確認しつつ、読み取りを移行後システムのIcebergテーブルとします.
  4. 移行後システムへの書き込み、移行後システムからの読み取り
    徐々に移行後システムを廃止し、移行後システムのIcebergテーブルのみにデータを書き込むようにします.

移行手法

🔸各エンジン上でSQLを利用し Apache Icebergへ

  • 新しいIcebergテーブルを作成するパターン
    • CREATE TABLE…AS SELECT (CTAS) ステートメントを使って新しいテーブルにデータを移行
    • COPY INTOコマンド(Snowflake等)やINSERT INTO SELECTコマンドを使用して、JSON/CSVなどのテーブル以外のソースから既存のIcebergテーブルにデータを挿入
  • 既存のIcebergテーブルにデータを追加するパターン
    • COPY INTO ステートメントを使用して、テーブル以外のソース (ファイル) からデータを挿入
    • INSERT INTO SELECT ステートメントを使用して、別のテーブルからデータを挿入

Example : 新しいIcebergテーブルを作成するパターン

Apache Spark SQLを使用し、新しいIcebergテーブルを作成、ソースからのデータを入力する

CREATE TABLE catalog.database.tableX
USING iceberg PARTITIONED BY (month(ts_field))
AS
SELECT *, CAST(old_field AS <data_type>) AS updated_field FROM source_table;

上記の例では tableX という新規カタログに新しい Apache Iceberg テーブルを作成している.

  • SELECT クエリの結果を複製し、更新する古いフィールドのデータ型を変換
  • 月変換を使用してパーティション分割スキームを指定
  1. 段階的に移行すること

単一のパーティションまたはデータのサブセットに対してCTAS ステートメントを使用する. その後、INSERT INTO SELECT ステートメントを使用して、他のパーティションのデータを徐々に追加します.
データの完全性を確認するために、パーティションごとにレコード数チェックを実行することがポイントになります.

  1. 並列処理を行うこと

  2. 監視とログ記録を行うこと


発展:SnowflakeのテーブルをIcebergテーブルに変換する

SnowflakeのテーブルはデータをSnowflakeマネージドなストレージに保管しています. このテーブルをIcebergテーブルに変換し、テーブルのデータ本体はユーザが指定したストレージに保管する形とします.

CREATE OR REPLACE ICEBERG TABLE my_iceberg_table (
    boolean_col boolean,
    int_col int,
    long_col long,
    float_col float,
    double_col double,
    decimal_col decimal(10,5),
    string_col string,
    fixed_col fixed(10),
    binary_col binary,
    date_col date,
    time_col time,
    timestamp_ntz_col timestamp_ntz(6),
    timestamp_ltz_col timestamp_ltz(6)
  )
  CATALOG = 'SNOWFLAKE'
  EXTERNAL_VOLUME = 'my_ext_vol'
  BASE_LOCATION = 'my/relative/path/from/extvol';

Example : 既存のIcebergテーブルにデータを追加するパターン

COPYコマンド

Snowflakeの場合

COPY INTO customer_iceberg_ingest
  FROM @my_int_parquet_stage
  FILE_FORMAT = 'my_parquet_format'
  LOAD_MODE = ADD_FILES_COPY
  MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;

COPYコマンドをIebergテーブルへのデータ追加に利用することが可能なサービスにおいては下記を注意して行う.

  • CSV、JSON、または Parquet ファイル内のデータがターゲット Iceberg テーブルのスキーマに準拠していること
  • Iceberg テーブルを日付でパーティション分割して段階的に取り込む場合は、ファイルを日付固有のフォルダーに編成して、各増分ステップで取り込むデータを簡単に指定できるようにすること
  • 適宜データの特性に応じて、日付と時刻の形式、区切り文字、NULL 値の処理など、FILE_FORMAT 句内で追加のオプションを指定すること
INSERTコマンド

INSERT INTO SELECT で任意のテーブルから Apache Iceberg テーブルにデータを挿入します.
何も特殊なことは行いません.

INSERT INTO tableB
SELECT * FROM tableA;

参考:SnowflakeのIcebergテーブルのデータ量を確認する

Snowflake Information Schema スキーマまたは Account Usage スキーマの TABLE_STORAGE_METRICS ビューと TABLES ビューをクエリすることで、Icebergテーブルがどれだけのストレージを使用しているかを追跡することが可能である.

SELECT metrics.* FROM
  snowflake.account_usage.table_storage_metrics metrics
  INNER JOIN snowflake.account_usage.tables tables
  ON (
    metrics.id = tables.table_id
    AND metrics.table_schema_id = tables.table_schema_id
    AND metrics.table_catalog_id = tables.table_catalog_id
  )
  WHERE tables.is_iceberg='YES';

🔸純粋なParquetデータセットから Apache Icebergへ

Apache Icebergはadd_filesプロシージャを提供しており、新しいテーブルを作成することなく既存のIcebergテーブルにファイルをインポートすることができます.
このプロシージャを使用すると、以前のスナップショットをすべて失効、履歴を保存せずに Delta Lake および Apache Hudi テーブルからデータを移行することができます.

Example

  • s3://parquet-files/tablesにあるテーブル内のすべてのファイルをmy_tableに追加
CALL catalog.system.add_files(
 table => database.iceberg_table',
 source_table => 's3://parquet-files/tables',
 partition_filter => map('partition_colume', 'partition_value'),
 check_duplicate_files => true
)

新しいファイルのメタデータを作成し、Icebergテーブルのファイルセットの一部として扱う

Delta LakeまたはApache HudiテーブルからApache Icebergテーブルに履歴を保存せずにデータを移行するには以下の2ステップを行うことになります.

  1. 現在のスナップショットのファイルのみを保持するために、デルタレイクまたはHudiテーブルの以前のスナップショットをすべて失効させる

  2. add_filesプロシージャを使用して、前の例に示したように、現在のスナップショットから対象のIcebergテーブルにファイルをインポート


発展:Snowflakeで管理されていないIcebergテーブルをIcebergカタログとしてSnowflakeを使用するテーブルに変換する
//オブジェクトストレージ内のIcebergファイルからIcebergテーブルを作成
CREATE ICEBERG TABLE myIcebergTable
  EXTERNAL_VOLUME='icebergMetadataVolume'
  CATALOG='icebergCatalogInt'
  METADATA_FILE_PATH='path/to/metadata/v1.metadata.json';

//テーブルメタデータを最新のメタデータファイルと同期
ALTER ICEBERG TABLE myIcebergTable REFRESH 'metadata/v2.metadata.json';

//テーブルをIcebergカタログとしてSnowflakeを使用するように変換
ALTER ICEBERG TABLE myIcebergTable CONVERT TO MANAGED
  BASE_LOCATION = myBaseLocation;

🔸Delta Lake から Apache Icebergへ

データの履歴を保持し、移行を実現することが可能です.

することで既存の Delta Lake テーブルを、元のテーブルのデータファイルを使用して新しい Iceberg テーブルにスナップショットすることができる.

Example

import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.catalog.Catalog;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.delta.DeltaLakeToIcebergMigrationActionsProvider;

// 元のDelta Tableの場所
String sourceDeltaLakeTableLocation = "s3://my-bucket/delta-table";

//新しいApache Icebergテーブルの場所
String destTableLocation = "s3://my-bucket/iceberg-table";

// 新しいテーブルの名前
TableIdentifier destTableIdentifier = TableIdentifier.of("my_database", "my_table");

// iceberg catalogにテーブルを追加
Catalog icebergCatalog = ...;

// 更新
Configuration hadoopConf = ...;
DeltaLakeToIcebergMigrationActionsProvider.defaultActions()
 .snapshotDeltaLakeTable(sourceDeltaLakeTableLocation)
 .as(destTableIdentifier)
 .icebergCatalog(icebergCatalog)
 .tableLocation(destTableLocation)
 .deltaLakeConfiguration(hadoopConf)
 .tableProperty

🔸Apache Hudi から Apache Icebergへ

調査中(本書記載の手法は技術上の問題により現在未だ実装されていません)

まとめ

現在Apache Icebergは様々なサービスで採用が進み、オープンフォーマットをリードしています. データを扱う上で、その周辺知識を持つことは非常に大きなメリットになると思います.

名前の揺らぎについて

「データベース:Database」と「カタログ:Catalog」と「スキーマ:Schema

サービス間で名前の揺れが見られることがわかります. これはそのサービスの要件や考え方により異なるためですが、昨今それらがお互いの領域をカバーすることにより、ほとんど同じ意味を持つが名前が異なる場合が出てきています.

階層:
  • Catalogを最上位にする場合
    catalogdatabasetable
    もしくは
    catalogschematable
    Databricksなどは database = schemaであり、create schemacreate databaseは同じ意味を持ちます

  • Databaseを最上位にする場合
    databaseschematable

参考

Apache Iceberg公式HP
書籍:Apache Iceberg: The Definitive Guide

Snowflake Data Heroes

Discussion