🦆

ParquetとDuckdbを使用して1000万行のcsvの読み込み・計算高速化を試みた

2024/11/18に公開

参考にさせていただいた記事

参考記事はDaskを使用しています。
https://qiita.com/fujine/items/830b3d30d3e4d4b36005

経緯

大量のデータを持つcsvをpandasで読み込み計算するとメモリ不足や速度の問題が付きまとう。
そんなとき参考記事を見つけ、こういった場合にparquetが有用であることを思い出した。またデータのロードからparquet変換、データ前処理、計算作業もできるduckdbを併用することでさらに便利になるのではないかと思い実験してみました。

PCスペック

CPU:Apple M1
メモリ:16GB
MacbookAir

今回あつかうデータ

1000万行10列にランダムな浮動小数点数が入っている。
データサイズは1.79GB

出力データの構造

統計量 列1 列2... ...列9 列10
count 10000000 10000000... ...10000000 10000000
mean 0.499927 0.499967... ...0.499847 0.500084
std 0.288671 0.288675... ...0.288647 0.288660
min 2.40e-07 1.53e-07... ...8.83e-08 2.20e-08
25% 0.250015 0.249965... ...0.249865 0.250064
50% 0.499733 0.499909... ...0.499799 0.500097
75% 0.749895 0.749971... ...0.749763 0.750215
max 1.000000 1.000000... ...1.000000 1.000000

時間の計算方法

python の time.time()を使用して時間を測定する。(5回測定した平均値)

実験1:作成したcsvファイルをpandasで読み込みpandasで計算

参考記事同様に作成したcsvファイルをpandasで読み込んだ後,統計情報を表示する方法
かかった時間は18.46秒

import pandas as pd
import time
import sys

# プログラム開始時刻を記録
start_time = time.time()

try:
    # 'dummy_data.csv'というファイル名でCSVを読み込みます
    data = pd.read_csv('dummy_data.csv')

    # データの統計情報を取得します
    stat = data.describe()

    # 統計情報を表示します
    print(stat)

except MemoryError:
    print("メモリ不足です。プログラムを終了します。")
    sys.exit(1)

# プログラム終了時刻を記録
end_time = time.time()

# 実行時間を計算して表示
execution_time = end_time - start_time
print(f"プログラムの実行時間: {execution_time:.2f}秒")

実験2:csv を一度pandasでparquetに変換してpandas読み込み計算

※別途pyarrowをインストール必要

csvからparquetに変換する時間は15.06秒となった...
しかしparquetを読み込み、同様の統計計算処理 11.64秒!
またデータサイズは789 MBとなった。(2分の1以下!)

合計時間は増えるが、計算処理は半分...とまではいかないが高速化したんじゃないかという感じ。

import pandas as pd
import time
import sys
pre_time = time.time()
# CSVをParquetに変換
csv_file = 'dummy_data.csv'
parquet_file = 'dummy_data.parquet'

try:
    # CSVファイルを読み込んでParquet形式で保存
    data = pd.read_csv(csv_file)
    data.to_parquet(parquet_file, index=False)
    
except Exception as e:
    print(f"CSVからParquetへの変換中にエラーが発生しました: {e}")
    sys.exit(1)
pos_time = time.time()
print(f"プログラムの実行時間: {pos_time - pre_time:.2f}秒")
# プログラム開始時刻を記録
start_time = time.time()

try:
    # Parquetファイルを読み込み
    data = pd.read_parquet(parquet_file)

    # データの統計情報を取得
    stat = data.describe()

    # 統計情報を表示します
    print(stat)

except MemoryError:
    print("メモリ不足です。プログラムを終了します。")
    sys.exit(1)
except Exception as e:
    print(f"データの読み込み中にエラーが発生しました: {e}")
    sys.exit(1)

# プログラム終了時刻を記録
end_time = time.time()

# 実行時間を計算して表示
execution_time = end_time - start_time
print(f"プログラムの実行時間: {execution_time:.2f}秒")

実験3:Pandasを用いてcsvをparquetへ変換していた部分をduckdbに置き換え&parquet読み込み部分もduckdbにした(計算はpandas)

csvからparquetへの変換は 4.21秒となった!
parquetを読み込み、同様の統計計算処理は6.85秒!
全体時間も当初のコードより短縮された🙌
ちなみにSELECT * ではなく一つの列を選択した場合だと1.71秒となったので特定の列のみ取得したい場合は有効

import duckdb
import time
import sys

pre_time = time.time()
csv_file = 'dummy_data.csv'
parquet_file = 'dummy_data.parquet'

try:
    # DuckDBを使用してCSVをParquetに変換
    con = duckdb.connect()  # DuckDBの接続を作成
    con.sql(f"COPY (SELECT * FROM read_csv_auto('{csv_file}')) TO '{parquet_file}' (FORMAT 'parquet');")
    print(f"{csv_file}を{parquet_file}に変換しました。")
except Exception as e:
    print(f"CSVからParquetへの変換中にエラーが発生しました: {e}")
    sys.exit(1)
finally:
    con.close()  # 接続を閉じる

pos_time = time.time()
print(f"プログラムの実行時間: {pos_time - pre_time:.2f}秒")

# プログラム開始時刻を記録
start_time = time.time()

try:
    # ParquetファイルをDuckDBで読み込み
    con = duckdb.connect()  # 再び接続を作成
    df = con.sql(f"SELECT * FROM '{parquet_file}'").fetchdf()

    # データの統計情報を取得
    stat = df.describe()

    # 統計情報を表示
    print(stat)

except MemoryError:
    print("メモリ不足です。プログラムを終了します。")
    sys.exit(1)
except Exception as e:
    print(f"データの読み込み中にエラーが発生しました: {e}")
    sys.exit(1)
finally:
    con.close()  # 接続を閉じる

# プログラム終了時刻を記録
end_time = time.time()

# 実行時間を計算して表示
execution_time = end_time - start_time
print(f"プログラムの実行時間: {execution_time:.2f}秒")

実験4:統計計算部分をpandasのdescribe()からduckdbに置き換え(ただし中央値などは計算無し)

parquetへの変換部分はあまり時間変化はなかったが、その後の計算部分は0.77秒!
ただしコード下部のようにデータ構造が変化してしまったこと、中央値が取得できなかったことなどが残念な点。

import duckdb
import time
import sys

pre_time = time.time()
csv_file = 'dummy_data.csv'
parquet_file = 'dummy_data.parquet'

try:
    # DuckDBを使用してCSVをParquetに変換
    con = duckdb.connect()  # DuckDBの接続を作成
    con.execute(f"COPY (SELECT * FROM read_csv_auto('{csv_file}')) TO '{parquet_file}' (FORMAT 'parquet');")
    print(f"{csv_file}を{parquet_file}に変換しました。")
except Exception as e:
    print(f"CSVからParquetへの変換中にエラーが発生しました: {e}")
    sys.exit(1)
finally:
    con.close()  # 接続を閉じる

pos_time = time.time()
print(f"プログラムの実行時間: {pos_time - pre_time:.2f}秒")

# プログラム開始時刻を記録
start_time = time.time()

try:
    # ParquetファイルをDuckDBで読み込み
    con = duckdb.connect()  # 再び接続を作成

    # 各列に対して統計情報を取得するクエリ
    query = f"""
    SELECT 
        '列1' AS column_name, 
        COUNT(列1) AS count, 
        AVG(列1) AS mean, 
        MIN(列1) AS min, 
        MAX(列1) AS max, 
        STDDEV(列1) AS stddev
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列2', 
        COUNT(列2), 
        AVG(列2), 
        MIN(列2), 
        MAX(列2), 
        STDDEV(列2)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列3', 
        COUNT(列3), 
        AVG(列3), 
        MIN(列3), 
        MAX(列3), 
        STDDEV(列3)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列4', 
        COUNT(列4), 
        AVG(列4), 
        MIN(列4), 
        MAX(列4), 
        STDDEV(列4)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列5', 
        COUNT(列5), 
        AVG(列5), 
        MIN(列5), 
        MAX(列5), 
        STDDEV(列5)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列6', 
        COUNT(列6), 
        AVG(列6), 
        MIN(列6), 
        MAX(列6), 
        STDDEV(列6)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列7', 
        COUNT(列7), 
        AVG(列7), 
        MIN(列7), 
        MAX(列7), 
        STDDEV(列7)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列8', 
        COUNT(列8), 
        AVG(列8), 
        MIN(列8), 
        MAX(列8), 
        STDDEV(列8)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列9', 
        COUNT(列9), 
        AVG(列9), 
        MIN(列9), 
        MAX(列9), 
        STDDEV(列9)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列10', 
        COUNT(列10), 
        AVG(列10), 
        MIN(列10), 
        MAX(列10), 
        STDDEV(列10)
    FROM '{parquet_file}'
    """

    # 統計情報を取得
    stats = con.sql(query)

    # 統計情報を表示
    print(stats)

except MemoryError:
    print("メモリ不足です。プログラムを終了します。")
    sys.exit(1)
except Exception as e:
    print(f"データの読み込み中にエラーが発生しました: {e}")
    sys.exit(1)
finally:
    con.close()  # 接続を閉じる

# プログラム終了時刻を記録
end_time = time.time()

# 実行時間を計算して表示
execution_time = end_time - start_time
print(f"プログラムの実行時間: {execution_time:.2f}秒")



# ┌─────────────┬──────────┬─────────────────────┬────────────────────────┬────────────────────┬─────────────────────┐
# │ column_name │  count   │        mean         │          min           │        max         │       stddev        │
# │   varchar   │  int64   │       double        │         double         │       double       │       double        │
# ├─────────────┼──────────┼─────────────────────┼────────────────────────┼────────────────────┼─────────────────────┤
# │ 列1         │ 10000000 │ 0.49997363703113196 │  3.209442711593624e-10 │ 0.9999999656430827 │ 0.28859636813140244 │
# │ 列2         │ 10000000 │  0.5000224119100443 │   9.88708384053183e-08 │ 0.9999999184711084 │   0.288666070122857 │
# │ 列3         │ 10000000 │  0.5001646258698798 │ 4.4403324350739126e-08 │ 0.9999999260758056 │ 0.28860387285912087 │
# │ 列4         │ 10000000 │  0.4997451253821639 │  7.392753542667663e-08 │ 0.9999999216304695 │   0.288696624563921 │
# │ 列5         │ 10000000 │  0.4999794226858744 │ 2.0042084547533534e-08 │ 0.9999999655836096 │ 0.28864317805683254 │
# │ 列6         │ 10000000 │  0.5002028919311465 │  1.172756992628976e-07 │ 0.9999999074513307 │  0.2887173415314276 │
# │ 列7         │ 10000000 │   0.499974034722065 │  5.191653708047639e-07 │ 0.9999999987997251 │   0.288661239064578 │
# │ 列8         │ 10000000 │  0.5000110029026893 │ 1.5417143539586675e-07 │ 0.9999999463445215 │  0.2886937845590032 │
# │ 列9         │ 10000000 │  0.5000169863884559 │  1.377724179185691e-07 │ 0.9999995820038601 │ 0.28866923297905317 │
# │ 列10        │ 10000000 │ 0.49981239991986165 │ 2.7163165272625633e-08 │ 0.9999999220848311 │  0.2886419021068985 │
# ├─────────────┴──────────┴─────────────────────┴────────────────────────┴────────────────────┴─────────────────────┤
# │ 10 rows                                                                                                6 columns │
# └──────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

実験5:duckdbで中央値など四分位範囲も計算する

計算部分は8.99秒となり結果としてはあまり良くなかった。
ORDER BY やGROUPを入れたからか...?
あと実験4と同様データ構造が変化(列と行が逆転)してしまっているのでここの問題も解決できればと思う。



import duckdb
import time
import sys

pre_time = time.time()
csv_file = 'dummy_data.csv'
parquet_file = 'dummy_data.parquet'

try:
    # DuckDBを使用してCSVをParquetに変換
    con = duckdb.connect()  # DuckDBの接続を作成
    con.execute(f"COPY (SELECT * FROM read_csv_auto('{csv_file}')) TO '{parquet_file}' (FORMAT 'parquet');")
    print(f"{csv_file}を{parquet_file}に変換しました。")
except Exception as e:
    print(f"CSVからParquetへの変換中にエラーが発生しました: {e}")
    sys.exit(1)
finally:
    con.close()  # 接続を閉じる

pos_time = time.time()
print(f"プログラムの実行時間: {pos_time - pre_time:.2f}秒")

# プログラム開始時刻を記録
start_time = time.time()

try:
    # ParquetファイルをDuckDBで読み込み
    con = duckdb.connect()  # 再び接続を作成

    # 各列に対して統計情報を取得するクエリ
    query = f"""
    SELECT 
        '列1' AS column_name, 
        COUNT(列1) AS count, 
        AVG(列1) AS mean, 
        MIN(列1) AS min, 
        MAX(列1) AS max, 
        STDDEV(列1) AS stddev,
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列1) AS q1,
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列1) AS median,
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列1) AS q3
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列2', 
        COUNT(列2), 
        AVG(列2), 
        MIN(列2), 
        MAX(列2), 
        STDDEV(列2),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列2),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列2),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列2)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列3', 
        COUNT(列3), 
        AVG(列3), 
        MIN(列3), 
        MAX(列3), 
        STDDEV(列3),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列3),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列3),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列3)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列4', 
        COUNT(列4), 
        AVG(列4), 
        MIN(列4), 
        MAX(列4), 
        STDDEV(列4),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列4),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列4),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列4)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列5', 
        COUNT(列5), 
        AVG(列5), 
        MIN(列5), 
        MAX(列5), 
        STDDEV(列5),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列5),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列5),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列5)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列6', 
        COUNT(列6), 
        AVG(列6), 
        MIN(列6), 
        MAX(列6), 
        STDDEV(列6),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列6),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列6),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列6)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列7', 
        COUNT(列7), 
        AVG(列7), 
        MIN(列7), 
        MAX(列7), 
        STDDEV(列7),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列7),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列7),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列7)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列8', 
        COUNT(列8), 
        AVG(列8), 
        MIN(列8), 
        MAX(列8), 
        STDDEV(列8),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列8),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列8),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列8)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列9', 
        COUNT(列9), 
        AVG(列9), 
        MIN(列9), 
        MAX(列9), 
        STDDEV(列9),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列9),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列9),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列9)
    FROM '{parquet_file}'
    UNION ALL
    SELECT 
        '列10', 
        COUNT(列10), 
        AVG(列10), 
        MIN(列10), 
        MAX(列10), 
        STDDEV(列10),
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY 列10),
        PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY 列10),
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY 列10)
    FROM '{parquet_file}'
    """

    # 統計情報を取得
    stats = con.sql(query)

    # 統計情報を表示
    print(stats)

except MemoryError:
    print("メモリ不足です。プログラムを終了します。")
    sys.exit(1)
except Exception as e:
    print(f"データの読み込み中にエラーが発生しました: {e}")
    sys.exit(1)
finally:
    con.close()  # 接続を閉じる

# プログラム終了時刻を記録
end_time = time.time()

# 実行時間を計算して表示
execution_time = end_time - start_time
print(f"プログラムの実行時間: {execution_time:.2f}秒")


#データ構造
# ┌─────────────┬──────────┬─────────────────────┬───┬─────────────────────┬─────────────────────┬─────────────────────┬────────────────────┐
# │ column_name │  count   │        mean         │ … │       stddev        │         q1          │       median        │         q3         │
# │   varchar   │  int64   │       double        │   │       double        │       double        │       double        │       double       │
# ├─────────────┼──────────┼─────────────────────┼───┼─────────────────────┼─────────────────────┼─────────────────────┼────────────────────┤
# │ 列1         │ 10000000 │ 0.49997363703113545 │ … │ 0.28859636813140366 │ 0.25017146296475634 │  0.4999567495815199 │ 0.7498039962861076 │
# │ 列2         │ 10000000 │   0.500022411910016 │ … │  0.2886660701228492 │ 0.25006191306815445 │  0.4999494247196515 │ 0.7500772993133467 │
# │ 列3         │ 10000000 │   0.500164625869882 │ … │  0.2886038728591226 │  0.2502857544480174 │  0.5002914313123676 │ 0.7501166075575045 │
# │ 列4         │ 10000000 │   0.499745125382161 │ … │  0.2886966245639178 │  0.2496954946391836 │ 0.49969925620313665 │ 0.7496399752443816 │
# │ 列5         │ 10000000 │ 0.49997942268589446 │ … │ 0.28864317805683354 │ 0.24998017690135335 │  0.5000690463765047 │ 0.7499377131344139 │
# │ 列6         │ 10000000 │  0.5002028919311468 │ … │ 0.28871734153142886 │ 0.25012429274894243 │  0.5004009405228993 │ 0.7502769785062803 │
# │ 列7         │ 10000000 │   0.499974034722065 │ … │   0.288661239064582 │  0.2499807787273045 │   0.499955546614117 │ 0.7499732945673792 │
# │ 列8         │ 10000000 │  0.5000110029026885 │ … │  0.2886937845589977 │ 0.24999584229212268 │ 0.49995571610122064 │ 0.7500625366610609 │
# │ 列9         │ 10000000 │  0.5000169863884575 │ … │ 0.28866923297905256 │ 0.24992688947850283 │  0.5000302850623182 │ 0.7500660607590525 │
# │ 列10        │ 10000000 │  0.4998123999198816 │ … │  0.2886419021068988 │ 0.24987186655067634 │  0.4996276557644865 │ 0.7497563383618168 │
# ├─────────────┴──────────┴─────────────────────┴───┴─────────────────────┴─────────────────────┴─────────────────────┴────────────────────┤
# │ 10 rows                                                                                                             9 columns (7 shown) │
# └─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

実験6:duckdbでSUMMARIZE SELECTを使用して統計量を出す。

duckdbにあるSUMMARIZE機能を使用するという方法をX(旧:Twitter)で教えていただいたきました。
ありがとうございます🙇
早速試してみましたが、実験3の計算部分より速く、5.66秒となりました!

import duckdb
import time
import sys

pre_time = time.time()
csv_file = 'dummy_data.csv'
parquet_file = 'dummy_data.parquet'

try:
    # DuckDBを使用してCSVをParquetに変換
    con = duckdb.connect()  # DuckDBの接続を作成
    con.execute(f"COPY (SELECT * FROM read_csv_auto('{csv_file}')) TO '{parquet_file}' (FORMAT 'parquet');")
    print(f"{csv_file}を{parquet_file}に変換しました。")
except Exception as e:
    print(f"CSVからParquetへの変換中にエラーが発生しました: {e}")
    sys.exit(1)
finally:
    con.close()  # 接続を閉じる

pos_time = time.time()
print(f"プログラムの実行時間: {pos_time - pre_time:.2f}秒")

# プログラム開始時刻を記録
start_time = time.time()

try:
    # ParquetファイルをDuckDBで読み込み
    con = duckdb.connect()  # 再び接続を作成
    df = con.sql(f"""
            SUMMARIZE SELECT * FROM
                '{parquet_file}'
        """)
    # 統計情報を表示
    print(df)

except MemoryError:
    print("メモリ不足です。プログラムを終了します。")
    sys.exit(1)
except Exception as e:
    print(f"データの読み込み中にエラーが発生しました: {e}")
    sys.exit(1)
finally:
    con.close()  # 接続を閉じる

# プログラム終了時刻を記録
end_time = time.time()

# 実行時間を計算して表示
execution_time = end_time - start_time
print(f"プログラムの実行時間: {execution_time:.2f}秒")

┌─────────────┬─────────────┬──────────────────────┬────────────────────┬───────────────┬─────────────────────┬───┬─────────────────────┬─────────────────────┬────────────────────┬──────────┬─────────────────┐
│ column_name │ column_type │         min          │        max         │ approx_unique │         avg         │ … │         q25         │         q50         │        q75         │  count   │ null_percentage │
│   varchar   │   varchar   │       varchar        │      varchar       │     int64     │       varchar       │   │       varchar       │       varchar       │      varchar       │  int64   │  decimal(9,2)   │
├─────────────┼─────────────┼──────────────────────┼────────────────────┼───────────────┼─────────────────────┼───┼─────────────────────┼─────────────────────┼────────────────────┼──────────┼─────────────────┤
│ 列1         │ DOUBLE      │ 2.2791307041636344…  │ 0.9999998544036967 │      11160784 │ 0.500030235128893   │ … │ 0.2501956898019403  │ 0.49956083226006986 │ 0.7497265542520956 │ 10000000 │            0.00 │
│ 列2         │ DOUBLE      │ 2.4721744396050127…  │ 0.9999999093202314 │      10957256 │ 0.4999490827945614  │ … │ 0.25018422311253213 │ 0.4999451469260643  │ 0.7497575338949348 │ 10000000 │            0.00 │
│ 列3         │ DOUBLE      │ 8.38723414053888e-08 │ 0.9999997896040389 │       8909170 │ 0.5001358460863113  │ … │ 0.2504446993263612  │ 0.5004291550605585  │ 0.7504208874256808 │ 10000000 │            0.00 │
│ 列4         │ DOUBLE      │ 1.4933057834731756…  │ 0.9999998761574722 │       9969957 │ 0.5001115345533031  │ … │ 0.2502220561175387  │ 0.5006930068240103  │ 0.7499472005784501 │ 10000000 │            0.00 │
│ 列5         │ DOUBLE      │ 9.225930908129953e…  │ 0.9999999481944614 │      11114490 │ 0.49995271184906337 │ … │ 0.2500060251068368  │ 0.49996950306516524 │ 0.7497239240688653 │ 10000000 │            0.00 │
│ 列6         │ DOUBLE      │ 1.27283873330164e-07 │ 0.9999999920360164 │       8725686 │ 0.4998446918404695  │ … │ 0.24979789828972812 │ 0.49991357662824504 │ 0.7499164639130429 │ 10000000 │            0.00 │
│ 列7         │ DOUBLE      │ 6.671306274075306e…  │ 0.9999999494034826 │       8263148 │ 0.4999672271593585  │ … │ 0.25033621400961614 │ 0.5005443333629526  │ 0.7497276050220494 │ 10000000 │            0.00 │
│ 列8         │ DOUBLE      │ 3.356572098045518e…  │ 0.9999999644997    │      10578452 │ 0.5000305241032194  │ … │ 0.2503857297601811  │ 0.5000831453163934  │ 0.7499767283800606 │ 10000000 │            0.00 │
│ 列9         │ DOUBLE      │ 5.6380300317293575…  │ 0.9999999674801799 │       8989957 │ 0.5000318817607543  │ … │ 0.2500874679491636  │ 0.5002759711176985  │ 0.7501578830335551 │ 10000000 │            0.00 │
│ 列10        │ DOUBLE      │ 7.47319797156365e-08 │ 0.99999992970467   │      10174594 │ 0.5000169403269028  │ … │ 0.2501655412874882  │ 0.5005279561385526  │ 0.7500558681857152 │ 10000000 │            0.00 │
├─────────────┴─────────────┴──────────────────────┴────────────────────┴───────────────┴─────────────────────┴───┴─────────────────────┴─────────────────────┴────────────────────┴──────────┴─────────────────┤
│ 10 rows                                                                                                                                                                                 12 columns (11 shown) │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

実験7:pd.read_parquet(parquet_file,engine="pyarrow")

これもXで教えて頂いた方法です。pandasのread_parquetにはengineを指定できるオプションがありpyarrowを指定すると高速になるとのことでしたので実験してみました。ただデータ型はarrow型になります。Arrowは大規模データセットを扱う際のメモリ使用量を削減する...とのことでかなり有用な方法に思えます。
統計量計算部分は...5.18秒!他の方法より速度が速くなりましたが実験8の内容によると誤差の可能性あり。

import pandas as pd
import time
import sys
import pyarrow
pre_time = time.time()
# CSVをParquetに変換
csv_file = 'dummy_data.csv'
parquet_file = 'dummy_data.parquet'

try:
    # CSVファイルを読み込んでParquet形式で保存
    data = pd.read_csv(csv_file)
    data.to_parquet(parquet_file, index=False)
    
except Exception as e:
    print(f"CSVからParquetへの変換中にエラーが発生しました: {e}")
    sys.exit(1)
pos_time = time.time()
print(f"プログラムの実行時間: {pos_time - pre_time:.2f}秒")
# プログラム開始時刻を記録
start_time = time.time()

try:
    # Parquetファイルを読み込み
    data = pd.read_parquet(parquet_file,engine="pyarrow")

    # データの統計情報を取得
    stat = data.describe()

    # 統計情報を表示します
    print(stat)

except MemoryError:
    print("メモリ不足です。プログラムを終了します。")
    sys.exit(1)
except Exception as e:
    print(f"データの読み込み中にエラーが発生しました: {e}")
    sys.exit(1)

# プログラム終了時刻を記録
end_time = time.time()

# 実行時間を計算して表示
execution_time = end_time - start_time
print(f"プログラムの実行時間: {execution_time:.2f}秒")


                 列1            列2            列3            列4            列5            列6            列7            列8            列9           列10
count  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07  1.000000e+07
mean   5.000302e-01  4.999491e-01  5.001358e-01  5.001115e-01  4.999527e-01  4.998447e-01  4.999672e-01  5.000305e-01  5.000319e-01  5.000169e-01
std    2.886573e-01  2.886021e-01  2.886422e-01  2.886021e-01  2.886880e-01  2.887401e-01  2.887060e-01  2.886779e-01  2.886876e-01  2.886374e-01
min    2.279131e-07  2.472174e-07  8.387234e-08  1.493306e-07  9.225931e-09  1.272839e-07  6.671306e-08  3.356572e-08  5.638030e-08  7.473198e-08
25%    2.501824e-01  2.499767e-01  2.502900e-01  2.501551e-01  2.499427e-01  2.496742e-01  2.500156e-01  2.500648e-01  2.501199e-01  2.500667e-01
50%    5.000470e-01  4.999147e-01  5.001627e-01  5.003557e-01  4.999114e-01  4.998032e-01  4.999315e-01  4.999983e-01  4.999607e-01  5.000067e-01
75%    7.499432e-01  7.499244e-01  7.501417e-01  7.499873e-01  7.500158e-01  7.499145e-01  7.500101e-01  7.501241e-01  7.500557e-01  7.500280e-01
max    9.999999e-01  9.999999e-01  9.999998e-01  9.999999e-01  9.999999e-01  1.000000e+00  9.999999e-01  1.000000e+00  1.000000e+00  9.999999e-01

実験8:pd.read_parquet(parquet_file,engine="pyarrow",dtype_backend="pyarrow")

これもXで教えて頂いた方法です。(n回目)ひたすら自分の知識不足を痛感しますね...
dtype_backendもpyarrowに指定するという方法です。engineとdtype_backendの何が違うかわからなかったので公式のドキュメントを見てみました。
https://pandas.pydata.org/docs/reference/api/pandas.read_parquet.html

engine:
Parquet形式のファイルを読み込むために使用する ライブラリ(エンジン) を指定
'auto'が自動で選択される。
pyarrow が使用可能なら優先・利用できない場合は fastparquet が使用される...
ということは実験7の速度向上は誤差の可能性がありますね。一応日を跨いでいるのと5回の試行回数は少なかったかもしれません。次回速度系の検証をする際は方法を改善したいです。

dtype_backend:
DataFrame の データ型のバックエンド(内部的なデータ型管理方法) を指定。
'numpy_nullable':デフォルトだとこれが選択されており、numpyを使用している。
'pyarrow':今回明示して指定した方式でPyArrowなデータ型を使用している。
これによりメモリ効率が上がるとのこと。

コードは実験7のpd.read_parquetの部分をタイトルのように書き換えただけなので省略。

統計量計算部分は...3.87秒!速度が他の方法より速くなりました!!!(n回目)

結論

duckdbでのparquet変換・読み込みは速度向上に十分貢献することがわかった。ただし四分位範囲を含む統計量計算を考えるとpandas(pyarrowオプション)の方が速度的に有効そうです。

なんどか修正してきて改めて有用性について考えてみました。
pandasのpyarrowオプションについて速度向上は今回最も大きい結果になり驚いています。内部構造をarrow型?で持つことによってもしかしたら他の方法でも速度向上が見込めるかもしれません。pythonだから...という先入観で遅いと思っていたんですが,pandasも内部で色々指定できるようになっているみたいで自分の勉強不足を痛感しました。職場でpandasが浸透していて追加のライブラリを入れる必要がない場合、または入れたくない場合、kaggleなどデフォルトでpandasが入っている場合はこちらの方が良さそうだと思います。

duckdbで最初から最後まで変換をする方法も需要があると思いました。というのも今回pythonで使用していますが、duckdb-wasmなどjavascript(typescript)上で同様の計算がしたい時、dbtなどでTransform部分を完結させたい場合,AWS Athenaの代わりに使用したい場合などはSQLのみでこの方法も取れそうということはわかった気がします。

今後速度関係でやりたいこととしては例えばリアルタイムに近い速度でデータの取得を行いたいときに、パイプライン全体の速度向上をどういった方法で行えば良いかというのも別の機会で検証できたらと思っています。(あくまで小規模、安価な方法で)
ただしばらくは自作アプリのためのパイプライン作成に従事したいと思います。

結論としては、四分位範囲まで必要なくてデータ構造が変わっても問題ない場合は実験4の方法、pythonが使用できて四分位範囲も欲しいなら実験8の方法を使用していこうと思います。

Discussion