🧊

Apache Iceberg BigQuey/BigLake テーブルを触ってみた

2024/12/14に公開

この記事は BigQuery Advent Calendar 2024 の 14 日目の記事。
今回は忙しくてキャッチアップできていない Iceberg X BigQuery について色々手を動かしてみる。

Apach Iceberg について

Apache Iceberg は、データレイクハウスの管理を目的としたオープンソースである。
類似の技術として Apache Hudi や Delta lake などがあり、これらはまとめてオープンテーブルフォーマット(OTF)と呼ばれる。
Iceberg には以下ような特徴がある。

  • スキーマの変更(カラムの追加、削除、リネームなど)を柔軟にサポート
  • 任意の時点のデータを簡単にクエリすることができ、データの変更履歴を追跡可能
  • ACIDトランザクションを提供し、一貫性のあるデータ管理を可能
  • Spark、Flink、Trinoなど、さまざまなビッグデータ処理エンジンとの互換性を有する

BigQuery と Iceberg について

2024 年に BigQuery では Iceberg 形式の外部テーブルを通じてクエリする機能が発表された。
https://cloud.google.com/blog/ja/products/data-analytics/announcing-bigquery-tables-for-apache-iceberg

この記事では発表になった Apache Iceberg BigQuery テーブルの概要について紹介している。

  • 自律的な最適化
    • 小さなファイルを適切なサイズに結合し、データの再クラスタリングやファイルのガベージコレクションを自動で行うため手動での OPTIMIZE や VACUUM 操作は不要
  • 高スループットのストリーミング挿入
    • BigQuery Storage Write API と連携する Vortex により、取り込んだデータを行指向形式で保存し、定期的に Parquet 形式へ変換
    • Pub/Sub や Datastream を使用して専用インフラなしでデータを取り込むことが可能
  • スケーラブルなメタデータ管理
    • メタデータはBigQueryのスケーラブルなシステムで管理され、オブジェクトストレージに依存せず高い変更レートを実現
    • トランザクションログの直接変更を防ぐことで信頼性の高い監査履歴を提供
  • セキュリティとガバナンスの統合
    • Storage APIによる細かいセキュリティポリシーに加え、Dataplexを介したデータガバナンス、データ品質管理、データリネージをサポート

記事執筆時は pre-GA の状態らしく、本番環境などへの適応は避けたほうが良さそう。

Apache iceberg BigLake/BigQuery テーブルの違い

まず Apache iceberg 用の BigLake 外部テーブルと Apache Iceberg 用の BigQuery テーブルという 2 つのソリューションがある。
Apache Iceberg 用の BigLake 外部テーブルは、読み取り専用テーブルであるが、BigQuery テーブルは変更も可能という違いがある。
前者がソース層で後者が Transform されたテーブルとして扱うとよいだろう。

Apache Iceberg 用の BigQuery テーブルを作成

まずはテストとなる paquet ファイルを作る。

>>> import pandas as pd
>>> import pyarrow as pa
>>> import pyarrow.parquet as pq
>>> df = pd.read_csv('data.csv')
>>> df
   id name
0   1  bob
1   2  tom
2   3  sam
>>> table = pa.Table.from_pandas(df)
>>> pq.write_table(table, 'data.parquet')

続いて Iceberg テーブルを作成する。
iceberg_dataset はテスト用に作ったデータセットである。
GCS のバケットも必要なので作っておく。(バケット名は仮名)
Iceberg テーブルにはメタ情報を保管する metadata/ と実際のデータを格納するための data/ というディレクトリが必要である。
そのためバケット内の構成として以下のようにするのがおすすめ。

gs://hoge_iceberg_test/
    |_ テーブル名/
    |    |_ metadata/
    |    |_ data/
    |_ テーブル名/

また GCS のバケットとの外部接続を作成する必要があったため、別途作っておく。

CREATE TABLE iceberg_dataset.iceberg_bigquery_table (
id int64,
name string
)
WITH CONNECTION `asia-northeast1.biglake_storage_connector`
OPTIONS (
file_format = 'PARQUET',
table_format = 'ICEBERG',
storage_uri = 'gs://hoge_iceberg_test/iceberg_bigquery_table/');

するとバケットに gs://hoge_iceberg_test/iceberg_bigquery_table/metadata/v0.metadata.json というファイルができるはず。
続いて先ほど作ったデータをロードする。

bq load \
    --source_format=PARQUET \
    iceberg_dataset.iceberg_bigquery_table \
    ~/Downloads/data.parquet

ガイドにあった --copy_files_only オプションは bq コマンドをアップデートしても動かずだったので削除した。
実行後にストレージに /data/xxxx.paquet というファイルができており、これがデータの実体だとわかる。
試しにクエリしてみると先ほど作ったデータが表示された。

別テーブルからの作成

また別のテーブルから Iceberg テーブルを作ることも AS を使うと可能。
これにより lake 層においた BigLake テーブルをクエリして作る Warehouse 層のテーブルで、データを Iceberg 形式で管理することができる。

CREATE OR REPLACE TABLE `iceberg_dataset.warehouse_users` (
    id int64,
    name string,
    age int
)
WITH CONNECTION `asia-northeast1.biglake_storage_connector`
OPTIONS (
file_format = 'PARQUET',
table_format = 'ICEBERG',
storage_uri = 'gs://hoge_iceberg_test/warehouse_users/')
AS SELECT id,
    name,
    age
FROM `iceberg_dataset.lake_users`
;

値の更新

INSERT/UPDATE/DELETE が使えるか試してみた。
いずれもデータの変更が確認できた。

INSERT INTO `iceberg_dataset.iceberg_bigquery_table` VALUES (5, 'hoge');

UPDATE `iceberg_dataset.iceberg_bigquery_table` SET name = 'fuga' WHERE id = 5;

DELETE FROM `iceberg_dataset.iceberg_bigquery_table` WHERE id = 5;

SQL の数だけ /data/ 配下には paquet ファイルが増えていることも確認できた。
これは Iceberg の特徴だが、BigQuery テーブルの場合ファイルが増えすぎたときの最適化などは自動でやってくれるらしいが、今回その様子までは確認できず…

> gsutil ls gs://hoge_iceberg_test/iceberg_bigquery_table/data/

gs://hoge_iceberg_test/iceberg_bigquery_table/data/436cceda-d0cf-422f-8f9b-d5dd8c204e39-3c924776629eb1d3-f-00000-of-00001.parquet
gs://hoge_iceberg_test/iceberg_bigquery_table/data/79527fba-702d-4228-9136-750e2e3ca141-fd940097b5835acc-f-00000-of-00001.parquet
gs://hoge_iceberg_test/iceberg_bigquery_table/data/f64d0aa7-a6de-49e2-9317-f99d36208c0c-cdc1fb17d3aec6d3-f-00000-of-00001.parquet
gs://hoge_iceberg_test/iceberg_bigquery_table/data/f7b26578-f110-4180-8366-3dbd646c3470-8bb305c216604ef0-f-00000-of-00001.parquet

スキーマの変更

まずはカラムを追加してみる。

ALTER TABLE `iceberg_dataset.iceberg_bigquery_table`
ADD COLUMN age integer;

この状態でクエリすると age は null で表示された。
BigQuery コンソール上ではスキーマの変更は確認できたが GCS バケットにある metadata/ 配下のスキーマには反映されておらず…
メタデータの変更を加えるには EXPORT TABLE METADATA を使う。

EXPORT TABLE METADATA FROM `iceberg_dataset.iceberg_bigquery_table`

そうすると最新のメタデータファイルができて age も追加されていることが確認できた。

> gsutil cat gs://hoge_iceberg_test/iceberg_bigquery_table/metadata/vXXXX.metadata.json
{... "schema":{"fields":[{"id":1,"name":"id","required":false,"type":"long"},{"id":2,"name":"name","required":false,"type":"string"},{"id":3,"name":"age","required":false,"type":"long"}], ... }}%

Apache Iceberg 用の BigLake 外部テーブルを作成

続いては BigLake の外部テーブルを作ってみる。
テーブルを作る際に Iceberg のメタデータファイルが必要なため、今回は BigQuery テーブルの方で作ったテーブルのメタデータを Export して使う。
冒頭に説明した通り INSERT/UPDATE/DELETE などは実行ができない。

CREATE EXTERNAL TABLE `iceberg_dataset.iceberg_biglake_table`
WITH CONNECTION `asia-northeast1.biglake_storage_connector`
OPTIONS (
    format = 'ICEBERG',
    uris = ["gs://hoge_iceberg_test/iceberg_bigquery_table/metadata/vXXXX.metadata.json"]
)

スキーマの変更

スキーマの変更は GCS にあるメタデータファイルを更新して bq コマンド or CREATE OR REPLACE EXTERNAL TABLE で更新することができた。
ここらへん BigLake Metastore を使用してテーブルを作ると自動でスキーマを変更してくれるらしいが、お金がかかりそうなので断念。

本来の運用ではスキーマが更新されるたびにメタデータのファイルが複数作られることになると思われるので、その変更をどう BigLake テーブルに伝えるかが難点になりそう。

まとめ

BigQuery が対応した Apache Iceberg のテーブルを触ることができた。
感想としては実際のデータを用意するところや OTF 形式に整形するところが難しく手元で検証するのが厄介なところが多かった。
Spark もあまり経験がなかったので、ガイドにあった手順などはなるべく使わないようにしている。

また試していない機能が沢山あると思うので、引き続き検証を進めて実践投入できるか判断したい。

AWS の Data Firehose はストリームデータを Iceberg テーブルとして配信できる機能があるみたいだが、
Cloud Pub/Sub や Datastream あたりが Iceberg に対応してすると一気に利用シーンが増えそうだなと思った。
また Dataproc や Vertex AI などからも Iceberg テーブルにすることで扱いやすくなる点もあると思うので、使用感などは探ってみたい。

Discussion