🍣

Serverless FrameworkとTypeScriptでCQRS/EventSourcingを実現する 実装編1

7 min read

こちらの記事の続きです。

https://zenn.dev/makumattun/articles/474fbe08a5111c
※この記事ではEventStoreとmain関数の流れまでを紹介します。
この部分

イベントソーシングでテーブル格納時にやりたいこと

やりたいこと
・一つのテーブルにすべてのイベントを格納したい
・複数のドメインイベントをAcidに登録したい
・整合性を持たせるために誤って2回登録のイベントが発火されたときは失敗させたい

ここらへんを満たすようにテーブルの設計をする必要があります。

DynamoDBのインデックス設計

ごちゃごちゃ記載するよりも、EventStoreテーブルの定義を記載します。

属性値

Name Type Default Nullable Comment
resourceId S false
version N false

主キー(複合主キー)

Name Type Definition
Primary Key Partition key and sort key [{ AttributeName: "resourceId", KeyType: "HASH" } { AttributeName: "version", KeyType: "RANGE" }]

はじめてPartitionKeyやsortKeyが出てきたので解説します。EventStoreは大量のイベントデータを保持するので、分散型のデータベースでなければ耐えられません。PartitionKeyはDynamoDBの内部で水平分割するために使います。DynamoDBが自動で水平分割するのでどれだけデータが増えても無限にスケールします!すごい!SortKeyはDynamoDBのテーブル定義は設定するかしないかは任意です。

指定しない場合は、PartitionKey = PrimaryKeyとなって、同じPartitionKeyで登録ができません。実際はPartitionKeyがユーザーIDだったり、エンティティのIDに該当して、複数のイベントでstateを構築するのにそれでは困ります。

今回は誤って複数回実行された場合や、異なるユーザーが同時に同じ作業をしたときにどちらも登録されては困る場合の制御のためにVersionでロックする必要があります。

なのでソートするかはどうかはおいてても、複合主キー(2つのカラムで)リソースをユニークに特定するためのキーを作る必要があります。

ソートする必要性もあります。パフォーマンスの向上のためにSnapshotという概念を実装するときにソートされていたほうが良いためです。(そもそもDynamoDBでは、複合主キーはPartitionKey1つとSortKey1つしか指定できないので、それ以上のIndexなどの要件があればちょっと困ります)

例えば、このようにデータが登録されていきます。

そしてロックのイメージを掴んでもらうためにこちらをご覧ください。

実際にテーブルに入っている値

1項目(1レコード)はこのような感じです。

・複数のドメインイベントをAcidに登録したい

こちらの要件を満たすためにイベントは1レコードにstringで複数登録します。それがeventsカラムです。

これだと、先に書いた悲観的ロックで書いたように在庫チェックして登録するかしないかの判定ができないではないかと思うかもしれませんが、それはしない前提です。楽観ロックでのロックによって保証されていることとします。
なので、最初に設計したPartitionKeyとSortKeyに加えて、こちらでEventStoreになります。

最後に見切れているので1レコード分載せておきます

EventStoreTableのテーブル定義まとめ

Name Type Default Nullable Comment
resourceId S false リソースの主キーになるカラムですUser系のイベントならUserIdに相当
version N false Versionを使った楽観ロックに使います。
commitId S false committedAtとresouceIdの文字列連結したものです
committedAt N false テーブルに保存した時間です。
events S false Domainイベントの集合です。JSON Objectをstringに変換してます

コマンドのバリデーション

Serverless FrameworkとTypeScriptでCQRSを実現する 概要編
で紹介した。コマンドの方から実装を見ていきます。

送信されてきた書き込み用のリクエストパラメータのことです。
ここでは次の機能を考えてみます。
twitterのようなアプリを作る前提でblockしたときの挙動です。
(なぜlikeやretweetではなくblockにしたのかは謎。)

こちらをコマンド(書き込み用のリクエストパラメータ)とします

{
    "payload": {"userId":"167","blockUserId":"19"},
    "type": "unblock"
}

今回はjoiをつかってvalidationをおこないます。joiってめちゃくちゃ使いやすくて驚きました。

    if (command.type === "block") {
      // 1. どんなリクエストが来るのかを定義
      const blockEventPayloadSchema = Joi.object({
        userId: Joi.string().required(),
        blockUserId: Joi.string().required()
      })

      // 2. コマンドが決まったフォーマットかどうかを確認、これはcommandがtypeとpayloadという属性を持っているかどうかのみを見ます。
      checkValidCommandScheme(command)
      const { value, error } = blockEventPayloadSchema.validate(command.payload);
      if (error) {
        throw new InvalidCommandFormatException(error);
      }

コマンドをドメインイベントに変換

コマンドはDDDでいう集約の更新単位になりますので、コマンドとイベントの関係は1対1もしくは1対多です。

      // 3. コマンドをイベント(ドメインイベント)に変換する
      const event: Event[] = [...createUnBlockTweetUserEvent({ userId: value.userId, unBlockUserId: value.blockUserId })]

今回の場合は、コマンドとイベントは1対1なので一つのイベントを作成します。
関数自体も非常にシンプルです。

export const createBlockTweetUserEvent = ({
  userId,
  blockUserId
}): Event[] => {

  return [
    {
      type: 'BLOCK_TWEET_USER',
      payload: {
        userId,
        blockUserId,
      }
    }
  ]
}

イベントの登録

イベントの登録をそのまま保存するだけでもいいのですが、EventStoreをそのまま運用するとパフォーマンスが著しく悪化するのでSnapshotという概念を実装しています。これはまた説明が大変なので別記事で解説します。

      // 4. イベントをステートに変換する
      // イベントからステートへの変換はドメインロジックなので実際はDomainそうにあるReducerを呼び出す。
      const snapshot = await eventStoreRepository.getByIdUsingSnapshot({
        resourceId: value.userId,
        reducerId: reducerId,
        reducerVersion: reducerVersion,
        reducer: fromEventToBlockTweetUserReducer
      })

      // 5. イベントを保存する
      const result = await eventStoreRepository.save({ events: event, resourceId: value.userId, expectedVersion: snapshot.version + 1 })

イベントの保存では、expectedVersionで最新のversion + 1でインクリメントしています。これは、この処理自体が楽観ロックでロックするほどのものではないので、サーバーサイドのこの部分でこのように対応することでロックしたくないところでロックされてエラーになることを防ぎます。

まとめ

こちらが全体の流れをまとめたものになります。

    if (command.type === "block") {
      // 1. どんなリクエストが来るのかを定義
      const blockEventPayloadSchema = Joi.object({
        userId: Joi.string().required(),
        blockUserId: Joi.string().required()
      })

      // 2. コマンドの決まったフォーマットかどうかを確認
      checkValidCommandScheme(command)
      const { value, error } = blockEventPayloadSchema.validate(command.payload);
      if (error) {
        throw new InvalidCommandFormatException(error);
      }

      // 3. コマンドをイベント(ドメインイベント)に変換する
      // 1トランザクションで処理したいドメインイベントを複数作る
      const event: Event[] = [...createBlockTweetUserEvent({ userId: value.userId, blockUserId: value.blockUserId })]

      // 4. イベントをステートに変換する
      // イベントからステートへの変換はドメインロジックなので実際はDomainそうにあるReducerを呼び出す。
      const snapshot = await eventStoreRepository.getByIdUsingSnapshot({
        resourceId: value.userId,
        reducerId: reducerId,
        reducerVersion: reducerVersion,
        reducer: fromEventToBlockTweetUserReducer
      })

      // 5. イベントを保存する
      const result = await eventStoreRepository.save({ events: event, resourceId: value.userId, expectedVersion: snapshot.version + 1 })

      // 6.保存されたイベントは、EventBusを通じて、他のアプリケーションに展開される。

度々になりますが、この記事で紹介した

https://zenn.dev/makumattun/articles/474fbe08a5111c
この部分に相当します。

次の記事でRepositoryの実装とSnapshotのテーブルなどを見ていこうと思います。

参考

こちらの記事を参考に複雑になってた設計をリファクタリングしています。

https://medium.com/@domagojk/serverless-event-sourcing-in-aws-lambda-dynamodb-sqs-7237d79aed27