😇

DynamoDB を使って CQRS + ES の夢を見る

2025/02/21に公開

HERP Careers の松山です。
前置きとして、HERP Careers として Event Sourcing をやってみたの記事ではないのでご注意ください。

CQRS + ES を個人開発でやってみたという話です。

CQRS + ES 熱が再燃したきっかけ 1

今回試してみるモチベーションの一つの話をします。
HERP Careers は複数のデータソースを掛け合わせて構築しているサービスです。一開発者として日々そのサービスを実装しています。そうするとそのデータソースが Event を起点にしていたら解決する課題が結構あるなぁって思うことがあります。
とくにほぼリアルタイムでデータを届けたいような時です。

ただだからと言って、流石に慣れていないイベント駆動の実装のパターンをプロダクトに導入するのもリスクがあるので、まずは自分の習熟度を上げていこうと思います。

補足として一時期 CQRS + ES は初期構築のハードルが高いなー
って諦めていたのですが、https://x.com/j5ik2o かとじゅんさんが
DynamoDB で Event Sourcing する adapter と使い方を公開していただいたので
個人開発でのハードルが下がっていました。

※ちなみに、HERP Careers でもかとじゅんさんの実装を参考にさせていただいている部分があります。

https://github.com/j5ik2o/event-store-adapter-js

CQRS + ES 熱が再燃したきっかけ 2

こちらも動機になるのですが結構衝撃的だったので紹介しておきます!

もともと自分は CQRS + ES は複雑なドメインがある場合にのみ有効であって、書き込みよりも読み込みメインのアプリやシンプルなアプリには TooMuch だと思っていました。

そんな中で参加した CQRS + ES のカンファレンスで、どのくらいの規模のアプリケーションから CQRS+ES を取り入れるべきですか?という質問を登壇者に相談しました。そうすると、5/5 でどんなアプリケーションでも導入する。今は慣れてしまってそちらの方が簡単なのと、いずれ複雑になっていくと言っていました。

いずれ複雑になるのは確かにと思いつつ、
慣れてしまったら快適になるなら個人開発でもいけるのか!と思い作りかけのアプリを CQRS + ES で書き直した次第です。

CQRS + ES の利点

最大の利点は、Read モデルと Write モデルを分離できることです。
実際に実装してみて、そう感じたのは次の点です。

  • 今までは想像以上に Read モデルを意識して DB 設計をしていた(取得する時にこの方が便利だからこのカラムはここに持っておこう。そしてそれは余程のケースがない限りスキーマ変更しないから慎重に設計しておこう...)
  • ログテーブルの管理が不要になった(従来は重要なテーブルの更新時に、実行された CRUD 操作とスナップショットをログテーブルに保存していた)

この 2 つのことが頭から追い出せて純粋にモデリングできることがなんと幸せなんだろうって思いました!設計時のメリットがすごく大きい気がしています!

最終的にアプリケーションコードがすごくシンプルになった(ライブラリでうまく隠蔽しているのもあります。)

それ以外の技術的なメリットは巷にいっぱい落ちているのでそちらに関しては今回はほとんど触れずに実装に行きます!

(お気持ちだけメリットの紹介)

  • 二層コミットが避けられる
  • ドメインイベントが監査ログにもなる
  • 障害発生時のトレースが楽

技術スタック

  • Expo
  • Next.js
  • Tailwind
  • Hono
  • DynamoDB
  • Supabase

システム構成

Expo から Next.js の API Route にデータを取りにいく構成です。
API Route では Hono x (zod open api)が待ち受けています。
Hono から DynamoDB に登録
Hono から Supabase へのデータ取得
DynamoDB Streams から Supabase のデータベースを更新

する構成です。

ディレクトリ構造

見る人が見たらどこに何があるのかはわかるはず。。

├── expo
│   ├── app
│   │   ├── (home)
│   │       ├── receipts
│   │       └── settings
│   ├── assets
│   │   └── display
│   ├── components
│   ├── contexts
│   ├── features
│   │   ├── receipt
│   │   └── users
│   ├── hooks
│   └── lib
│      └── client
├── rmu
│   ├── cdk
│   │   ├── bin
│   │   ├── lib
│   │   └── test
│   └── lambda
└── web
    ├── src
    │   ├── app
    │   │   └── api
    │   │       └── hono
    │   │           └── [...route]
    │   ├── features
    │   │   ├── families
    │   │   │   ├── command
    │   │   │   ├── domain
    │   │   │   │   └── family
    │   │   │   │       └── events
    │   │   │   ├── repository
    │   │   │   └── routes
    │   │   ├── reciepts
    │   │   │   ├── command
    │   │   │   ├── domain
    │   │   │   │   ├── receipt
    │   │   │   │   │   └── events
    │   │   │   │   └── receipt-item
    │   │   │   │       └── events
    │   │   │   ├── query
    │   │   │   ├── repository
    │   │   │   ├── routes
    │   │   │   └── service
    │   │   └── users
    │   │       ├── command
    │   │       ├── domain
    │   │       │   └── user
    │   │       │       └── events
    │   │       ├── repository
    │   │       └── routes
    │   ├── lib
    │   │   ├── client
    │   │   ├── server
    │   │   │   ├── event-store
    │   │   │   │   └── internal
    │   │   │   │       └── test
    │   │   │   ├── ocr
    │   │   │   └── rmu
    │   │   │       └── update-read-model
    │   │   │           ├── family
    │   │   │           ├── file
    │   │   │           ├── receipt
    │   │   │           ├── receipt-item
    │   │   │           └── user
    │   │   └── util
    │   └── types
    ├── supabase
    │   └── migrations
    └── types

サービスは以下の 3 つで構成されています:

  • web: Next.js で構築された Web アプリケーション
  • expo: iOS/Android 向けのモバイルアプリケーション
  • rmu (Read Model Updater): Lambda 上で動作するイベントハンドラー

API は hono を使用して OpenAPI/TypeScript の型定義とともに実装し、expo アプリケーションからそれらを呼び出す構成となっています。

ユーザー機能の実装フロー

ユーザー機能の実装を見ていくと、以下のような流れで処理が行われています:

  1. ルーティング(routes/sign-in.ts)
    • hono を使用して OpenAPI スキーマベースのルートを定義
    • POST リクエストを /api/hono/protected/auth/sign-in で受け付け
    • Clerk によるユーザー ID の認証を実施
// routes/sign-in.ts
export const signInRoute = createRoute({
    method: 'post',
    path: '/api/hono/protected/auth/sign-in',
    request: {
        body: {
            content: {
                'application/json': {
                    schema: SignInRequestSchema,
                },
            },
        },
    },
    responses: {
        201: {
            content: {
                'application/json': {
                    schema: SignInResponseSchema,
                },
            },
            description: 'User signed in successfully',
        },
        // ... エラーレスポンスの定義
    },
});
  1. コマンド処理(command/sign-in.ts)
    • ユーザーの新規作成または更新を実行
    • 新規ユーザーの場合はデフォルトの家族集約も同時に作成 (※この辺が悩ましいです)
// command/sign-in.ts
export const signIn = (
    userId: string,
    requestData: SignInRequest,
    dynamodbClient: DynamoDBClient,
): TE.TaskEither<CommandError, SignInResponse> => {
    const executorId = userId;
    const eventStore = EventStoreFactory.ofDynamoDB<UserId, User, UserEvent>(
        dynamodbClient,
        process.env.AWS_DYNAMODB_TABLE_NAME || 'dev-journal',
        // ... その他の設定
    );

    return pipe(
        TE.of({ userId, requestData }),
        TE.chain((data) =>
            TE.tryCatch(
                async () => {
                    const maybeUser = await userRepository.findById(
                        UserId.of(data.userId),
                    );
                    if (!maybeUser) {
                        // 新規ユーザー作成のロジック
                        const defaultFamilyId = FamilyId.generate();
                        const [user, event] = User.create({
                            email: requestData.email,
                            currentFamilyId: defaultFamilyId,
                            executorId,
                        });
                        await userRepository.store(event, user)();
                        // デフォルト家族の作成
                        const [family, familyEvent] = Family.create({
                            /*...*/
                        });
                        await familyRepository.store(familyEvent, family)();
                        return user;
                    }
                    // 既存ユーザーの更新ロジック
                    const result = maybeUser.update({
                        /*...*/
                    });
                    // ...
                },
                // エラーハンドリング
            ),
        ),
        // レスポンスの整形
    );
};
  1. ドメインモデル(domain/user/)
    • User 集約がイベントソーシングの中心として機能
    • イベントの定義と処理
// domain/user/user.ts
export class User implements Aggregate<User, UserId> {
    readonly typeName = 'User';

    static create({
        email,
        executorId,
        currentFamilyId,
    }: {
        email: string;
        executorId: string;
        currentFamilyId: FamilyId;
    }): [User, UserCreated] {
        const id = executorId;
        const sequenceNumber = 1;
        const version = 1;

        const user = new User({
            id: UserId.of(id),
            email,
            sequenceNumber,
            version,
            currentFamilyId,
        });

        const event = UserCreated.of({
            aggregateId: UserId.of(id),
            email,
            executorId,
            sequenceNumber,
            currentFamilyId,
        });

        return [user, event];
    }

    // イベントの再生処理
    static replay(events: UserEvent[], snapshot: User): User {
        return events.reduce((user, event) => {
            switch (event.symbol) {
                case UserCreatedTypeSymbol:
                    return snapshot;
                case UserUpdatedTypeSymbol: {
                    const updateEvent = event as UserUpdated;
                    const result = snapshot.update({
                        email: updateEvent.email,
                        executorId: event.executorId,
                    });
                    if (E.isLeft(result)) {
                        throw new Error(result.left.message);
                    }
                    return result.right[0];
                }
                default:
                    throw new Error('Unknown event type');
            }
        }, snapshot);
    }
}
  1. リポジトリ(repository/user-repository.ts)
    • DynamoDB をイベントストアとして利用
    • イベントの永続化とスナップショットの管理
// repository/user-repository.ts
export class UserRepositoryImpl implements UserRepository {
    store(
        event: UserEvent,
        snapshot: User,
    ): TE.TaskEither<RepositoryError, void> {
        if (event.isCreated || this.snapshotDecider?.(event, snapshot)) {
            return this.storeEventAndSnapshot(event, snapshot);
        }
        return this.storeEvent(event, snapshot.version);
    }

    async findById(id: UserId): Promise<User | undefined> {
        const snapshot = await this.eventStore.getLatestSnapshotById(id);
        if (snapshot === undefined) {
            return undefined;
        }
        const events = await this.eventStore.getEventsByIdSinceSequenceNumber(
            id,
            snapshot.sequenceNumber + 1,
        );
        return User.replay(events, snapshot);
    }
}

その他 local 開発のために

  • LocalStack の導入(DynamoDB の emulator)
  • DynamoDB Streams はポーリングしてメイン関数にイベントを渡す

DynamoDB Stream のポーリングの例(詳しくはかとじゅんさんの example のリポジトリに)

async function main() {
  console.log("Starting DynamoDB Stream listener...");

  while (true) {
    for (const tableName of TABLE_NAMES) {
      try {
        await streamDriver(tableName);
      } catch (error) {
        console.error(
          `An error occurred while processing table ${tableName}, but stream processing will continue:`,
          error,
        );
      }
    }

    // ポーリング間隔を設定
    await new Promise(resolve => setTimeout(resolve, 1000));
  }
}
  • Supabase local の導入(npx supabase start)
  • 複数のサーバーの立ち上げに concurrency を使用

concurrency の例

    "dev": "concurrently --kill-others --names \"next,dynamodb,export,expo\" \"npm run next:dev\" \"npm run dynamodb:start\" \"npm run export:type:start\" \"npm run expo:dev\"",

を使って npm run dev で Expo から Next.js、DynamoDB の立ち上げなど、依存関係のあるものとないものを並列に実行しつつ、変更検知までなんとか実装しました。この辺はまたリアクションがあれば追記していきたいと思います!

難しいと感じる点

フロントへのフィードバックが遅い

即座に Read Model に書き込まれません。Command -> Event Store -> Stream -> Read Model
と順を追って更新していくため、登録したのにフロントに反映されていない状態が発生しがちです。
そしてしばらくすると反映されます。
toB ではフロントに Read Model の内容が反映されるまで待っても問題ない時もありますが、
toC だとユーザー体験を優先させたいため、一度 Read Model に直接書き込まざるを得ないかという気持ちです。

集約を跨ぐケースがある(1 リクエストで複数トランザクションが走る)

ユーザーが初めて新規登録した時に家族という集約も作成したいというケースがありました。家族 -> ユーザー の依存関係のためです。ユーザーの集約と家族の集約はデータのライフサイクルが完全に異なるため、別々の集約であることは間違いありません。ただし、UserCreated のイベントの時にRMUで Read Model で家族を構築すると、DynamoDB には家族集約が存在しないのに Read Model には存在することになってしまいます。

迷ったのですが結果的には、1 リクエストで別々のトランザクションを実行させることにしました。Saga を実装することも考えましたが、個人開発ではそこまでの実装は避けたかったため、RDB で一連の処理の成功可否を管理し、データの整合性を確認できるようにしました。

まとめ

CQRS+ES は以前よりもだいぶハードルが下がりました。
ただし、従来のステートソーシングに慣れていると、まだ使い方に迷う部分もあります。
この点については、引き続き CQRS+ES での個人開発を進めながら習熟度あげていこうと思います。
最終的にはPO に対してもこちらの方が優れていることを説得できるようになったらいいなって思います。
(LLM も発展し、「該当箇所を参考にして」!といった実装ができるようになったため、以前はデメリットと考えていた「多くのドメインオブジェクトを実装しなければならない」という課題も解消されたと感じています。)
今回のリポジトリもリアクションがあればテンプレート化して共有したい気持ちです。

Discussion