💭

CSV.GZファイルにSQLを実行した結果->Polars DataFrame->2次元配列(ベクタ)に変換してCSVに出力する

2023/10/02に公開

初めに

現職でcsv.gzファイルを解凍してcsvのデータを加工するといった処理フローを構築しておりますが、そもそもcsv.gzファイルにSQLを実行した結果をcsvファイルに出力すれば1ステップ省けるという着想に基づき実験的に処理を書いてみました。

csv.gzファイルへのSQL実行結果をPolars DataFrameへ変換する処理およびPolars DataFrameから2次元の配列(ベクタ)に変換する処理は今回の処理の目的からすると必須ではありません。

しかしながら、他の用途で使える可能性もあるので実装しています。
特にPolars DataFrameから2次元の配列(ベクタ)に変換する処理はPolars DataFrameをGoogle Sheets APIでGoogle Sheetsのシートに書き込む際に使えると思います。

試すこと

csv.gzファイルにSQLを実行し、その実行結果をcsvに書き込む処理を下記の仕様で実装しました。

  1. csv.gzファイルに対してApache Arrow DataFusionを介してSQLを実行し、実行結果をPolars DataFrameに変換

  2. Polars DataFrameを2次元の配列(ベクタ)に変換

  3. 2次元の配列(ベクタ)を一行ずつcsvファイルに書き込む

開発環境

  • Windows10
  • Visual Studio Code
  • WSL2
  • Docker
  • Rust : 1.73.0

やったこと

Rustのビルド環境の構築

WSL2にDockerをセットアップし、コンテナでRustのビルド環境を構築しました。

</> Dockerfile

FROM debian:bullseye

RUN apt-get update \
    && apt-get install -y build-essential \
    && apt-get install -y curl \
    && apt-get install -y sudo \
    && apt-get -y install libpq-dev \
    && apt-get clean \
    && rm -rf /var/lib/apt/lists/*

# user追加
ARG USERNAME=user
ARG GROUPNAME=general
ARG UID=1000
ARG GID=1000
ARG PASSWORD=user
RUN groupadd -g $GID $GROUPNAME && \
    useradd -m -s /bin/bash -u $UID -g $GID -G sudo $USERNAME && \
    echo $USERNAME:$PASSWORD | chpasswd && \
    echo "$USERNAME   ALL=(ALL) NOPASSWD:ALL" >> /etc/sudoers


USER $USERNAME
WORKDIR /home/$USERNAME/

# フォルダ作成
RUN mkdir -p $HOME/project \
    && chmod +wrx $HOME/project \
    && mkdir -p $HOME/data \
    && chmod +wrx $HOME/data

# rust
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs > $HOME/rustup.sh \
    && chmod +x $HOME/rustup.sh \
    && $HOME/rustup.sh -y --default-toolchain nightly 

CMD ["bash"]

プロジェクトの作成

下記のコマンドでプロジェクトを作成しました。

cargo new decompress-csv-gz-slow

Cargo.tomlファイルに依存クレートを設定しました。

</> Cargo.toml

[package]
name = "decompress-csv-gz-slow"
version = "0.1.0"
edition = "2021"

[dependencies]
arrow = "42.0.0"
parquet = "42.0.0"
anyhow = "1.0.71"
csv = "1.2.2"
datafusion = "27.0.0"
tokio = "1.29.1"
polars = "0.31.1"

実装

csv.gzファイルの列レイアウトからスキーマ情報を生成

csv.gzファイルの1行目のデータを読み取り、後続のデータ抽出(加工)SQL実行用のスキーマ(データ型は全て文字列型)を生成する

let ctx = SessionContext::new();
// query compressed CSV with specific options
let csv_options = CsvReadOptions::default()
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.file_extension(".gz");
ctx.register_csv("_table", &input_file_path, csv_options).await?;// create a plan to run a SQL query for Header Fields
let _sql = r#"SELECT * FROM _table  LIMIT 1 OFFSET 0"#;
let df = ctx.sql(&_sql).await?;

// convert to Result<Vec<RecordBatch>>
let batches = df.collect().await?;

// convert to polars dataframe
let fields: Vec<String> =  (&batches[0]).schema().fields.to_vec().iter().map(|s| s.name().to_string()).collect();
println!("{:?}", &fields);
let query_fields: Vec<Field> = fields.clone().iter().map(|s| Field::new(s, DataType::Utf8, true)).collect();
println!("{:?}", &query_fields);
let _schema = Schema::new(query_fields);

スキーマ(データ型は全て文字列型)を設定してテーブル登録

前述のSQL用のスキーマ(データ型は全て文字列型)で固定することで後続のPolars DataFrameのスキーマを全て文字列型で作成可能となり、変換処理を簡略化できる

// create a plan to run a SQL query
// register the table
let ctx = SessionContext::new();
// query compressed CSV with specific options
let csv_options = CsvReadOptions::default()
.has_header(true)
.file_compression_type(FileCompressionType::GZIP)
.schema(&_schema)
.file_extension(".gz");
ctx.register_csv("_table", &input_file_path, csv_options).await?;

データ行数の取得

後述の処理でSQLのoffsetがデータ行数を超えたら処理を止めるためにデータ行数を取得する

let _sql = r#"SELECT * FROM _table"#;
let df = ctx.sql(&_sql).await?;
let count: u64 = df.clone().count().await?.try_into().unwrap();
println!("total data count -> {:?}", &count);

外部のSQLファイルを読み込む

コマンドライン引数で指定したSQLファイルパスよりSQLを読み込む

// SQLファイルを開く
let mut sqlfile = File::open(sql_file_path)?;
// ファイルの内容を格納するための文字列を作成
let mut query_txt = String::new();
// ファイルの内容を読み込む
sqlfile.read_to_string(&mut query_txt)?; 
let _sql = format!(r#"{query_txt} "#);
println!("sql -> {:?}", &_sql);

SQLファイル(data_ex.sql)

テーブル名を_tableで固定する以外は自由に変更可である。

select * from _table

SQL実行結果をPolars DataFrameに変換

SQLを実行してVec<RecordBatch>型のオブジェクトに格納(let batches = df.collect().await?)し、Vec<RecordBatch>型のオブジェクトをPolars DataFrameオブジェクトに変換する(let df: DataFrame_Polars = DataFrame_Polars::new(series_vec)?)処理です。Polars DataFrameのスキーマは全て文字列型とし、処理を簡略化しています。

ソースコードが読みづらくて申し訳ないです。変数dfに格納されるオブジェクトが途中からPolars DataFrameに変更されますので注意してください。

let df = ctx.sql(&_sql).await?;
let df = df.limit(offset.try_into().unwrap(), Some(limit.try_into().unwrap()))?;
println!("offset -> {:?}", &offset);
// convert to Result<Vec<RecordBatch>>
let output_file_path = output_file_path.replace(".csv",format!("_{exec_cnt}_.csv").as_str());
let mut wtr = WriterBuilder::new().quote_style(QuoteStyle::Always).from_path(output_file_path)?;            
let batches = df.collect().await?;
for (_idx,batch) in batches.iter().enumerate() { 
    let columns =  batch.columns();
    let mut series_vec: Vec<Series> = vec![];
    for (i,el) in columns.iter().enumerate() {
        let el = el.clone();
        let value_array: StringArray = el.as_any().downcast_ref::<StringArray>().unwrap().clone();
        let values: Vec<String> = value_array.iter().map(|s| s.unwrap().to_string()).collect();
        let _s = Series::new(&fields[i], &values);
        series_vec.push(_s);
    }
    
    let df: DataFrame_Polars = DataFrame_Polars::new(series_vec)?;

Polars DataFrameを2次元配列(ベクタ)に変換

Polars DataFrameを変換して2次元配列(let mut row_vecs: Vec<Vec<String>> = vec![])に格納します。Polars DataFrameをベクタに変換するメソッドを見つけることができませんでしたので使えそうなメソッドを組み合わせて実装しております。。

// 行iteration,行方向2次元配列への変換
let height = df.clone().height();
// println!("{:?}", &height);
let col_headers: Vec<String> = df.clone().get_columns().iter().map(|s| s.name().to_string()).collect();
// println!("{:?}", &col_headers);
let mut row_vecs: Vec<Vec<String>> = vec![];
for _idx in 0..=height-1 {
    let sl = df.clone().slice(_idx.try_into()?, 1); // _idx行目の1行分のレコード(Dataframe)を取得
    // println!("{:?}", &sl);
    let _vec = sl.get(0).unwrap(); //1行分のレコードのため、index=0
    // println!("{:?}", &_vec);
    // let _vec_iter = _vec.iter().clone();
    let row_vec: Vec<String> =  _vec.iter().map(|s| s.get_str().unwrap().to_string()).collect();
    // println!("{:?}", &row_vec);
    row_vecs.push(row_vec.clone());
}

コードの全体像

コードの全体像は下記となります。

</> src/main.rs

use std::fs::File;
use std::sync::Arc;
use std::env;
use std::io::prelude::*;

use anyhow::{Context,Result};
use arrow::datatypes::{DataType, Field, Schema};

use arrow::array::StringArray;
use datafusion::datasource::file_format::file_type::FileCompressionType;
use datafusion::prelude::*;
use polars::prelude::*;
use polars::series::Series;
use polars::frame::DataFrame as DataFrame_Polars;
use csv::{ReaderBuilder,WriterBuilder,QuoteStyle};


#[tokio::main]
async fn main() -> Result<()> {
    // コマンドライン引数を取得
    let args: Vec<String> = env::args().collect();
    let chunk: u64 = args[1].parse().unwrap(); //"100000";
    let sql_file_path: String = args[2].to_string();
    let input_file_path = &args[3]; //"./data/*.gz";
    let output_file_path = &args[4]; //"./data/output.csv";    
    // schema生成用に読み込む
    // register the table
    let ctx = SessionContext::new();
    // query compressed CSV with specific options
    let csv_options = CsvReadOptions::default()
    .has_header(true)
    .file_compression_type(FileCompressionType::GZIP)
    .file_extension(".gz");
    ctx.register_csv("_table", &input_file_path, csv_options).await?;

    // create a plan to run a SQL query for Header Fields
    let _sql = r#"SELECT * FROM _table  LIMIT 1 OFFSET 0"#;
    let df = ctx.sql(&_sql).await?;

    // convert to Result<Vec<RecordBatch>>
    let batches = df.collect().await?;

    // convert to polars dataframe
    let fields: Vec<String> =  (&batches[0]).schema().fields.to_vec().iter().map(|s| s.name().to_string()).collect();
    println!("{:?}", &fields);
    let query_fields: Vec<Field> = fields.clone().iter().map(|s| Field::new(s, DataType::Utf8, true)).collect();
    println!("{:?}", &query_fields);
    let _schema = Schema::new(query_fields);

    // SQL実行
    let limit = chunk;
    let mut offset: u64 = 0;
    let mut exec_cnt = 0;
    // create a plan to run a SQL query
    // register the table
    let ctx = SessionContext::new();
    // query compressed CSV with specific options
    let csv_options = CsvReadOptions::default()
    .has_header(true)
    .file_compression_type(FileCompressionType::GZIP)
    .schema(&_schema)
    .file_extension(".gz");
    ctx.register_csv("_table", &input_file_path, csv_options).await?;
    let _sql = r#"SELECT * FROM _table"#;
    let df = ctx.sql(&_sql).await?;
    let count: u64 = df.clone().count().await?.try_into().unwrap();
    println!("total data count -> {:?}", &count);
    // SQLファイルを開く
    let mut sqlfile = File::open(sql_file_path)?;
    // ファイルの内容を格納するための文字列を作成
    let mut query_txt = String::new();
    // ファイルの内容を読み込む
    sqlfile.read_to_string(&mut query_txt)?; 
    let _sql = format!(r#"{query_txt} "#);
    println!("sql -> {:?}", &_sql);
    loop {
        if offset > count {
            println!("break offset -> {:?}", &offset);
            break;
        }
        
        let df = ctx.sql(&_sql).await?;
        let df = df.limit(offset.try_into().unwrap(), Some(limit.try_into().unwrap()))?;
        println!("offset -> {:?}", &offset);
        // convert to Result<Vec<RecordBatch>>
        let output_file_path = output_file_path.replace(".csv",format!("_{exec_cnt}_.csv").as_str());
        let mut wtr = WriterBuilder::new().quote_style(QuoteStyle::Always).from_path(output_file_path)?;            
        let batches = df.collect().await?;
        for (_idx,batch) in batches.iter().enumerate() { 
            let columns =  batch.columns();
            let mut series_vec: Vec<Series> = vec![];
            for (i,el) in columns.iter().enumerate() {
                let el = el.clone();
                let value_array: StringArray = el.as_any().downcast_ref::<StringArray>().unwrap().clone();
                let values: Vec<String> = value_array.iter().map(|s| s.unwrap().to_string()).collect();
                let _s = Series::new(&fields[i], &values);
                series_vec.push(_s);
            }

            let df: DataFrame_Polars = DataFrame_Polars::new(series_vec)?;
            // println!("{:?}", &df);

            // 行iteration,行方向2次元配列への変換
            let height = df.clone().height();
            // println!("{:?}", &height);
            let col_headers: Vec<String> = df.clone().get_columns().iter().map(|s| s.name().to_string()).collect();
            // println!("{:?}", &col_headers);
            let mut row_vecs: Vec<Vec<String>> = vec![];
            for _idx in 0..=height-1 {
                let sl = df.clone().slice(_idx.try_into()?, 1); // _idx行目の1行分のレコード(Dataframe)を取得
                // println!("{:?}", &sl);
                let _vec = sl.get(0).unwrap(); //1行分のレコードのため、index=0
                // println!("{:?}", &_vec);
                // let _vec_iter = _vec.iter().clone();
                let row_vec: Vec<String> =  _vec.iter().map(|s| s.get_str().unwrap().to_string()).collect();
                // println!("{:?}", &row_vec);
                row_vecs.push(row_vec.clone());
            }
            // println!("{:?}", &row_vecs);

            // 行方向2次元配列 -> CSV出力
            // let _col: Vec<String> = col_headers.iter().map(|s| "\"".to_string() + s.as_str() + "\"").collect();
            if offset % limit == 0 && _idx == 0{
                wtr.write_record(&col_headers)?;
            }

            for row_vec in row_vecs {
                wtr.write_record(&row_vec)?;
            }
        }
        // offsetを更新
        offset += limit;
        // exec_cntを更新
        exec_cnt += 1;
        wtr.flush()?;
    }


    Ok(())
}

ビルド

下記のコマンドでバイナリを作成しました。Release用のビルドを実施。

cargo build --release

動作確認

input.csv.gzにSQLを実行した結果をcsvファイルに出力しました。

</> input.csv.gz
日本政府が公開しているデータからダウンロードしたデータに一部加工(ダブルクォートチェック用)を加えて使用しました。
下記の内容のデータで、データ行数は34,612行となります。

"所有者コード","所有者名","住所"
"44754","(株) 愛知オートグループ,test1","愛知県名古屋市港区油屋町3-28-3-40G"
"44755","(株)BIRD MOTOR","福岡県春日市下白水64-5"
"44756","RR general trading(同)","千葉県富津市富津1649"
"44757","(有) 愛知車輌","愛知県名古屋市中川区下之一色町字戌亥島9-7"
.
.
.

下記のコマンドを実行しました。

$ ./decompress-csv-gz-slow 10000 ./data_ex.sql ./input.csv.gz ./output.csv

実行結果は下記の通り確認できました。

10,000行ずつでファイルを分割して出力するため、ファイルが4つ作成(output_0_.csv~output_3_.csv)される

["所有者コード", "所有者名", "住所"]
[Field { name: "所有者コード", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "所有者名", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "住所", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]
total data count -> 34612
sql -> "select * from _table "
offset -> 0
offset -> 10000
offset -> 20000
offset -> 30000
break offset -> 40000

</> output_0_.csv

"所有者コード","所有者名","住所"
"44754","(株) 愛知オートグループ,test1","愛知県名古屋市港区油屋町3-28-3-40G"
"44755","(株)BIRD MOTOR","福岡県春日市下白水64-5"
"44756","RR general trading(同)","千葉県富津市富津1649"
"44757","(有) 愛知車輌","愛知県名古屋市中川区下之一色町字戌亥島9-7"
.
.
.

</> output_1_.csv

"所有者コード","所有者名","住所"
"50078","タカダ自動車(株)","愛知県宝飯郡小坂井町伊奈字市場321"
"49784","(株)ホンダ四輪販売三河","愛知県岡崎市六名南2丁目4-13"
"18821","(有)ワカヤマ","愛知県江南市古知野町高瀬32"
"08930","名古屋日産中村販売(株)","愛知県名古屋市中村区十王町3-39"
.
.
.

</> output_2_.csv

"所有者コード","所有者名","住所"
"31078","(株)プラチナ","島根県松江市八幡町795-1"
"31079","(有) 大山伝宝商店","青森県八戸市江陽5丁目16-23"
"31090","(株) FATE","兵庫県宝塚市山本丸橋4丁目80"
"31091","(株)草原商会","鹿児島県曽於郡大崎町神領2374-5"
.
.
.

</> output_3_.csv

"所有者コード","所有者名","住所"
"10919","オートプラザ山陰(有)","鳥取県鳥取市叶96"
"24967","三宝自動車(有)","兵庫県西脇市大木町293-1"
"14637","(有)ラープ","奈良県大和高田市大谷377-32"
"13445","(有)ワールド貿易","和歌山県和歌山市直川1734"
.
.
. 

終わりに

上記処理はCloud Run Jobsで駆動できるかも検証しました。
コンテナ内にGCSバケットのフォルダをマウントしてGCSのフォルダにcsvファイルを書き込むことまではできましたが、データ行数の多いcsv.gzファイルに対してはSQLのoffset(とlimit)を使って分割処理を行う手法のため、offsetの仕様の問題でクエリの速度が遅くなっていきます。
そのため、実践で利用する場合はファイル分割処理などでcsv.gzファイル1つあたりの最大データ行数を1,000万行くらいに調整する対応が必要かと思います。

参考情報

slice - DataFrame in polars::frame - Rust (docs.rs)

collect - DataFrame in datafusion::dataframe - Rust (docs.rs)

Discussion