DynamoDBを使ったCQRS/Event Sourcingシステムの構築方法(言語・F/W非依存)

2021/05/24に公開

CQRS/Event SourcingといえばAkka/Scalaがオススメと言い続けてきたけど、言語やフレームワーク非依存というか、そういう縛りが緩い方法を考えた(実際に検証したわけではないですが、実装できるつもりで書いてます)ので、以下にまとめます。

前提

クラウド環境はAWS。コマンド側DBをDynamoDB。DynamoDBにそれなりに詳しい人向けに基礎的な部分の解説も省いてます。クエリ側DBは要件に応じて選択してください。とりあえずAuroraのつもりで書きます。
コマンド側で発生したイベントをクエリ側に伝搬させるために、DynamoDB Streamsを利用します。クエリ側のRead APIはRead DBを読むだけなので解説は省きます。

ドメインはショッピングカートです。

アプリケーションは伝統的なステートレスウェブアプリケーションを想定します。アプリケーションの最新状態(State)はDynamoDBにあります。発生したイベント(Event)もDynamoDBに保存します。これらの書き込みは同一トランザクションで行うのでTransactWriteItems APIを利用します。

ちなみにStateが2箇所に出現しますが、Cart(State)は正規化されているのに対し、CartReadModel(State)は非正規化されている前提となります。

https://docs.aws.amazon.com/ja_jp/amazondynamodb/latest/developerguide/transaction-apis.html

事前準備

集約用テーブル、ドメインイベント用テーブルを準備する

ドメインモデル用テーブルは集約の最新状態を格納するためのテーブル。カラム構成は永続化できればなんでもいいですが、楽観的ロックができる必要があります。ここではversionを使う想定。

  • cartId(パーティションキー)
  • userAccountId
  • cartItems
  • createdAt
  • updatedAt
  • version

ドメインイベント用テーブルのパーティションキーは集約ID、ソートキーはイベントの連番にしてください。集約のversion+1をセットするのが一番簡単でしょう。イベントのボディは何でもOKですが、JSONにすることが多いです。

  • carId(パーティションキー)
  • sequenceNumber(ソートキー)
  • eventType
  • body
  • createdAt

ユースケース

コマンド側のユースケースを例示します。集約とドメインイベントを同じトランザクションで書きます。以下のコード例では具体的なAPI呼び出しがありませんが、TransactWriteItems APIをリポジトリ内部で呼び出していると思ってください。

そのうえで、集約の書き込みは楽観的ロックで不整合が起きないようにします。楽観的ロックに失敗したらドメインイベントも書き込まれませんし、逆に成功するとドメインイベントも正しい順序で書き込みができます。

カートの作成

コードは疑似コードとしてのScala。新規作成のときは、楽観的ロックもなにもないので、attribute_not_existsで集約IDが存在していないことを条件にします。

class NewCartCommandProcessor(
  cartRepository: CartRepository
) {
  def execute(ownerAccountId: UserAccountId): Unit = {
    // (1) カートを生成すると同時に、カートが生成されたイベントも生成する
    val (cart, cartCreatedEvent) = Cart(CartId.generate(), ownerAccountId)
    // (2) newCartとcartCreatedEventを同じトランザクションでattribute_not_exists(newCart.id)として条件付き書き込みする
    // newCart.versionには初期値をセットして更新する
    cartRepository.store(newCart, cartCreatedEvent)
  }
}

カートアイテムの更新

更新時は楽観的ロックを使います。

class AddCartItemCommandProcessor(
  cartRepository: CartRepository
) {
  def execute(cartId: CartId, cartItem: CartItem, version: Long): Unit = {
    // (1) 現在の集約を取得する
    val cart = cartRepository.findById(cartId, version)
    // (2) 集約の振る舞いを呼び出し、成功した場合は新しい集約とドメインイベントが手に入る
    val (newCart, cartAddedEvent) = cart.addCartItem(cartItem)
    // (3) newCartとcartAddedEventを同じトランザクションで、#version = versionとして条件付き書き込みする
    // SET cartAddedEvent.sequenceNumber = newCart.version + 1 する
    // SET newCart.version = newCart.version + 1 する
    cartRepository.store(newCart, cartAddedEvent)
  }
}

Akkaと比較して

ちなみに Akka/Scala では (1)の処理がワークロードが発生した初回のリクエストのみ行われ、集約アクターとしてキャッシュされるのでリクエスト毎に不要。ワークロードがなくなると集約アクターは破棄されるのでメモリも圧迫しない。また、イベントから状態を作り出すので(3)のnewCartの毎回の保存が不要。ただしイベントが大量な場合のショートカットとしてイベントN件に1回スナップショットを保存することができる。この考え方により、集約アクターの状態がDBと完全同期するので、純粋に書き込みのみに専念できるようになる。キャッシュについてはどれぐらい性能的に優位差があるか分からないが、DAXを使うこともできるので検討する価値はありそう。

RMU(Read Model Updater)をどのように実装するか

RMU=Read Model Updaterとは、ドメインイベントからリードモデルを構築する責務です。

ドメインイベントをコンシュームしてリードモデルを構築する必要があります。
ドメインイベントをコンシュームして、リードDBにクエリサイドで利用するデータを構築します。やり方は2つぐらいあります。

  1. KCL v1 + DynamoDB Kinesis Adaptor Clientを追加
  1. Lambdaを使う

1)はJavaでの実装になります。ちなみにDynamoDB StreamsではKCLv1のみになりますので注意。2)は起動が速い言語が向いてます。JVMとかはちょっとしんどいと思います。GraalVMならいいかも。(最近リリースされましたが、Kinesis Data Streams for DynamoDBというのはありますが、あれは順序保証されないので要件に合わないので注意してください)

ここではLambda前提で書きます。以下のように、読み込んだイベントに応じてリードモデルをリードDBに構築してください。実際のコードはもっち複雑になるかもしれませんが、イメージとして把握してもらえば。

// Lambdaの例
exports.handler = function(event, context, callback) {
    console.log(JSON.stringify(event, null, 2));
    event.Records.forEach(function(record) {
        var cartEvent = record.dynamodb;
        switch cartEvent.eventType {
          case "create_cart":
            createCart(cartEvent.body); // SQL実行
            break;
          case "add_cart_item":
            addCartItem(cartEvent.body); // SQL実行
            break;
          // ...
        }
    });
    callback(null, "message");
};

注意としては以下。

  • イベントのコンシュームは順序が守られる必要があります。DynamoDB Streamsではパーティションキー毎に順序保証されるので、パーティションキーを必ず集約IDにする必要があります。
  • リードモデルのカラムにLastSequenceNumberが必要で、最後に処理したイベントのシーケンズ番号を書き込んでください。処理しようとするイベントがすでに古いシーケンズ番号の場合はスキップしましょう。

Pros vs Cons

  • Pros
    • トランザクション書き込みができるNoSQLがあれば、言語やフレームワークを選ばない(Akkaを使わなくてもCQRS/Event Sourcingが実現できる)
  • Cons
    • Akkaの方式と比べると書き込みのオーバーヘッドが高い

まとめ

この方法は書き込みのオーバーヘッドが高そうですが、言語やF/Wの縛りは緩いと思います。性能への影響は実際に検証してみないとなんとも言えないですね。DynamoDBを例にして書きましたが、条件付きでトランザクション書き込みができて、変更データをキャプチャできるならDynamoDB以外でも適用可能な考え方だと思います。ご参考までに。

Discussion