Open12

Event-Driven Architecture in Golang読書メモ(仕切り直し)

geibeegeibee

前提

  • 小規模なブログアプリケーションがある。
  • ユーザー、記事、購入の3つのドメインがある
    • ユーザーは記事を購入できる。Zennのようなイメージ
  • アプリケーションはモジュラモノリスで、モジュール間はgRPC間で通信する
geibeegeibee

4章

用語の定義

  • ドメインイベント: 境界付けられたコンテキスト内での変更の伝搬
  • イベントソーシングイベント: ほぼドメインイベントと同じだが、ドメインイベントがいつ発生したかとかのメタデータも含む(ドメインイベント自体はプロセスが終わると消える)。
  • 統合イベント: 境界付けられたコンテキスト間での変更の伝搬

実装

ドメインイベントの実装のために以下の3つを作る

  • 集約にイベントを追加するための入れ物
  • ドメインイベント
  • イベントディスパッチャー

まず、あらゆる集約にEventを扱うインターフェースを定義したい。
それ専用の構造体を使って、関数の処理を委譲する。

internal/ddd/aggregate.go
type AggregateBase struct {
	ID string
	events []Event
}

func (a *AggregateBase) AddEvent(event Event) {
	a.events = append(a.events, event)
}

func (a *AggregateBase) GetID() string {
	return a.ID
}

func (a *AggregateBase) GetEvents() []Event {
	return a.events
}

これに合わせて、ドメインオブジェクトが作成されたときにイベントを発生させる。

orders/internal/domain/order.go
    // CreateOrder
    order.AddEvent(&OrderCreated{
		Order: order,
	})

ドメインイベント自体は、サービスでユニークなイベント名を返す関数を持つように定義する。

orders/internal/domain/order_event.go
package domain

type OrderCreated struct {
	Order *Order
}

func (OrderCreated) EventName() string {
	return "orders.OrderCreated"
}

orders/internal/application/create_order.go
type CreateOrderHandler struct {
	orders          domain.OrderRepository
+	domainPublisher ddd.Publisher
}

+ func NewCreateOrderHandler(orders domain.OrderRepository, domainPublisher ddd.Publisher) CreateOrderHandler {
	return CreateOrderHandler{
		orders:          orders,
+		domainPublisher: domainPublisher,
	}
}

func (h CreateOrderHandler) CreateOrder(ctx context.Context, cmd CreateOrder) error {
	order, err := domain.NewOrder(cmd.ID, cmd.BuyerID, cmd.OrderedAt, cmd.OrderItems, cmd.TotalPrice)
	if err != nil {
		return err
	}

	err = h.orders.Save(ctx, order)
	if err != nil {
		return err
	}

+	if err = h.domainPublisher.Publish(ctx, order.GetEvents()...); err != nil {
+		return err
+	}

	return nil
}

上記のようなイベントは、以下のような型アサーションによってイベントを判別する

orderCreated := event.(*domain.OrderCreated)

イベントを処理する機構(Dispatcher)については、実装の詳細は別記事: リンク
概要は以下の通り

  • Subscribe: イベントとイベントハンドラを受け取って、Dispatcherに登録
  • Publish: 対象のイベントのハンドラをすべて呼び出す
geibeegeibee
  1. domainEventHandlerが満たすべきインターフェースを定義する
    • これによって、モジュールを問わず色々なイベントハンドラを定義できる
  2. Dispatcherが(event, handler)をSubscribeするようにインターフェースを定義する
    • Subscribeでは、Dispatcherがeventに対応するhandlerを持つようにする
  3. Dispatcherに具体的な(module-specificな)イベント、ハンドラを定義

現在の状況(Orderモジュールについて)
CreateOrderを実行すると、CreateOrderイベントのハンドラがすべて実行される。
CreateOrderイベントをSubscribeしているため。
→モジュール内でDispatcherを通じて処理が疎結合になっている

  1. メインプロセスはDispatcherを用いてCreateOrderイベントをSubscribeする
  2. ユースケースの処理の実行
    例: CreateOrder
    • ドメイン層でメソッド(NewOrder)を呼び出し、その中でorderにOrderCreatedイベントを追加
      • (orderのドメインメソッドは自分に起きたイベントのスライスを保持する)
    • orderをDBへの永続化(イベントは永続化しない)
    • イベントをPublishする
      • (ユースケース層で、orderが持っているイベントのスライスに対応するハンドラを全部呼び出す)
  3. Publishされた時点で、Dispatcherによって、Publishされたイベントに対応するハンドラがすべて呼び出される

つまり、RegisterCustomer → NotifyCustomer →...みたいな連鎖をオーケストレーションロジックで実装するのではなく、Dispatcherでつないでいる

原著のChapter04完了時点では、RegisteredOrder等は誰からもSubscribeされていないので、空振りするだけのPublishが定義されている。これは不要なので定義しないものとする

ここまでいって、やっとドメインイベントの実装が終わった。
ドメインイベントの定義上、これはあくまでOrderという境界付けられたコンテキスト内のみにイベントを伝搬しているに過ぎない。加えて現時点では以下の状態であることに留意

  • ドメインイベントはイミュータブルだから、とりあえずどんなデータでもイベントに入れる
  • ドメインイベントなのでコンテキスト外からのSubscribeはない(誰がSubscribeしているかをすべて把握できる)
  • イベントのライフタイムはとても短く、DBにもキューにも永続化はされない

また、現時点では疎結合と言いつつ処理はすべて同期処理である。
5章からは、イベントソーシングイベントを実装していく

geibeegeibee

5章

概要

イベントソーシングとは、CRUDのイベントのすべてを「イベントの追加」として扱う考え方。
例えば以下のような感じ。

普通のCRUD イベントソーシング 時刻
注文の追加 注文追加イベントの発生 10:00
注文の変更 注文変更イベントの発生 10:05
注文の取消 注文取消イベントの発生 10:10

例えば上記の例では注文の取消が最新のイベントなので、注文の最新状態は「取消」になる。

イベントソーシングの実装においては、イベントソースは強い一貫性と楽観的な並行性を持つべき
例えば、2つ以上のイベントが同時に発行されたとき、最初のイベントだけがinsertされて、2つめのイベントはリトライされるべき。

→これ、なんでだろう。タイムスタンプが逆転してしまうと困るから?上記の発言におけるイベントのスコープがわからない。例えば全然関係ない注文のイベントについては好き勝手に挿入されていいような?

イベントソーシングとイベントストリーミング

  • イベントストリーミング: コンテキストをまたぐ状態変化の伝搬。メッセージブロー。カーを用いる。結果整合性
  • イベントソーシング: 単一コンテキスト内で履歴を管理する。データストアに永続化される。強い整合性が必要。

イベントソーシングのためのイベントの改善

イベントソーシングの実装の下準備として、イミュータブルなEvent型を定義

internal/ddd/event.go
type EventPayload interface{}

type EventOptions interface {
	configureEvent(*event)
}

type Event interface {
	ID() string
	EventName() string
	Payload() EventPayload
	Metadata() Metadata
	OccurredAt() time.Time
}

type event struct {
	id         string
	name       string
	payload    EventPayload
	metadata   Metadata
	occurredAt time.Time
}

var _ Event = (*event)(nil)

type EventHandler func(ctx context.Context, event Event) error

func NewEvent(name string, payload EventPayload, options ...EventOptions) event {
	e := event{
		id:         uuid.New().String(),
		name:       name,
		payload:    payload,
		metadata:   make(Metadata),
		occurredAt: time.Now(),
	}

	for _, option := range options {
		option.configureEvent(&e)
	}
	return e
}

func (e event) ID() string            { return e.id }
func (e event) EventName() string     { return e.name }
func (e event) Payload() EventPayload { return e.payload }
func (e event) Metadata() Metadata    { return e.metadata }
func (e event) OccurredAt() time.Time { return e.occurredAt }

このコードのポイント

  • GetterとConstructerのみを定義することで、eventのインスタンスはイミュータブルになる

疑問

  • EventPayloadインターフェースがinterface{}である意味。これはanyと変わらないのでは。後に関数をもたせる可能性があるからかな。
  • metadata.goの意味
    • Setter, Getterを定義している。これはすべての属性をパブリックにするのと何が違うのか。
      • optionsで渡されるキーの先頭が大文字だろうが小文字だろうがアクセスできるようになる。

続いて、Aggregateも微修正する。
イベントをバージョン管理できるようにするのが一番大きな目的。
また、イベントソーシングするとなるとイベントのメタデータ(いつ、どこで、誰が、どの集約を、どうするかを示す情報)がたくさん必要になるため、柔軟なメタデータを保持できるように変更

go

-func (a *AggregateBase) AddEvent(event Event) {
-       a.events = append(a.events, event)
+type AggregateEvent interface {
+       Event
+       AggregateName() string
+       AggregateID() string
+       AggregateVersion() int
+}
+
+type aggregateEvent struct {
+       event
+}
+
+var _ AggregateEvent = (*aggregateEvent)(nil)
+
+const (
+       AggregateNameKey    = "aggregate-name"
+       AggregateIDKey      = "aggregate-id"
+       AggregateVersionKey = "aggregate-version"
+)
+
+func (a *AggregateBase) AddEvent(name string, payload EventPayload, options ...EventOption) {
+       options = append(
+               options,
+               Metadata{
+                       AggregateNameKey: a.name,
+                       AggregateIDKey:   a.id,
+               },
+       )
+       a.events = append(
+               a.events,
+               aggregateEvent{
+                       event: NewEvent(name, payload, options...),
+               },
+       )
 }

+
+func (e aggregateEvent) AggregateName() string { return e.metadata.Get(AggregateNameKey).(string) }
+func (e aggregateEvent) AggregateID() string   { return e.metadata.Get(AggregateIDKey).(string) }
+func (e aggregateEvent) AggregateVersion() int { return e.metadata.Get(AggregateVersionKey).(int) }

このコードのポイ ント

  • イベントを集約に所属させる。これによって、集約ごとの複数のイベントが表現できる。
    • 例えば、注文ID=1のイベントID=3みたいな。
  • メタデータを持つようになったイベントに合わせてAddEventを変えた
    • optionsにAggregateの情報を入れて、新しいイベントを作成してeventsに追加する
  • AddEventでEventNameとPayloadを別々に受け取る。これによって、名前とペイロードの定義が疎結合になるため、1つのPayloadの定義を複数のイベントで使い回せるようになった。
geibeegeibee

上記の変更はあくまでイベントと集約を微修正したにすぎない。
ここから、イベントソーシングのための本格的な実装に動く

イベントソーシングの実装

特定の集約にイベントソーシングを追加できるようにするため、internal/esパッケージを作成する。

既存の集約にバージョンを追加

internal/es/aggregate.go
type Aggregate struct {
	ddd.Aggregate
	version int
}

func NewAggregate(id, name string) Aggregate {
	return Aggregate{
		Aggregate: ddd.NewAggregate(id, name),
		version:   0,
	}
}

AddEventでイベントを追加するときに、バージョンをインクリメントする

internal/es/aggregate.go

func (a *Aggregate) AddEvent(
	name string,
	payload ddd.EventPayload,
	options ...ddd.EventOption,
) {
	options = append(
		options,
		ddd.Metadata{
			ddd.AggregateVersionKey: a.PendingVersion() + 1,
		},
	)
}

func (a Aggregate) PendingVersion() int { return a.version + len(a.Events()) }

ドメインイベント側の変更

  • CreateOrdersで、直接order.ID=idみたいな感じで割り当ててはだめ。あくまでイベントを発生させるべき
  • イベント自体は、変更を追跡するのが主な目的になるため、イベントに含める値は絞る

集約側の変更

  • 集約はイベントの集まりとして扱われるため、集約そのもののテーブルは不要
    • 例えば、注文という集約をイベントソーシングで扱う場合、注文テーブルは不要
    • また、RepositoryはLoad()とSave()メソッドのみ持つ
geibeegeibee

変更点のまとめ(抜粋)

https://github.com/geibee/event-driven-from-scratch/compare/Chapter4...main

インフラストラクチャ層(1/2)

理解のしやすさのためにインフラストラクチャ層は2段階に分けて説明する。

  • イベントソーシングをする集約は、internal/es/aggregate.goに新しい構造体を定義する
    • 集約にバージョンをもたせる
internal/es/aggregate.go
type Aggregate struct {
	ddd.Aggregate
	version int
}
  • バージョン付きの集約のイベント操作を追加する
    • AddEvent: イベントを足す際に、最新のバージョン情報をメタデータとして付与する
      • 最新のバージョン情報=集約のバージョン+現在のイベントの長さ + 1
        • 自分のイベントを足すぶん+1する
internal/es/aggregate.go
func (a *Aggregate) AddEvent(
	name string,
	payload ddd.EventPayload,
	options ...ddd.EventOption,
) {
	options = append(
		options,
		ddd.Metadata{
			ddd.AggregateVersionKey: a.PendingVersion() + 1,
		},
	)
}

func (a Aggregate) PendingVersion() int { return a.version + len(a.Events()) }
  • バージョン付きの集約のイベント操作を追加する(続き)
    • CommitEvent: イベントの長さ分集約自体のバージョンを上げ、イベントはすべて消去する
      • イベントをすべて反映させた状態の集約のバージョンを作成している
    • ClearEvents: イベントをすべて消去
      • (イベントソーシングに限らない)Aggregateにメソッドを移譲している
internal/es/aggregate.go
func (a *Aggregate) CommitEvents() {
	a.version += len(a.Events())
	a.ClearEvents()
}

ドメイン層

集約の構造体定義クラスの変更

イベントをラップして、集約の情報をイベントに持たせる

internal/ddd/aggregate.go
type AggregateEvent interface {
  Event
  AggregateName() string
  AggregateID() string
  AggregateVersion() int
}

それに付随してイベントの追加ロジックを変える。

  • 引数としてイベントの名前を与える
    • 名前とイベントのペイロードを分ける(同じペイロードを複数の名前で使いまわす)
  • 集約のオプションに集約の名前と集約のIDをPushする。
internal/ddd/aggregate.go
func (a *Aggregate) AddEvent(name string, payload EventPayload, options ...EventOption) {
	options = append(
		options,
		Metadata{
			AggregateNameKey: a.name,
			AggregateIDKey:   a.id,
		},
	)
	a.events = append(
		a.events,
		aggregateEvent{
			event: NewEvent(name, payload, options...),
		},
	)
}

ドメインオブジェクト自体の変更

  • Orderオブジェクトを作成するときに、イベントソーシング用の集約を付与する
orders/internal/domain/order.go
type Order struct {
-	ddd.AggregateBase
+	es.Aggregate
	BuyerID    string
	OrderedAt  time.Time
	OrderItems []*OrderItem
	Price       float64
}

インターフェース層

変更なし

ユースケース層

イベントソーシングにおいては、CRUDにおけるUpdate, Deleteは存在しない。
=イベントの履歴を読み出すか、新しいイベントに追加するかしかない。

参照

Readが注文レコードの取得(=Find)から、注文履歴レコードの読み出し(=Load)に変わる

orders/internal/application/get_order.go
-func (h GetOrderHandler) GetOrder(ctx context.Context, cmd GetOrder) (*domain.Order, error) {
-	order, err := h.orders.Find(ctx, cmd.ID, cmd.BuyerID)
+func (h GetOrderHandler) GetOrder(ctx context.Context, query GetOrder) (*domain.Order, error) {
+	order, err := h.orders.Load(ctx, query.ID)
	if err != nil {
		return nil, err
	}

更新

Publisherを構造体に含めたり、ドメインイベントの作成時に他のドメインメソッドを呼び出す(=Publish)することはやめる。

orders/internal/application/create_order.go
type CreateOrderHandler struct {
	orders          domain.OrderRepository
-	domainPublisher ddd.Publisher
}

func (h CreateOrderHandler) CreateOrder(ctx context.Context, cmd CreateOrder) error {
	order, err := domain.CreateOrder(cmd.ID, cmd.BuyerID, cmd.OrderedAt, cmd.OrderItems, cmd.TotalPrice)
	if err != nil {
		return err
    }

-	if err = h.domainPublisher.Publish(ctx, order.GetEvents()...); err != nil {
-		return err
-	}
	return nil
}
geibeegeibee

上記を踏まえ、リポジトリ層では以下に注目する

リポジトリ層

通常のイベント

イベントそのものを永続化する
Loadでは、stream_id(=集約のID), stream_name(=集約の名前), stream_version(=集約のバージョン)をキーにしてイベントをクエリする。

※本書では、イベントソーシングとは集約をappend-onlyなストリームと定義している

internal/postgres/event_store.go
// 他の部分は本質から外れるためSQL文のみ抽出
func (s EventStore) Load(ctx context.Context, aggregate es.EventSourcedAggregate) (err error) {
	const query = `SELECT stream_version, event_id, event_name, event_data, occurred_at FROM %s WHERE stream_id = $1 AND stream_name = $2 AND stream_version > $3 ORDER BY stream_version ASC`

	aggregateID := aggregate.ID()
	aggregateName := aggregate.AggregateName()

	var rows *sql.Rows

	rows, err = s.db.QueryContext(ctx, s.table(query), aggregateID, aggregateName, aggregate.Version())

Saveでは、stream_id, stream_name, stream_versionに加えて、event_id, event_name, event_dataをINSERTする。
これは集約のイベントの数分ループを回し、それぞれのイベントのID, イベント名、イベントのデータを元にデータを更新する

internal/postgres/event_store.go
func (s EventStore) Save(ctx context.Context, aggregate es.EventSourcedAggregate) error {
	const query = `INSERT INTO %s (stream_id, stream_name, stream_version, event_id, event_name, event_data, occurred_at) VALUES ($1, $2, $3, $4, $5, $6, $7)`
(中略)
	aggregateID := aggregate.ID()
	aggregateName := aggregate.AggregateName()
(中略)
	for _, event := range aggregate.Events() {
(中略)
			if _, err = tx.ExecContext(
			ctx, s.table(query)
			aggregateID, aggregateName, event.AggregateVersion(), event.ID(), event.EventName(), payloadData, event.OccurredAt(),
geibeegeibee

ドメイン層(2/2)

集約のもつ属性に関わらずイベントのストリームとして永続化するために、再利用可能なリポジトリとStoreを作成する。
※こんなに汎化された機能はドメイン層ではないのではないかと思うが、一旦おいておく

イベントソーシングのためには、集約が大量に持っているイベントを適用したりするためのメソッドがたくさん必要。
したがって、イベントソーシングを実装するための集約のインターフェースを定義する。

internal/es/aggregate_store.go
type EventSourcedAggregate interface {
	ID() string
	AggregateName() string
	AddEvent(string, ddd.EventPayload, ...ddd.EventOption)
	Events() []ddd.AggregateEvent
	ClearEvents()
	ApplyEvent(ddd.Event) error
	CommitEvents()
	Version() int
	PendingVersion() int
}

また、上記のインターフェースを永続化するためのリポジトリインターフェースを定義する。

internal/es/aggregate_store.go
type AggregateStore interface {
	Load(ctx context.Context, aggregate EventSourcedAggregate) error
	Save(ctx context.Context, aggregate EventSourcedAggregate) error
}

リポジトリ層

イベントソーシングを踏まえたリポジトリ層を実装する。

  • AggreggateStoreは実質的なDB。
  • ジェネリクスを使っているが、これはLoadとSaveの処理の中で活きる
internal/es/aggregate_repository.go
type AggregateRepository[T EventSourcedAggregate] struct {
	aggregateName string
	registry      registry.Registry
	store         AggregateStore // Load, Saveの処理を移譲する
}

Loadの実装を見ていく。

  • レジストリのビルド
  • 型検査
  • DBのクエリ
internal/es/aggregate_repository.go
func (r AggregateRepository[T]) Load(
	ctx context.Context, aggregateID string,
) (agg T, err error) {
	var v any

    // レジストリの作成
	v, err = r.registry.Build(
		r.aggregateName,
		ddd.SetID(aggregateID),
		ddd.SetName(r.aggregateName),
	)
	if err != nil {
		return agg, err
	}

	var ok bool
    // 型検査
	if agg, ok = v.(T); !ok {
		return agg, fmt.Errorf("%T is not the expected type %T", v, agg)
	}

    // DBのクエリ
	if err = r.store.Load(ctx, agg); err != nil {
		return agg, err
	}

	return agg, nil
}

インフラ層

レジストリは内部のUtilライブラリ的なもの。
本システムでは、イベントストアを他のイベントと共有することになる。
Prototype(Clone)デザインパターンのPrototype Registryと同じようなもの。
Prototypeデザインパターンとは、オブジェクトを完璧に複製するためのデザインパターン。
詳細はこちらの記事にまとめた。
https://zenn.dev/regmarmcem/articles/a7220b40edbf23

geibeegeibee

今回のレジストリは上記PrototypeRegistryに近い。
そして、本システムではどうやってserdeするかを定義するためにこのPrototypeパターンを使っている。

以上を踏まえてレジストリが何をしているかを改めて考えていく。
まず、レジストリにプロトタイプを登録する。
これは型をkey, インスタンスを作成するためのfactory methodをfnとして渡し、シリアライズの関数をs, デシリアライズの関数をdとして渡す。

internal/registry/registry.go
func (r *registry) register(key string, fn func() interface{}, s Serializer, d Deserializer, o []BuildOption) error {
	r.mu.Lock()
	defer r.mu.Unlock()

	if _, exists := r.registered[key]; exists {
		return errors.New(fmt.Sprintf("キーに対応するデータがすでに登録されています。key: %s", key))
	}

	r.registered[key] = registered{
		factory:      fn,
		serializer:   s,
		deserializer: d,
		options:      o,
	}

	return nil
}

そしてこれをJSONでserdeするために以下のようなJSON用のラッパーがある。
※JSON以外の形式でシリアライズすることがあるのかという疑問はおいておく

internal/registry/serdes/json.go
type JsonSerde struct {
	r registry.Registry
}

var _ registry.Serde = (*JsonSerde)(nil)

func NewJsonSerde(r registry.Registry) *JsonSerde {
	return &JsonSerde{r: r}
}

func (c JsonSerde) Register(v registry.Registrable, options ...registry.BuildOption) error {
	return registry.Register(c.r, v, c.serialize, c.deserialize, options)
}

func (c JsonSerde) RegisterKey(key string, v interface{}, options ...registry.BuildOption) error {
	return registry.RegisterKey(c.r, key, v, c.serialize, c.deserialize, options)
}

func (c JsonSerde) RegisterFactory(key string, fn func() interface{}, options ...registry.BuildOption) error {
	return registry.RegisterFactory(c.r, key, fn, c.serialize, c.deserialize, options)
}

func (JsonSerde) serialize(v interface{}) ([]byte, error) {
	return json.Marshal(v)
}

func (JsonSerde) deserialize(data []byte, v interface{}) error {
	return json.Unmarshal(data, v)
}

registryでインスタンスを作成するのがBuild。
keyで指定した型を作るだけだが、その際にBuildOptionに指定した関数を適用する。

internal/registry/registry.go
func (r *registry) Build(key string, options ...BuildOption) (interface{}, error) {
	reg, exists := r.registered[key]
	if !exists {
		return nil, errors.New(fmt.Sprintf("キーに対応する値は登録されていません。key: %s", key))
	}

    // Factory Methodによってインスタンスを作成
	v := reg.factory()
	uos := append(r.registered[key].options, options...)

    // ビルドオプションとして与えられた関数をすべてインスタンスに適用
	for _, option := range uos {
		err := option(v)
		if err != nil {
			return nil, err
		}
	}

	return v, nil
}

このBuildは、registryに登録されたSerialize/Deserializeメソッドの中でインスタンスを作成するために使われるのに加え、イベントのLoadでも使われる。

internal/es/aggregate_repository.go
func (r AggregateRepository[T]) Load(
	ctx context.Context, aggregateID string,
) (agg T, err error) {
	var v any
	v, err = r.registry.Build(
		r.aggregateName,
        // BuildOptionとしてsetID, setNameを与えることで、作成時に特定のIDとNameを付与する
		ddd.SetID(aggregateID),
		ddd.SetName(r.aggregateName),
	)
	if err != nil {
		return agg, err
	}

	var ok bool
	if agg, ok = v.(T); !ok {
		return agg, fmt.Errorf("%T is not the expected type %T", v, agg)
	}

	if err = r.store.Load(ctx, agg); err != nil {
		return agg, err
	}

	return agg, nil
}

geibeegeibee

レジストリの使い方まとめ

  • 後で回収する型をregisterする(ゼロ値が望ましい)
    • 型はSerializer/Deserializerも一緒にregisterする
  • レジストリがデータストアから取得して作ったインスタンスは、一度もSerializeされたことのないデータと同じように扱える。
    • だから、インスタンスの型特有のserdeに関わる処理を、インスタンスの利用側で用意する必要はない
    • レジストリを使わないと、型ごとにマッパーを用意したり、map[string]interface{}として扱うしかなくなる
  • レジストリがprivate fieldを含む複雑な型を扱うには、今回はBuildOptionで制御する
    • これは多分JSONにシリアライズするときに可視性を制御するのと同じ感じ
    • 今回はidとnameをddd.SetID(), ddd.SetName()でセットするが、そういう指定をもっと細かくやりたければBuildOptionを使う

レジストリに関する所感

  • 型ごとにserdeするのにこんな大層な実装いるんだろうか。どうせJSONしか使ってないのに。
  • Loadで使われているが、このレジストリとジェネリクスを組み合わせることで、型ごとにDBとのMapperを用意しなくてすむ。
    • そのメリットは大きい気はするが、コードの見通しはかなり悪い。。。
geibeegeibee

CQRS

概要

  • イベントソーシングの実装にはCQRSがほぼ必須
    • たとえば、特定の注文イベントをキーに、それに紐づく商品情報やユーザーの詳細を確認したい場合は、CQRSがないとかなり実装が難しい。
      • すべてのイベントをロードしてフィルターして、、、みたいな操作が必要
        ※一方、CQRSにはイベントソーシングは必須ではない

実装

Read用のモデルを作る

  • それ用のRepositoryを作って、Load/Save以外のメソッドを実装する。
    • All()、Find()、...
    • ここでは、イベントソーシングを導入する前のRepositoryをリネームして使う
      ※この時点では、イベントストリームであるエンティティと閲覧用のエンティティをどのように同期するかは未定

イベントとRead用のモデルを同期する

Q. Save/Publishの順番をどうするか?

  • Save()→Publish()の順に実行すると、集約のオブジェクトからイベントが消えてしまうので、後続のPublish()で使えない。
  • 一方、Publish()→Save()の順だと、Save()が絶対に失敗しないようなイベントを扱うように運用で強制しないといけない。またそもそもこの呼び出し順も強制しないといけない。
  • Save()の中でPublish()を実行しちゃうのもありだが、SaveとPublishが密結合になる。

A. Chain of Responsibility Patternを実装したmiddlewareを用いる。

ポイント

メモ

データの編集はドメインイベントによって直接行われない。

  • ドメインファンクションなどのメソッドはすべてイベントを出力する
  • すべての編集はイベントの適用によって行われる

ドメインイベントのあり方としてドメインイベントが集約の情報をすべて持つのは何の問題もない。しか