🦆

kaggleのコンペでDuckdbを使用して前処理を高速化しようとしたら失敗した話

2024/11/13に公開

参加させていただいたコンペ

人狼ゲームのログ解析による犯人予測と勝利チームの予測

人狼ゲームのプレイヤーログデータ(csvに記載)を用いて、犯人(人狼)予測と勝利チームの予測を行う機械学習モデルの構築を目指しました。
例えばカラムとしてmessage(発言内容),role(役職)などが記載されている6万行×10列の訓練データを用いて機械学習を行うというものでした。
特徴量に関しても例えばこの人は毎回人狼になるとか勝率が良い人もいるとか非常に凝った内容になって面白かったです。

もしよければ探してみて下さい。

記事を書いた経緯

今回、kaggleのコンペに初めて参加しました。正直なところ、機械学習に関しては素人で、ついていくのが精一杯でした。肝心のモデル作成の部分はチュートリアルをなぞるだけでしたが非常に勉強になりました。(精度も並)

ただ今回は前処理の部分を自分なりに変更してデータ前処理の高速化をしてみたいと考え、DuckDBを使用してみることにしました。

列指向のデータベースエンジンで、大量データのクエリ処理や集計が高速で行えるので。pandasでデータを処理している部分をDuckDBで置き換えれば、まあ早くなるかな?と軽く考えていました。

...が、サンプルコードをpandasで実行した場合の処理時間は約69秒だったのに対し、DuckDBに置き換えた場合は約80秒と、かえって遅くなってしまいました。

なので試してみた遅かったコードのどこが原因か探してみることにしました。
ちなみに速度検証に関しては

start_time = time.time()
###この部分に速度検証したいコードを記述###
end_time = time.time()
print(f"{end_time - start_time}")

を使用しました。(kaggle上では測定のたびにかなり実行時間が異なるので秒数はあくまでも参考)

検証

遅かったコード:Duckdbのfetchdf()を使用した。

def load_data_ac(file_paths):
    queries = []
    for file_path in file_paths:
        episode_number = int(os.path.splitext(os.path.basename(file_path))[0])
        
        sql_query = f"SELECT *, {episode_number} AS episode_number FROM read_json('{file_path}')"
        
        std_time = time.time()
        df = con.execute(sql_query).fetchdf()
        end_time = time.time()
        print(f"{end_time - std_time}")
        queries.append(df)
    return pd.concat(queries)

#帰ってきたクエリのリストを結合
load_data_ac(FILE_PATHS_TRAIN)

受け取った50個ほどのファイルパスの名前に人狼ゲームの1ゲーム目、2ゲーム目などの数字が1.jsonl,2.jsonlなどのように書かれていた。
そのためその数字を新たな列として加えつつSQLでファイル内容を読んでいる。

ただその後がコンペ終了時間ギリギリで書いていたからなのか結構変なことをしていて、
SQLを一度pandasのデータフレームに変換:fetchdf()
その後最初に用意してあるリストにそれを追加して、最後にconcatで縦結合している。

time関数で秒数を測ってみると1ファイルあたり...
a = con.execute(sql_query).fetchdf() #0.10069794654846191秒
でした。
1つのファイルごとに毎回pandasのデータフレームを作っているので遅くなっていました。
なのでデータフレームにするのは最後の一度だけで良いことがわかりました。

load_data_ac(FILE_PATHS_TRAIN)呼び出し速度は3.126620054244995秒

速かったコード:テンプレートでいただいていたコード

def load_data(file_paths):
    data = []
    for file_path in tqdm(file_paths):
        episode_number = int(os.path.splitext(os.path.basename(file_path))[0])
        with open(file_path, 'r', encoding='utf-8') as f:
            for line in f:
                record = json.loads(line)
                record['episode_number'] = episode_number
                data.append(record)
    return pd.DataFrame(data)

標準のpythonライブラリで各ファイルの内容を1行づつ読み込みリストに格納
最後に一回だけデータフレームにしているので速かったのだと思います。

load_data(FILE_PATHS_TRAIN)呼び出し速度は2.363025188446045秒

速いと思ったが失敗したコード:duckdbでUNIONALL

def load_data_ac_optimized(file_paths):
    # ファイルごとにクエリを作成し、同時に処理
    sql_queries = [
        f"SELECT *, {int(os.path.splitext(os.path.basename(file_path))[0])} AS episode_number FROM read_json_auto('{file_path}')"
        for file_path in file_paths
    ]
    # すべてのクエリをUNIONで結合
    
    sql_query = " UNION ALL ".join(sql_queries)
    # 一度fetchdfで取り出す
    return con.sql(sql_query).fetchdf()

load_data_ac_optimized(FILE_PATHS_TRAIN)

速度的には速かったが各列のデータ型が合っておらず使用できないというエラーとなったコード。
おそらく改良すれば使用できる。

意外と速くなかった?:事前にparquetに変換→plのデータフレームとしてload

def convert_jsonl_to_parquet(file_paths, output_dir):
    # 出力ディレクトリが存在しない場合作成
    os.makedirs(output_dir, exist_ok=True)
    
    for file_path in file_paths:
        # Parquetファイルの出力パスを作成 output_dirができる
        parquet_path = os.path.join(output_dir, os.path.basename(file_path).replace('.jsonl', '.parquet'))
        
        # DuckDBのCOPYコマンドを使ってJSONLからParquetに変換
        con.sql(f"COPY (SELECT * FROM read_json_auto('{file_path}')) TO '{parquet_path}' (FORMAT 'parquet');")

# 出力先ディレクトリを指定して変換を実行
convert_jsonl_to_parquet(FILE_PATHS_TEST, '/kaggle/working/test_parquet')
convert_jsonl_to_parquet(FILE_PATHS_TRAIN, '/kaggle/working/train_parquet')

parquetでcsvデータをparquetに上のコードで変換して...

def load_data_parquet(file_paths):
    data = []
    for file_path in tqdm(file_paths):
        # ファイル名からエピソード番号を取得
        episode_number = int(os.path.splitext(os.path.basename(file_path))[0])
        
        # Parquetファイルを読み込み、各レコードにエピソード番号を追加してリストに保存
        records = pl.read_parquet(file_path).to_dicts()
        for record in records:
            record['episode_number'] = episode_number
            data.append(record)
    
    # 最後にPolars DataFrameに変換
    return pl.DataFrame(data)

TRAIN_PARQUET = glob.glob("/kaggle/working/train_parquet/*.parquet")


df_parquet = load_data_parquet(TRAIN_PARQUET)

load_data_parquet(TRAIN_PARQUET)呼び出しが3.6074962615966797秒...
やり方が悪いのかとも思ったが後述するが列選択をしていないのもあるのかもしれないです。

他気づいたこと

con.execute と con.sqlの違い

#1. con.sqlとcon.executeの違い
con.sql: DuckDBが提供する関数で、クエリを簡単に実行し、結果をそのままデータフレームやテーブル形式で返してくれ、クエリの準備や実行、結果の取得までが一括で行われるとのこと。

con.execute: より低レベルな操作を行う関数で、executeでクエリを実行した後、fetchdfやfetchallを使ってデータを取り出す必要があって、これらの操作が別々になっているため、全体として少し時間がかかるとのこと。

更なる高速化...?

duckdbは列ごとの検索が高速になるがSELECT * FROM のように指定しても全てのデータをとってきているので当然その恩恵は得られない。
最終段階で得られる列が決定している状態ならSQL文をひとまとめにして、SELECT * FROM を極力減らすことが重要なのではないかと思う。
ただEDAなどの段階では色々列を見たりすると思うので有効ではないかもしれない。

全体の処理が意外と高速化されなかった原因

kaggle上元々インストールしていないライブラリを追加した。

実は元々kaggle上にはpandas他使用ライブラリはインストールされている状態で追加のライブラリを使用するときはpip install する必要がある。このインストール時間で余計に時間を食ってしまったのではないかと思います。

メモリ依存

duckdbはメモリ依存なのでkaggleノートブック自体の性能に左右されるのではないかと思いました。実機ならまた違うかなと思います。

次やりたいこと

時間を測る際はkaggle上ではだいぶブレるので一旦kaggle apiでローカルにデータを持ってきてからやろうと思います。またpandasやpolarsなどデータフレームになるべく落とさない方法で良い方法が見つかると良いと思いました。あとこういったkaggle上でもよりよいデータ可視化ができるような上手い方法も何か探したいと思います。あと現在参加している時系列系のコンペではもっと機械学習の手法を学びたいと思います!

Discussion