Zenn
🐥

AWSでの効率的なデータ処理を考える~データフォーマット~ その2 変換方法あれこれ

2025/03/22に公開
1

はじめに

前回の記事内でDuckDBを使ったデータ変換処理をサンプルコードとして提示しました。

https://zenn.dev/penginpenguin/articles/5dc60be6c3113a

この記事に対してXでコメントをいただきました。

parquetへの変換は COPY table_name TO 's3://s3_bucket/filename.parquet'; で対応可能
もともとのコードと上記のCOPY~形式に変えた場合のparquet file sizeや実行速度、parquet v2 format指定での影響も気になる

というわけで今回の記事では上記の点について検証してみたいと思います。

サンプルコード(CSVをメモリ上に展開して変換する場合)

こちらはCSVをメモリ上に展開して変換する場合のコードです。
メモリに一度格納することで、例えば、データの前処理(フィルタリングや変換)のような柔軟なデータ処理が可能となります。

sample.py
import duckdb
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
import time
from io import BytesIO

def lambda_handler(event, context):
    try:
        # 1. DuckDB接続とホームディレクトリ設定
        duckdb_connection = duckdb.connect(database=':memory:')
        duckdb_connection.execute("SET home_directory='/tmp'")  # Lambdaの一時ディレクトリに設定
        
        # httpfs 拡張モジュールのインストールとロード
        duckdb_connection.execute("INSTALL httpfs;")
        duckdb_connection.execute("LOAD httpfs;")
        
        # 2. S3からCSVを読み込む
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']
        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
        
        print(f"Processing file: {s3_input_path}")
        
        query = f"""
            SELECT * FROM read_csv_auto('{s3_input_path}')
        """

        start_time = time.time()
        result_arrow_table = duckdb_connection.execute(query).fetch_arrow_table()
        
        print(f"Loaded {result_arrow_table.num_rows} rows from CSV")
        
        # 3. Parquetに変換
        output_buffer = BytesIO()
        pq.write_table(result_arrow_table, output_buffer)
        output_buffer.seek(0)
        
        # 4. S3にアップロード
        s3_output_bucket = "任意のバケット"
        s3_client = boto3.client('s3')
        output_key = s3_object_key.replace('.csv', '.parquet')
        s3_output_path = f"s3://{s3_output_bucket}/{output_key}"
        
        s3_client.upload_fileobj(output_buffer, s3_output_bucket, output_key)
        end_time = time.time() - start_time
        
        print(f"File converted and saved to {s3_output_path} in {end_time:.4f} sec")
    
    except Exception as e:
        print(f"Error occurred: {e}")

サンプルコード(COPYコマンドで直接バケットに出力する場合 Parquet v1)

こちらはCSVをCOPYコマンドで直接バケットに出力する場合のコードです。
この場合、シンプルかつ高速に処理が可能となります。

sample.py
import duckdb
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
import time
from io import BytesIO

def lambda_handler(event, context):
    try:
        # 1. DuckDB接続とホームディレクトリ設定
        duckdb_connection = duckdb.connect(database=':memory:')
        duckdb_connection.execute("SET home_directory='/tmp'")  # Lambdaの一時ディレクトリに設定
        
        # httpfs 拡張モジュールのインストールとロード
        duckdb_connection.execute("INSTALL httpfs;")
        duckdb_connection.execute("LOAD httpfs;")

        # 2. S3からCSVを読み込んでParquetに書き出し
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']
        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
        
        print(f"Processing file: {s3_input_path}")
        
        # 3. COPYコマンドでCSVをParquetに変換してS3に保存
        s3_output_bucket = "任意のバケット"
        output_key = s3_object_key.replace('.csv', '.parquet')
        s3_output_path = f"s3://{s3_output_bucket}/{output_key}"

        query = f"""
            COPY (SELECT * FROM read_csv_auto('{s3_input_path}')) 
            TO '{s3_output_path}' (FORMAT 'parquet');
        """
        
        start_time = time.time()
        duckdb_connection.execute(query)
        end_time = time.time() - start_time
        print(f"File converted and saved to {s3_output_path} in {end_time:.4f} sec")
    
    except Exception as e:
        print(f"Error occurred: {e}")

サンプルコード(COPYコマンドで直接バケットに出力する場合 Parquet v2)

こちらはCSVをCOPYコマンドで直接バケットに出力し、Parquet v2で指定した場合のコードです。

sample.py
import duckdb
import boto3
import pyarrow as pa
import pyarrow.parquet as pq
import time
from io import BytesIO

def lambda_handler(event, context):
    try:
        # 1. DuckDB接続とホームディレクトリ設定
        duckdb_connection = duckdb.connect(database=':memory:')
        duckdb_connection.execute("SET home_directory='/tmp'")  # Lambdaの一時ディレクトリに設定
        
        # httpfs 拡張モジュールのインストールとロード
        duckdb_connection.execute("INSTALL httpfs;")
        duckdb_connection.execute("LOAD httpfs;")

        # 2. S3からCSVを読み込んでParquetに書き出し
        s3_bucket = event['Records'][0]['s3']['bucket']['name']
        s3_object_key = event['Records'][0]['s3']['object']['key']
        s3_input_path = f"s3://{s3_bucket}/{s3_object_key}"
        
        print(f"Processing file: {s3_input_path}")
        
        # 3. COPYコマンドでCSVをParquetに変換してS3に保存
        s3_output_bucket = "任意のバケット"
        output_key = s3_object_key.replace('.csv', '.parquet')
        s3_output_path = f"s3://{s3_output_bucket}/{output_key}"

        query = f"""
            COPY (SELECT * FROM read_csv_auto('{s3_input_path}')) 
            TO '{s3_output_path}' (PARQUET_VERSION v2);
        """
        
        start_time = time.time()
        duckdb_connection.execute(query)
        end_time = time.time() - start_time
        print(f"File converted and saved to {s3_output_path} in {end_time:.4f} sec")
    
    except Exception as e:
        print(f"Error occurred: {e}")

検証

CSVをメモリ上に展開して変換する場合

試行回数 変換後ファイルサイズ(MB) 処理時間(s) メモリ使用量(MB)
1回目 309.4 48.6834 2585
2回目 309.4 49.0462 2574
3回目 309.4 48.7816 2574

検証結果

  • ファイルサイズ:309.4MB
  • 処理時間:平均48.8秒
  • メモリ使用量:平均2577MB

考察

  • メモリにすべてのCSVデータをロードしてから変換を行う方法は、比較的ファイルサイズの圧縮率が高いという結果(309.4MB)。
  • 処理時間は48.7秒前後で、メモリ使用量はかなり高め(2577MB程度)
  • データの前処理(フィルタリング、変換など)が可能で、柔軟性が高い点が利点だが、大量のデータを扱う際にはメモリ消費が問題になる可能性がある。

COPYコマンドで直接バケットに出力する場合 Parquet v1

試行回数 変換後ファイルサイズ(MB) 処理時間(s) メモリ使用量(MB)
1回目 451.7 37.9470 633
2回目 451.7 39.1853 632
3回目 451.7 38.2245 616

検証結果

  • ファイルサイズ:451.7MB
  • 処理時間:平均38.1秒
  • メモリ使用量:平均627MB

考察

  • COPYコマンドで直接S3に出力する方法は、メモリ使用量と処理時間の両方で改善。特にメモリ使用量(627MB)は大幅に削減されているが、ファイルサイズ(451.7MB)はメモリ展開方式に比べて大きく、圧縮率は低め。
  • メモリの節約と処理速度の向上に対して、圧縮率が犠牲になっている印象。これはデータをメモリにロードすることなく直接出力できるため、ディスクIOが減少するためかもしれない。

COPYコマンドで直接バケットに出力する場合 Parquet v2

Parquet v1よりファイルサイズの圧縮率が低く、メモリ使用量が若干少ない

試行回数 変換後ファイルサイズ(MB) 処理時間(s) メモリ使用量(MB)
1回目 421.1 37.7041 620
2回目 421.1 38.4100 624
3回目 421.1 38.2079 610

検証結果

  • ファイルサイズ:421.1MB
  • 処理時間:平均38.1秒
  • メモリ使用量:平均618MB

考察

  • Parquet v2を使用した場合、Parquet v1と比較して、メモリ使用量は若干少なくなっているが、圧縮率(421.1MB)は高くなっている。

検証結果まとめ

メモリ展開:ファイルサイズの圧縮率が高いが、メモリ消費が非常に高く、処理時間も長め。
COPYコマンド:メモリ消費と処理時間が大きく改善され、メモリ使用量が少ないが、圧縮率は低め。

Parquet v1 vs Parquet v2:Parquet v2のほうが圧縮率とメモリ使用量的に優位、今回のケースでは処理時間に優位な差はなかった。

選定方法

データ量が大きい場合は、メモリ使用量や処理時間が重要な要素となり、COPYコマンドを使った方法のほうが優位だと思われます。
ただし、データの圧縮が最優先の場合、メモリ展開での処理が有効ですが、メモリ使用率と処理時間が大きくなる可能性があるため大量データの取り扱いには注意が必要です。

最終的にはデータ量やパフォーマンス要件に依存するため、プロジェクトの要件に応じて選定いただければと思います。

まとめ

今回はいただいたコメントをもとにParquetへの変換方法のパフォーマンスを確認しました。
検証結果より単にParquetにすればいいというわけではなく、どういった要素を優先するかによって最適な変換方法を選定する必要があることがわかりました。

近年のデータが大規模化している中で、処理の効率化を進めるためには、どのようなデータ量やシステム要件を考慮するかによって最適なアプローチを選ぶことが重要です。

今回の記事が、最適なアーキテクチャ設計を考える一助になれば幸いです。

1

Discussion

ログインするとコメントできます