Open15

DuckDB メモ v2

voluntasvoluntas

前提

  • 自社製品では JSONL 形式のログが色々出力される
  • 自社製品は分散システムなためログも分散する
  • 自社サービスのログファイルは大きくても無圧縮で 1 日 100 GB いかない程度
  • Fluent Bit で gzip で圧縮した JSONL 形式のログを S3 互換オブジェクトストレージ (以下 S3) に保存する

モチベーション

  • 自社パッケージ製品のクラウドサービスで、顧客にログ解析情報を提供したい
    • その仕組みを OSS 化して、自社パッケージ製品を利用している顧客が利用できるようにしたい
  • S3 から gzip で圧縮された JSONL ログ(複数) を取得して、横断的に解析したい
  • 自動でスキーマは作成してほしい
  • 解析結果を DuckDB ファイルとして出力し S3 へ保存したい
  • 出力した DuckDB ファイルを DuckDB-Wasm で読み込ませたい
  • コストを低くしたい
    • 転送量が安い S3 を利用すればかなりコストが下げられる
  • 顧客のブラウザ上だけで解析結果を自由にいじれるようにしたい
    • 毎回 API 叩かれたくない
  • 一定期間でファイルを削除するのも S3 なら気軽にできる
  • 解析が DuckDB + S3 で、表示が DuckDB-Wasm + S3 スケールが容易になる
  • DuckDB を利用する事で自社以外でも同じ解析の仕組みが利用できるようになる
    • 自社製品と関連付けた解析システムとして OSS にできる

選定理由

  • DuckDB
  • S3 互換オブジェクトストレージ
    • 安い
    • 汎用的
    • 永続化

ざっくり

おまけ

  • sqlc-duckdb の開発を検討中
  • DuckDB のパーサーを利用する
  • Go / TypeScript / Python 向けに出力する
Hidden comment
voluntasvoluntas

DuckDB 処理フロー

  • トリガーで情報を渡して、それに沿った duckdb ファイルを出力する仕組み

systemd/Timers 案

採用

  • 定期的にジョブキューを見に行って、ジョブがあったら DuckDB を実行する仕組み
    • S3 から集計して DuckDB ファイルを吐き出して S3 に保存
    • ジョブが完了するタイミングで Postgres のレコードを更新する
  • Postgres のジョブキューモドキを利用する
  • ジョブワーカーを増やすだけでスケールすることができる

GitHub Actions 案

不採用

voluntasvoluntas

自社サービスで利用する際のコスト

かなり雑に計算してます

  • S3 互換オブジェクトストレージ
    • Akamai Connected Cloud の Object Storage を利用
    • 250 GB で月 5 ドル
    • 転送量は 1 TB 無料枠ありで 1 TB で月 5 ドル
    • API の回数などは課金対象ではない
    • 一定期間でログは削除
      • 14 日 or 30 日をイメージ
  • ジョブワーカー

ざっくりコスト計算

  • 転送量に 1 TB 利用
    • 0
  • ディスク容量 1 TB
    • 20 ドル
  • ジョブワーカー
    • 既にある

月 20 ドルの追加で実現できそう。

voluntasvoluntas

CREATE SECRET に Akamai Connected Cloud の Object Storage を利用する

  • ENDPOINT を明示的に指定するだけでよい
  • あとは KEY_ID に Access Key 、SECRET に Secret Key を指定する
CREATE SECRET secret (
    TYPE S3,
    KEY_ID '...',
    SECRET '...',
    ENDPOINT 'jp-osa-1.linodeobjects.com',
);
voluntasvoluntas

EXPLAIN ANALYZE

JSONL / JSONL (gzip) / Parquet (zstd) のファイルサイズ (バイト)

このログは 時雨堂WebRTC SFU Sora が出力するログの1つです。

クライアント側で 15 秒間隔で取得している WebRTC 統計情報WebRTC DataChannel 経由で Sora へ吸い上げて、Sora が JSONL として出力しています。

101_452_550 rtc_stats.jsonl
 10_864_324 rtc_stats.jsonl.gz
  2_390_235 rtc_stats.parquet

jsonl 読み込みからの parquet ファイルの生成

D EXPLAIN ANALYZE create table rtc_stats as select * from read_json('rtc_stats.jsonl');
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││    Query Profiling Information    ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE create table rtc_stats as select * from read_json('rtc_stats.jsonl');
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││              Total Time: 0.281s              ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│           QUERY           │
│    ────────────────────   │
│           0 Rows          │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│      EXPLAIN_ANALYZE      │
│    ────────────────────   │
│           0 Rows          │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│   BATCH_CREATE_TABLE_AS   │
│    ────────────────────   │
│           1 Rows          │
│          (0.14s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         TABLE_SCAN        │
│    ────────────────────   │
│    Function: READ_JSON    │
│                           │
│        Projections:       │
│       connection_id       │
│             id            │
│           label           │
│         timestamp         │
│            type           │
│          version          │
│         node_name         │
│         session_id        │
│            role           │
│         channel_id        │
│         bundle_id         │
│         client_id         │
│        multistream        │
│         spotlight         │
│         simulcast         │
│        log_written        │
│          rtc_data         │
│           rtc_id          │
│       rtc_timestamp       │
│          rtc_type         │
│                           │
│         83911 Rows        │
│          (0.25s)          │
└───────────────────────────┘
D EXPLAIN ANALYZE copy rtc_stats to 'rtc_stats.parquet' (FORMAT parquet, COMPRESSION zstd);
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││    Query Profiling Information    ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE copy rtc_stats to 'rtc_stats.parquet' (FORMAT parquet, COMPRESSION zstd);
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││              Total Time: 0.320s              ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│           QUERY           │
│    ────────────────────   │
│           0 Rows          │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│      EXPLAIN_ANALYZE      │
│    ────────────────────   │
│           0 Rows          │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│     BATCH_COPY_TO_FILE    │
│    ────────────────────   │
│           1 Rows          │
│          (0.07s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         TABLE_SCAN        │
│    ────────────────────   │
│         rtc_stats         │
│                           │
│        Projections:       │
│       connection_id       │
│             id            │
│           label           │
│         timestamp         │
│            type           │
│          version          │
│         node_name         │
│         session_id        │
│            role           │
│         channel_id        │
│         bundle_id         │
│         client_id         │
│        multistream        │
│         spotlight         │
│         simulcast         │
│        log_written        │
│          rtc_data         │
│           rtc_id          │
│       rtc_timestamp       │
│          rtc_type         │
│                           │
│         83911 Rows        │
│          (0.04s)          │
└───────────────────────────┘
D EXPLAIN ANALYZE select * from parquet_scan('rtc_stats.parquet');
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││    Query Profiling Information    ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE select * from parquet_scan('rtc_stats.parquet');
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││              Total Time: 0.0908s             ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│           QUERY           │
│    ────────────────────   │
│           0 Rows          │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│      EXPLAIN_ANALYZE      │
│    ────────────────────   │
│           0 Rows          │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         TABLE_SCAN        │
│    ────────────────────   │
│         Function:         │
│        PARQUET_SCAN       │
│                           │
│        Projections:       │
│       connection_id       │
│             id            │
│           label           │
│         timestamp         │
│            type           │
│          version          │
│         node_name         │
│         session_id        │
│            role           │
│         channel_id        │
│         bundle_id         │
│         client_id         │
│        multistream        │
│         spotlight         │
│         simulcast         │
│        log_written        │
│          rtc_data         │
│           rtc_id          │
│       rtc_timestamp       │
│          rtc_type         │
│                           │
│         83911 Rows        │
│          (0.09s)          │
└───────────────────────────┘

jsonl.gz からの読み込み

D EXPLAIN ANALYZE create table rtc_stats as select * from read_json('rtc_stats.jsonl.gz');
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││    Query Profiling Information    ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE create table rtc_stats as select * from read_json('rtc_stats.jsonl.gz');
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││              Total Time: 0.572s              ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│           QUERY           │
│    ────────────────────   │
│           0 Rows          │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│      EXPLAIN_ANALYZE      │
│    ────────────────────   │
│           0 Rows          │
│          (0.00s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│   BATCH_CREATE_TABLE_AS   │
│    ────────────────────   │
│           1 Rows          │
│          (0.13s)          │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│         TABLE_SCAN        │
│    ────────────────────   │
│    Function: READ_JSON    │
│                           │
│        Projections:       │
│       connection_id       │
│             id            │
│           label           │
│         timestamp         │
│            type           │
│          version          │
│         node_name         │
│         session_id        │
│            role           │
│         channel_id        │
│         bundle_id         │
│         client_id         │
│        multistream        │
│         spotlight         │
│         simulcast         │
│        log_written        │
│          rtc_data         │
│           rtc_id          │
│       rtc_timestamp       │
│          rtc_type         │
│                           │
│         83911 Rows        │
│          (0.29s)          │
└───────────────────────────┘
voluntasvoluntas

PostgreSQL Extension

PostgreSQL Extension – DuckDB

DuckDB から PostgreSQL を引くことができる。

Secret Manager

登録は Secret Manager を利用する。CREATE SECRET 時に名前も付けられるので付けておいても良い。

CREATE SECRET spam (
    TYPE POSTGRES,
    HOST '127.0.0.1',
    PORT 5432,
    DATABASE postgres,
    USER 'postgres',
    PASSWORD ''
);

ATTACH

  • Secret Manager で名前を付けている場合は SECRET で指定するのがオススメ
  • READ_ONLY にすることで事故を防ぐ
  • AS postgres_db の部分で認識できる
ATTACH '' AS postgres_db (TYPE POSTGRES, SECRET spam, READ_ONLY);

使う

  • SHOW ALL TABLES; でテーブルが見れる
  • PostgreSQL 側に egg というテーブルがあるとする
CREATE TABLE egg AS SELECT * FROM postgres_db.egg

これで DuckDB 側に持ってこれる。

DETACH

DETACH postgres_db;

COPY ... TO

  • テーブルを Parquet 化できる
  • CREATE TABLE で DuckDB 側に持ってきてからやったほうがいい
COPY egg TO 'egg.parquet (FORMAT Parquet, COMPRESSION zstd)';
voluntasvoluntas

Grafana DuckDB Data Source

MotherDuck 手動で開発が進められている模様。

https://github.com/grafana/grafana/issues/80948#issuecomment-2386523407

Hi everyone and thank you for your patience. I'm happy to announce that we signed an agreement with MotherDuck, and the plugin development for DuckDB has already started 🎊

皆さん、お待たせして申し訳ありません。お知らせできることを嬉しく思います。私たちはMotherDuckとの契約を締結し、DuckDB用のプラグイン開発がすでに開始されています🎊

voluntasvoluntas

* で使える便利な句

https://duckdb.org/docs/sql/expressions/star#exclude-clause

EXCLUDE

* を使った時に特定の値だけ抜いてくれる EXCLUDE

SELECT * EXCLUDE (spam, egg) FROM ham;

spam と egg カラムを除いてくれる。

REPLACE

* を使った時に特定のカラムを置き換えてくれる REPLACE

SELECT * REPLACE (spam / 1_000 AS spam) FROM egg;

spam を 1000 で割った値に置き換えてくれる。