Open3

Parquet とその操作方法

Ryo KAJIRyo KAJI

背景

DuckDB + OPFS (Origin Private File System) により、ログ分析や可視化といった機能を静的サイト上に実現できることが知られている。
この構成において、データ形式に Parquet が利用されている事例が多いため、その利用方法を調査する。

Ryo KAJIRyo KAJI

サーバサイドでの利用

スタック

一旦以下を仮定する。

  • 言語: TypeScript
  • ランタイム: CloudFlare Worker
  • ストレージ: CloudFlare R2

ライブラリ

採用:

  • hyparquet: パーサライブラリ。書き込みは hyparquet-writer で別途提供。

代替候補:

  • parquet-wasm : クライアントサイドでの利用が適切なライブラリと考え、保留。
  • @dsnp/parquetjs: parquet.js のフォーク。このスタックで利用できるコードが書けなかったため、保留。

hyparquet での PoC コード

import { parquetWriteBuffer } from "hyparquet-writer";
import { parquetReadObjects } from "hyparquet";

// 書き込み
const writer = async (env: Cloudflare.Env) => {
  const arrayBuffer = parquetWriteBuffer({
    columnData: [
      { name: "id", data: [1, 2, 3], type: "INT32" },
      { name: "name", data: ["Alice", "Bob", "Charlie"], type: "STRING" },
    ],
  });
  const obj = await env.BUCKET.put("foo.parquet", arrayBuffer);
  console.log("R2 Object:", obj);
};

// 読み込み
const reader = async (env: Cloudflare.Env) => {
  const obj = await env.BUCKET.get("foo.parquet");

  if (!obj || !obj.body) {
    console.error("Object or body is null");
  }

  const arrayBuffer = await obj!.arrayBuffer();

  const parquetData = await parquetReadObjects({
    file: arrayBuffer,
  });

  console.log("Object:", parquetData);
};

実行結果

  1. R2 に parquet ファイルを作成
  2. R2 からファイルを取得、ログ出力で復元できることを確認

注意: 検証の都合上、以下のように Hono アプリケーションとして実行する。

index.ts
import { Hono } from "hono";
const app = new Hono<{ Bindings: Cloudflare.Env }>();

app.post("/write", async (c) => {
  await writer(c.env);
  return c.json({ success: true });
});

app.get("/read", async (c) => {
  await reader(c.env);
  return c.json({ success: true });
});

export default app;

表示例:

R2 Object: HeadResult {
  // ...
  key: 'foo.parquet'
}
[wrangler:info] GET /write 200 OK (102ms)
[wrangler:info] GET /read 200 OK (22ms)
Object: [
  { id: 1, name: 'Alice' },
  { id: 2, name: 'Bob' },
  { id: 3, name: 'Charlie' }
]
Ryo KAJIRyo KAJI

memo: @dsnp/parquetjs の使い方と制限事項

使い方

@dsnp/parquetjsthrift に依存しており、アプリケーションで利用する場合は明示的にインストールする必要がある。

npm i thrift

型チェックは例えば次のような実装で通すことができる。

import * as parquet from "@dsnp/parquetjs";

export const writer = async (env: Cloudflare.Env) => {
  const schema = new parquet.ParquetSchema({
    id: { type: "INT32" },
    name: { type: "UTF8" },
  });

  const buffer: Buffer = Buffer.alloc(0);
  const writer = await parquet.ParquetWriter.openFile(schema, buffer);
  await writer.appendRow({ id: 1, name: "Alice" });
  await writer.appendRow({ id: 2, name: "Bob" });
  await writer.appendRow({ id: 3, name: "Charlie" });
  await writer.close();

  const parquetBuffer = Buffer.concat([buffer]);

  const obj = await env.BUCKET.put("bar.parquet", parquetBuffer);
  console.log("R2 Object:", obj);
};

export const reader = async (env: Cloudflare.Env) => {
  const obj = await env.BUCKET.get("bar.parquet");

  if (!obj || !obj.body) {
    console.error("Object or body is null");
    return;
  }

  const arrayBuffer = await obj!.arrayBuffer();
  const parquetBuffer = Buffer.from(arrayBuffer);

  const reader = await parquet.ParquetReader.openFile(parquetBuffer);
  const cursor = reader.getCursor();

  let record = null;
  while ((record = await cursor.next())) {
    console.log("Record:", record);
  }

  await reader.close();
};

制限事項

以下のエラーに示すように fs.createWriteStream が実装されていないとのことで、利用ができなかった。
ライブラリの実装の問題なのか、 worker 環境の問題なのかは原因の特定を行なっていない。

[ERROR] Error: [unenv] fs.createWriteStream is not implemented yet!

      at createNotImplementedError
  (file:///(...)/node_modules/unenv/dist/runtime/_internal/utils.mjs:25:9)
      at Object.fn [as createWriteStream]
  (file:///(...)/node_modules/unenv/dist/runtime/_internal/utils.mjs:30:9)
      at null.<anonymous>
  (file:///(...)/node_modules/@dsnp/parquetjs/dist/lib/util.js:207:43)
      at [object Object]
      at Object.osopen
  (file:///(...)/node_modules/@dsnp/parquetjs/dist/lib/util.js:206:12)
      at ParquetWriter.openFile
  (file:///(...)/node_modules/@dsnp/parquetjs/dist/lib/writer.js:84:49)
      at writer (file:///(...)/src/io.ts:10:46)
      at Array.<anonymous> (file:///(...)/src/index.ts:89:9)
      at #dispatch
  (file:///(...)/node_modules/hono/dist/hono-base.js:188:37)
      at Object.fetch
  (file:///(...)/node_modules/hono/dist/hono-base.js:214:17)