PyIceberg でデータレイクハウスを扱う
今までのあらすじ
Apache Sparkでデータレイクハウスを作ってみました。
今回はPyIcebergでデータレイクハウスを扱ってみたいと思います。
PyIcebergとは
PyIcebergのページにはこう書かれています。
PyIceberg is a Python implementation for accessing Iceberg tables, without the need of a JVM.
JavaVirtualMachineを使わないPythonだけでのインプリメンテーションらしいです。
早速使ってみよう
使い方は超簡単なので早速使ってみましょう。
といってもデータレイクハウスの入れ物であるPostgreSQLとMinIOは必要ですので私の投稿
を参考に構築しておいてください。
PyIcebergをインストールしよう
PyIceberはpipで取ってこられます。今回はS3(MinIO)とPostgreSQLをつかうのでこうなります。
$ pip install pyiceberg[s3fs,sql-postgres]
ついでに言うとpyarrow
を使うのでこれもインストールしておきます。
$ pip install pyarrow
あとはここのGetting startedを参考にPythonコードを作成します。
設定ファイルを書いてみよう
設定はPythonコードの中で書くか、
の通りに設定ファイル.pyiceberg.yaml
内で記述することとなります。私は後者を選びました。
こんな感じ(適当にあなたの環境に合わせてください)
catalog:
default:
type: sql
uri: postgresql+psycopg2://USER:PASSWORD@URL/DBNAME
init_catalog_tables: false
warehouse: s3://BUCKET/WAREHOSUE_NAME/
s3.endpoint: https://URL:PORT
s3.access-key-id: ADMIN_USERNAME
s3.secret-access-key: ADMIN_PASSWORD
s3.resolve-region: False
大文字になっているところが環境依存のところです。
PyIcebergと戯れる
では先に述べたGetting startedを踏襲しましょう。
まずは、データをダウンロードしておきます。
$ curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet
つぎに、コードを書きます。まずはスキーマ登録から。
# モジュールのインポート。
import pyarrow.parquet as pq
from pyiceberg.catalog import load_catalog
from datetime import datetime
# データのURLを念のため書いておきます。
# curl https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet -o /tmp/yellow_tripdata_2023-01.parquet
# データは毎回新しいものとするために時刻を取得します。
dt_now = datetime.now()
dt_now_str = dt_now.strftime("%Y%m%d%H%M%S%f")
# Parquetファイルからデータフレームを作ります。
df = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet")
# カタログは先ほどの.pyiceberg.yamlで作成したdefaultを使います。
catalog = load_catalog("default")
# 名前空間を作ります。
catalog.create_namespace("default"+dt_now_str)
# スキーマを登録するためにテーブルを作ります。
table = catalog.create_table(
"default.taxi_dataset"+dt_now_str,
schema=df.schema,
)
次に、実際のデータを書き込みます。
# テーブルの書き込み。
table.append(df)
最後に書き込まれたデータの行数をカウントします。
length = len(table.scan().to_arrow())
print(length)
実行してみましょう。
$ python3 test.py
Unable to resolve region for bucket real-data-lakehouse
3066766
1行目のエラーは無視してください(現在調査中)。きちんと書き込まれていますから。
行数が出てきました。
実際データが書き込まれているか確認します。
まずはMinIOから。
このようにdata
ディレクトリの中にParquetファイルと
metadata
ディレクトリの中にJSONとAvroファイルができていればOKです。
ついでなので、PostgreSQLの中も見ておきましょう。
データレコードも書かれていることが判りました。
さいごに(今後の課題)
Sparkで書いたときのディレクトリ構造の持ち方
と
PyIcebergで書いたときのディレクトリ構造の持ち方
では若干データ構造が違うようです。今後はこの差異を埋める方法を考えたいと思います。
あと、若干疑問が残る場所がありますのでそこもきちんと解消したいと思います。
Discussion