Icebergについて調べてみた
概要
対象者: データエンジニアやデータサイエンティストなど、ビッグデータを効率的に扱いたい技術者。
内容: Apache Icebergの基本的な特徴と機能(ACIDトランザクション、スキーマ進化、テーブルスナップショット)について解説。
記事を読むとわかること: Icebergの基本的な機能とその利点、具体的な使用例やコードサンプルを通じて、どのようにデータの整合性やスキーマ変更、過去のデータ参照が可能かが理解できる。
序章
Icebergについて調べる機会があり、ざっくりとまとめてみました。
詳しくはこちらドキュメントを読んでください。
Apache Icebergのドキュメント
Icebergとは
Iceberg(アイスバーグ)は、大量のデータ(ビックデータ)を効率よく扱う(高速で一貫性のあるクエリ)ためのオープンソースのテーブル形式です。
以下の特徴があります。
- ACID (atomicity, consistency, isolation, durability) transactions
- Schema evolution
- Hidden partitioning
- Table snapshots
ACID transactions
Icebergは、ACID(Atomicity, Consistency, Isolation, Durability)トランザクションをサポートしています。
例えば、2人が足し算と引き算を同時に行ってみます。
2つのスレッドがfare_amountに対して足し算と引き算を行うコードを作成し実行してみます。
import threading
# 事前に、nyc.taxisというIcebergを作成している
def update(user_id, fare_amount_delta):
query = f"""
UPDATE nyc.taxis
SET fare_amount = fare_amount + {fare_amount_delta}
WHERE VendorID = 1
"""
spark.sql(query)
print(f"User {user_id} updated the fare amount by {fare_amount_delta}")
threads = []
# ユーザー1は料金を5ドル増やす
thread1 = threading.Thread(target=update, args=(1, 5.0))
threads.append(thread1)
# ユーザー2は料金を1ドル減らす
thread2 = threading.Thread(target=update, args=(2, -1.0))
threads.append(thread2)
# 全てのスレッドを開始
for thread in threads:
thread.start()
# 全てのスレッドが完了するまで待機
for thread in threads:
thread.join()
User 2 updated the fare amount by -1.0
An error occurred while calling o36.sql.:
org.apache.iceberg.exceptions.ValidationException:
Found conflicting files that can contain records matching ref(name="VendorID") == 1:
結果は、Icebergの仕組みがconflictを検出して処理をabortしました。
このように、Icebergは複数のクライアントが同時に書き込みを行った場合でもデータの整合性を担保できます。
また、Icerbergとしては数種類の分離レベルをサポートしていますが、
snowflakeのIcebergとしては、Read Committedしか対応していません。
Read Committedは、non repeatable readとphantom readが発生する可能性がありデータの一貫性は落ちますが、処理速度が速い特徴があります
Icebergの同時実行制御について詳しくはこちらまで
Schema evolution
Icebergでは、テーブルを作り直すことなく様々なスキーマ変更を適用し、新旧データを一貫して処理出来ます。
DESC nyc.taxis
結果
col_name data_type comment
trip_distance float None
-- trip_distanceの名前をdistanceに変更
ALTER TABLE nyc.taxis RENAME COLUMN trip_distance TO distance
-- distanceにコメントを付与
ALTER TABLE nyc.taxis ALTER COLUMN distance COMMENT 'This is taximeter.'
-- distanceの型をdoubleに変更
ALTER TABLE nyc.taxis ALTER COLUMN distance TYPE double;
結果
col_name data_type comment
trip_distance double This is taximeter.
このように、テーブルを作り直すことなく様々なスキーマ変更が適用できます。
Table snapshots
Icebergは、過去のある時点の断面を遡って参照できる機能もあります。
from datetime import datetime
datetime.now().strftime('%Y-%m-%d %H:%M:%S')
-- 変更前
SELECT passenger_count FROM nyc.taxis limit 3
結果
'2024-09-02 05:58:18'
passenger_count
1.0
1.0
1.0
-- passenger_countを全部5にしてみる
UPDATE nyc.taxis SET passenger_count = 5
SELECT passenger_count FROM nyc.taxis limit 3
結果
passenger_count
5.0
5.0
5.0
SELECT passenger_count FROM nyc.taxis TIMESTAMP AS OF '2024-09-02 05:58:18' limit 3
結果
passenger_count
1.0
1.0
1.0
Discussion