家計簿で学ぶ KurrentDB Projection — Rust と TypeScript でサーバサイド常時計算
はじめに
前回 Event Sourcing で家計簿 CLI を作っていたのですが、残高を確認するたびに全イベントを読み直して計算していました。
income alice 100000 salary
income alice 50000 bonus
expense alice 30000 food lunch
expense alice 25000 food dinner
...
balance alice ← ここで全件 read して計算(遅い)
イベントが増えるほど遅くなります。
しんどいですね。
KurrentDB には Projection というサーバサイドで JavaScript を常時実行して集計結果を保持する仕組みがあります。
これを使えば get_state で一発取得できます。
この記事では Event Sourcing の家計簿 CLI に Projection を追加し、残高・カテゴリ集計・予算超過アラートを実装するまでを紹介します。
全体像
Before:
income/expense → KurrentDB に追記
balance → 全イベントを読み直して計算(遅い)
After:
income/expense → KurrentDB に追記
balance → Projection が裏で常時計算 → get_state で一発取得(速い)
stats → カテゴリ別集計、全体サマリーも Projection が提供
alert → 予算超過時に Projection が別ストリームに emit → subscribe で監視
Projection は KurrentDB 内蔵の V8 エンジンで JavaScript を実行します。
SQL や独自 DSL ではなく JavaScript 限定です。
調査は このブランチ の VERIFY_QUERY.md に、検証結果は VERIFY_RESULT.md に残しています。
TypeScript で型安全に書いて ES5 にコンパイルし、Rust 側で include_str!() を使ってバイナリに埋め込む構成にしました。
projections/*.ts → tsc → projections/dist/*.js → include_str!() → Rust binary
環境
# Cargo.toml
[dependencies]
kurrentdb = "1.0.0"
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
# docker-compose.yml
services:
kurrentdb:
image: eventstore/eventstore:23.10.2-alpha-arm64v8
container_name: kurrentdb
ports:
- "2113:2113"
environment:
- EVENTSTORE_INSECURE=true
- EVENTSTORE_RUN_PROJECTIONS=All
- EVENTSTORE_START_STANDARD_PROJECTIONS=true
volumes:
- kurrentdb-data:/var/lib/eventstore
volumes:
kurrentdb-data:
EVENTSTORE_START_STANDARD_PROJECTIONS=true がポイントです。
fromCategory() を使うには KurrentDB の標準 Projection($by_category 等)が起動している必要があります。
TypeScript で Projection を書く
types.d.ts — 型定義は自作する
KurrentDB は Projection 用の TypeScript 型定義を 公式提供していません(Issue #1805)。
fromCategory() や emit() は KurrentDB の V8 エンジンがグローバルに提供する関数なので、自分で declare する必要があります。
interface ProjectionEvent<T> {
body: T
bodyRaw: string
streamId: string
eventType: string
sequenceNumber: number
metadataRaw: Record<string, unknown>
isJson: boolean
partition: string
}
interface WhenHandlers<S> {
$init?: () => S
$any?: (state: S, event: ProjectionEvent<unknown>) => void
[eventType: string]: ((state: S, event: ProjectionEvent<any>) => void) | (() => S) | undefined
}
interface ProjectionBuilder<S = unknown> {
when(handlers: WhenHandlers<S>): ProjectionChain
foreachStream(): { when(handlers: WhenHandlers<S>): ProjectionChain }
outputState(): void
}
interface ProjectionChain {
outputState(): ProjectionChain
transformBy(handler: (state: any) => any): ProjectionChain
filterBy(handler: (state: any) => boolean): ProjectionChain
}
declare function fromAll(): ProjectionBuilder<any>
declare function fromCategory(name: string): ProjectionBuilder<any>
declare function fromStream(name: string): ProjectionBuilder<any>
declare function fromStreams(streams: string[]): ProjectionBuilder<any>
declare function emit(streamId: string, eventType: string, eventBody: unknown, metadata?: unknown): void
declare function linkTo(streamId: string, event: unknown, metadata?: unknown): void
declare function log(message: string): void
fromCategory() の戻り値は ProjectionBuilder<any> にしています。
<unknown> だと when() に渡すハンドラの state 型が合わずエラーになります。
tsconfig.json
{
"compilerOptions": {
"target": "ES5",
"module": "none",
"strict": true,
"outDir": "dist",
"rootDir": ".",
"declaration": false,
"sourceMap": false
},
"include": ["*.ts"],
"exclude": ["node_modules"]
}
module: "none" にしているのは、Projection にモジュールシステムがないからです。
グローバルスコープで fromCategory() 等が使えます。
balance.ts — アカウント別残高
interface BalanceState {
balance: number
total_income: number
total_expense: number
event_count: number
}
interface IncomeEvent {
amount: number
description: string
}
interface ExpenseEvent {
amount: number
category: string
description: string
}
fromCategory('account')
.foreachStream()
.when({
$init(): BalanceState {
return { balance: 0, total_income: 0, total_expense: 0, event_count: 0 }
},
income(s: BalanceState, e: ProjectionEvent<IncomeEvent>) {
s.balance += e.body.amount
s.total_income += e.body.amount
s.event_count += 1
},
expense(s: BalanceState, e: ProjectionEvent<ExpenseEvent>) {
s.balance -= e.body.amount
s.total_expense += e.body.amount
s.event_count += 1
}
})
.outputState()
重要なポイントがいくつかあります。
-
fromCategory('account')— ストリーム名account-*のイベントを対象にする -
.foreachStream()— ストリームごとに独立した state を持つ(alice と bob で別々の残高) -
$init()— state の初期値 -
income/expense— イベントタイプ名に対応するハンドラ。Rust のEventData::json("income", ...)の第1引数と一致させる -
.outputState()—get_stateAPI で state を取得可能にする
category.ts — カテゴリ別支出集計
interface CategoryEntry {
total: number
count: number
}
interface CategoryState {
categories: Record<string, CategoryEntry>
}
interface ExpenseEvent {
amount: number
category: string
description: string
}
fromCategory('account')
.when({
$init(): CategoryState {
return { categories: {} }
},
expense(s: CategoryState, e: ProjectionEvent<ExpenseEvent>) {
var cat = e.body.category
if (!s.categories[cat]) {
s.categories[cat] = { total: 0, count: 0 }
}
s.categories[cat].total += e.body.amount
s.categories[cat].count += 1
}
})
.outputState()
balance.ts との違いは .foreachStream() がないことです。
全アカウントの expense を横断集計しています。
alert.ts — 予算超過アラート
interface AlertTotals {
totals: Record<string, number>
}
interface ExpenseEvent {
amount: number
category: string
description: string
}
fromCategory('account')
.when({
$init(): AlertTotals {
return { totals: {} }
},
expense(s: AlertTotals, e: ProjectionEvent<ExpenseEvent>) {
var cat = e.body.category
if (!s.totals[cat]) s.totals[cat] = 0
s.totals[cat] += e.body.amount
if (s.totals[cat] > 50000) {
var account = e.streamId.replace('account-', '')
emit('alert-' + account, 'BudgetExceeded', {
category: cat,
total: s.totals[cat],
triggered_by: e.body.amount,
account: account
})
}
}
})
ここが一番おもしろいところです。
emit() を使って新しいストリーム alert-{account} にイベントを書き込んでいます。
Projection が「Read Model を作る」だけでなく「派生イベントを生成する」ことができるんですね。
.outputState() がないのもポイントです。
この Projection は state を公開する必要がなく、emit でイベントを生成することが目的です。
summary.ts — 全アカウントサマリー
interface SummaryState {
total_income: number
total_expense: number
net: number
accounts: Record<string, number>
}
interface IncomeEvent {
amount: number
description: string
}
interface ExpenseEvent {
amount: number
category: string
description: string
}
fromCategory('account')
.when({
$init(): SummaryState {
return { total_income: 0, total_expense: 0, net: 0, accounts: {} }
},
income(s: SummaryState, e: ProjectionEvent<IncomeEvent>) {
if (!s.accounts[e.streamId]) s.accounts[e.streamId] = 0
s.accounts[e.streamId] += e.body.amount
s.total_income += e.body.amount
s.net += e.body.amount
},
expense(s: SummaryState, e: ProjectionEvent<ExpenseEvent>) {
if (!s.accounts[e.streamId]) s.accounts[e.streamId] = 0
s.accounts[e.streamId] -= e.body.amount
s.total_expense += e.body.amount
s.net -= e.body.amount
}
})
.outputState()
accounts マップでアカウントごとの残高も保持しています。
e.streamId はイベントのストリーム名(account-alice 等)がそのまま入ります。
ビルド
cd projections
npm install --save-dev typescript
npx tsc
ls dist/
# balance.js category.js alert.js summary.js
Rust 側の実装
src/projections.rs — JS埋め込みと State 型
use serde::Deserialize;
use std::collections::HashMap;
pub const BALANCE_JS: &str = include_str!("../projections/dist/balance.js");
pub const CATEGORY_JS: &str = include_str!("../projections/dist/category.js");
pub const ALERT_JS: &str = include_str!("../projections/dist/alert.js");
pub const SUMMARY_JS: &str = include_str!("../projections/dist/summary.js");
pub const BALANCE_NAME: &str = "account-balance";
pub const CATEGORY_NAME: &str = "category-expense";
pub const ALERT_NAME: &str = "budget-alert";
pub const SUMMARY_NAME: &str = "account-summary";
pub const ALL_PROJECTIONS: &[(&str, &str, bool)] = &[
(BALANCE_NAME, BALANCE_JS, false),
(CATEGORY_NAME, CATEGORY_JS, false),
(ALERT_NAME, ALERT_JS, true),
(SUMMARY_NAME, SUMMARY_JS, false),
];
#[derive(Debug, Deserialize)]
pub struct BalanceState {
pub balance: f64,
pub total_income: f64,
pub total_expense: f64,
pub event_count: f64,
}
#[derive(Debug, Deserialize)]
pub struct CategoryEntry {
pub total: f64,
pub count: f64,
}
#[derive(Debug, Deserialize)]
pub struct CategoryState {
pub categories: HashMap<String, CategoryEntry>,
}
#[derive(Debug, Deserialize)]
pub struct SummaryState {
pub total_income: f64,
pub total_expense: f64,
pub net: f64,
pub accounts: HashMap<String, f64>,
}
#[derive(Debug, Deserialize)]
pub struct BudgetExceededEvent {
pub category: String,
pub total: f64,
pub triggered_by: f64,
pub account: String,
}
include_str!() はコンパイル時にファイルを読み込むので cargo build の前に npx tsc が必要です。
ALL_PROJECTIONS の3つ目の bool は emit フラグです。alert だけ true にしています。
Projection を登録する
use kurrentdb::{Client, CreateProjectionOptions, ProjectionClient};
use crate::projections::ALL_PROJECTIONS;
pub async fn run(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
let proj_client = ProjectionClient::from(client.clone());
for &(name, query, emit) in ALL_PROJECTIONS {
let options = CreateProjectionOptions::default()
.emit(emit)
.track_emitted_streams(emit);
match proj_client.create(name, query.to_string(), &options).await {
Ok(()) => println!(" ✓ {}", name),
Err(e) => {
let msg = e.to_string();
if msg.contains("Conflict") || msg.contains("already exists") {
println!(" - {} (already exists)", name);
} else {
eprintln!(" ✗ {} : {}", name, e);
}
}
}
}
println!("\nprojection setup complete.");
Ok(())
}
ProjectionClient::from(client.clone()) で Client から変換できます。内部の gRPC 接続を共有するので新しい接続は作られません。
emit() を使う Projection には CreateProjectionOptions::default().emit(true).track_emitted_streams(true) が必要です。
これを忘れると emit が無視されます。
balance コマンドを get_state に書き換える
Before(全イベント読み直し):
let mut read = client.read_stream(stream, &options).await?;
while let Some(event) = read.next().await? {
// 1件ずつ apply...
}
After(Projection の get_state):
use kurrentdb::{Client, GetStateProjectionOptions, ProjectionClient};
use crate::projections::{BalanceState, BALANCE_NAME};
pub async fn run(client: &Client, account: &str) -> Result<(), Box<dyn std::error::Error>> {
let proj_client = ProjectionClient::from(client.clone());
let stream = format!("account-{}", account);
let options = GetStateProjectionOptions::default().partition(stream);
let state: BalanceState = proj_client
.get_state::<_, BalanceState>(BALANCE_NAME, &options)
.await??;
if state.event_count == 0.0 {
println!("No events found.");
return Ok(());
}
println!("{}: {} ({} events)", account, format_amount_f64(state.balance), state.event_count as u64);
println!(" income: {:>9}", format_amount_f64(state.total_income));
println!(" expense: {:>9}", format_amount_f64(state.total_expense));
Ok(())
}
await?? — 二重の ? です。
get_state() の戻り値が Result<serde_json::Result<T>> なので、外側が gRPC エラー、内側が JSON デシリアライズエラーです。
.partition("account-alice") は foreachStream() を使った Projection で必須です。
どのストリームの state を取得するか指定します。
アラート監視
use kurrentdb::{Client, StreamPosition, SubscribeToStreamOptions};
use crate::projections::BudgetExceededEvent;
pub async fn run(client: &Client, account: &str) -> Result<(), Box<dyn std::error::Error>> {
let stream = format!("alert-{}", account);
println!("watching alerts for {}... (Ctrl+C to stop)", account);
let options = SubscribeToStreamOptions::default().start_from(StreamPosition::Start);
let mut subscription = client.subscribe_to_stream(stream, &options).await;
loop {
let event = subscription.next().await?;
let recorded = event.get_original_event();
let alert = recorded.as_json::<BudgetExceededEvent>()?;
println!(
"[ALERT] {} - category '{}' exceeded budget: {} (triggered by +{})",
alert.account, alert.category,
format_amount_f64(alert.total), format_amount_f64(alert.triggered_by),
);
}
}
ここでは ProjectionClient を使いません。
alert.ts が emit() で作ったストリームは普通のストリームなので、通常の Client で subscribe できます。
動かしてみる
# KurrentDB 起動
docker compose up -d
# TypeScript ビルド → Rust ビルド
cd projections && npx tsc && cd ..
cargo build
# Projection 登録
cargo r -- projection setup
# ✓ account-balance
# ✓ category-expense
# ✓ budget-alert
# ✓ account-summary
# Projection 状態確認
cargo r -- projection status
# NAME STATUS PROGRESS
# ------------------------------------------
# account-balance Running 100.0%
# category-expense Running 100.0%
# budget-alert Running 100.0%
# account-summary Running 100.0%
# テストデータ投入
cargo r -- income alice 100000 salary
cargo r -- expense alice 30000 food lunch
cargo r -- expense alice 25000 food dinner
# 残高確認(一瞬で返ってくる)
cargo r -- balance alice
# alice: 45,000 (3 events)
# income: 100,000
# expense: 55,000
# カテゴリ集計
cargo r -- stats category
# CATEGORY TOTAL COUNT
# -----------------------------------
# food 55,000 2
# 全体サマリー
cargo r -- stats summary
# === Account Summary ===
#
# income: 100,000
# expense: 55,000
# net: 45,000
#
# ACCOUNT BALANCE
# ---------------------------------------
# alice 45,000
アラートも試しましょう。
ターミナルを2つ開きます。
# ターミナル A
cargo r -- alert watch alice
# watching alerts for alice... (Ctrl+C to stop)
# [ALERT] alice - category 'food' exceeded budget: 55,000 (triggered by +25,000)
最初の2件で food カテゴリが 55,000 に達しているので、過去のアラートがすぐ表示されます。
# ターミナル B
cargo r -- expense alice 10000 food snack
ターミナル A に新しいアラートが表示されます。
[ALERT] alice - category 'food' exceeded budget: 65,000 (triggered by +10,000)
ハマりポイント
実装中に何度か苦しんだので共有します。
1. JavaScript の number は全部 f64
Rust 側の Deserialize 型を u64 にすると死にます。
event_count のような整数に見えるフィールドでも f64 にしてください。
2. foreachStream() と partition の関係
.foreachStream() を使うと各ストリームが独立した state を持ちます。
get_state() で取得するときは .partition("account-alice") で指定。
使わない場合はグローバルに1つの state になるので partition 不要です。
3. Apple Silicon と V8
KurrentDB の amd64 イメージを Rosetta で動かすと emit() が機能しません。
arm64 ネイティブの eventstore/eventstore:23.10.2-alpha-arm64v8 を使いましょう。
Projection の outputState() は amd64 でも動くので、emit() だけ動かないのが厄介です。
4. emit() には明示的な有効化が必要
CreateProjectionOptions::default().emit(true) を忘れると、Projection は正常に動いているように見えるのに emit 先のストリームが空っぽという状況になります。
5. include_str!() はコンパイル時評価
TypeScript を変更したら npx tsc → cargo build の順で実行する必要があります。
cargo build だけだと古い JS が埋め込まれます。
6. types.d.ts は公式提供されていない
Issue #1805 でリクエストされていますが、まだ対応されていません。
自作するしかないのが現状です。
おわりに
Projection のおかげで「全イベント読み直して計算」から「get_state で一発取得」に変わりました。
event が増えても balance コマンドの速度は変わりません。
TypeScript で Projection を書いて ES5 にコンパイルし、Rust に埋め込む構成は少し変わっていますが、型安全に Projection を書けるのは安心感があります。
emit() を使った派生イベント生成は Event Sourcing の可能性を広げてくれますね。
今回は予算超過アラートでしたが、集計結果を別ストリームに流してダッシュボード的なものを作ることもできそうです。
ただ、慣れ親しんだ JS/TS で query を書けるのはいいものの、運用の観点で見ると「先月の食費いくらだっけ?」を SELECT SUM(amount) FROM expenses WHERE category = 'food' でさっと取りたくなりますよね。
KurrentDB の get_state は事前に定義した Projection の結果しか返せないので、アドホックな集計には向きません。
Projection の投影先を PostgreSQL 等の RDB にしておけば、Event Sourcing の利点を活かしつつ SQL で自由に集計できます。
次はそのあたりを試してみたいと思っています。
Discussion