📊

家計簿で学ぶ KurrentDB Projection — Rust と TypeScript でサーバサイド常時計算

に公開

はじめに

前回 Event Sourcing で家計簿 CLI を作っていたのですが、残高を確認するたびに全イベントを読み直して計算していました。

income alice 100000 salary
income alice 50000 bonus
expense alice 30000 food lunch
expense alice 25000 food dinner
...
balance alice  ← ここで全件 read して計算(遅い)

イベントが増えるほど遅くなります。
しんどいですね。

KurrentDB には Projection というサーバサイドで JavaScript を常時実行して集計結果を保持する仕組みがあります。
これを使えば get_state で一発取得できます。

この記事では Event Sourcing の家計簿 CLI に Projection を追加し、残高・カテゴリ集計・予算超過アラートを実装するまでを紹介します。

全体像

Before:
  income/expense → KurrentDB に追記
  balance        → 全イベントを読み直して計算(遅い)

After:
  income/expense → KurrentDB に追記
  balance        → Projection が裏で常時計算 → get_state で一発取得(速い)
  stats          → カテゴリ別集計、全体サマリーも Projection が提供
  alert          → 予算超過時に Projection が別ストリームに emit → subscribe で監視

Projection は KurrentDB 内蔵の V8 エンジンで JavaScript を実行します。
SQL や独自 DSL ではなく JavaScript 限定です。
調査は このブランチVERIFY_QUERY.md に、検証結果は VERIFY_RESULT.md に残しています。

TypeScript で型安全に書いて ES5 にコンパイルし、Rust 側で include_str!() を使ってバイナリに埋め込む構成にしました。

projections/*.ts → tsc → projections/dist/*.js → include_str!() → Rust binary

環境

# Cargo.toml
[dependencies]
kurrentdb = "1.0.0"
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
tokio = { version = "1", features = ["full"] }
# docker-compose.yml
services:
  kurrentdb:
    image: eventstore/eventstore:23.10.2-alpha-arm64v8
    container_name: kurrentdb
    ports:
      - "2113:2113"
    environment:
      - EVENTSTORE_INSECURE=true
      - EVENTSTORE_RUN_PROJECTIONS=All
      - EVENTSTORE_START_STANDARD_PROJECTIONS=true
    volumes:
      - kurrentdb-data:/var/lib/eventstore

volumes:
  kurrentdb-data:

EVENTSTORE_START_STANDARD_PROJECTIONS=true がポイントです。
fromCategory() を使うには KurrentDB の標準 Projection($by_category 等)が起動している必要があります。

TypeScript で Projection を書く

types.d.ts — 型定義は自作する

KurrentDB は Projection 用の TypeScript 型定義を 公式提供していませんIssue #1805)。
fromCategory()emit() は KurrentDB の V8 エンジンがグローバルに提供する関数なので、自分で declare する必要があります。

interface ProjectionEvent<T> {
  body: T
  bodyRaw: string
  streamId: string
  eventType: string
  sequenceNumber: number
  metadataRaw: Record<string, unknown>
  isJson: boolean
  partition: string
}

interface WhenHandlers<S> {
  $init?: () => S
  $any?: (state: S, event: ProjectionEvent<unknown>) => void
  [eventType: string]: ((state: S, event: ProjectionEvent<any>) => void) | (() => S) | undefined
}

interface ProjectionBuilder<S = unknown> {
  when(handlers: WhenHandlers<S>): ProjectionChain
  foreachStream(): { when(handlers: WhenHandlers<S>): ProjectionChain }
  outputState(): void
}

interface ProjectionChain {
  outputState(): ProjectionChain
  transformBy(handler: (state: any) => any): ProjectionChain
  filterBy(handler: (state: any) => boolean): ProjectionChain
}

declare function fromAll(): ProjectionBuilder<any>
declare function fromCategory(name: string): ProjectionBuilder<any>
declare function fromStream(name: string): ProjectionBuilder<any>
declare function fromStreams(streams: string[]): ProjectionBuilder<any>
declare function emit(streamId: string, eventType: string, eventBody: unknown, metadata?: unknown): void
declare function linkTo(streamId: string, event: unknown, metadata?: unknown): void
declare function log(message: string): void

fromCategory() の戻り値は ProjectionBuilder<any> にしています。
<unknown> だと when() に渡すハンドラの state 型が合わずエラーになります。

tsconfig.json

{
  "compilerOptions": {
    "target": "ES5",
    "module": "none",
    "strict": true,
    "outDir": "dist",
    "rootDir": ".",
    "declaration": false,
    "sourceMap": false
  },
  "include": ["*.ts"],
  "exclude": ["node_modules"]
}

module: "none" にしているのは、Projection にモジュールシステムがないからです。
グローバルスコープで fromCategory() 等が使えます。

balance.ts — アカウント別残高

interface BalanceState {
  balance: number
  total_income: number
  total_expense: number
  event_count: number
}

interface IncomeEvent {
  amount: number
  description: string
}

interface ExpenseEvent {
  amount: number
  category: string
  description: string
}

fromCategory('account')
  .foreachStream()
  .when({
    $init(): BalanceState {
      return { balance: 0, total_income: 0, total_expense: 0, event_count: 0 }
    },
    income(s: BalanceState, e: ProjectionEvent<IncomeEvent>) {
      s.balance += e.body.amount
      s.total_income += e.body.amount
      s.event_count += 1
    },
    expense(s: BalanceState, e: ProjectionEvent<ExpenseEvent>) {
      s.balance -= e.body.amount
      s.total_expense += e.body.amount
      s.event_count += 1
    }
  })
  .outputState()

重要なポイントがいくつかあります。

  • fromCategory('account') — ストリーム名 account-* のイベントを対象にする
  • .foreachStream() — ストリームごとに独立した state を持つ(alice と bob で別々の残高)
  • $init() — state の初期値
  • income / expense — イベントタイプ名に対応するハンドラ。Rust の EventData::json("income", ...) の第1引数と一致させる
  • .outputState()get_state API で state を取得可能にする

category.ts — カテゴリ別支出集計

interface CategoryEntry {
  total: number
  count: number
}

interface CategoryState {
  categories: Record<string, CategoryEntry>
}

interface ExpenseEvent {
  amount: number
  category: string
  description: string
}

fromCategory('account')
  .when({
    $init(): CategoryState {
      return { categories: {} }
    },
    expense(s: CategoryState, e: ProjectionEvent<ExpenseEvent>) {
      var cat = e.body.category
      if (!s.categories[cat]) {
        s.categories[cat] = { total: 0, count: 0 }
      }
      s.categories[cat].total += e.body.amount
      s.categories[cat].count += 1
    }
  })
  .outputState()

balance.ts との違いは .foreachStream() がないことです。
全アカウントの expense を横断集計しています。

alert.ts — 予算超過アラート

interface AlertTotals {
  totals: Record<string, number>
}

interface ExpenseEvent {
  amount: number
  category: string
  description: string
}

fromCategory('account')
  .when({
    $init(): AlertTotals {
      return { totals: {} }
    },
    expense(s: AlertTotals, e: ProjectionEvent<ExpenseEvent>) {
      var cat = e.body.category
      if (!s.totals[cat]) s.totals[cat] = 0
      s.totals[cat] += e.body.amount

      if (s.totals[cat] > 50000) {
        var account = e.streamId.replace('account-', '')
        emit('alert-' + account, 'BudgetExceeded', {
          category: cat,
          total: s.totals[cat],
          triggered_by: e.body.amount,
          account: account
        })
      }
    }
  })

ここが一番おもしろいところです。
emit() を使って新しいストリーム alert-{account} にイベントを書き込んでいます。
Projection が「Read Model を作る」だけでなく「派生イベントを生成する」ことができるんですね。

.outputState() がないのもポイントです。
この Projection は state を公開する必要がなく、emit でイベントを生成することが目的です。

summary.ts — 全アカウントサマリー

interface SummaryState {
  total_income: number
  total_expense: number
  net: number
  accounts: Record<string, number>
}

interface IncomeEvent {
  amount: number
  description: string
}

interface ExpenseEvent {
  amount: number
  category: string
  description: string
}

fromCategory('account')
  .when({
    $init(): SummaryState {
      return { total_income: 0, total_expense: 0, net: 0, accounts: {} }
    },
    income(s: SummaryState, e: ProjectionEvent<IncomeEvent>) {
      if (!s.accounts[e.streamId]) s.accounts[e.streamId] = 0
      s.accounts[e.streamId] += e.body.amount
      s.total_income += e.body.amount
      s.net += e.body.amount
    },
    expense(s: SummaryState, e: ProjectionEvent<ExpenseEvent>) {
      if (!s.accounts[e.streamId]) s.accounts[e.streamId] = 0
      s.accounts[e.streamId] -= e.body.amount
      s.total_expense += e.body.amount
      s.net -= e.body.amount
    }
  })
  .outputState()

accounts マップでアカウントごとの残高も保持しています。
e.streamId はイベントのストリーム名(account-alice 等)がそのまま入ります。

ビルド

cd projections
npm install --save-dev typescript
npx tsc
ls dist/
# balance.js  category.js  alert.js  summary.js

Rust 側の実装

src/projections.rs — JS埋め込みと State 型

use serde::Deserialize;
use std::collections::HashMap;

pub const BALANCE_JS: &str = include_str!("../projections/dist/balance.js");
pub const CATEGORY_JS: &str = include_str!("../projections/dist/category.js");
pub const ALERT_JS: &str = include_str!("../projections/dist/alert.js");
pub const SUMMARY_JS: &str = include_str!("../projections/dist/summary.js");

pub const BALANCE_NAME: &str = "account-balance";
pub const CATEGORY_NAME: &str = "category-expense";
pub const ALERT_NAME: &str = "budget-alert";
pub const SUMMARY_NAME: &str = "account-summary";

pub const ALL_PROJECTIONS: &[(&str, &str, bool)] = &[
    (BALANCE_NAME, BALANCE_JS, false),
    (CATEGORY_NAME, CATEGORY_JS, false),
    (ALERT_NAME, ALERT_JS, true),
    (SUMMARY_NAME, SUMMARY_JS, false),
];

#[derive(Debug, Deserialize)]
pub struct BalanceState {
    pub balance: f64,
    pub total_income: f64,
    pub total_expense: f64,
    pub event_count: f64,
}

#[derive(Debug, Deserialize)]
pub struct CategoryEntry {
    pub total: f64,
    pub count: f64,
}

#[derive(Debug, Deserialize)]
pub struct CategoryState {
    pub categories: HashMap<String, CategoryEntry>,
}

#[derive(Debug, Deserialize)]
pub struct SummaryState {
    pub total_income: f64,
    pub total_expense: f64,
    pub net: f64,
    pub accounts: HashMap<String, f64>,
}

#[derive(Debug, Deserialize)]
pub struct BudgetExceededEvent {
    pub category: String,
    pub total: f64,
    pub triggered_by: f64,
    pub account: String,
}

include_str!() はコンパイル時にファイルを読み込むので cargo build の前に npx tsc が必要です。

ALL_PROJECTIONS の3つ目の bool は emit フラグです。alert だけ true にしています。

Projection を登録する

use kurrentdb::{Client, CreateProjectionOptions, ProjectionClient};
use crate::projections::ALL_PROJECTIONS;

pub async fn run(client: &Client) -> Result<(), Box<dyn std::error::Error>> {
    let proj_client = ProjectionClient::from(client.clone());

    for &(name, query, emit) in ALL_PROJECTIONS {
        let options = CreateProjectionOptions::default()
            .emit(emit)
            .track_emitted_streams(emit);
        match proj_client.create(name, query.to_string(), &options).await {
            Ok(()) => println!("  ✓ {}", name),
            Err(e) => {
                let msg = e.to_string();
                if msg.contains("Conflict") || msg.contains("already exists") {
                    println!("  - {} (already exists)", name);
                } else {
                    eprintln!("  ✗ {} : {}", name, e);
                }
            }
        }
    }

    println!("\nprojection setup complete.");
    Ok(())
}

ProjectionClient::from(client.clone())Client から変換できます。内部の gRPC 接続を共有するので新しい接続は作られません。

emit() を使う Projection には CreateProjectionOptions::default().emit(true).track_emitted_streams(true) が必要です。
これを忘れると emit が無視されます。

balance コマンドを get_state に書き換える

Before(全イベント読み直し):

let mut read = client.read_stream(stream, &options).await?;
while let Some(event) = read.next().await? {
    // 1件ずつ apply...
}

After(Projection の get_state):

use kurrentdb::{Client, GetStateProjectionOptions, ProjectionClient};
use crate::projections::{BalanceState, BALANCE_NAME};

pub async fn run(client: &Client, account: &str) -> Result<(), Box<dyn std::error::Error>> {
    let proj_client = ProjectionClient::from(client.clone());
    let stream = format!("account-{}", account);
    let options = GetStateProjectionOptions::default().partition(stream);

    let state: BalanceState = proj_client
        .get_state::<_, BalanceState>(BALANCE_NAME, &options)
        .await??;

    if state.event_count == 0.0 {
        println!("No events found.");
        return Ok(());
    }

    println!("{}: {} ({} events)", account, format_amount_f64(state.balance), state.event_count as u64);
    println!("  income:  {:>9}", format_amount_f64(state.total_income));
    println!("  expense: {:>9}", format_amount_f64(state.total_expense));

    Ok(())
}

await?? — 二重の ? です。
get_state() の戻り値が Result<serde_json::Result<T>> なので、外側が gRPC エラー、内側が JSON デシリアライズエラーです。

.partition("account-alice")foreachStream() を使った Projection で必須です。
どのストリームの state を取得するか指定します。

アラート監視

use kurrentdb::{Client, StreamPosition, SubscribeToStreamOptions};
use crate::projections::BudgetExceededEvent;

pub async fn run(client: &Client, account: &str) -> Result<(), Box<dyn std::error::Error>> {
    let stream = format!("alert-{}", account);

    println!("watching alerts for {}... (Ctrl+C to stop)", account);

    let options = SubscribeToStreamOptions::default().start_from(StreamPosition::Start);
    let mut subscription = client.subscribe_to_stream(stream, &options).await;

    loop {
        let event = subscription.next().await?;
        let recorded = event.get_original_event();
        let alert = recorded.as_json::<BudgetExceededEvent>()?;

        println!(
            "[ALERT] {} - category '{}' exceeded budget: {} (triggered by +{})",
            alert.account, alert.category,
            format_amount_f64(alert.total), format_amount_f64(alert.triggered_by),
        );
    }
}

ここでは ProjectionClient を使いません。
alert.ts が emit() で作ったストリームは普通のストリームなので、通常の Client で subscribe できます。

動かしてみる

# KurrentDB 起動
docker compose up -d

# TypeScript ビルド → Rust ビルド
cd projections && npx tsc && cd ..
cargo build

# Projection 登録
cargo r -- projection setup
#   ✓ account-balance
#   ✓ category-expense
#   ✓ budget-alert
#   ✓ account-summary

# Projection 状態確認
cargo r -- projection status
# NAME                 STATUS        PROGRESS
# ------------------------------------------
# account-balance      Running       100.0%
# category-expense     Running       100.0%
# budget-alert         Running       100.0%
# account-summary      Running       100.0%

# テストデータ投入
cargo r -- income alice 100000 salary
cargo r -- expense alice 30000 food lunch
cargo r -- expense alice 25000 food dinner

# 残高確認(一瞬で返ってくる)
cargo r -- balance alice
# alice: 45,000 (3 events)
#   income:   100,000
#   expense:   55,000

# カテゴリ集計
cargo r -- stats category
# CATEGORY           TOTAL  COUNT
# -----------------------------------
# food              55,000      2

# 全体サマリー
cargo r -- stats summary
# === Account Summary ===
#
#   income:       100,000
#   expense:       55,000
#   net:           45,000
#
# ACCOUNT                    BALANCE
# ---------------------------------------
# alice                       45,000

アラートも試しましょう。
ターミナルを2つ開きます。

# ターミナル A
cargo r -- alert watch alice
# watching alerts for alice... (Ctrl+C to stop)
# [ALERT] alice - category 'food' exceeded budget: 55,000 (triggered by +25,000)

最初の2件で food カテゴリが 55,000 に達しているので、過去のアラートがすぐ表示されます。

# ターミナル B
cargo r -- expense alice 10000 food snack

ターミナル A に新しいアラートが表示されます。

[ALERT] alice - category 'food' exceeded budget: 65,000 (triggered by +10,000)

ハマりポイント

実装中に何度か苦しんだので共有します。

1. JavaScript の number は全部 f64

Rust 側の Deserialize 型を u64 にすると死にます。
event_count のような整数に見えるフィールドでも f64 にしてください。

2. foreachStream() と partition の関係

.foreachStream() を使うと各ストリームが独立した state を持ちます。
get_state() で取得するときは .partition("account-alice") で指定。
使わない場合はグローバルに1つの state になるので partition 不要です。

3. Apple Silicon と V8

KurrentDB の amd64 イメージを Rosetta で動かすと emit() が機能しません。
arm64 ネイティブの eventstore/eventstore:23.10.2-alpha-arm64v8 を使いましょう。
Projection の outputState() は amd64 でも動くので、emit() だけ動かないのが厄介です。

4. emit() には明示的な有効化が必要

CreateProjectionOptions::default().emit(true) を忘れると、Projection は正常に動いているように見えるのに emit 先のストリームが空っぽという状況になります。

5. include_str!() はコンパイル時評価

TypeScript を変更したら npx tsccargo build の順で実行する必要があります。
cargo build だけだと古い JS が埋め込まれます。

6. types.d.ts は公式提供されていない

Issue #1805 でリクエストされていますが、まだ対応されていません。
自作するしかないのが現状です。

おわりに

Projection のおかげで「全イベント読み直して計算」から「get_state で一発取得」に変わりました。
event が増えても balance コマンドの速度は変わりません。

TypeScript で Projection を書いて ES5 にコンパイルし、Rust に埋め込む構成は少し変わっていますが、型安全に Projection を書けるのは安心感があります。

emit() を使った派生イベント生成は Event Sourcing の可能性を広げてくれますね。
今回は予算超過アラートでしたが、集計結果を別ストリームに流してダッシュボード的なものを作ることもできそうです。

ただ、慣れ親しんだ JS/TS で query を書けるのはいいものの、運用の観点で見ると「先月の食費いくらだっけ?」を SELECT SUM(amount) FROM expenses WHERE category = 'food' でさっと取りたくなりますよね。
KurrentDB の get_state は事前に定義した Projection の結果しか返せないので、アドホックな集計には向きません。
Projection の投影先を PostgreSQL 等の RDB にしておけば、Event Sourcing の利点を活かしつつ SQL で自由に集計できます。
次はそのあたりを試してみたいと思っています。

Discussion