🐡

RustでApache Parquetの読み書きをしてみた

に公開

目的

CSVの代替としてApache Paquetを扱う話がでてきたので、Rustで簡単な読み書きをやってみます。

Apache Parquet

本家サイトApache Parquetによれば、
「Apache Parquetは、効率的なデータの保存と取得のために設計された、オープンソースの列指向データファイル形式です。複雑なデータを一括処理するための高性能な圧縮およびエンコードスキームを提供し、多くのプログラミング言語や分析ツールでサポートされています。」
とのことです。

自分の理解だと検索のパフォーマンスが良くて、サイズも小さくて、汎用性もあって使い勝手の良いベターCSVという認識です。

Rustのライブラリは本家がサポートしています。

コード

本家のexampleを参考に色々書いてみました。

CSVからparquetに変換

CSVのフォーマットをmake_schemaでしっかりと定義します。CSVファイルから推測する方法もあるんですが、うまくいかなかったので明示しています。

input.csv
id,title,content,page
1,aaa,a,10
2,bbb,b,20
3,ccc,c,30
4,ddd,d,40
5,eee,e,50
6,fff,f,60
7,ggg,g,70
8,hhh,h,80
9,iii,i,90
10,jjj,j,100
schema.rs
use arrow::datatypes::{DataType, Field, Schema};
pub fn make_schema() -> Schema {
    Schema::new(vec![
        Field::new("id", DataType::UInt64, false),
        Field::new("title", DataType::Utf8, false),
        Field::new("content", DataType::Utf8, false),
        Field::new("page", DataType::UInt64, false),
    ])
}
move_csv.rs
use std::sync::Arc;
use crate::schema::make_schema;
use std::fs::File;
use arrow::csv;
use parquet::arrow::ArrowWriter;

pub fn execute(input: &str, output: &str) -> anyhow::Result<()> {
    // スキーマー定義
    let schema = Arc::new(make_schema());

    // CSVファイルを読み込み、Parquetファイルに変換
    let file = File::open(input)?;
    let mut csv = csv::ReaderBuilder::new(Arc::clone(&schema)).with_header(true)
        .build(file)?;

    let output = File::create(output)?;
    let mut writer = ArrowWriter::try_new(output, Arc::clone(&schema), None)?;
    while let Some(batch) = csv.next() {
        let batch = batch?;
        writer.write(&batch)?;
    }
    writer.close()?;
    Ok(())
}

全部読み

print_batchesというきれいに表示する関数が用意されています。

read_all.rs
use std::fs::File;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use arrow::util::pretty::print_batches;

pub fn execute(input: &str) -> anyhow::Result<()> {
    let file = File::open(input)?;
    let parquet_reader = ParquetRecordBatchReaderBuilder::try_new(file)?
        .with_batch_size(8192)
        .build()?;
    let mut batches = Vec::new();
    for batch in parquet_reader {
        batches.push(batch?);
    }
    print_batches(&batches)?;
    Ok(())
}
+----+-------+---------+------+
| id | title | content | page |
+----+-------+---------+------+
| 1  | aaa   | a       | 10   |
| 2  | bbb   | b       | 20   |
| 3  | ccc   | c       | 30   |
| 4  | ddd   | d       | 40   |
| 5  | eee   | e       | 50   |
| 6  | fff   | f       | 60   |
| 7  | ggg   | g       | 70   |
| 8  | hhh   | h       | 80   |
| 9  | iii   | i       | 90   |
| 10 | jjj   | j       | 100  |
+----+-------+---------+------+

フィルターとプロジェクション

filterして一部のみ読み込みます。今回IDが1か5を取得していますが、一気にフィルターする方法がなくて、バラバラチェックしたもをのORで結合しています。
もっといい方法がありそうです。

filter.rs
use std::time::SystemTime;
use arrow::array::{BooleanArray, Scalar, UInt64Array};
use arrow::compute::kernels::cmp::eq;
use arrow::error::ArrowError;
use arrow::util::pretty::print_batches;
use parquet::arrow::arrow_reader::{ArrowPredicateFn, RowFilter};
use parquet::arrow::{ParquetRecordBatchStreamBuilder, ProjectionMask};
use futures_util::stream::TryStreamExt;

// 2つのBooleanArrayをORで結合する
fn merge_bool_array(a: Result<BooleanArray, ArrowError>, b: Result<BooleanArray, ArrowError>) -> Result<BooleanArray, ArrowError> {
    if let Ok(a) = a {
        if let Ok(b) = b {
            // resとres2の結果をORで結合。個別に分解して行う
            Ok(a.iter().zip(b.iter()).map(|(a, b)| Some(a.unwrap() | b.unwrap())).collect())
        } else {
            Ok(a)
        }
    } else {
        a
    }
}

pub async fn execute(input: &str) -> anyhow::Result<()> {
    let file = tokio::fs::File::open(input).await?;

    let mut builder = ParquetRecordBatchStreamBuilder::new(file)
        .await?
        .with_batch_size(8192);

    // 3列のみを読み込む
    let file_metadata = builder.metadata().file_metadata().clone();
    let mask = ProjectionMask::roots(file_metadata.schema_descr(), [0, 1, 2]);
    builder = builder.with_projection(mask);

    // IDが1または5の行のみを抽出
    let scalar1 = UInt64Array::from(vec![1]);
    let scalar2 = UInt64Array::from(vec![5]);
    let filter = ArrowPredicateFn::new(
        ProjectionMask::roots(file_metadata.schema_descr(), [0]),
        move |record_batch| {
            let res1 = eq(record_batch.column(0), &Scalar::new(&scalar1));
            let res2 = eq(record_batch.column(0), &Scalar::new(&scalar2));
            merge_bool_array(res1, res2)
        },
    );

    let row_filter = RowFilter::new(vec![Box::new(filter)]);
    builder = builder.with_row_filter(row_filter);

    let stream = builder.build()?;

    let start = SystemTime::now();

    let result = stream.try_collect::<Vec<_>>().await?;

    println!("took: {} ms", start.elapsed()?.as_millis());

    print_batches(&result)?;
    Ok(())
}
took: 1 ms
+----+-------+---------+
| id | title | content |
+----+-------+---------+
| 1  | aaa   | a       |
| 5  | eee   | e       |
+----+-------+---------+

書き込み

列指向なフォーマットなので列ごとに値を作って書き込んでいます。
書き込みをを分けることで可能です。

maker.rs
use std::sync::Arc;

use arrow::array::RecordBatch;
use parquet::arrow::AsyncArrowWriter;
use tokio::fs::File;
use crate::schema::make_schema;

pub async fn execute(input: &str) -> anyhow::Result<()> {
    let file = File::create(input).await.unwrap();
    let schema = Arc::new(make_schema());
    let mut writer: AsyncArrowWriter<File> = AsyncArrowWriter::try_new(file, Arc::clone(&schema), None)?;
    let batch = RecordBatch::try_new(
        Arc::clone(&schema),
        vec![
            Arc::new(arrow::array::UInt64Array::from(vec![1, 2, 3, 4, 5])),
            Arc::new(arrow::array::StringArray::from(vec!["x", "y", "z", "w", "a"])),
            Arc::new(arrow::array::StringArray::from(vec!["xxx", "yyy", "zzz", "www", "aaa"])),
            Arc::new(arrow::array::UInt64Array::from(vec![100, 200, 300, 400, 500])),
        ],
    )?;
    writer.write(&batch).await?;
    writer.flush().await?;

    let batch = RecordBatch::try_new(
        Arc::clone(&schema),
        vec![
            Arc::new(arrow::array::UInt64Array::from(vec![6, 7, 8, 9, 10])),
            Arc::new(arrow::array::StringArray::from(vec!["b", "c", "d", "e", "f"])),
            Arc::new(arrow::array::StringArray::from(vec!["bbb", "ccc", "ddd", "eee", "fff"])),
            Arc::new(arrow::array::UInt64Array::from(vec![600, 700, 800, 900, 1000])),
        ],
    )?;
    writer.write(&batch).await?;
    writer.flush().await?;

    writer.close().await?;
    Ok(())
}

一括読み込み

+----+-------+---------+------+
| id | title | content | page |
+----+-------+---------+------+
| 1  | x     | xxx     | 100  |
| 2  | y     | yyy     | 200  |
| 3  | z     | zzz     | 300  |
| 4  | w     | www     | 400  |
| 5  | a     | aaa     | 500  |
| 6  | b     | bbb     | 600  |
| 7  | c     | ccc     | 700  |
| 8  | d     | ddd     | 800  |
| 9  | e     | eee     | 900  |
| 10 | f     | fff     | 1000 |
+----+-------+---------+------+

本家にあるグループごとに読み込むサンプルを使うと上記の結果は以下のようになります。

+----+-------+---------+------+
| id | title | content | page |
+----+-------+---------+------+
| 1  | x     | xxx     | 100  |
| 2  | y     | yyy     | 200  |
| 3  | z     | zzz     | 300  |
| 4  | w     | www     | 400  |
| 5  | a     | aaa     | 500  |
+----+-------+---------+------+
+----+-------+---------+------+
| id | title | content | page |
+----+-------+---------+------+
| 6  | b     | bbb     | 600  |
| 7  | c     | ccc     | 700  |
| 8  | d     | ddd     | 800  |
| 9  | e     | eee     | 900  |
| 10 | f     | fff     | 1000 |
+----+-------+---------+------+

メイン

main.rs
pub mod group;
pub mod maker;
pub mod move_csv;
pub mod read_all;
pub mod filter;
pub mod schema;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    move_csv::execute("../input.csv", "output.parquet")?;
    read_all::execute("output.parquet")?;
    filter::execute("output.parquet").await?;
    group::execute("output.parquet").await?;
    maker::execute("result.parquet").await?;
    read_all::execute("result.parquet")?;
    group::execute("result.parquet").await?;

    Ok(())
}
Cargo.toml
[package]
name = "sample"
version = "0.1.0"
edition = "2024"

[dependencies]
anyhow = "1"
arrow = { version = "54.2.1", features = ["prettyprint"] }
bytes = "1.10.1"
futures-util = "0.3.31"
parquet = { version = "54", features = ["json", "async"] }
sysinfo = "0.33.1"
tokio = { version="1.43.0", features=["macros", "rt-multi-thread", "fs"] }

まとめ

上記サンプルはコードにまとめてあります。

一通り読み書きをしてみました。
読み込みはもっと良い方法がありそうなのと、書き込みはもっと色々できそうです。
本家の書き込みのサンプルを見るとブルームフィルターを使っています。色々とパフォーマンスを考慮したカスタマイズができるようです。

Discussion