🛠️

【ドメイン駆動設計】ドメインイベントの実装方法 - 2.イベントストアへの格納

2022/08/14に公開

なにこれ

以下の記事の続きです。「2.イベントストア格納処理方式」の実装方法をまとめた記事です。
https://zenn.dev/taiyou/articles/7b0325591bf1f8

イベントストア

導入メリット

  1. 発行されたドメインイベントを「他の境界づけられたコンテキスト」に通知する際のキューとして使えること
    • もちろん、自身のコンテキスト内で処理するためのキューとしても使える
  2. REST API経由でイベントストアに格納されたイベントをクライアントに提供できる
  3. 「ドメインエキスパートが気にかける、何かの出来事」を履歴として記録できる
    • ドメインエキスパートや開発者があとで確認できる
    • 政府から法的な監査の要請である出来事を記録する必要に導入することもある = 「政府が気にかける、何かの出来事」
    • ログ出力などによる記録ではなく、"集約における振る舞いの結果"を記録
  4. データ分析や機械学習の学習データとして、イベントストアのデータを使える
    • 例えば、zennの運営者)記事が作成されてから、Likeされるまでの時間はどのくらいか?を記事テーブルの作成日時とイベントストアの作成日時から計算できる
  5. イベントを使って、集約のインスタンスを再構築できる(イベントソーシングの一環)

1.イベントストアへの格納方法

この節では、発行されたイベントをイベントストアまで格納する方法を紹介します。格納されたイベントストアを「他の境界づけられたコンテキスト」に転送する方法については、次節で紹介します。

処理の流れ

サブスクライバでイベントを受信したら(handleEventが呼び出されたら)、次の流れでイベントストアまで格納されます。

  1. apppendメソッドを実行します。
    • ドメインイベント指定でappendを実行します。
    • イベントストアを保存するDBは会社・チームにより異なるため、アプリケーション側でEventStoreを継承した実装クラスMySQLEventStoreをDIしておく。
event.event_store.py
class EventStore(abc.ABC):
    @abc.abstractmethod
    def append(self, domain_event: DomainEvent) -> EventStore:
        pass
port.adapter.persistence.mysql.myql_event_store.py
class MySQLEventStore(EventStore):
    def append(self, domain_event: DomainEvent) -> EventStore:
        print("appendを実行します!!")
        # ...
  1. MySQLEventStoreクラスのappendが呼び出されたら、EventSerializerクラスのserializeメソッドを実行し、JSON文字列を取得します。
port.adapter.persistence.mysql.myql_event_store.py
class MySQLEventStore(EventStore):
    def append(self, domain_event: DomainEvent) -> EventStore:
        print("appendを実行します!!")
+	payload = EventSerializer.instance().serialize(domain_event)
  1. イベントIDとタイプ名、JSON文字列指定でStoredEventを生成します。
    • イベントIDは、アプリケーション側で生成する or DBが自動生成した一意なシーケンス値を指定する
    • タイプ名には、対応するドメインイベントの実装クラスを指定する。この値はデシリアイズする際に利用します。
port.adapter.persistence.mysql.myql_event_store.py
class MySQLEventStore(EventStore):
    def append(self, domain_event: DomainEvent) -> EventStore:
        print("appendを実行します!!")
	payload = EventSerializer.instance().serialize(domain_event)
+	event_id = # 生成する or DBから一意なシーケンス値を取得する
+	stored_event = StoredEvent(event_id, domain_event.type_name(), payload, domain_event.occurred_on)
  1. 生成されたStoredEventをデータベースに保存します。
port.adapter.persistence.mysql.myql_event_store.py
class MySQLEventStore(EventStore):
    def append(self, domain_event: DomainEvent) -> EventStore:
        print("appendを実行します!!")
	payload = EventSerializer.instance().serialize(domain_event)
	event_id = // 生成する or DBから一意なシーケンス値を取得する
	stored_event = StoredEvent(event_id, domain_event.type_name(), payload, domain_event.occurred_on)
+	self.__mysql_client.insert(stored_event)

2.格納されたイベントの処理

WIP

  • 自コンテキスト内でのイベントを処理する場合
  • 他の境界づけられたコンテキストにイベントを転送して処理する場合
    • REST APIによるイベント転送
    • MQによるイベント転送

実装

ドメインイベントサブスクライバ(実装クラス)

application.application_service_life_cycle.py
class DomainEventSubscriberImpl(DomainEventSubscriber[DomainEvent]):
    def __init__(self, event_store: EventStore):
        self.__event_store = event_store

    def subscribed_to_event_type(self) -> type:
        return DomainEvent

    def handle_event(self, a_domain_event: DomainEvent) -> NoReturn:
        self.__event_store.append(a_domain_event)


class ApplicationServiceLifeCycle:
    @inject  # event_storeをDIする
    def __init__(self, event_store: EventStore):
        self.__event_store = event_store

    def listen(self):
        DomainEventPublisher\
	     .instance()\
	     .subscriber(DomainEventSubscriberImpl(self.__event_store))
application
// see https://github.com/VaughnVernon/IDDD_Samples/blob/master/iddd_agilepm/src/main/java/com/saasovation/agilepm/application/ApplicationServiceLifeCycle.java

イベントストア(インターフェース)

event.event_store.py
class EventStore(abc.ABC):
    @abc.abstractmethod
    def append(self, domain_event: DomainEvent) -> StoredEvent:
        pass
// see https://github.com/VaughnVernon/IDDD_Samples/blob/master/iddd_common/src/main/java/com/saasovation/common/event/EventStore.java

〇〇イベントストア(実装クラス)

port.adapter.persistence.event.mysql.mysql_event_store.py
class MySQLEventStore:
    def append(self, domain_event: DomainEvent) -> StoredEvent:
	event_id = # 生成する or DBから一意なシーケンス値を取得する
	# ファクトリメソッドであるofメソッドを用意
	# ofメソッドで、ドメインイベントをシリアライズ化して、StoredEventを生成するようにしました
	stored_event = StoredEvent.of(event_id, domain_event)
	self.__mysql_client.insert(stored_event)

イベントシリアライザー

event.event_serializer.py
class EventSerializer:
    def serialize(self, domain_event: DomainEvent) -> str:
        return base64.b64encode(pickle.dumps(domain_event)).decode("utf-8")

ストアドイベント

event.stored_event.py
@dataclass(init=False)
class StoredEvent:
    event_id: int
    type_name: str
    event_body: str
    occurred_on: datetime.datetime

    def __init__(self, event_id: int, type_name: str, event_body: str, occurred_on: datetime.datetime):
        super().__setattr__("event_id", event_id)
        super().__setattr__("type_name", type_name)
        super().__setattr__("event_body", event_body)
        super().__setattr__("occurred_on", occurred_on)

    @staticmethod
    def of(event_id: int, domain_event: DomainEvent) -> StoredEvent:
        payload = EventSerializer.instance().serialize(domain_event)
        return StoredEvent(event_id, domain_event.type_name(), payload, domain_event.occurred_on)

    def __hash__(self):
        return self.event_id + (1237 * 233)

DB

RDB

stored_events

カラム名 説明
event_id int 主キー
event_body text シリアライズされたイベント
occurred_on datetime イベント発生日時
type_name varchar(100) ドメインイベントの実装クラス名
DDL

CREATE TABLE stored_events (
event_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
event_body TEXT NOT NULL,
occurred_on DATETIME NOT NULL,
type_name VARCHAR NOT NULL
);

KVS

WIP

参考文献

Discussion