🐻‍❄️

Rust版のpolarsでhive形式のparquetファイルの読み書きを行うメモ(ver: rs-0.43.1)

2024/10/09に公開

※この記事は不完全な部分を含む、ワークアラウンド的なところがあるのでご容赦ください
(polarsのアップデートで将来的に色々変わるかも?)

はじめに

最近pandasの対抗として台頭してきたpolars:

https://github.com/pola-rs/polars

について、

  • PythonではなくRustで使いたい
  • hive形式(パーティションあり)で分散されたparquetファイルを読み書きしたい

といったことがあり調べていた。
Python版ではユーザーガイドのIO/Hiveのページ:

https://docs.pola.rs/user-guide/io/hive/

にやり方がまとめられているが、何故かRustを使う場合の例が記載されておらず、
「polars自体の実装はRustなのだからRustで出来ないはずなくない?内部的にはRustのコードが動いているはずだし」
と思って調査した結果、予想外にかなりハマったのでメモ。

結論など

実際に動かしたコード例は

https://github.com/junkor-1011/polars-rust-partitioned-parquet-example-2024/blob/zenn_20241008_2/src/main.rs

に掲載している。↑のコードについて、大まかには

  1. テストデータとして適当なcsvを読み込み
  2. hive形式のparquetとして再保存
    • その際、適当なカラムをパーティションとして指定
  3. ↑で書き込んだhive形式のparquetを改めて読み込み
    • 読み込みではパーティションはよしなに読み込ませる(オプションなどで指定したりしない)

のような処理を行っている。

なお、polarsをプロジェクトに加える際、例えば

# -F でfeaturesを追加
cargo add polars -F polars-io -F parquet -F lazy

のようにして、polars-io, parquet, lazyの3つのfeaturesを有効にしておくこと:

Cargo.toml
[dependencies]
polars = { version = "0.43.1", features = ["polars-io", "parquet", "lazy"] }

書き込み

まずhive形式(parquet)での書き込みについて、先ほどのコード例から関係ある部分を抜粋&少しだけ書き換えると以下のような感じ:

use polars::prelude::*;

// (中略)

let mut df: DataFrame // 書き込み対象のpolars DataFrame
let output_path: &str = "/path/to/dir-name" // 書き込み先のディレクトリパス
let partition_by: Vec<&str> // partitionに使うカラム名を順に格納したVec

let _ = write_partitioned_dataset( // polars::preludeから読み込める関数: write_partitioned_datasetを使う
    &mut df,
    std::path::Path::new(output_path), // 書き込み先のパス(今回はhive形式なので、ここで指定した名前のディレクトリが作成される)
    partition_by,
    &ParquetWriteOptions::default(), // 特にこだわりが無ければdefault()を指定しておく
    4_294_967_296, // Ref: https://github.com/pola-rs/polars/blob/rs-0.43.1/py-polars/polars/dataframe/frame.py#L3651
).unwrap();

python版での記述df.write_parquet("<書き込み先のパス>", partition_by=[])とはだいぶ書きぶりが異なり、
適当な関数write_partitioned_datasetを探し出してくる必要があることに加え、オプション引数などもないため、やや冗長な記載になっている。
(Python版ではwrite_partitioned_datasetの5つの引数のうち正味はじめの3つしか使用しておらず、最後2つは適当な値がセットされる)

ここで、最後の引数: chunksizeでは謎の数字4_294_967_296を渡しているが、
python版のpolarsのwrite_parquetの実装をたどると、

https://github.com/pola-rs/polars/blob/rs-0.43.1/py-polars/polars/dataframe/frame.py#L3651

のように上記の数字が普通にハードコードされていたためそれに倣っている。

ちなみに、write_partitioned_dataset関数について、

https://github.com/pola-rs/polars/blob/rs-0.43.1/crates/polars-io/src/partition.rs#L190

を見ると

Write a partitioned parquet dataset. This functionality is unstable.

などと書かれており、

  • まだ完成形では無さそう
  • 保存形式はparquetしか対応してなさそう

みたいなことを推測できる。

読み込み

書き込みのときと同様に抜粋&少しだけ書き換えて掲載すると、

use polars::prelude::*;

// (中略)

let target_path: &str = "/path/to/target_dir" // 読み込み対象のディレクトリパス

let lf: LazyFrame = LazyFrame::scan_parquet_files( // LazyFrameとして読み込み
    vec![target_path.into()].into(), // 引数の型をあわせるためにVecに入れたりintoしたりする
    ScanArgsParquet::default(), // 特段事情がなければデフォルトで良いらしい
).unwrap();

let df: DataFrame = lf.collect().unwrap(); // collectすることでDataFrameに変換

といった感じ。

読み込みに使っているscan_parquet_filesに関しては、

https://docs.pola.rs/api/rust/dev/polars_lazy/frame/struct.LazyFrame.html#method.scan_parquet_files

なども参照。

(なお、公式ドキュメントなどに詳細な記載がなく、色々試行錯誤してたらそれっぽく動いた、みたいな感じなので詳細は色々と分かっていない。。。)

補足

途中にも書いたが、unstableといった記述もソースコード中にはあり、将来的にはもう少しいい感じのやり方がオフィシャルに出てくるかもしれない。
(というかそうなってほしい)

また、ちょくちょくソースコードのリンクを貼っていることから察することができるように(?)、今回の検証はpolarsの具体的な実装を追跡+時々エスパーすることでなんとかそれっぽく動くところまで持っていっている。

polarsのソースコードの大まかな構成としては、

polars
├── crates # (Rustで実装)
│   ├── polars-** # 数々のcrateに分割されたサブモジュール
│   ├── polars # Rust版のpolarsの本体
│   └── polars-python # PyO3/maturinによる、Rust実装からのPythonモジュール生成
└── py-polars # Python版のpolarsの本体 (Pythonで実装)

みたいな感じになっている。

Rust版だとpolarsクレートが他のpolars-**クレートを適宜呼び出すような実装になっている。

一方、Python版ではpolars-pythonクレートにてPyO3/maturin:

https://github.com/PyO3/maturin

を使っており、ここでRustのコードをPythonコードに変換させている。
そのため、Python版の処理を追跡していくと概ね

といった流れでコードを追うことになる。
(PyO3/maturinで生成されるPythonコード自体はリポジトリに含まれていないため、maturin初心者だとpolars-pythoncrateまで追跡するのが難しいかも?)

今回、Python版のpolarsが動く際、具体的にどのようにして各crateを呼び出して動かしているのか(どの関数を使い、どのような引数を与えるか、など)を調べて、それをRust側で実装する際に参考にしている。

正直よく分からなかった部分も多々あるが、内部実装を見てみるのは個人的に結構勉強になったように思う。
ので、(参考になる人がいるのかよくわからないが、)一応メモしておく。

あとがき(感想など)

冒頭で

「polars自体の実装はRustなのだからRustで出来ないはずなくない?内部的にはRustのコードが動いているはずだし」

などと書いたが、
これ自体は間違いではないと思うものの、一方で、Rustで使いやすいインターフェースが用意されているのか?ということとは必ずしも一致しないと感じた。

一方、それはそれとしてhive形式のファイルの取り扱いについて、以前はローカルでもApache Sparkをpysparkなどの形で使って処理することが多かった(※今後も別にやめないと思う)が、
一方で今回のpolarsや、他にもduckdbなどローカルでサクッと動かせるツールが出てきており、使える選択肢が増えてきて便利になってきたように思う。

GitHubで編集を提案

Discussion