🌟

Rust で ADBC 経由で DuckDB にクエリを投げる

に公開

Observable Plot のドキュメントを読んでいると、Apache Arrow 使えるよ、という記述がありました。「ということは、データベースからフロントエンドまで変換なしでデータ渡したりできる...?」とふと思いました。それについて調べたことのメモ。

Plot.lineY(linedata, {
  x: (d) => d.hour,
  y: (d) => d.value,
  stroke: (d) => d.sensor
}).plot()

For greater efficiency, Plot also supports columnar data: you can use an Apache Arrow table as data instead of an array of objects. ^0.6.16 You can even pass parallel arrays of values, or Apache Arrow vectors, to each channel.

イメージ

よくあるパターンとして、フロントエンドは JSON しか読めないので、そこに来るまでのどこかでデータを JSON に変換するという処理が挟まります。DB と接続するバックエンドのサーバーで JSON 化するとか、あるいは DB の側で直接 JSON を吐き出すとか、そんな感じです。

DB  ---------->  backend  --(JSON)-->  frontend
DB  --(JSON)-->  backend  --(JSON)-->  frontend

この、JSON にエンコードして、それをまたクライアント側でデコードして...、という処理を私たちは当たり前のように受け入れていますが、Apache Arrow を使うと、そうした処理を挟まず、右から左に受け流すようにして効率的にデータを流すことができるはずです。

DB  --(Arrow)-->  backend  --(Arrow)-->  frontend

参考:

https://www.clear-code.com/blog/2025/2/20/how-arrow-format-accelerate-query-result-transfer.html

まあ、というのは理論上の話で、ブラウザで捌けるくらいのデータ量であれば対して差がない可能性もけっこうあります。でもまあ、いちおう気になったのでやり方くらいは調べておこう、ということで。

ADBC? Arrow Flight SQL?

まず、ここは私が混乱してるだけかもですが、こういう時に使うのは ADBC なのか Arrow Flight SQL なのかどっち?、というのが悩む点です。と思っていたら、公式の FAQ にも書かれていました。

https://arrow.apache.org/adbc/current/faq.html#how-do-adbc-and-arrow-flight-sql-differ

曰く、

  • ADBC は「client API specification」
  • Arrow Flight SQL は「wire protocol」

とのことです。ADBC は蛇口みたいなもので、ひねれば Arrow 形式のデータが出てくるけど、実際のデータの転送がどういう形式かは不明。Arrow Flight SQL はそのデータ転送の部分の話。ということだと私は理解しました。

理解しましたといいつつあんまり理解していないのですが、Arrow Flight SQL を使うというのはドライバを自分で書くということで険しい道であり、すでに ADBC があるならそっちを使った方が楽、というのはわかりました。

既存のツール

とか言って小難しく調べてるけど、まあ Observable Plot のドキュメントにもわざわざ言及されてるくらいだし、デファクトになってるツールがあるんでしょ? 私もそう思っていました。でもないっぽいんですよね...。

追記:これなのでは??? DB との接続に ADBC を使えるなら完璧だった...

https://roapi.github.io/docs/

Rust

前置きが長くなりましたが、ということで、HTTP でリクエストしたら ADBC 経由でデータベースにクエリを実行して、結果を Aapche Arrow 形式で返してくれるやつをつくることにしました。

ADBC の公式ページでは Rust は書かれていないんですが、実際にはあります。adbc_core という crate です。

https://docs.rs/adbc_core/latest/adbc_core/

使い方はこんな感じです。結果は RecordBatchReader という型に入ってくるのですが、使い方はまたあとで説明します。

// まずドライバを作成
let mut driver = adbc_core::driver_manager::ManagedDriver::load_dynamic_from_filename(
    "/path/to/driver",
    None,
    adbc_core::options::AdbcVersion::default(),
)?;

// DBを作成。URI やパスワードはここで指定する。
let mut db = driver.new_database_with_opts([
    (OptionDatabase::Uri, "postgresql://..."),
    (OptionDatabase::Username, "user"),
    (OptionDatabase::Password, "pass"),
])?;

// コネクションを作成。カタログやスキーマ、auto commit の設定など。
let mut conn = db.new_connection_with_opts([
    (OptionConnection::CurrentCatalog, "catalog"),
])?;

// 新しいステートメントを作成して実行
let mut stmt = conn.new_statement()?;
stmt.set_sql_query("SELECT * FROM tbl")?;
let record_batch_reader = stmt.execute()?;

DuckDB の場合

私はとりあえず DuckDB を使ったのですが、DuckDB の場合はこうなります。コネクションの設定とかはないので、指定するものは少ないです。ちょっと分かりづらい点としては、load_dynamic_from_filename() の第二引数 entrypoint は多くのデータベースで None なのですが、DuckDB の場合は Some(b"duckdb_adbc_init") を指定する必要があります。

let mut driver = adbc_core::driver_manager::ManagedDriver::load_dynamic_from_filename(
    "/path/to/libduckdb.dylib", // OS によって違う
    Some(b"duckdb_adbc_init"),
    adbc_core::options::AdbcVersion::default(),
)?;

let mut db = driver.new_database()?;

let mut conn = db.new_connection()?;

...

(補足)duckdb-rs でもいい

私の目的は ADBC の使い方を知ることだったので adbc_core を使っていますが、DuckDB のみに限るなら duckdb-rs でもいいと思います。たしか Apache Arrow 形式でデータを返してくれるはず。

ADBC の利点を挙げるなら、duckdb-rs の場合は DuckDB 自体を静的リンクすることになるのでビルドに時間がかかる&サイズが大きいですが(たぶん。未確認です)、ADBC はドライバを動的に読み込むので軽い、というのはあります。

RecordBatchReader の扱い方

RecordBatchReaderはイテレータになっていて、next()Result<RecordBatch> を取り出すことができます。

最終的には Arrow IPC format にしたいので、そのためには arrow_ipc::writer::StreamWriter を使います。以下のコードでは、いったんサイズを調べてから全部一気に書き出すようにして処理しています。

// まずサイズを調べる
let mut result_bytes: usize = 0;
let mut batches: Vec<REcordBatch> = Vec::new();
for b in record_batch_reader {
    result_bytes += b?.get_array_memory_size();
    batches.push(b);
}

// 判明したサイズに基づいて結果の Vec をアロケートする
let mut result: Vec<u8> = Vec::with_capacity(result_bytes);
let schema_ref = batches[0].schema(); // Assuming all batches are the same schema
let mut writer = arrow_ipc::writer::StreamWriter::try_new(&mut result, &*schema_ref).unwrap();

for batch in &batches {
    writer.write(batch)?;
}

result.into_response()

ちなみに、たぶん一気に処理するのではなくて、処理できたところから返していった方がいいと思うのですが、そのあたりの実装の仕方がよくわからなかったのでこうなっています。もう少し詳しく書くと、私は poem を使って HTTP サーバーを実装したのですが、poem には Body::from_bytes_stream() が、axum には Body::from_stream() があり、ここに Stream を渡せばいいんだと思うんですが、RecordBatchReaderStream に変換する方法がわかりませんでした...。

JavaScript側

Apache Arrow の JavaScript / TypeScript 実装には tableFromIPC() という関数があるのでこれを使えば OK です。簡単ですね。

import { tableFromIPC } from 'apache-arrow';

const data_url = 'http://localhost...';
const dataFrame = await tableFromIPC(fetch(data_url));

感想

やり残した点としては、これでデータのシリアライズのコストは回避できたはずですが、無駄なバッファリングが発生してしまっています。そもそも HTTP のストリーム処理?について理解できていないので、ここから先はもうちょっとちゃんと勉強しないと進めなそうです。

(ちなみに、今回のコードは、公開するつもりですが、もうちょっとやりたいことがあるので、それが終わったら別の記事と合わせて公開します)

Discussion