🛠️
【ドメイン駆動設計】ドメインイベントの実装方法 - 2.イベントストアへの格納
なにこれ
以下の記事の続きです。「2.イベントストア格納処理方式」の実装方法をまとめた記事です。
イベントストア
導入メリット
- 発行されたドメインイベントを「他の境界づけられたコンテキスト」に通知する際のキューとして使えること
- もちろん、自身のコンテキスト内で処理するためのキューとしても使える
- REST API経由でイベントストアに格納されたイベントをクライアントに提供できる
- 「ドメインエキスパートが気にかける、何かの出来事」を履歴として記録できる
- ドメインエキスパートや開発者があとで確認できる
- 政府から法的な監査の要請である出来事を記録する必要に導入することもある = 「政府が気にかける、何かの出来事」
- ログ出力などによる記録ではなく、"集約における振る舞いの結果"を記録
- データ分析や機械学習の学習データとして、イベントストアのデータを使える
- 例えば、zennの運営者)記事が作成されてから、Likeされるまでの時間はどのくらいか?を記事テーブルの作成日時とイベントストアの作成日時から計算できる
- イベントを使って、集約のインスタンスを再構築できる(イベントソーシングの一環)
1.イベントストアへの格納方法
この節では、発行されたイベントをイベントストアまで格納する方法を紹介します。格納されたイベントストアを「他の境界づけられたコンテキスト」に転送する方法については、次節で紹介します。
処理の流れ
サブスクライバでイベントを受信したら(handleEventが呼び出されたら)、次の流れでイベントストアまで格納されます。
- apppendメソッドを実行します。
- ドメインイベント指定でappendを実行します。
- イベントストアを保存するDBは会社・チームにより異なるため、アプリケーション側で
EventStore
を継承した実装クラスMySQLEventStoreをDIしておく。
event.event_store.py
class EventStore(abc.ABC):
@abc.abstractmethod
def append(self, domain_event: DomainEvent) -> EventStore:
pass
port.adapter.persistence.mysql.myql_event_store.py
class MySQLEventStore(EventStore):
def append(self, domain_event: DomainEvent) -> EventStore:
print("appendを実行します!!")
# ...
- MySQLEventStoreクラスのappendが呼び出されたら、
EventSerializer
クラスのserialize
メソッドを実行し、JSON文字列を取得します。
port.adapter.persistence.mysql.myql_event_store.py
class MySQLEventStore(EventStore):
def append(self, domain_event: DomainEvent) -> EventStore:
print("appendを実行します!!")
+ payload = EventSerializer.instance().serialize(domain_event)
- イベントIDとタイプ名、JSON文字列指定で
StoredEvent
を生成します。- イベントIDは、アプリケーション側で生成する or DBが自動生成した一意なシーケンス値を指定する
- タイプ名には、対応するドメインイベントの実装クラスを指定する。この値はデシリアイズする際に利用します。
port.adapter.persistence.mysql.myql_event_store.py
class MySQLEventStore(EventStore):
def append(self, domain_event: DomainEvent) -> EventStore:
print("appendを実行します!!")
payload = EventSerializer.instance().serialize(domain_event)
+ event_id = # 生成する or DBから一意なシーケンス値を取得する
+ stored_event = StoredEvent(event_id, domain_event.type_name(), payload, domain_event.occurred_on)
- 生成された
StoredEvent
をデータベースに保存します。
port.adapter.persistence.mysql.myql_event_store.py
class MySQLEventStore(EventStore):
def append(self, domain_event: DomainEvent) -> EventStore:
print("appendを実行します!!")
payload = EventSerializer.instance().serialize(domain_event)
event_id = // 生成する or DBから一意なシーケンス値を取得する
stored_event = StoredEvent(event_id, domain_event.type_name(), payload, domain_event.occurred_on)
+ self.__mysql_client.insert(stored_event)
2.格納されたイベントの処理
WIP
- 自コンテキスト内でのイベントを処理する場合
- 他の境界づけられたコンテキストにイベントを転送して処理する場合
- REST APIによるイベント転送
- MQによるイベント転送
実装
ドメインイベントサブスクライバ(実装クラス)
application.application_service_life_cycle.py
class DomainEventSubscriberImpl(DomainEventSubscriber[DomainEvent]):
def __init__(self, event_store: EventStore):
self.__event_store = event_store
def subscribed_to_event_type(self) -> type:
return DomainEvent
def handle_event(self, a_domain_event: DomainEvent) -> NoReturn:
self.__event_store.append(a_domain_event)
class ApplicationServiceLifeCycle:
@inject # event_storeをDIする
def __init__(self, event_store: EventStore):
self.__event_store = event_store
def listen(self):
DomainEventPublisher\
.instance()\
.subscriber(DomainEventSubscriberImpl(self.__event_store))
application
// see https://github.com/VaughnVernon/IDDD_Samples/blob/master/iddd_agilepm/src/main/java/com/saasovation/agilepm/application/ApplicationServiceLifeCycle.java
イベントストア(インターフェース)
event.event_store.py
class EventStore(abc.ABC):
@abc.abstractmethod
def append(self, domain_event: DomainEvent) -> StoredEvent:
pass
// see https://github.com/VaughnVernon/IDDD_Samples/blob/master/iddd_common/src/main/java/com/saasovation/common/event/EventStore.java
〇〇イベントストア(実装クラス)
port.adapter.persistence.event.mysql.mysql_event_store.py
class MySQLEventStore:
def append(self, domain_event: DomainEvent) -> StoredEvent:
event_id = # 生成する or DBから一意なシーケンス値を取得する
# ファクトリメソッドであるofメソッドを用意
# ofメソッドで、ドメインイベントをシリアライズ化して、StoredEventを生成するようにしました
stored_event = StoredEvent.of(event_id, domain_event)
self.__mysql_client.insert(stored_event)
イベントシリアライザー
event.event_serializer.py
class EventSerializer:
def serialize(self, domain_event: DomainEvent) -> str:
return base64.b64encode(pickle.dumps(domain_event)).decode("utf-8")
ストアドイベント
event.stored_event.py
@dataclass(init=False)
class StoredEvent:
event_id: int
type_name: str
event_body: str
occurred_on: datetime.datetime
def __init__(self, event_id: int, type_name: str, event_body: str, occurred_on: datetime.datetime):
super().__setattr__("event_id", event_id)
super().__setattr__("type_name", type_name)
super().__setattr__("event_body", event_body)
super().__setattr__("occurred_on", occurred_on)
@staticmethod
def of(event_id: int, domain_event: DomainEvent) -> StoredEvent:
payload = EventSerializer.instance().serialize(domain_event)
return StoredEvent(event_id, domain_event.type_name(), payload, domain_event.occurred_on)
def __hash__(self):
return self.event_id + (1237 * 233)
DB
RDB
stored_events
カラム名 | 型 | 説明 |
---|---|---|
event_id |
int |
主キー |
event_body |
text |
シリアライズされたイベント |
occurred_on |
datetime |
イベント発生日時 |
type_name |
varchar(100) |
ドメインイベントの実装クラス名 |
DDL
CREATE TABLE stored_events
(
event_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
event_body TEXT NOT NULL,
occurred_on DATETIME NOT NULL,
type_name VARCHAR NOT NULL
);
KVS
WIP
Discussion