AWS Lambda × PyIceberg のカタログアクセスパターン比較
はじめに
Apache Icebergはデータレイクにおける信頼性の高いテーブル形式として注目されています。
ACIDトランザクションやスキーマの進化、タイムトラベルなどの機能を活用することで柔軟なデータ管理が可能となります。
以前、下記の記事でLambdaからIcebergへの書き込みを行いました。
こちらの記事では、Iceberg形式のテーブルにGlueCatalogを使用して書き込みを行いましたが、AWS Glue Iceberg RESTエンドポイントも活用可能です。
今回は、以前紹介したGlueCatalog形式とAWS Glue Iceberg RESTエンドポイント形式(REST Catalog)の実装方法の違いと比較について紹介したいと思います。
2つの書き込み方式の概要
前述したようにAWS Lambda上で小〜中規模のETLを実現する際、Apache Iceberg形式のテーブルへ直接データを書き込む手法として以下の2パターンがあります。
パターン1:GlueCatalog形式
Lambda関数内でDuckDB/PyArrowを使ったデータ前処理後、PyIcebergを利用してIcebergテーブルにGlueCatalog経由で書き込む方式です。
下記のライブラリを活用して実装しています。
- DuckDB:データベースエンジンとして、メモリ内での高速なSQLクエリ処理を実現。
- PyArrow:Arrowフォーマットを使用し、高速なデータの変換・転送をサポート。
- PyIceberg:AWS Glue Catalogを活用して、Iceberg形式のテーブルにアクセス・操作。
ポイントはIcebergを操作するためのライブラリであるPyIcebergを活用することとなります。
パターン1:GlueCatalog形式 サンプルコード
import duckdb
import pyarrow as pa
from pyiceberg.catalog.glue import GlueCatalog
def lambda_handler(event, context):
try:
# DuckDB接続とホームディレクトリ設定
duckdb_connection = duckdb.connect(database=':memory:')
duckdb_connection.execute("SET home_directory='/tmp'")
# httpfs 拡張モジュールのインストールとロード
duckdb_connection.execute("INSTALL httpfs;")
duckdb_connection.execute("LOAD httpfs;")
# S3からDuckDBでデータをロード
s3_bucket = event['Records'][0]['s3']['bucket']['name']
s3_object_key = event['Records'][0]['s3']['object']['key']
s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
print(f"s3_input_path: {s3_input_path}")
query = f"""
SELECT * FROM read_parquet('{s3_input_path}') WHERE VendorID = 1
"""
# SQLを実行して結果をPyArrow Table形式で取得
result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()
print(f"取得したデータ数: {result_arrow_table.num_rows}")
print(f"データスキーマ: {result_arrow_table.schema}")
# Glue Catalogの設定(Icebergテーブルにアクセス)
catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog") # それぞれ環境に合わせて設定してください。
# テーブルの読み込み
namespace = "icebergdb" # 環境に合わせて設定してください。
table_name = "yellow_tripdata" # 環境に合わせて設定してください。
iceberg_table = catalog.load_table(f"{namespace}.{table_name}")
# Icebergテーブルにデータを一括追加
iceberg_table.append(result_arrow_table)
print("データがIceberg形式でS3に追加されました。")
except Exception as e:
print(f"エラーが発生しました: {e}")
パターン1:GlueCatalog形式 実行結果
サンプルデータは下記から取得しています。(サンプルとしてよく使われるNYCのタクシーのデータです)
このようにIceberg形式のOTFにデータを登録することができました!
パターン2:AWS Glue Iceberg RESTエンドポイント方式(REST Catalog)
AWS Glueが提供するIceberg RESTエンドポイント(REST Catalog仕様準拠)を、PyIcebergクライアントから呼び出してテーブル操作を行う方式です。
下記のライブラリを活用して実装しています。
- DuckDB:データベースエンジンとして、メモリ内での高速なSQLクエリ処理を実現。
- PyArrow:Arrowフォーマットを使用し、高速なデータの変換・転送をサポート。
- PyIceberg:AWS Glue Catalogを活用して、Iceberg形式のテーブルにアクセス・操作。
ポイントはAWS Glue Iceberg RESTエンドポイント方式を経由して書き込む部分となります。
パターン2:AWS Glue Iceberg RESTエンドポイント方式(REST Catalog) サンプルコード
ポイント下記のようにエンドポイント経由でアクセスする点となります。
# リージョン指定
sts = boto3.client('sts')
region = sts._client_config.region_name
# Catalog設定
catalog_properties = {
"type": "rest",
"uri": f"https://glue.{region}.amazonaws.com/iceberg",
"s3.region": region,
"rest.sigv4-enabled": "true",
"rest.signing-name": "glue",
"rest.signing-region": region
}
# DB設定
database_name = "icebergdb" # 環境に合わせて設定してください。
table_name = "yellow_tripdata" # 環境に合わせて設定してください。
# Glue Catalogの設定(API経由でIcebergテーブルにアクセス)
catalog = load_catalog(**catalog_properties)
import boto3
import duckdb
import pyarrow as pa
from pyiceberg.catalog import load_catalog # GlueCatalogを使ってIcebergテーブルにアクセス
def lambda_handler(event, context):
try:
# DuckDB接続とホームディレクトリ設定
duckdb_connection = duckdb.connect(database=':memory:')
duckdb_connection.execute("SET home_directory='/tmp'") # Lambdaの一時ディレクトリに設定
# httpfs 拡張モジュールのインストールとロード
duckdb_connection.execute("INSTALL httpfs;")
duckdb_connection.execute("LOAD httpfs;")
# S3からDuckDBでデータをロード
s3_bucket = event['Records'][0]['s3']['bucket']['name']
s3_object_key = event['Records'][0]['s3']['object']['key']
s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
print(f"s3_input_path: {s3_input_path}")
query = f"""
SELECT * FROM read_parquet('{s3_input_path}') WHERE VendorID = 1
"""
# SQLを実行して結果をPyArrow Table形式で取得
result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()
print(f"取得したデータ数: {result_arrow_table.num_rows}")
print(f"データスキーマ: {result_arrow_table.schema}")
# リージョン指定
sts = boto3.client('sts')
region = sts._client_config.region_name
# Catalog設定
catalog_properties = {
"type": "rest",
"uri": f"https://glue.{region}.amazonaws.com/iceberg",
"s3.region": region,
"rest.sigv4-enabled": "true",
"rest.signing-name": "glue",
"rest.signing-region": region
}
# DB設定
database_name = "icebergdb" # 環境に合わせて設定してください。
table_name = "yellow_tripdata" # 環境に合わせて設定してください。
# Glue Catalogの設定(API経由でIcebergテーブルにアクセス)
catalog = load_catalog(**catalog_properties)
# テーブルの読み込み
table = catalog.load_table(f"{database_name}.{table_name}")
# Icebergテーブルにデータを一括追加
table.append(result_arrow_table) # PyArrow Table を追加
print("データがIceberg形式でS3に追加されました。")
except Exception as e:
print(f"エラーが発生しました: {e}")
パターン2:AWS Glue Iceberg RESTエンドポイント方式(REST Catalog) 実行結果
こちらも同様にサンプルデータは下記から取得しています。
同様にIceberg形式のOTFにデータを登録することができました!
それぞれのパターンの違い
どちらも実装に大きな差がないのでは?と思った方も多いと思います。(私もそう思います。)
確かにどちらの方式も PyIcebergを使用している点では同じですが、PyIcebergを通じて接続する「Catalogのバックエンド」が異なるという明確な違いがあります。
共通点
- 両者ともPythonライブラリとしてPyIcebergを使用する。
- load_table()でIcebergテーブルを取得し、append()を使ってデータを書き込む。
- 書き込まれるデータは Iceberg の仕様(マニフェスト、データファイル、スナップショット)に準拠して S3 に配置される。
両者の違い
差異が出るポイントとしては操作するカタログ種別にあります。
項目 | 方式1:GlueCatalog形式 | 方式2:AWS Glue Iceberg REST エンドポイント方式(REST catalog) |
---|---|---|
PyIcebergのCatalog種別 | GlueCatalog |
REST catalog |
アクセス方式 | AWS SDKを使ってローカルでGlue Catalogを操作 | Iceberg公式のREST Catalog仕様でCatalog操作 |
実行方法 | PyIcebergがGlue APIを直接呼び出す | PyIcebergがREST エンドポイントを実行 |
大きなポイントは方式2:AWS Glue Iceberg RESTエンドポイント方式はApache Iceberg REST仕様で規定されたAPIオペレーションに対応しているということです。
つまり、方式1はAWSの仕様に沿った形で動作し、方式2はIcebergの標準仕様に沿った形で動作します。
Icebergの標準仕様に従った実装を行いたい場合は、AWS Glue Iceberg RESTエンドポイント方式を採用するのがいいかと思います。
AWSの記事やドキュメント上はAWS Glue Iceberg RESTエンドポイント方式を活用しているパターンが多いのでこちらをメインに考えるのがいいかもしれません。
とはいえ、Lambdaからアクセスする場合はそんなに大きな差はないのかなと思います。
まとめ
今回は、LambdaからIcebergテーブルを操作する場合の2つの方式を紹介しました。
いずれもPyIcebergを使用することには変わりませんが、動作方式が異なる点に注意が必要です。
この違いを意識して、実装を検討いただければと思います。
今回の記事が、Icebergテーブルを扱う際の軽量なデータ処理を検討している方の参考になれば幸いです。
Discussion