Zenn
🦆

AWS Lambda×DuckDB×PyIcebergによるETLの実装

2024/12/20に公開

はじめに

以前、AWS Lambda×DuckDBによる軽量ETLの実装という記事を投稿しました。
https://zenn.dev/penginpenguin/articles/b508f04a3431a8

今回は最近話題のApache Iceberg形式に対応したLambdaベースのETLを実装したいと思います。
そもそもAWS GlueやEMRを使えば簡単に出来るのですが、軽量データセットの場合だとコスト面が合わないということもあると思います。
そういった場合に今回のようなAWS Lambdaを使ったETLが効果を発揮します。

※ちなみに個人アカウントでGlueを使ってIcebergのテーブルをいじっていたら2万近くかかってしまったので、個人で使用する場合にも効果的かと思います!

今回のアーキテクチャ

今回のアーキテクチャはとてもシンプルです。
S3のファイルアップロードをトリガーにLambdaを起動し、S3(Iceberg)に書き込むものとなります。

ポイントはLambdaで使用する各種ライブラリとなります。

  • DuckDB:データベースエンジンとして、メモリ内での高速なSQLクエリ処理を実現。
  • PyArrow:Arrowフォーマットを使用し、高速なデータの変換・転送をサポート。
  • PyIceberg:AWS Glue Catalogを活用して、Iceberg形式のテーブルにアクセス・操作。

DuckDBとは

DuckDBとは、組み込み型のOLAP(オンライン分散処理)向けデータベースエンジンです。
DuckDBは非常に軽量で、インメモリ処理が可能となるためLambdaのようなシンプルな環境でも効率的に動作させることができます。
特にデータ分析やETL処理のようなバッチ処理において強力なパフォーマンスを発揮すると言われています。

https://duckdb.org/

PyArrowとは

PyArrowとは、Apache ArrowプロジェクトのPythonバインディングで、メモリ内データの効率的な操作と転送を可能にするライブラリです。
Apache Arrowは、データ解析や分散コンピューティングに特化した高速なカラムナメモリフォーマットを提供します。
PyArrowは、このArrowフォーマットをPythonで利用するためのAPIを提供し、さまざまなデータ操作やデータ間のやり取りを効率的に行うことが可能です。

https://arrow.apache.org/

PyIcebergとは

PyIcebergとは、Apache IcebergのPythonライブラリで、大規模データセットのテーブル管理を容易にするライブラリです。
Apache Icebergは、クラウドネイティブのデータレイクを効率的に扱うために設計されたオープンソースのテーブルフォーマットであり、PyIcebergはそのPython実装を提供します。
PyIcebergを使用することで、Python環境でIcebergテーブルにアクセスし、高度なデータ操作が可能となります。

https://py.iceberg.apache.org/

Lambdaへの組み込み

Lambdaへの組み込みはLambdaレイヤーを使いました。
下記のコマンドでレイヤー用のライブラリをダウンロードし、zipで固めていただければOKです。

mkdir python 
pip install -t python --platform manylinux2014_x86_64 --only-binary=:all: pyiceberg[glue,duckdb]

https://py.iceberg.apache.org/#installation

オプションにglue,duckdbを付加することで同時にglue用の拡張と、DuckDB及びPyArrowがインストールされます。
ちなみにこのままだとLambdaレイヤーの250MBを超過してしまうため、boto3を削除しました。

素直にコンテナイメージ化したほうがいいのかなと思います。

サンプルコード

サンプルコードではETLのフィルタとしてWHERE VendorID = 1を追加しています。

sample.py
import duckdb
import pyarrow as pa
from pyiceberg.catalog.glue import GlueCatalog  

def lambda_handler(event, context):
    try:
        # DuckDB接続とホームディレクトリ設定
        duckdb_connection = duckdb.connect(database=':memory:')
        duckdb_connection.execute("SET home_directory='/tmp'") 
        
        # httpfs 拡張モジュールのインストールとロード
        duckdb_connection.execute("INSTALL httpfs;")
        duckdb_connection.execute("LOAD httpfs;")
        
        # S3からDuckDBでデータをロード
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']
        
        
        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
        
        print(f"s3_input_path: {s3_input_path}")
        
        query = f"""
            SELECT * FROM read_parquet('{s3_input_path}') WHERE VendorID = 1
        """
        # SQLを実行して結果をPyArrow Table形式で取得
        result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()
        
        print(f"取得したデータ数: {result_arrow_table.num_rows}")
        print(f"データスキーマ: {result_arrow_table.schema}")
        
        # Glue Catalogの設定(Icebergテーブルにアクセス)
        catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog")  # それぞれ環境に合わせて設定してください。

        # テーブルの読み込み
        namespace = "icebergdb" # 環境に合わせて設定してください。
        table_name = "yellow_tripdata" # 環境に合わせて設定してください。
        iceberg_table = catalog.load_table(f"{namespace}.{table_name}")

        # Icebergテーブルにデータを一括追加
        iceberg_table.append(result_arrow_table) 

        print("データがIceberg形式でS3に追加されました。")
    
    except Exception as e:
        print(f"エラーが発生しました: {e}")

実行結果

サンプルデータは下記から取得しています。(サンプルとしてよく使われるNYCのタクシーのデータです)
https://aws.amazon.com/marketplace/pp/prodview-okyonroqg5b2u

トリガー起動前

トリガー起動後


このようにIceberg形式のOTFにデータを登録することができました!

今回は特段フィルタもチェック処理も実装していませんが、内部で何かしらの処理を行うことも可能です。

今回の構成のメリット・デメリット

メリット

メリデメは過去の記事と基本的には同じです。
コストをかけずにIcebergを操作できるのが大きなポイントです。

  • 導入が簡単:Lambdaレイヤーで簡単に導入可能。
  • コストメリットのあるIceberg操作:GlueやEMRを使わず、コストをかけずにIcebergの操作が可能
  • Lambdaで簡単に開発可能:DuckDBにて提供されている関数を活用することで簡単に開発が可能。
  • SQLライクな処理:SQLの構文(PostgreSQL互換)で簡単に操作できる。
  • 効率的な処理が可能:Lambda上でもインメモリ処理により効率的なデータ処理が可能。
  • S3トリガーでリアルタイムETL処理が可能:S3トリガーを使うことでファイル格納をトリガーにETL処理を即時実行可能。

デメリット

  • 最大メモリサイズの制限がある:Lambdaの制限により最大メモリサイズが10240 MBとなる。場合によってはメモリが枯渇する可能性がある。(この辺りのメモリ効率は未検証なのでどこまで耐えられるかは不明です…)
  • 実行時間の制限がある:Lambdaの制限により実行時間は15分が最大となります。大規模データセットの場合、15分を超えてしまう可能性があるため、場合によってはコンテナサービス等も検討する必要があります。

さらなる拡張

今回はデータを取得してそのまま登録するだけの処理を行いました。
Iceberg形式ということで下記のようなWrite-Audit-Publish(WAP)パターンの実装も可能です。
WAP処理を行うことでデータの品質を担保することが可能となります。

Glue Data Qualityを使うと便利だと思います。

WAP処理については下記にまとめているので参考にどうぞ。

https://zenn.dev/penginpenguin/articles/bedb295e9dbe45

ちなみにDuckDBのIceberg拡張機能も便利に使えそうです。
※書き込みは現時点では未対応とのこと…

https://duckdb.org/docs/extensions/iceberg.html

まとめ

今回はAWS Lambdaを活用してIceberg形式のテーブルに対してETL処理を実装しました。
AWSでIcebergにETLを実行する場合、通常はAWS GlueやEMRが候補に挙がりますが小規模なデータセットではコスト面で適さない場合があります。
そのようなケースでは、今回紹介した組み合わせがコストパフォーマンスに優れた選択肢となると思います。

Lambdaを利用することで、S3トリガーを活用したリアルタイムETL処理が可能になります。
これを活かすことで軽量でシンプルなデータ処理フローを構築できる点が大きなメリットです。

とはいえ、全ては適材適所なのでなんでもLambdaではなく、要件や制約等様々な要素を加味してサービス選定を行うのがいいと思います。

また、Icebergテーブルにはスキーマの進化やタイムトラベル等様々なメリットがあります。
Icebergは、OTFの主流とも言える存在になりつつあり、今後ますます利用される機会が増えると考えられます。

今回の記事が、Icebergテーブルを扱う際の軽量なデータ処理やリアルタイムETLを検討している方の参考になれば幸いです。

Discussion

ログインするとコメントできます