イベントストアでDynamoDBを採用したときにつまづいたこと(整合性の設計)
はじめに
イベントソーシングとは
イベントソーシングは、システムの状態を「最新のデータ」としてではなく、「起きたことの記録(イベント)の列」として保存するアーキテクチャパターンです。
たとえば注文管理システムであれば、「現在の注文合計金額」を直接保存するのではなく、「商品Aを追加した」「クーポンを適用した」「支払いが完了した」といったイベントを積み重ねます。現在の状態が必要なときは、その列を最初から順に再生(リプレイ)して導出します。
状態をイベントの列から導出するとき、毎回最初から再生すると時間がかかります。そのため、ある時点の状態を「スナップショット」として保存しておき、次回はスナップショットの時点から再生するのが一般的です。
スナップショット(seqNr=10 時点の状態)
+ イベント seqNr=11
+ イベント seqNr=12
+ イベント seqNr=13
↓
現在の状態
この「スナップショット + その後のイベント列を取得して状態を復元する」処理が、今回問題の起点になりました。
この記事について
DynamoDB を使って EventStore を実装したとき、「書いた直後に読んだら古いデータが返ってくる」という問題に遭遇しました。この記事では、その原因と対処、そして「なぜ最初の設計がまずかったか」を説明します。
DynamoDB の整合性モデル
まず前提知識として、DynamoDB の読み取り整合性モデルを整理します。
DynamoDB には2種類の読み取りモードがあります。
| 読み取り種別 | 対象 | 特徴 |
|---|---|---|
| 強力な整合性のある読み込み | プライマリテーブルのみ | 書き込み後に即座に最新値が見える。Read Capacity Unit(RCU、読み取りコストの単位)は2倍 |
| 結果整合性のある読み込み | プライマリテーブル / GSI | 伝播が完了していれば最新値。未完了なら古い値が返ることがある |
重要な制約として、GSI(Global Secondary Index)は強力な整合性のある読み込みを指定できません。GSI に対するクエリは常に結果整合性のある読み込みになります。これは AWS の公式ドキュメントにも明記されています。
書き込みから GSI への伝播はミリ秒〜秒単位の遅延があります。
書き込み
│
▼
プライマリテーブル(即座に反映)
│
│ 非同期伝播(遅延あり)
▼
GSI
この性質は DynamoDB の仕様であり、バグではありません。ただし、読み取り先として GSI を選んだ場合、書き込み直後の読み取りが古い状態を返す可能性があることを意味します。
問題の構造 — 再構築パスに GSI を使っていた
EventStore を使う Repository の Save() は、内部でおおよそ次の処理を行います。
- loadInternal: スナップショット + その後のイベント列を取得し、集約の最新状態を復元する
- ApplyCommand: コマンドを集約に適用してイベントを生成する
- PersistEvent: 生成したイベントをプライマリテーブルに書き込む
なぜ Save() の中でロードするのか
Save(ctx, aggID, cmd) という API の形にしたのは、アプリケーションコードに永続化のメタ情報(SeqNr、Version)を渡させたくなかったためです。
イベントソーシングの実装では、楽観ロックのために「現在のバージョン番号」や「最後のイベント番号」を呼び出し元が管理するパターンもあります。しかしその場合、アプリケーションコードは「次のコマンドを実行するために今の SeqNr はいくつか」を意識し続けなければなりません。
// アプリコードにメタ情報が漏れるパターン(今回は採用しなかった)
loaded, seqNr := repo.Load(ctx, id)
repo.Save(ctx, id, cmd, seqNr) // seqNr をアプリ側で管理
// 今回の設計: アプリコードはドメインのことだけ考える
repo.Save(ctx, id, cmd) // メタ情報は内部で管理
アプリコードをドメインロジックだけに集中させるために、Save() の内部で毎回最新状態を読み直すことにしました。DynamoDB の整合性さえ担保できれば、この設計は成立すると思っていました。
問題は 1. の「イベント列を取得する」部分にありました。当初の実装では、このクエリに GSI を使っていました。
// 変更前: GSI を使ったクエリ
QueryInput{
TableName: journalTable,
IndexName: "journal-aid-index", // GSI
KeyConditionExpression: "#aid = :aid AND #seq_nr > :seq_nr",
// ConsistentRead は指定不可(GSI は常に結果整合性)
}
Save() を連続で呼んだとき、何が起きるか見てみます。
1回目の Save() でプライマリテーブルに書き込んだイベントが、2回目の Save() の時点で GSI にまだ伝播していない。その結果、集約を blank state から再構築してしまい、「集約が存在しない」という偽のエラーが発生します。
コントラクトの問題として捉える
この問題を「DynamoDB の GSI の落とし穴」として片付けることもできますが、根本は設計上のコントラクト不整合でした。
Repository の loadInternal は「書いた直後に読んでも必ず最新のイベントが見える」ことを暗黙に期待しています。しかし EventStore インターフェースは、イベントを取得するメソッドについてそのような保証を表明していませんでした。
インターフェースが「イベントを返せばよい」という緩い契約にとどまっていたため、DynamoDB の実装者が結果整合性のある GSI クエリを選んでも「仕様を満たしている」と判断できてしまいます。インターフェースを満たした実装が、Repository の前提を壊す——これがコントラクト不整合の実体です。
修正の方向は「EventStore の実装は、集約の再構築に使う read について、書き込み直後でも最新のイベントを返すことを保証しなければならない」という契約を明示することです。DynamoDB の実装でいえば、GSI ではなくプライマリテーブルに対して強力な整合性のある読み込みを使うことが、その契約を満たす唯一の手段です。
実装の変更 — プライマリテーブル + ConsistentRead へ
LoadStreamAfter の DynamoDB 実装を、GSI クエリからプライマリテーブルの強整合性読み取りに変更します。
// 変更後: プライマリテーブルを ConsistentRead で読む
QueryInput{
TableName: journalTable,
// IndexName なし(プライマリテーブル直読み)
KeyConditionExpression: "#pk = :pk AND #sk > :sk",
FilterExpression: "#aid = :aid",
ConsistentRead: true, // 強整合性
ScanIndexForward: true,
}
ConsistentRead: true を指定することで、書き込み直後の読み取りでも最新のイベントが必ず返るようになります。
なぜ FilterExpression が必要か
プライマリテーブルに変更したことで、1点追加の考慮が必要になりました。
この EventStore のテーブル設計では、書き込みのホットスポットを避けるために、集約を複数のシャードに分散しています。パーティションキーは "TypeName-{シャードID}" の形式で、1つのシャードには複数の集約のイベントが混在します。
パーティションキー: "Order-3"(シャード3)
ソートキー: "Order-abc-00000000000000000001" ← 集約 abc のイベント
ソートキー: "Order-xyz-00000000000000000001" ← 集約 xyz のイベント(同じシャード)
ソートキー: "Order-abc-00000000000000000002" ← 集約 abc のイベント
GSI を使っていたときは aid = "Order-abc" で集約単位に絞り込めましたが、プライマリテーブルではソートキーの範囲クエリになります。#sk > "Order-abc-..." という条件は辞書順の比較なので、Order-xyz-... のようなキーも通過してしまいます(xyz > abc)。
そこで FilterExpression: "#aid = :aid" を追加し、集約 abc のイベントだけを返すようにします。
また、1回のクエリで全件返るとは限らないため、LastEvaluatedKey を使ったページネーションループも必要になります。
for {
out, err := s.client.Query(ctx, query)
// ... イベントを追加 ...
if len(out.LastEvaluatedKey) == 0 {
break
}
query.ExclusiveStartKey = out.LastEvaluatedKey
}
トレードオフ
この変更には明確なコスト面のトレードオフがあります。
RCU コストの増加
強力な整合性のある読み込みは、結果整合性のある読み込みの2倍の RCU を消費します。集約の再構築頻度が高いシステムでは、DynamoDB のコストが増加します。
FilterExpression によるスキャン非効率
DynamoDB の FilterExpression はページサイズを計算した後に適用されます。同一シャードに他の集約のイベントが多い場合、1回のクエリで返る件数が少なくなり、ページネーション回数が増えることがあります。
これらのトレードオフを承知した上で、この設計を選んだ理由は「集約の再構築は correctness-critical である」という判断です。書き込み後の読み取りが古い状態を返した場合、アプリケーションコードが偽のドメインエラーを受け取ることになります。この問題をアプリケーション側でリトライや SeqNr 管理によって回避しようとすると、EventStore が本来隠蔽すべき永続化の詳細がアプリケーションのコードに漏れ出します。
コストを内部で吸収することで、アプリケーションのコードはドメインロジックだけに集中できます。
現在の見解 — この設計は根本的な解決ではない
ConsistentRead への変更で整合性の問題は解消されましたが、振り返ると「Save() が毎回ロードする」という設計自体が問題の根本だったと考えています。
Save() の内部で毎回 DynamoDB を読み直すということは、コマンドを実行するたびに read + write の2回のアクセスが発生することを意味します。ConsistentRead に切り替えたことでそのコストはさらに増加しています。
現在、より根本的な解決策として LoadedAggregate という opaque な型を導入する方向で開発を進めています。
// ロードした結果を opaque な handle として受け取る
loaded, err := repo.LoadForCommand(ctx, id)
// handle を渡すので Save 時の DynamoDB read が不要
loaded, err = repo.SaveLoaded(ctx, loaded, cmd)
LoadedAggregate は内部に seqNr / version を持ちますが、アプリコードからはフィールドにアクセスできません。SaveLoaded は渡された handle の状態をそのまま使うため、DynamoDB から再読み込みをしません。
この設計であれば、「書いた直後に読み直す」操作そのものが発生しないので、GSI の整合性問題が起きる余地もありません。ConsistentRead による RCU コストの増加も解消されます。
今回の LoadStreamAfter への変更は「コントラクトを正しく表明して整合性を保証した」という意味では正しい変更でしたが、コストを払い続ける設計です。LoadedAggregate への移行が完了した時点で、根本から作り直すことになります。
また、そもそも同じ集約に対して Save() を連続で呼ぶような使い方自体、集約設計の観点からも疑問が残ります。DDD における集約は、1つのコマンドに対して1つのトランザクション境界を持つのが基本です。同じ集約に対して短時間に複数のコマンドを連続して適用するユースケースは、集約の粒度が細かすぎるか、あるいは複数のコマンドを1つにまとめられるサインである可能性があります。今回の整合性問題は、その設計上の違和感を技術的な症状として表面化させたとも言えます。
まとめ
DynamoDB で EventStore を実装したとき、GSI に対するクエリは ConsistentRead を指定できないため、書き込み直後の読み取りで古いデータが返る可能性があります。集約の再構築パスにこのクエリを使っていた結果、連続する Save() で偽のエラーが発生しました。
この問題はインターフェースのコントラクト不整合でもありました。GetEventsSince という名前では整合性の要件が不明確だったため、LoadStreamAfter にリネームし「集約再構築のための read」という意図を明示しました。実装は GSI クエリからプライマリテーブルの ConsistentRead: true に変更し、強整合性を保証します。
ただし、この変更は暫定的な対処です。Save() が毎回ロードするという設計自体を見直し、ロード済みの状態を opaque な handle として次の操作に渡す設計に移行することで、再読み込み自体をなくすことが根本的な解決になります。
Discussion