Open12
DuckDB メモ v2
前提
- 自社製品では JSONL 形式のログが色々出力される
- 自社製品は分散システムなためログも分散する
- 自社サービスのログファイルは大きくても無圧縮で 1 日 100 GB いかない程度
- Fluent Bit で gzip で圧縮した JSONL 形式のログを S3 互換オブジェクトストレージ (以下 S3) に保存する
モチベーション
-
自社パッケージ製品のクラウドサービスで、顧客にログ解析情報を提供したい
- その仕組みを OSS 化して、自社パッケージ製品を利用している顧客が利用できるようにしたい
- S3 から gzip で圧縮された JSONL ログ(複数) を取得して、横断的に解析したい
- 自動でスキーマは作成してほしい
- 解析結果を DuckDB ファイルとして出力し S3 へ保存したい
- 出力した DuckDB ファイルを DuckDB-Wasm で読み込ませたい
-
コストを低くしたい
- 転送量が安い S3 を利用すればかなりコストが下げられる
- 顧客のブラウザ上だけで解析結果を自由にいじれるようにしたい
- 毎回 API 叩かれたくない
- 一定期間でファイルを削除するのも S3 なら気軽にできる
- 解析が DuckDB + S3 で、表示が DuckDB-Wasm + S3 スケールが容易になる
- DuckDB を利用する事で自社以外でも同じ解析の仕組みが利用できるようになる
- 自社製品と関連付けた解析システムとして OSS にできる
選定理由
- DuckDB
- SQL で書きたい
- ライセンスが MIT
- 安定性を優先している
- https://duckdblabs.com/
- DuckDB-Wasm がある
- ブラウザで完結したい
- サービス側の負荷を最小限にしたい
- S3 互換オブジェクトストレージ
- 安い
- 汎用的
- 永続化
ざっくり
- Fluent Bit から gizp で圧縮した JSONL を S3 に保存する
- DuckDB から S3 へ圧縮されたファイルを取得する
create table egg as select * from read_json_auto('s3://spam/egg.jsonl.gz');
- URL にはワイルドカードが使用可能
- 読み込んだ結果にフィルターをかけて必要なテーブルだけが入った duckbox 形式で出力する
- EXPORT DATABSE を利用する
- duckdb ではなく duckbox らしい
- https://duckdb.org/docs/sql/statements/export.html
- 読み込んだ結果にフィルターかけて、テーブルを Parquet 形式で出力する
- DuckDB-Wasm で S3 からファイルをダウンロードする
- DuckDB-Wasm は aws 拡張が使えず
s3://
は利用できない - Parquet を HTTPS 経由でダウンロードする
- S3 の署名付き URL を利用する
- ATTACH を利用する
- DuckDB-Wasm は aws 拡張が使えず
- DuckDB-Wasm の情報をグラフライブラリを利用して描画する
おまけ
- sqlc-duckdb の開発を検討中
- DuckDB のパーサーを利用する
- Go / TypeScript / Python 向けに出力する
DuckDB 処理フロー
- トリガーで情報を渡して、それに沿った duckdb ファイルを出力する仕組み
systemd/Timers 案
採用
- 定期的にジョブキューを見に行って、ジョブがあったら DuckDB を実行する仕組み
- S3 から集計して DuckDB ファイルを吐き出して S3 に保存
- ジョブが完了するタイミングで Postgres のレコードを更新する
- Postgres のジョブキューモドキを利用する
- ジョブワーカーを増やすだけでスケールすることができる
GitHub Actions 案
不採用
- キューに入れて処理するなら GitHub Actions を使えるのではないか?
- Self-hosted を利用すれば好き放題できる
- https://github.com/opt-nc/setup-duckdb-action
自社サービスで利用する際のコスト
かなり雑に計算してます
- S3 互換オブジェクトストレージ
- Akamai Connected Cloud の Object Storage を利用
- 250 GB で月 5 ドル
- 転送量は 1 TB 無料枠ありで 1 TB で月 5 ドル
- API の回数などは課金対象ではない
- 一定期間でログは削除
- 14 日 or 30 日をイメージ
- ジョブワーカー
- CPU 2 コア メモリ 4 GB の場合
- 月 36 ドル
- メモリが欲しい場合
- https://www.linode.com/pricing/#compute-high-memory
- 月 60 ドルでメモリ 24 GB のマシンが借りれる
- CPU 2 コア メモリ 4 GB の場合
ざっくりコスト計算
- 転送量に 1 TB 利用
- 0
- ディスク容量 1 TB
- 20 ドル
- ジョブワーカー
- 既にある
月 20 ドルの追加で実現できそう。
pg_duckdb
DuckDB の仕組みを Postgres で利用できるようにする Postgres 拡張。
Secrets Manager
Cloudflare R2
CREATE SECRET secret5 (
TYPE R2,
KEY_ID 'AKIAIOSFODNN7EXAMPLE',
SECRET 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
ACCOUNT_ID 'my_account_id'
);
to_timestamp
timestamp への変換。
time_bucket
TimescaleDB にある time_bucket そのまま。
time_bucket(INTERVAL '2 weeks', TIMESTAMP '1992-04-20 15:26:00', TIMESTAMP '1992-04-01 00:00:00')
CREATE SECRET に Akamai Connected Cloud の Object Storage を利用する
- ENDPOINT を明示的に指定するだけでよい
- あとは KEY_ID に Access Key 、SECRET に Secret Key を指定する
CREATE SECRET secret (
TYPE S3,
KEY_ID '...',
SECRET '...',
ENDPOINT 'jp-osa-1.linodeobjects.com',
);
EXPLAIN ANALYZE
JSONL / JSONL (gzip) / Parquet (zstd) のファイルサイズ (バイト)
このログは 時雨堂 の WebRTC SFU Sora が出力するログの1つです。
クライアント側で 15 秒間隔で取得している WebRTC 統計情報 を WebRTC DataChannel 経由で Sora へ吸い上げて、Sora が JSONL として出力しています。
101_452_550 rtc_stats.jsonl
10_864_324 rtc_stats.jsonl.gz
2_390_235 rtc_stats.parquet
jsonl 読み込みからの parquet ファイルの生成
D EXPLAIN ANALYZE create table rtc_stats as select * from read_json('rtc_stats.jsonl');
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││ Query Profiling Information ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE create table rtc_stats as select * from read_json('rtc_stats.jsonl');
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││ Total Time: 0.281s ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│ QUERY │
│ ──────────────────── │
│ 0 Rows │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ EXPLAIN_ANALYZE │
│ ──────────────────── │
│ 0 Rows │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ BATCH_CREATE_TABLE_AS │
│ ──────────────────── │
│ 1 Rows │
│ (0.14s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ TABLE_SCAN │
│ ──────────────────── │
│ Function: READ_JSON │
│ │
│ Projections: │
│ connection_id │
│ id │
│ label │
│ timestamp │
│ type │
│ version │
│ node_name │
│ session_id │
│ role │
│ channel_id │
│ bundle_id │
│ client_id │
│ multistream │
│ spotlight │
│ simulcast │
│ log_written │
│ rtc_data │
│ rtc_id │
│ rtc_timestamp │
│ rtc_type │
│ │
│ 83911 Rows │
│ (0.25s) │
└───────────────────────────┘
D EXPLAIN ANALYZE copy rtc_stats to 'rtc_stats.parquet' (FORMAT parquet, COMPRESSION zstd);
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││ Query Profiling Information ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE copy rtc_stats to 'rtc_stats.parquet' (FORMAT parquet, COMPRESSION zstd);
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││ Total Time: 0.320s ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│ QUERY │
│ ──────────────────── │
│ 0 Rows │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ EXPLAIN_ANALYZE │
│ ──────────────────── │
│ 0 Rows │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ BATCH_COPY_TO_FILE │
│ ──────────────────── │
│ 1 Rows │
│ (0.07s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ TABLE_SCAN │
│ ──────────────────── │
│ rtc_stats │
│ │
│ Projections: │
│ connection_id │
│ id │
│ label │
│ timestamp │
│ type │
│ version │
│ node_name │
│ session_id │
│ role │
│ channel_id │
│ bundle_id │
│ client_id │
│ multistream │
│ spotlight │
│ simulcast │
│ log_written │
│ rtc_data │
│ rtc_id │
│ rtc_timestamp │
│ rtc_type │
│ │
│ 83911 Rows │
│ (0.04s) │
└───────────────────────────┘
D EXPLAIN ANALYZE select * from parquet_scan('rtc_stats.parquet');
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││ Query Profiling Information ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE select * from parquet_scan('rtc_stats.parquet');
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││ Total Time: 0.0908s ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│ QUERY │
│ ──────────────────── │
│ 0 Rows │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ EXPLAIN_ANALYZE │
│ ──────────────────── │
│ 0 Rows │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ TABLE_SCAN │
│ ──────────────────── │
│ Function: │
│ PARQUET_SCAN │
│ │
│ Projections: │
│ connection_id │
│ id │
│ label │
│ timestamp │
│ type │
│ version │
│ node_name │
│ session_id │
│ role │
│ channel_id │
│ bundle_id │
│ client_id │
│ multistream │
│ spotlight │
│ simulcast │
│ log_written │
│ rtc_data │
│ rtc_id │
│ rtc_timestamp │
│ rtc_type │
│ │
│ 83911 Rows │
│ (0.09s) │
└───────────────────────────┘
jsonl.gz からの読み込み
D EXPLAIN ANALYZE create table rtc_stats as select * from read_json('rtc_stats.jsonl.gz');
┌─────────────────────────────────────┐
│┌───────────────────────────────────┐│
││ Query Profiling Information ││
│└───────────────────────────────────┘│
└─────────────────────────────────────┘
EXPLAIN ANALYZE create table rtc_stats as select * from read_json('rtc_stats.jsonl.gz');
┌────────────────────────────────────────────────┐
│┌──────────────────────────────────────────────┐│
││ Total Time: 0.572s ││
│└──────────────────────────────────────────────┘│
└────────────────────────────────────────────────┘
┌───────────────────────────┐
│ QUERY │
│ ──────────────────── │
│ 0 Rows │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ EXPLAIN_ANALYZE │
│ ──────────────────── │
│ 0 Rows │
│ (0.00s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ BATCH_CREATE_TABLE_AS │
│ ──────────────────── │
│ 1 Rows │
│ (0.13s) │
└─────────────┬─────────────┘
┌─────────────┴─────────────┐
│ TABLE_SCAN │
│ ──────────────────── │
│ Function: READ_JSON │
│ │
│ Projections: │
│ connection_id │
│ id │
│ label │
│ timestamp │
│ type │
│ version │
│ node_name │
│ session_id │
│ role │
│ channel_id │
│ bundle_id │
│ client_id │
│ multistream │
│ spotlight │
│ simulcast │
│ log_written │
│ rtc_data │
│ rtc_id │
│ rtc_timestamp │
│ rtc_type │
│ │
│ 83911 Rows │
│ (0.29s) │
└───────────────────────────┘
PostgreSQL Extension
DuckDB から PostgreSQL を引くことができる。
Secret Manager
登録は Secret Manager を利用する。CREATE SECRET 時に名前も付けられるので付けておいても良い。
CREATE SECRET spam (
TYPE POSTGRES,
HOST '127.0.0.1',
PORT 5432,
DATABASE postgres,
USER 'postgres',
PASSWORD ''
);
ATTACH
- Secret Manager で名前を付けている場合は SECRET で指定するのがオススメ
- READ_ONLY にすることで事故を防ぐ
- AS postgres_db の部分で認識できる
ATTACH '' AS postgres_db (TYPE POSTGRES, SECRET spam, READ_ONLY);
使う
-
SHOW ALL TABLES;
でテーブルが見れる - PostgreSQL 側に egg というテーブルがあるとする
CREATE TABLE egg SELECT * FROM postgres_db.egg
これで DuckDB 側に持ってこれる。
DETACH
DETACH postgres_db;
COPY ... TO
- テーブルを Parquet 化できる
- CREATE TABLE で DuckDB 側に持ってきてからやったほうがいい
COPY egg TO 'egg.parquet (FORMAT Parquet, COMPRESSION zstd)';
複数ファイル読み込み時のファイル名の取得
read_json_auto("log*.jsonl.zst", filename=true)
filename=true
を指定することで複数ファイル読み込み時に、ファイル名はカラム名 filename
で取れるようになる。