💫

PyIceberg でデータレイクハウスを扱う

に公開

今までのあらすじ

https://zenn.dev/evakichi/books/ab58dab238083e

Apache Sparkでデータレイクハウスを作ってみました。
今回はPyIcebergでデータレイクハウスを扱ってみたいと思います。

https://py.iceberg.apache.org/

PyIcebergとは

PyIcebergのページにはこう書かれています。

PyIceberg is a Python implementation for accessing Iceberg tables, without the need of a JVM.

JavaVirtualMachineを使わないPythonだけでのインプリメンテーションらしいです。

早速使ってみよう

使い方は超簡単なので早速使ってみましょう。

といってもデータレイクハウスの入れ物であるPostgreSQLとMinIOは必要ですので私の投稿

https://zenn.dev/evakichi/books/ab58dab238083e/viewer/6d9742

を参考に構築しておいてください。

PyIcebergをインストールしよう

PyIceberはpipで取ってこられます。今回はS3(MinIO)とPostgreSQLをつかうのでこうなります。

$ pip install pyiceberg[s3fs,sql-postgres]

ついでに言うとpyarrowを使うのでこれもインストールしておきます。

$ pip install pyarrow

あとはここのGetting startedを参考にPythonコードを作成します。

設定ファイルを書いてみよう

設定はPythonコードの中で書くか、

https://py.iceberg.apache.org/configuration/

の通りに設定ファイル.pyiceberg.yaml内で記述することとなります。私は後者を選びました。

こんな感じ(適当にあなたの環境に合わせてください)

.pyiceberg.yaml
catalog:
  default:
    type: sql
    uri: postgresql+psycopg2://USER:PASSWORD@URL/DBNAME
    init_catalog_tables: false
    warehouse: s3://BUCKET/WAREHOSUE_NAME/
    s3.endpoint: https://URL:PORT
    s3.access-key-id: ADMIN_USERNAME
    s3.secret-access-key: ADMIN_PASSWORD
    s3.resolve-region: False

大文字になっているところが環境依存のところです。

PyIcebergと戯れる

では先に述べたGetting startedを踏襲しましょう。
まずは、データをダウンロードしておきます。

$ curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet

つぎに、コードを書きます。まずはスキーマ登録から。

test.py
# モジュールのインポート。
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from datetime import datetime

# データのURLを念のため書いておきます。
# curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet

# データは毎回新しいものとするために時刻を取得します。
dt_now = datetime.now()
dt_now_str = dt_now.strftime("%Y%m%d%H%M%S%f")

# Parquetファイルからデータフレームを作ります。
df = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet")

# カタログは先ほどの.pyiceberg.yamlで作成したdefaultを使います。
catalog = load_catalog("default")

# 名前空間を作ります。
catalog.create_namespace("default"+dt_now_str)

# スキーマを登録するためにテーブルを作ります。
table = catalog.create_table(
    "default.taxi_dataset"+dt_now_str,
    schema=df.schema,
)

次に、実際のデータを書き込みます。

test.py
# テーブルの書き込み。
table.append(df)

最後に書き込まれたデータの行数をカウントします。

test.py
length = len(table.scan().to_arrow())
print(length)

実行してみましょう。

$ python3 test.py
Unable to resolve region for bucket real-data-lakehouse
3066766

1行目のエラーは無視してください(現在調査中)。きちんと書き込まれていますから。
行数が出てきました。
実際データが書き込まれているか確認します。
まずはMinIOから。

このようにdataディレクトリの中にParquetファイルと

metadataディレクトリの中にJSONとAvroファイルができていればOKです。

ついでなので、PostgreSQLの中も見ておきましょう。

データレコードも書かれていることが判りました。

さいごに(今後の課題)

Sparkで書いたときのディレクトリ構造の持ち方


PyIcebergで書いたときのディレクトリ構造の持ち方

では若干データ構造が違うようです。今後はこの差異を埋める方法を考えたいと思います。

あと、若干疑問が残る場所がありますのでそこもきちんと解消したいと思います。

Discussion