🙆‍♀️

Icebergについて調べてみた

2024/09/04に公開

概要

対象者: データエンジニアやデータサイエンティストなど、ビッグデータを効率的に扱いたい技術者。

内容: Apache Icebergの基本的な特徴と機能(ACIDトランザクション、スキーマ進化、テーブルスナップショット)について解説。

記事を読むとわかること: Icebergの基本的な機能とその利点、具体的な使用例やコードサンプルを通じて、どのようにデータの整合性やスキーマ変更、過去のデータ参照が可能かが理解できる。

序章

Icebergについて調べる機会があり、ざっくりとまとめてみました。
詳しくはこちらドキュメントを読んでください。

Apache Icebergのドキュメント

https://iceberg.apache.org/

Icebergとは

Iceberg(アイスバーグ)は、大量のデータ(ビックデータ)を効率よく扱う(高速で一貫性のあるクエリ)ためのオープンソースのテーブル形式です。
以下の特徴があります。

  1. ACID (atomicity, consistency, isolation, durability) transactions
  2. Schema evolution
  3. Hidden partitioning
  4. Table snapshots

ACID transactions

Icebergは、ACID(Atomicity, Consistency, Isolation, Durability)トランザクションをサポートしています。

例えば、2人が足し算と引き算を同時に行ってみます。
2つのスレッドがfare_amountに対して足し算と引き算を行うコードを作成し実行してみます。

import threading

# 事前に、nyc.taxisというIcebergを作成している

def update(user_id, fare_amount_delta):
    query = f"""
        UPDATE nyc.taxis
        SET fare_amount = fare_amount + {fare_amount_delta}
        WHERE VendorID = 1
    """
    spark.sql(query)
    print(f"User {user_id} updated the fare amount by {fare_amount_delta}")

threads = []

# ユーザー1は料金を5ドル増やす
thread1 = threading.Thread(target=update, args=(1, 5.0))
threads.append(thread1)

# ユーザー2は料金を1ドル減らす
thread2 = threading.Thread(target=update, args=(2, -1.0))
threads.append(thread2)

# 全てのスレッドを開始
for thread in threads:
    thread.start()

# 全てのスレッドが完了するまで待機
for thread in threads:
    thread.join()
User 2 updated the fare amount by -1.0

An error occurred while calling o36.sql.:
org.apache.iceberg.exceptions.ValidationException:
Found conflicting files that can contain records matching ref(name="VendorID") == 1:

結果は、Icebergの仕組みがconflictを検出して処理をabortしました。
このように、Icebergは複数のクライアントが同時に書き込みを行った場合でもデータの整合性を担保できます。

また、Icerbergとしては数種類の分離レベルをサポートしていますが、
snowflakeのIcebergとしては、Read Committedしか対応していません。
Read Committedは、non repeatable readとphantom readが発生する可能性がありデータの一貫性は落ちますが、処理速度が速い特徴があります
https://docs.snowflake.com/en/user-guide/tables-iceberg-transactions#queries

Icebergの同時実行制御について詳しくはこちらまで
https://medium.com/snowflake/how-apache-iceberg-enables-acid-compliance-for-data-lakes-9069ae783b60

Schema evolution

Icebergでは、テーブルを作り直すことなく様々なスキーマ変更を適用し、新旧データを一貫して処理出来ます。

DESC nyc.taxis

結果

col_name	data_type	comment
trip_distance	float	None
-- trip_distanceの名前をdistanceに変更
ALTER TABLE nyc.taxis RENAME COLUMN trip_distance TO distance
-- distanceにコメントを付与
ALTER TABLE nyc.taxis ALTER COLUMN distance COMMENT 'This is taximeter.'
-- distanceの型をdoubleに変更
ALTER TABLE nyc.taxis ALTER COLUMN distance TYPE double;

結果

col_name	data_type	comment
trip_distance	double	This is taximeter.

このように、テーブルを作り直すことなく様々なスキーマ変更が適用できます。

Table snapshots

Icebergは、過去のある時点の断面を遡って参照できる機能もあります。

from datetime import datetime
datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-- 変更前
SELECT passenger_count FROM nyc.taxis limit 3

結果

'2024-09-02 05:58:18'
passenger_count
1.0
1.0
1.0
-- passenger_countを全部5にしてみる
UPDATE nyc.taxis SET passenger_count = 5
SELECT passenger_count FROM nyc.taxis limit 3

結果

passenger_count
5.0
5.0
5.0
SELECT passenger_count FROM nyc.taxis TIMESTAMP AS OF '2024-09-02 05:58:18' limit 3

結果

passenger_count
1.0
1.0
1.0
ちゅらデータ株式会社

Discussion