Open3
Parquet とその操作方法
背景
DuckDB + OPFS (Origin Private File System) により、ログ分析や可視化といった機能を静的サイト上に実現できることが知られている。
この構成において、データ形式に Parquet が利用されている事例が多いため、その利用方法を調査する。
サーバサイドでの利用
スタック
一旦以下を仮定する。
- 言語: TypeScript
- ランタイム: CloudFlare Worker
- ストレージ: CloudFlare R2
ライブラリ
採用:
-
hyparquet
: パーサライブラリ。書き込みはhyparquet-writer
で別途提供。
代替候補:
-
parquet-wasm
: クライアントサイドでの利用が適切なライブラリと考え、保留。 -
@dsnp/parquetjs
:parquet.js
のフォーク。このスタックで利用できるコードが書けなかったため、保留。
hyparquet での PoC コード
import { parquetWriteBuffer } from "hyparquet-writer";
import { parquetReadObjects } from "hyparquet";
// 書き込み
const writer = async (env: Cloudflare.Env) => {
const arrayBuffer = parquetWriteBuffer({
columnData: [
{ name: "id", data: [1, 2, 3], type: "INT32" },
{ name: "name", data: ["Alice", "Bob", "Charlie"], type: "STRING" },
],
});
const obj = await env.BUCKET.put("foo.parquet", arrayBuffer);
console.log("R2 Object:", obj);
};
// 読み込み
const reader = async (env: Cloudflare.Env) => {
const obj = await env.BUCKET.get("foo.parquet");
if (!obj || !obj.body) {
console.error("Object or body is null");
}
const arrayBuffer = await obj!.arrayBuffer();
const parquetData = await parquetReadObjects({
file: arrayBuffer,
});
console.log("Object:", parquetData);
};
実行結果
- R2 に parquet ファイルを作成
- R2 からファイルを取得、ログ出力で復元できることを確認
注意: 検証の都合上、以下のように Hono アプリケーションとして実行する。
index.ts
import { Hono } from "hono";
const app = new Hono<{ Bindings: Cloudflare.Env }>();
app.post("/write", async (c) => {
await writer(c.env);
return c.json({ success: true });
});
app.get("/read", async (c) => {
await reader(c.env);
return c.json({ success: true });
});
export default app;
表示例:
R2 Object: HeadResult {
// ...
key: 'foo.parquet'
}
[wrangler:info] GET /write 200 OK (102ms)
[wrangler:info] GET /read 200 OK (22ms)
Object: [
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' },
{ id: 3, name: 'Charlie' }
]
@dsnp/parquetjs
の使い方と制限事項
memo: 使い方
@dsnp/parquetjs
は thrift
に依存しており、アプリケーションで利用する場合は明示的にインストールする必要がある。
npm i thrift
型チェックは例えば次のような実装で通すことができる。
import * as parquet from "@dsnp/parquetjs";
export const writer = async (env: Cloudflare.Env) => {
const schema = new parquet.ParquetSchema({
id: { type: "INT32" },
name: { type: "UTF8" },
});
const buffer: Buffer = Buffer.alloc(0);
const writer = await parquet.ParquetWriter.openFile(schema, buffer);
await writer.appendRow({ id: 1, name: "Alice" });
await writer.appendRow({ id: 2, name: "Bob" });
await writer.appendRow({ id: 3, name: "Charlie" });
await writer.close();
const parquetBuffer = Buffer.concat([buffer]);
const obj = await env.BUCKET.put("bar.parquet", parquetBuffer);
console.log("R2 Object:", obj);
};
export const reader = async (env: Cloudflare.Env) => {
const obj = await env.BUCKET.get("bar.parquet");
if (!obj || !obj.body) {
console.error("Object or body is null");
return;
}
const arrayBuffer = await obj!.arrayBuffer();
const parquetBuffer = Buffer.from(arrayBuffer);
const reader = await parquet.ParquetReader.openFile(parquetBuffer);
const cursor = reader.getCursor();
let record = null;
while ((record = await cursor.next())) {
console.log("Record:", record);
}
await reader.close();
};
制限事項
以下のエラーに示すように fs.createWriteStream
が実装されていないとのことで、利用ができなかった。
ライブラリの実装の問題なのか、 worker 環境の問題なのかは原因の特定を行なっていない。
✘ [ERROR] Error: [unenv] fs.createWriteStream is not implemented yet!
at createNotImplementedError
(file:///(...)/node_modules/unenv/dist/runtime/_internal/utils.mjs:25:9)
at Object.fn [as createWriteStream]
(file:///(...)/node_modules/unenv/dist/runtime/_internal/utils.mjs:30:9)
at null.<anonymous>
(file:///(...)/node_modules/@dsnp/parquetjs/dist/lib/util.js:207:43)
at [object Object]
at Object.osopen
(file:///(...)/node_modules/@dsnp/parquetjs/dist/lib/util.js:206:12)
at ParquetWriter.openFile
(file:///(...)/node_modules/@dsnp/parquetjs/dist/lib/writer.js:84:49)
at writer (file:///(...)/src/io.ts:10:46)
at Array.<anonymous> (file:///(...)/src/index.ts:89:9)
at #dispatch
(file:///(...)/node_modules/hono/dist/hono-base.js:188:37)
at Object.fetch
(file:///(...)/node_modules/hono/dist/hono-base.js:214:17)