Event-Driven Architecture in Golang読書メモ(仕切り直し)
以下の本を読んでいく
原著のソースコードはサービス数が多すぎて写経が大変すぎるので、サブセットを作成して仕切り直し。
上記のスクラップの続き。具体的には以下の続きから始める
前提
- 小規模なブログアプリケーションがある。
- ユーザー、記事、購入の3つのドメインがある
- ユーザーは記事を購入できる。Zennのようなイメージ
- アプリケーションはモジュラモノリスで、モジュール間はgRPC間で通信する
4章
用語の定義
- ドメインイベント: 境界付けられたコンテキスト内での変更の伝搬
- イベントソーシングイベント: ほぼドメインイベントと同じだが、ドメインイベントがいつ発生したかとかのメタデータも含む(ドメインイベント自体はプロセスが終わると消える)。
- 統合イベント: 境界付けられたコンテキスト間での変更の伝搬
実装
ドメインイベントの実装のために以下の3つを作る
- 集約にイベントを追加するための入れ物
- ドメインイベント
- イベントディスパッチャー
まず、あらゆる集約にEventを扱うインターフェースを定義したい。
それ専用の構造体を使って、関数の処理を委譲する。
type AggregateBase struct {
ID string
events []Event
}
func (a *AggregateBase) AddEvent(event Event) {
a.events = append(a.events, event)
}
func (a *AggregateBase) GetID() string {
return a.ID
}
func (a *AggregateBase) GetEvents() []Event {
return a.events
}
これに合わせて、ドメインオブジェクトが作成されたときにイベントを発生させる。
// CreateOrder
order.AddEvent(&OrderCreated{
Order: order,
})
ドメインイベント自体は、サービスでユニークなイベント名を返す関数を持つように定義する。
package domain
type OrderCreated struct {
Order *Order
}
func (OrderCreated) EventName() string {
return "orders.OrderCreated"
}
type CreateOrderHandler struct {
orders domain.OrderRepository
+ domainPublisher ddd.Publisher
}
+ func NewCreateOrderHandler(orders domain.OrderRepository, domainPublisher ddd.Publisher) CreateOrderHandler {
return CreateOrderHandler{
orders: orders,
+ domainPublisher: domainPublisher,
}
}
func (h CreateOrderHandler) CreateOrder(ctx context.Context, cmd CreateOrder) error {
order, err := domain.NewOrder(cmd.ID, cmd.BuyerID, cmd.OrderedAt, cmd.OrderItems, cmd.TotalPrice)
if err != nil {
return err
}
err = h.orders.Save(ctx, order)
if err != nil {
return err
}
+ if err = h.domainPublisher.Publish(ctx, order.GetEvents()...); err != nil {
+ return err
+ }
return nil
}
上記のようなイベントは、以下のような型アサーションによってイベントを判別する
orderCreated := event.(*domain.OrderCreated)
イベントを処理する機構(Dispatcher)については、実装の詳細は別記事: リンク
概要は以下の通り
- Subscribe: イベントとイベントハンドラを受け取って、Dispatcherに登録
- Publish: 対象のイベントのハンドラをすべて呼び出す
- domainEventHandlerが満たすべきインターフェースを定義する
- これによって、モジュールを問わず色々なイベントハンドラを定義できる
- Dispatcherが(event, handler)をSubscribeするようにインターフェースを定義する
- Subscribeでは、Dispatcherがeventに対応するhandlerを持つようにする
- Dispatcherに具体的な(module-specificな)イベント、ハンドラを定義
現在の状況(Orderモジュールについて)
CreateOrderを実行すると、CreateOrderイベントのハンドラがすべて実行される。
CreateOrderイベントをSubscribeしているため。
→モジュール内でDispatcherを通じて処理が疎結合になっている
- メインプロセスはDispatcherを用いてCreateOrderイベントをSubscribeする
- ユースケースの処理の実行
例: CreateOrder- ドメイン層でメソッド(NewOrder)を呼び出し、その中でorderにOrderCreatedイベントを追加
- (orderのドメインメソッドは自分に起きたイベントのスライスを保持する)
- orderをDBへの永続化(イベントは永続化しない)
- イベントをPublishする
- (ユースケース層で、orderが持っているイベントのスライスに対応するハンドラを全部呼び出す)
- ドメイン層でメソッド(NewOrder)を呼び出し、その中でorderにOrderCreatedイベントを追加
- Publishされた時点で、Dispatcherによって、Publishされたイベントに対応するハンドラがすべて呼び出される
つまり、RegisterCustomer → NotifyCustomer →...みたいな連鎖をオーケストレーションロジックで実装するのではなく、Dispatcherでつないでいる
原著のChapter04完了時点では、RegisteredOrder等は誰からもSubscribeされていないので、空振りするだけのPublishが定義されている。これは不要なので定義しないものとする
ここまでいって、やっとドメインイベントの実装が終わった。
ドメインイベントの定義上、これはあくまでOrderという境界付けられたコンテキスト内のみにイベントを伝搬しているに過ぎない。加えて現時点では以下の状態であることに留意
- ドメインイベントはイミュータブルだから、とりあえずどんなデータでもイベントに入れる
- ドメインイベントなのでコンテキスト外からのSubscribeはない(誰がSubscribeしているかをすべて把握できる)
- イベントのライフタイムはとても短く、DBにもキューにも永続化はされない
また、現時点では疎結合と言いつつ処理はすべて同期処理である。
5章からは、イベントソーシングイベントを実装していく
5章
概要
イベントソーシングとは、CRUDのイベントのすべてを「イベントの追加」として扱う考え方。
例えば以下のような感じ。
普通のCRUD | イベントソーシング | 時刻 |
---|---|---|
注文の追加 | 注文追加イベントの発生 | 10:00 |
注文の変更 | 注文変更イベントの発生 | 10:05 |
注文の取消 | 注文取消イベントの発生 | 10:10 |
例えば上記の例では注文の取消が最新のイベントなので、注文の最新状態は「取消」になる。
イベントソーシングの実装においては、イベントソースは強い一貫性と楽観的な並行性を持つべき
例えば、2つ以上のイベントが同時に発行されたとき、最初のイベントだけがinsertされて、2つめのイベントはリトライされるべき。
→これ、なんでだろう。タイムスタンプが逆転してしまうと困るから?上記の発言におけるイベントのスコープがわからない。例えば全然関係ない注文のイベントについては好き勝手に挿入されていいような?
イベントソーシングとイベントストリーミング
- イベントストリーミング: コンテキストをまたぐ状態変化の伝搬。メッセージブロー。カーを用いる。結果整合性
- イベントソーシング: 単一コンテキスト内で履歴を管理する。データストアに永続化される。強い整合性が必要。
イベントソーシングのためのイベントの改善
イベントソーシングの実装の下準備として、イミュータブルなEvent型を定義
type EventPayload interface{}
type EventOptions interface {
configureEvent(*event)
}
type Event interface {
ID() string
EventName() string
Payload() EventPayload
Metadata() Metadata
OccurredAt() time.Time
}
type event struct {
id string
name string
payload EventPayload
metadata Metadata
occurredAt time.Time
}
var _ Event = (*event)(nil)
type EventHandler func(ctx context.Context, event Event) error
func NewEvent(name string, payload EventPayload, options ...EventOptions) event {
e := event{
id: uuid.New().String(),
name: name,
payload: payload,
metadata: make(Metadata),
occurredAt: time.Now(),
}
for _, option := range options {
option.configureEvent(&e)
}
return e
}
func (e event) ID() string { return e.id }
func (e event) EventName() string { return e.name }
func (e event) Payload() EventPayload { return e.payload }
func (e event) Metadata() Metadata { return e.metadata }
func (e event) OccurredAt() time.Time { return e.occurredAt }
このコードのポイント
- GetterとConstructerのみを定義することで、eventのインスタンスはイミュータブルになる
疑問
- EventPayloadインターフェースがinterface{}である意味。これはanyと変わらないのでは。後に関数をもたせる可能性があるからかな。
- metadata.goの意味
- Setter, Getterを定義している。これはすべての属性をパブリックにするのと何が違うのか。
- optionsで渡されるキーの先頭が大文字だろうが小文字だろうがアクセスできるようになる。
- Setter, Getterを定義している。これはすべての属性をパブリックにするのと何が違うのか。
続いて、Aggregateも微修正する。
イベントをバージョン管理できるようにするのが一番大きな目的。
また、イベントソーシングするとなるとイベントのメタデータ(いつ、どこで、誰が、どの集約を、どうするかを示す情報)がたくさん必要になるため、柔軟なメタデータを保持できるように変更
-func (a *AggregateBase) AddEvent(event Event) {
- a.events = append(a.events, event)
+type AggregateEvent interface {
+ Event
+ AggregateName() string
+ AggregateID() string
+ AggregateVersion() int
+}
+
+type aggregateEvent struct {
+ event
+}
+
+var _ AggregateEvent = (*aggregateEvent)(nil)
+
+const (
+ AggregateNameKey = "aggregate-name"
+ AggregateIDKey = "aggregate-id"
+ AggregateVersionKey = "aggregate-version"
+)
+
+func (a *AggregateBase) AddEvent(name string, payload EventPayload, options ...EventOption) {
+ options = append(
+ options,
+ Metadata{
+ AggregateNameKey: a.name,
+ AggregateIDKey: a.id,
+ },
+ )
+ a.events = append(
+ a.events,
+ aggregateEvent{
+ event: NewEvent(name, payload, options...),
+ },
+ )
}
+
+func (e aggregateEvent) AggregateName() string { return e.metadata.Get(AggregateNameKey).(string) }
+func (e aggregateEvent) AggregateID() string { return e.metadata.Get(AggregateIDKey).(string) }
+func (e aggregateEvent) AggregateVersion() int { return e.metadata.Get(AggregateVersionKey).(int) }
このコードのポイ ント
- イベントを集約に所属させる。これによって、集約ごとの複数のイベントが表現できる。
- 例えば、注文ID=1のイベントID=3みたいな。
- メタデータを持つようになったイベントに合わせてAddEventを変えた
- optionsにAggregateの情報を入れて、新しいイベントを作成してeventsに追加する
- AddEventでEventNameとPayloadを別々に受け取る。これによって、名前とペイロードの定義が疎結合になるため、1つのPayloadの定義を複数のイベントで使い回せるようになった。
上記の変更はあくまでイベントと集約を微修正したにすぎない。
ここから、イベントソーシングのための本格的な実装に動く
イベントソーシングの実装
特定の集約にイベントソーシングを追加できるようにするため、internal/es
パッケージを作成する。
既存の集約にバージョンを追加
type Aggregate struct {
ddd.Aggregate
version int
}
func NewAggregate(id, name string) Aggregate {
return Aggregate{
Aggregate: ddd.NewAggregate(id, name),
version: 0,
}
}
AddEventでイベントを追加するときに、バージョンをインクリメントする
func (a *Aggregate) AddEvent(
name string,
payload ddd.EventPayload,
options ...ddd.EventOption,
) {
options = append(
options,
ddd.Metadata{
ddd.AggregateVersionKey: a.PendingVersion() + 1,
},
)
}
func (a Aggregate) PendingVersion() int { return a.version + len(a.Events()) }
ドメインイベント側の変更
- CreateOrdersで、直接order.ID=idみたいな感じで割り当ててはだめ。あくまでイベントを発生させるべき
- イベント自体は、変更を追跡するのが主な目的になるため、イベントに含める値は絞る
集約側の変更
- 集約はイベントの集まりとして扱われるため、集約そのもののテーブルは不要
- 例えば、注文という集約をイベントソーシングで扱う場合、注文テーブルは不要
- また、RepositoryはLoad()とSave()メソッドのみ持つ
変更点のまとめ(抜粋)
インフラストラクチャ層(1/2)
理解のしやすさのためにインフラストラクチャ層は2段階に分けて説明する。
- イベントソーシングをする集約は、internal/es/aggregate.goに新しい構造体を定義する
- 集約にバージョンをもたせる
type Aggregate struct {
ddd.Aggregate
version int
}
- バージョン付きの集約のイベント操作を追加する
- AddEvent: イベントを足す際に、最新のバージョン情報をメタデータとして付与する
- 最新のバージョン情報=集約のバージョン+現在のイベントの長さ + 1
- 自分のイベントを足すぶん+1する
- 最新のバージョン情報=集約のバージョン+現在のイベントの長さ + 1
- AddEvent: イベントを足す際に、最新のバージョン情報をメタデータとして付与する
func (a *Aggregate) AddEvent(
name string,
payload ddd.EventPayload,
options ...ddd.EventOption,
) {
options = append(
options,
ddd.Metadata{
ddd.AggregateVersionKey: a.PendingVersion() + 1,
},
)
}
func (a Aggregate) PendingVersion() int { return a.version + len(a.Events()) }
- バージョン付きの集約のイベント操作を追加する(続き)
- CommitEvent: イベントの長さ分集約自体のバージョンを上げ、イベントはすべて消去する
- イベントをすべて反映させた状態の集約のバージョンを作成している
- ClearEvents: イベントをすべて消去
- (イベントソーシングに限らない)Aggregateにメソッドを移譲している
- CommitEvent: イベントの長さ分集約自体のバージョンを上げ、イベントはすべて消去する
func (a *Aggregate) CommitEvents() {
a.version += len(a.Events())
a.ClearEvents()
}
ドメイン層
集約の構造体定義クラスの変更
イベントをラップして、集約の情報をイベントに持たせる
type AggregateEvent interface {
Event
AggregateName() string
AggregateID() string
AggregateVersion() int
}
それに付随してイベントの追加ロジックを変える。
- 引数としてイベントの名前を与える
- 名前とイベントのペイロードを分ける(同じペイロードを複数の名前で使いまわす)
- 集約のオプションに集約の名前と集約のIDをPushする。
func (a *Aggregate) AddEvent(name string, payload EventPayload, options ...EventOption) {
options = append(
options,
Metadata{
AggregateNameKey: a.name,
AggregateIDKey: a.id,
},
)
a.events = append(
a.events,
aggregateEvent{
event: NewEvent(name, payload, options...),
},
)
}
ドメインオブジェクト自体の変更
- Orderオブジェクトを作成するときに、イベントソーシング用の集約を付与する
type Order struct {
- ddd.AggregateBase
+ es.Aggregate
BuyerID string
OrderedAt time.Time
OrderItems []*OrderItem
Price float64
}
インターフェース層
変更なし
ユースケース層
イベントソーシングにおいては、CRUDにおけるUpdate, Deleteは存在しない。
=イベントの履歴を読み出すか、新しいイベントに追加するかしかない。
参照
Readが注文レコードの取得(=Find)から、注文履歴レコードの読み出し(=Load)に変わる
-func (h GetOrderHandler) GetOrder(ctx context.Context, cmd GetOrder) (*domain.Order, error) {
- order, err := h.orders.Find(ctx, cmd.ID, cmd.BuyerID)
+func (h GetOrderHandler) GetOrder(ctx context.Context, query GetOrder) (*domain.Order, error) {
+ order, err := h.orders.Load(ctx, query.ID)
if err != nil {
return nil, err
}
更新
Publisherを構造体に含めたり、ドメインイベントの作成時に他のドメインメソッドを呼び出す(=Publish)することはやめる。
type CreateOrderHandler struct {
orders domain.OrderRepository
- domainPublisher ddd.Publisher
}
func (h CreateOrderHandler) CreateOrder(ctx context.Context, cmd CreateOrder) error {
order, err := domain.CreateOrder(cmd.ID, cmd.BuyerID, cmd.OrderedAt, cmd.OrderItems, cmd.TotalPrice)
if err != nil {
return err
}
- if err = h.domainPublisher.Publish(ctx, order.GetEvents()...); err != nil {
- return err
- }
return nil
}
上記を踏まえ、リポジトリ層では以下に注目する
- ドメインイベントのPublish相当の機能はどこに消えてしまった?
- Load, Saveの具体的な実装
リポジトリ層
通常のイベント
イベントそのものを永続化する
Loadでは、stream_id(=集約のID), stream_name(=集約の名前), stream_version(=集約のバージョン)をキーにしてイベントをクエリする。
※本書では、イベントソーシングとは集約をappend-onlyなストリームと定義している
// 他の部分は本質から外れるためSQL文のみ抽出
func (s EventStore) Load(ctx context.Context, aggregate es.EventSourcedAggregate) (err error) {
const query = `SELECT stream_version, event_id, event_name, event_data, occurred_at FROM %s WHERE stream_id = $1 AND stream_name = $2 AND stream_version > $3 ORDER BY stream_version ASC`
aggregateID := aggregate.ID()
aggregateName := aggregate.AggregateName()
var rows *sql.Rows
rows, err = s.db.QueryContext(ctx, s.table(query), aggregateID, aggregateName, aggregate.Version())
Saveでは、stream_id, stream_name, stream_versionに加えて、event_id, event_name, event_dataをINSERTする。
これは集約のイベントの数分ループを回し、それぞれのイベントのID, イベント名、イベントのデータを元にデータを更新する
func (s EventStore) Save(ctx context.Context, aggregate es.EventSourcedAggregate) error {
const query = `INSERT INTO %s (stream_id, stream_name, stream_version, event_id, event_name, event_data, occurred_at) VALUES ($1, $2, $3, $4, $5, $6, $7)`
(中略)
aggregateID := aggregate.ID()
aggregateName := aggregate.AggregateName()
(中略)
for _, event := range aggregate.Events() {
(中略)
if _, err = tx.ExecContext(
ctx, s.table(query)
aggregateID, aggregateName, event.AggregateVersion(), event.ID(), event.EventName(), payloadData, event.OccurredAt(),
ドメイン層(2/2)
集約のもつ属性に関わらずイベントのストリームとして永続化するために、再利用可能なリポジトリとStoreを作成する。
※こんなに汎化された機能はドメイン層ではないのではないかと思うが、一旦おいておく
イベントソーシングのためには、集約が大量に持っているイベントを適用したりするためのメソッドがたくさん必要。
したがって、イベントソーシングを実装するための集約のインターフェースを定義する。
type EventSourcedAggregate interface {
ID() string
AggregateName() string
AddEvent(string, ddd.EventPayload, ...ddd.EventOption)
Events() []ddd.AggregateEvent
ClearEvents()
ApplyEvent(ddd.Event) error
CommitEvents()
Version() int
PendingVersion() int
}
また、上記のインターフェースを永続化するためのリポジトリインターフェースを定義する。
type AggregateStore interface {
Load(ctx context.Context, aggregate EventSourcedAggregate) error
Save(ctx context.Context, aggregate EventSourcedAggregate) error
}
リポジトリ層
イベントソーシングを踏まえたリポジトリ層を実装する。
- AggreggateStoreは実質的なDB。
- ジェネリクスを使っているが、これはLoadとSaveの処理の中で活きる
type AggregateRepository[T EventSourcedAggregate] struct {
aggregateName string
registry registry.Registry
store AggregateStore // Load, Saveの処理を移譲する
}
Loadの実装を見ていく。
- レジストリのビルド
- 型検査
- DBのクエリ
func (r AggregateRepository[T]) Load(
ctx context.Context, aggregateID string,
) (agg T, err error) {
var v any
// レジストリの作成
v, err = r.registry.Build(
r.aggregateName,
ddd.SetID(aggregateID),
ddd.SetName(r.aggregateName),
)
if err != nil {
return agg, err
}
var ok bool
// 型検査
if agg, ok = v.(T); !ok {
return agg, fmt.Errorf("%T is not the expected type %T", v, agg)
}
// DBのクエリ
if err = r.store.Load(ctx, agg); err != nil {
return agg, err
}
return agg, nil
}
インフラ層
レジストリは内部のUtilライブラリ的なもの。
本システムでは、イベントストアを他のイベントと共有することになる。
Prototype(Clone)デザインパターンのPrototype Registryと同じようなもの。
Prototypeデザインパターンとは、オブジェクトを完璧に複製するためのデザインパターン。
詳細はこちらの記事にまとめた。
今回のレジストリは上記PrototypeRegistryに近い。
そして、本システムではどうやってserdeするかを定義するためにこのPrototypeパターンを使っている。
以上を踏まえてレジストリが何をしているかを改めて考えていく。
まず、レジストリにプロトタイプを登録する。
これは型をkey, インスタンスを作成するためのfactory methodをfnとして渡し、シリアライズの関数をs, デシリアライズの関数をdとして渡す。
func (r *registry) register(key string, fn func() interface{}, s Serializer, d Deserializer, o []BuildOption) error {
r.mu.Lock()
defer r.mu.Unlock()
if _, exists := r.registered[key]; exists {
return errors.New(fmt.Sprintf("キーに対応するデータがすでに登録されています。key: %s", key))
}
r.registered[key] = registered{
factory: fn,
serializer: s,
deserializer: d,
options: o,
}
return nil
}
そしてこれをJSONでserdeするために以下のようなJSON用のラッパーがある。
※JSON以外の形式でシリアライズすることがあるのかという疑問はおいておく
type JsonSerde struct {
r registry.Registry
}
var _ registry.Serde = (*JsonSerde)(nil)
func NewJsonSerde(r registry.Registry) *JsonSerde {
return &JsonSerde{r: r}
}
func (c JsonSerde) Register(v registry.Registrable, options ...registry.BuildOption) error {
return registry.Register(c.r, v, c.serialize, c.deserialize, options)
}
func (c JsonSerde) RegisterKey(key string, v interface{}, options ...registry.BuildOption) error {
return registry.RegisterKey(c.r, key, v, c.serialize, c.deserialize, options)
}
func (c JsonSerde) RegisterFactory(key string, fn func() interface{}, options ...registry.BuildOption) error {
return registry.RegisterFactory(c.r, key, fn, c.serialize, c.deserialize, options)
}
func (JsonSerde) serialize(v interface{}) ([]byte, error) {
return json.Marshal(v)
}
func (JsonSerde) deserialize(data []byte, v interface{}) error {
return json.Unmarshal(data, v)
}
registryでインスタンスを作成するのがBuild。
keyで指定した型を作るだけだが、その際にBuildOptionに指定した関数を適用する。
func (r *registry) Build(key string, options ...BuildOption) (interface{}, error) {
reg, exists := r.registered[key]
if !exists {
return nil, errors.New(fmt.Sprintf("キーに対応する値は登録されていません。key: %s", key))
}
// Factory Methodによってインスタンスを作成
v := reg.factory()
uos := append(r.registered[key].options, options...)
// ビルドオプションとして与えられた関数をすべてインスタンスに適用
for _, option := range uos {
err := option(v)
if err != nil {
return nil, err
}
}
return v, nil
}
このBuildは、registryに登録されたSerialize/Deserializeメソッドの中でインスタンスを作成するために使われるのに加え、イベントのLoadでも使われる。
func (r AggregateRepository[T]) Load(
ctx context.Context, aggregateID string,
) (agg T, err error) {
var v any
v, err = r.registry.Build(
r.aggregateName,
// BuildOptionとしてsetID, setNameを与えることで、作成時に特定のIDとNameを付与する
ddd.SetID(aggregateID),
ddd.SetName(r.aggregateName),
)
if err != nil {
return agg, err
}
var ok bool
if agg, ok = v.(T); !ok {
return agg, fmt.Errorf("%T is not the expected type %T", v, agg)
}
if err = r.store.Load(ctx, agg); err != nil {
return agg, err
}
return agg, nil
}
レジストリの使い方まとめ
- 後で回収する型をregisterする(ゼロ値が望ましい)
- 型はSerializer/Deserializerも一緒にregisterする
- レジストリがデータストアから取得して作ったインスタンスは、一度もSerializeされたことのないデータと同じように扱える。
- だから、インスタンスの型特有のserdeに関わる処理を、インスタンスの利用側で用意する必要はない
- レジストリを使わないと、型ごとにマッパーを用意したり、map[string]interface{}として扱うしかなくなる
- レジストリがprivate fieldを含む複雑な型を扱うには、今回はBuildOptionで制御する
- これは多分JSONにシリアライズするときに可視性を制御するのと同じ感じ
- 今回はidとnameをddd.SetID(), ddd.SetName()でセットするが、そういう指定をもっと細かくやりたければBuildOptionを使う
レジストリに関する所感
- 型ごとにserdeするのにこんな大層な実装いるんだろうか。どうせJSONしか使ってないのに。
- Loadで使われているが、このレジストリとジェネリクスを組み合わせることで、型ごとにDBとのMapperを用意しなくてすむ。
- そのメリットは大きい気はするが、コードの見通しはかなり悪い。。。
CQRS
概要
- イベントソーシングの実装にはCQRSがほぼ必須
- たとえば、特定の注文イベントをキーに、それに紐づく商品情報やユーザーの詳細を確認したい場合は、CQRSがないとかなり実装が難しい。
- すべてのイベントをロードしてフィルターして、、、みたいな操作が必要
※一方、CQRSにはイベントソーシングは必須ではない
- すべてのイベントをロードしてフィルターして、、、みたいな操作が必要
- たとえば、特定の注文イベントをキーに、それに紐づく商品情報やユーザーの詳細を確認したい場合は、CQRSがないとかなり実装が難しい。
実装
Read用のモデルを作る
- それ用のRepositoryを作って、Load/Save以外のメソッドを実装する。
- All()、Find()、...
- ここでは、イベントソーシングを導入する前のRepositoryをリネームして使う
※この時点では、イベントストリームであるエンティティと閲覧用のエンティティをどのように同期するかは未定
イベントとRead用のモデルを同期する
Q. Save/Publishの順番をどうするか?
- Save()→Publish()の順に実行すると、集約のオブジェクトからイベントが消えてしまうので、後続のPublish()で使えない。
- 一方、Publish()→Save()の順だと、Save()が絶対に失敗しないようなイベントを扱うように運用で強制しないといけない。またそもそもこの呼び出し順も強制しないといけない。
- Save()の中でPublish()を実行しちゃうのもありだが、SaveとPublishが密結合になる。
A. Chain of Responsibility Patternを実装したmiddlewareを用いる。
ポイント
-
イベントハンドラの扱い
- ただのfunc型ではなく、EventHandler interfaceにする(Figure 5.17)
- これはhttpパッケージが好きなルーターを定義できるのと似ている
- これによって、ログモジュールを挟み込める(httpルーターとまるっきり一緒)
- ただのfunc型ではなく、EventHandler interfaceにする(Figure 5.17)
-
EventStoreとSnapshotStoreの違いを知るためには以下の章以降を熟読。
- EventStore
- 全体的にちゃんと読めていないので、以下以降を読む
https://learning.oreilly.com/library/view/event-driven-architecture-in/9781803238012/B18368_05.xhtml#:-:text=Connecting the domain events with the read model - 多分注文のようなshort-livedな集約のイベントと、Storeのようなlong-livedな集約のイベントがあるとき、long-livedなイベント全てを呼び出すのはコストが高いため、スナップショットを取るということの模様
メモ
データの編集はドメインイベントによって直接行われない。
- ドメインファンクションなどのメソッドはすべてイベントを出力する
- すべての編集はイベントの適用によって行われる
ドメインイベントのあり方としてドメインイベントが集約の情報をすべて持つのは何の問題もない。しか