🐑

AWS Glue Python Shell×DuckDB×PyIcebergによるETLの実装

に公開

はじめに

AWS Community Builderのぺんぎん(@jitepengin)です。

以前、AWS Lambda×DuckDB×PyIcebergによるETLの実装という記事を投稿しました。
https://zenn.dev/penginpenguin/articles/77d4a9b1e90e3a

今回はLambdaと同様に軽量ETL処理に活用されることの多い、AWS Glue Python ShellをベースにしたApache Iceberg形式に対応したETLを実装したいと思います。

AWS Glue(Spark)やEMRだとコストがかかる、Lambdaだとタイムアウトが気になるといった場合にAWS Glue Python Shellが効果的だと思います。

AWS Glue Python Shellとは

AWS Glue Python Shellは、Apache Sparkを使わずにPythonスクリプトを実行できるAWS Glueの実行環境です。
Lambdaの15分制限に比べると長時間のバッチ処理が可能になります。デフォルトではなんと48時間です!
また、サーバレスで起動できるため、インフラの管理は不要で使った分だけ課金されます。

GlueやEMRと比較するとコストも安く済むケースが多いことから、軽量ETLやデータ変換処理で活用されます。

そんな便利なGlue Python Shellですが、Lamndaと比べるとトリガとなるイベントが少ないというデメリットがあります。
よくイベント駆動の処理で使われるS3のファイル配置トリガについても直接のトリガやEventBridge経由でのトリガも対応していません。
こういった場合のトリガには一般的にはLambdaまたはGlue Workflowを経由して起動されます。
これ以外にも様々な制約がありますが、うまく使いこなせば処理の幅が大きく広がる可能性を秘めたサービスとなります。

今回のアーキテクチャ

今回は、S3にファイルがアップロードされたことをトリガーに、Lambdaを経由してGlue Python Shellを実行し、変換後のデータをApache Iceberg形式でS3に保存します。
LambdaはあくまでもGlue Python Shellの起動だけ行い、ETLをGlue Python Shell側で行うことで処理時間の上限の15分を超える処理を行うことが狙いとなります。

ポイントは下記の各種ライブラリとなります。

  • DuckDB:データベースエンジンとして、メモリ内での高速なSQLクエリ処理を実現。
  • PyArrow:Arrowフォーマットを使用し、高速なデータの変換・転送をサポート。
  • PyIceberg:AWS Glue Catalogを活用して、Iceberg形式のテーブルにアクセス・操作。

DuckDBとは

DuckDBとは、組み込み型のOLAP(オンライン分析処理)向けデータベースエンジンです。
DuckDBは非常に軽量で、インメモリ処理が可能となるためLambdaのようなシンプルな環境でも効率的に動作させることができます。
特にデータ分析やETL処理のようなバッチ処理において強力なパフォーマンスを発揮すると言われています。

https://duckdb.org/

PyArrowとは

PyArrowとは、Apache ArrowプロジェクトのPythonバインディングで、メモリ内データの効率的な操作と転送を可能にするライブラリです。
Apache Arrowは、データ解析や分散コンピューティングに特化した高速なカラムナメモリフォーマットを提供します。
PyArrowは、このArrowフォーマットをPythonで利用するためのAPIを提供し、さまざまなデータ操作やデータ間のやり取りを効率的に行うことが可能です。

https://arrow.apache.org/

PyIcebergとは

PyIcebergとは、Apache IcebergのPythonライブラリで、大規模データセットのテーブル管理を容易にするライブラリです。
Apache Icebergは、クラウドネイティブのデータレイクを効率的に扱うために設計されたオープンソースのテーブルフォーマットであり、PyIcebergはそのPython実装を提供します。
PyIcebergを使用することで、Python環境でIcebergテーブルにアクセスし、高度なデータ操作が可能となります。

https://py.iceberg.apache.org/

Additional Python modules pathの設定

Additional Python modules pathには以下を設定してください。
割とライブラリの依存関係にはまりやすいので注意していただければと思います。

pyiceberg[glue]==0.9.1,pydantic==2.5.0,pydantic-core==2.14.1,annotated-types==0.6.0,duckdb==0.9.2,pyarrow==14.0.1,typing-extensions==4.8.0

サンプルコード

Lambdaのトリガー処理

S3のイベントを受けてGlue Python Shellを起動するだけのシンプルな処理です。

sample.py
import boto3

def lambda_handler(event, context):
    glue = boto3.client("glue")
    
    s3_bucket = event['Records'][0]['s3']['bucket']['name']
    s3_object_key = event['Records'][0]['s3']['object']['key']
            
    s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
    
    response = glue.start_job_run(
        JobName="Iceberg shell",
        Arguments={
            "--s3_input": s3_input_path
        }
    )
    print(f"Glue Job started: {response['JobRunId']}")
    return response

AWS Glue Python ShellのETL処理

Lambdaから起動され、パラメータをもとにDuckDBでファイルを取得、PyIcebergでS3のIcebergテーブルに書き込む処理です。
DuckDBではVendorID = 1でデータをフィルタリングしています。このようにSQLのような構文でクレンジングやフィルタリングを行うことが可能です。

今回はカタログアクセスにGlueCatalog形式を使っています。
AWS Glue Iceberg RESTエンドポイント方式(REST Catalog)でも同様の処理が可能となりますので、要件に応じて選択いただければと思います。

カタログアクセス方式の違いについては下記を参考にしていただければと思います。

https://zenn.dev/penginpenguin/articles/e44880aaa2d5e3

sample.py
import boto3
import sys
import os
from awsglue.utils import getResolvedOptions


def get_job_parameters():
    # get job parameters
    try:
        required_args = ['s3_input']
        
        args = getResolvedOptions(sys.argv, required_args)
        
        s3_file_path = args['s3_input']
        print(f"object: {s3_file_path}")

        return s3_file_path
        
    except Exception as e:
        print(f"parameters error: {e}")
        raise

def setup_duckdb_environment():
    # Properly set up the DuckDB environment
    try:
        home_dir = '/tmp'
        duckdb_dir = '/tmp/.duckdb'
        extensions_dir = '/tmp/.duckdb/extensions'
        
        os.environ['HOME'] = home_dir
        os.environ['DUCKDB_HOME'] = duckdb_dir
        os.environ['DUCKDB_CONFIG_PATH'] = duckdb_dir
        os.environ['DUCKDB_EXTENSION_DIRECTORY'] = extensions_dir
        
        os.makedirs(duckdb_dir, exist_ok=True)
        os.makedirs(extensions_dir, exist_ok=True)
        os.chmod(duckdb_dir, 0o755)
        os.chmod(extensions_dir, 0o755)
        
        print(f"DuckDB environment setup completed: {duckdb_dir}")
        return True
        
    except Exception as e:
        print(f"DuckDB environment setup error: {e}")
        return False

def read_parquet_with_duckdb(s3_input):
    # Read Parquet file from S3 using DuckDB
    import duckdb
    
    con = duckdb.connect(':memory:')
    
    try:
        con.execute("SET extension_directory='/tmp/.duckdb/extensions';")
        con.execute("INSTALL httpfs;")
        con.execute("LOAD httpfs;")
        
        session = boto3.Session()
        credentials = session.get_credentials()
        
        con.execute(f"SET s3_region='ap-northeast-1';")
        con.execute(f"SET s3_access_key_id='{credentials.access_key}';")
        con.execute(f"SET s3_secret_access_key='{credentials.secret_key}';")
        
        if credentials.token:
            con.execute(f"SET s3_session_token='{credentials.token}';")
        
        print(f"Reading data from S3: {s3_input}")
        sql = f"SELECT * FROM read_parquet('{s3_input}') WHERE VendorID = 1"
        res = con.execute(sql)
        
        return res.arrow()
        
    except Exception as e:
        print(f"DuckDB error: {e}")
        raise
    finally:
        con.close()

def write_iceberg_table(arrow_table):
    # Write to an Iceberg table
    try:
        print("Writing started...")
        
        from pyiceberg.catalog import load_catalog
        from pyiceberg.schema import Schema
        from pyiceberg.types import NestedField, StringType, IntegerType, DoubleType, TimestampType
        
        catalog_config = {
            "type": "glue",
            "warehouse": "s3://your-bucket/your-warehouse/", # Adjust to your environment.
            "region": "ap-northeast-1"
        }
        
        catalog = load_catalog("glue_catalog", **catalog_config)

        table_identifier = "icebergdb.yellow_tripdata"
        
        table = catalog.load_table(table_identifier)
        print(f"Target data to write: {arrow_table.num_rows:,} rows")
        
        # Write Arrow table to Iceberg table
        table.append(arrow_table)
        
     
        return True
        
    except Exception as e:
        print(f"Writing error: {e}")
        import traceback
        traceback.print_exc()
        return False

def main():  
    # Set up DuckDB environment
    if not setup_duckdb_environment():
        print("Failed to set up DuckDB environment")
        return
    
    try:
        import pyiceberg
        
        # Read data
        s3_input = get_job_parameters()

        arrow_tbl = read_parquet_with_duckdb(s3_input)
        print(f"Data read success: {arrow_tbl.shape}")
        
        # Write to Iceberg table
        if write_iceberg_table(arrow_tbl):
            print("\nWriting fully successful!")
  
        else:
            print("Writing failed")
        
    except Exception as e:
        print(f"Main error: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()

実行結果

サンプルデータは下記から取得しています。(サンプルとしてよく使われるNYCのタクシーのデータです)
https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

実行前

実行後

このようにIceberg形式のOTFにデータを登録することができました!

参考:Lambdaでの実装

以前の記事では、Lambda内でDuckDBとPyIcebergを使って直接ETL処理を行いました。
ただし、Lambdaは15分の時間制限があるため、大きなデータや複雑な処理では途中でタイムアウトする可能性やメモリ不足でエラーとなる可能性がありました。
同じライブラリ構成をGlue Python Shellに移すことで制約を回避することが可能となります。

sample.py
import duckdb
import pyarrow as pa
from pyiceberg.catalog.glue import GlueCatalog  

def lambda_handler(event, context):
    try:
        # Connect to DuckDB and set the home directory
        duckdb_connection = duckdb.connect(database=':memory:')
        duckdb_connection.execute("SET home_directory='/tmp'") 
        
        # Install and load the httpfs extension
        duckdb_connection.execute("INSTALL httpfs;")
        duckdb_connection.execute("LOAD httpfs;")
        
        # Load data from S3 using DuckDB
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']
        
        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
        
        print(f"s3_input_path: {s3_input_path}")
        
        query = f"""
            SELECT * FROM read_parquet('{s3_input_path}') WHERE VendorID = 1
        """
        # Execute SQL and retrieve results as a PyArrow Table
        result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()
        
        print(f"Number of rows retrieved: {result_arrow_table.num_rows}")
        print(f"Data schema: {result_arrow_table.schema}")
        
        # Configure Glue Catalog (to access Iceberg table)
        catalog = GlueCatalog(region_name="ap-northeast-1", database="icebergdb", name="my_catalog")  # Adjust to your environment.

        # Load the table
        namespace = "icebergdb"  # Adjust to your environment.
        table_name = "yellow_tripdata"  # Adjust to your environment.
        iceberg_table = catalog.load_table(f"{namespace}.{table_name}")

        # Append data to the Iceberg table in bulk
        iceberg_table.append(result_arrow_table) 

        print("Data has been appended to S3 in Iceberg format.")
    
    except Exception as e:
        print(f"An error occurred: {e}")

AWS Glue Python Shell構成のメリット・デメリット

メリット

  • Lambdaより長時間実行でき、大きなファイルも処理可能。
  • サーバレスでスケーラビリティが高い。
  • DuckDBやPyIcebergなどのPythonのライブラリをそのまま利用可能。
  • Iceberg形式での保存によりスナップショットやタイムトラベルが可能。

デメリット

  • Lambdaより起動がやや遅く、Start-up timeとしてライブラリの読み込みの時間がかかる。
  • その名の通り、Pythonのみ対応し他の言語は使えない。
  • Sparkの分散処理が活用できないので大規模データや高負荷となる処理には不向き。
  • 依存ライブラリのバージョン管理は工夫が必要。

参考:Lambdaとの料金比較

Lambdaの実行上限の15分で比較しました。

Lambda

メモリは意外と使ってしまうので2048 MBで設定しています。

AWS Glue Python Shell

条件によってはAWS Glue Python Shellのほうが安くなる場合もあります。

まとめ

今回はAWS Glue Python Shellを活用してIceberg形式のテーブルに対してETL処理を実装しました。
AWS Glue Python Shellは、LambdaとGlue(Spark)の中間に位置するサーバレス実行環境として、軽量ながら長時間のETL処理を可能にします。
今回のようにDuckDBとPyIcebergを組み合わせれば、低コストで柔軟なデータ処理パイプラインを構築できます。

特に起動時間が長いことに大きなメリットがありますので、Lambdaでは時間が足りないが、Glue(Spark)やEMRはオーバースペックというケースで選択肢になると思います。

Icebergテーブルにはスキーマの進化やタイムトラベル等様々なメリットがあります。
Icebergは、OTFの主流とも言える存在になりつつあり、今後ますます利用される機会が増えると考えられます。

今回の記事が、Icebergテーブルを扱う際の軽量なデータ処理やリアルタイムETLを検討している方の参考になれば幸いです。

Discussion