🍔

ドメインイベントによるイベント駆動の実装

2021/05/22に公開

プロローグ

膨れ上がる要件、複雑化していくAPI

システムを開発していると、どんどん要件が増え、一つのAPIの呼び出しで様々な処理を行う必要が出てくることがあります。
例えば予定を登録するAPIを実行したら、DBにデータを保存し、Googleカレンダーにもイベントを作成し、予定の開始時刻直前に通知を出すためのタスクを登録したり、同様に終了時刻直前に通知を出すためのタスクを登録したり・・・。

この様に1つのAPIに対する要求が増えてくると、プログラムが複雑になり、それぞれの処理のエラーハンドリングなどが増え制御が難しくなっていきます。
また、Googleカレンダーに連携するなど、外部サービスに依存する場合はそことの通信コストがかかり、レスポンスを返すのが遅くなったり、外部サービスがダウンしている場合はAPIも失敗することになり、システムの可用性が下がります。

非同期処理の必要性

処理をシンプルに保ち、可用性も損なわないようにするにはどうしたらよいでしょうか?
アプローチの一つとして、処理を分割し、非同期で実行するという手があると思います。

例えば、上記の例で言うと、Googleカレンダーへのイベント登録は本当にAPI呼び出し時に実行しなければならない(レスポンスを返す前に完了しなければならない)ことでしょうか?
予定の登録APIではDBへの登録にとどまり、レスポンスを返してしまった上で非同期でGoogleカレンダーへの連携処理を実行してみるようにするとどうでしょうか?

APIの実行時間からGoogleカレンダーとの通信時間が削減され、Googleカレンダーの稼働状況に関わらずAPIが成功するようになり可用性が向上します。仮にGoogleカレンダーがダウンしていたとしても、リトライし続ければ復帰した時点でイベントが作成され、結果的には整合性が取れます。また、リトライにかかる時間はAPIの実行に影響を与えません。つまり、ユーザーを待たせず、作業を成功させるためUXが向上します。

この様に処理を非同期で実行し、結果整合とすることで大きな処理の塊を分割しつつ、可用性を高める事ができます。
もちろん要件にもよりますが、意外とある程度の遅延は許容されます。

メッセージングサービスによるイベント駆動の導入

この様に、APIなどを実行したのを皮切りに別処理を駆動するようなスタイルをイベント駆動と言います。
では非同期で実行するには具体的にはどうすればよいでしょうか?

Javaであれば、スレッドを起動し対応するイベントハンドラを駆動させるということも出来ます。
しかし、GAEでアプリを作る場合はスレッドに制限があったり、同じインスタンスのリソースを使うことになり、重い処理が走る場合はリソースを専有してしまって他のリクエストに影響を与えたりと問題がありそうです。

そのため、Pub/Sub(あるいはCloudTasksなど)を使って新規リクエストとしてイベントを処理するようにします。そうすることで、これらの問題に対処することが出来ます。

そしてマイクロサービスへ

GAEの制限を回避する以外にも、メッセージングサービスを使う目的があります。
それはシステムのマイクロサービス化です。

システムが巨大化してくると、コードが大きくなり、更に開発メンバーも増えていきます。
コードは複雑度を増し、巨大なコードベースを複数のチーム、メンバーが操作することによる弊害も生まれます。

継続的に巨大なシステムを複数のチームやメンバーで開発していくにはマイクロサービス化が重要になってきます。
サービスを分けることによって、コードベースが分割され複数のチームでの開発を可能にします。また、稼働させるサーバーのスペックや構成をサービス単位で選択でき、個別にスケールさせていくことも出来ます。一つのサービスで障害が発生しても、他のサービスによって機能の一部を提供し続けることも可能になります。

このように巨大なシステムを開発していく上でマイクロサービス化は様々なメリットを持ちます。しかし、マイクロサービス化をすると当然サービス間でのやり取りが発生します。つまりそれはGoogleカレンダーと連携するのと同じような問題をはらみます。
他サービスに依存するAPIは通信コストや障害による機能の提供不能問題を抱えることになります。

そういった事態を避けるため、マイクロサービスではメッセージングによる非同期連携が必須となってきます。そのため、予めメッセージングによる非同期処理を実装しておくことで、マイクロサービス化をスムーズに出来る可能性が高まると思います。

ただし、何でもやればいいというわけではありません。初期段階や小さなプロジェクトで前もってマイクロサービス化を考慮してあれこれ策を弄するのが必ずしも正解とは限りません。
YAGNIの精神よろしく、必要のないものにコストを費やすのはコストオーバーになりかねません。
プロジェクト毎に相談しながらやりましょう。

ドメインイベントを用いたイベントの表現と実装

では実際にイベント駆動にするにはどんな実装をするとよいでしょうか?
APIを実行したときに、処理の中でPub/SubやCloud Tasksのクライアントライブラリを使ってメッセージやタスクを登録すれば非同期で処理を実行出来ます。

しかし、ビジネスロジック内に直接Pub/Subなどの技術的な関心事を組み込んでしまうことはコードが表現するドメイン知識を曇らせ、更に非同期処理を実現するためのコードの交換可能性を低下させます。

そこで、DDD(Domain Driven Design)の戦術的設計のパターンの一つであるドメインイベントを使ってイベントを表現します。
ドメインイベントとはドメイン内で起こった何かの出来事を表現するモデルです。例えば「タスクが登録された」といったようなものがイベントです。これにTaskRegisteredのような名前をつけてクラスとして扱う手法がドメインイベントパターンです。

POST /tasksというRESTのAPIを実行したら、DBにタスクを登録し、TaskRegisteredイベントを発行します。そのイベントの発生をアプリケーション層などのサブスクライバーが検知し、メッセージングサービスを使ってメッセージを登録します。
こうすることで、ビジネスロジック上ではイベントが発生したというドメイン知識を表現しつつ、技術的な関心事をドメイン層から切り離します。更にDIP(Dependency Inversion Principle)に則った設計をし、メッセージングサービスへのインターフェースを用意することでメッセージングの実現方法を交換可能にします。

ドメインイベントの名前
DDDでは言葉が重要な意味を持ちます。実際の会話や文章で登場する言葉をそのままモデルとして扱います。コード上でも会話でも統一された語彙を使うことで、コードとドメイン知識の結びつきが強くなります。このように統一された語彙の集まり(用語集)をDDDではユビキタス言語といいます。
ドメインイベントもユビキタス言語に則って命名される必要があります。上記では「タスクが登録された」でしたが、「タスクが予定された」のような表現を会話で使っている場合はTaskScheduledなどにし、言葉に名前を合わせていきます。

イベント駆動の構築

構成

今回はGAE/Javaでのイベント駆動の仕組みを構築していきます。
まず、以下に実装の全体図を示します。

※ 実装したコードはGitHubで公開しています

使用技術

項目名 使用技術
データベース Cloud SQL for MySQL
メッセージングシステム Cloud Pub/Sub
言語 Java11
実行環境 GAE/Java11

今回はドメインイベントを使ってイベントを表現していくため、DDDライクな設計にします。
また、全体的にクリーンアーキテクチャでレイヤーを切り分けて構築します。

レイヤーは以下の4つに分かれています

  • ドメイン層(Domain Layer)
    • ドメインモデルを定義し、ビジネスロジックを配置するレイヤー。ドメインイベントはここで定義、発生する。
  • アプリケーション層(Application Layer)
    • トランザクションの管理やユースケースの実行を行うレイヤー。ドメイン層で発生したドメインイベントを観測し、イベントの蓄積と配信を行う。
  • プレゼンテーション層(Presentaion Layer)
    • APIのエンドポイントを定義したり、メッセージングサービスからのPushリクエストの受け取り口を定義する。
  • インフラストラクチャ層(Infrastructure Layer)
    • 技術的な関心事を記載するレイヤー。DBやメッセージングサービスへアクセスするための実装を提供する。

以降の章で具体的な解説をしていきます。

シナリオ

イベント駆動の仕組みを構築するにあたって、GitHubのProjectsのようなシステムを例として考えてみましょう。GitHubのProjectsはProjectを作成し、Projectの中に複数のColumn、更にその中に複数のNoteを作成することが出来ます。
今回はこのGitHub Projectsを参考に、簡略化したシステムをサンプルとして実装します。
また、独自に以下のルールを追加してみます。

  • ColumnはProjectがアクティブじゃないと追加出来ない
  • NoteはProjectがアクティブじゃないと追加出来ない
  • NoteはCloumnがアクティブじゃないと追加出来ない
  • Noteが作成されたらGoogle TasksにTaskとしてコピーする

モデル

ドメイン層(Domain Layer)

ドメイン層では要となるドメインイベントの定義を行います。
また、アプリケーション層にドメインイベントを配信するための仕組みもこのレイヤーで提供します。

DomainEvent

まずはドメインイベントを定義してみましょう。
上記のシナリオでは以下のような要件がありました。

Noteが作成されたらGoogle TasksにTaskとしてコピーする

この、〜されたら、〜した時、などがイベントです。これをドメインモデルとして定義していきます。
ここではNoteCreatedというモデルとして定義します。
まずはドメインイベントを抽象的に扱えるように、基底となるインターフェースを定義しましょう。

public interface DomainEvent {
}

ここではインターフェースとしましたが、ドメインイベントに共通で持たせたい属性などがある場合は抽象クラスとしてもよいでしょう。
ではNoteCreatedを定義してみます。

public interface NoteEvent extends DomainEvent {
}

@Value
public class NoteCreated implements NoteEvent {

    ProjectId projectId;
    ColumnId columnId;
    NoteId noteId;

}

今回の例ではNoteに関するイベントを示すためのマーカーインターフェースを定義しています。
今の所この実装にあまり意味はないですが、抽象クラスとして共通項をもたせたり、抽象化した処理を行うのに使用したりも出来るでしょう。

NoteCreatedにはイベントの発生の元になったNoteのプロジェクトのID、カラムID、ノートIDを持つようにしています。

イベントには何を持たせればよいでしょうか?

今回の例ではタスクを作成するために、Noteのインスタンスが必要になります。上記の情報があれば、それを基にDBからNoteのインスタンスを復元することが出来ますが、それがオーバーヘッドでもあります。
イベントにNoteのすべての情報が入っていればコンシューマ側ではオーバーヘッド無しに情報を取得できます。このようにイベントにコンシューマが必要な情報を詰め込むテクニックをイベントエンリッチメントと呼びます。ただし、これはこれでモデルが変わったときにイベントのメンテナンスが大変になったりとデメリットもあります。
イベントに何を入れるべきかは要件やプロジェクトでどうすると良さそうか考えて設計しましょう。

次は定義したイベントをアプリケーション層に配信するための仕組みを見ていきましょう。

DomainEventPublisher、DomainEventSubscriber

ドメイン層は業務やビジネスのルールなどのビジネスロジックを表現、実装する場所です。◯◯をしたらXXが起こるというドメイン知識の表現はここで行いますが、ドメインイベントをメッセージングサービスに渡し、非同期処理を発火させる準備を行うのはアプリケーション層の仕事になります。

そこで、ドメイン層で発生したドメインイベントをアプリケーション層に通知する手段が必要になります。
しかし、ドメイン層からアプリケーション層のクラスを直接呼び出すのはClean Architecture的には内部から外部への依存になってしまうので、DIP的な実装にしたいところです。

そのため、ドメインイベントの発生を購読して受け取るためのインターフェースとしてDomainEventSubscriberをドメイン層で提供し、アプリケーション層で実装を提供します。
それをドメインイベントを発行するDomainEventPublisherに登録し、イベントの通知を行います。

サブスクライバは以下のように定義します。
イベントを受け取るためのメソッドが一つ定義されているだけのシンプルなインターフェースです。

public interface DomainEventSubscriber<T extends DomainEvent> {
    void handle(T event);
}

次はパブリッシャーを定義します。
パブリッシャーにはサブスクライバと購読するイベントを登録するためのsubscribe()メソッドと、ドメインイベントを発行するpublish()メソッドを定義します。

public class DomainEventPublisher {

    public <T extends DomainEvent> void subscribe(
            Class<T> subscribeTo,
            DomainEventSubscriber<T> subscriber
    ) {
        ・・・
    }

    public void publish(DomainEvent event) {
        ・・・
    }
    
}

時には複数箇所でイベントが発行される場合もあるでしょうし、イベントに対するサブスクライバも一つとは限りません。任意のイベントに複数のサブスクライバを登録出来るようにサブスクライバをドメインイベントの型をキーにリストで持てるようにします。
スレッド単位(リクエスト単位)でサブスクライバを持つようにThreadLocalに保持するようにします。(DIコンテナを利用してリクエストスコープなシングルトンインスタンスを作るのも一つの手だと思います。)

DomainEventPublisher.java
private static final ThreadLocal<Map<Class<? extends DomainEvent>, List<DomainEventSubscriber<?>>>>
            SUBSCRIBERS = ThreadLocal.withInitial(HashMap::new);

public <T extends DomainEvent> void subscribe(
        Class<T> subscribeTo,
        DomainEventSubscriber<T> subscriber
) {
    // ドメインイベントの型をキーにサブスクライバを登録する
    List<DomainEventSubscriber<?>> domainEventSubscribers = SUBSCRIBERS.get()
           .computeIfAbsent(subscribeTo, key -> new ArrayList<>());
    domainEventSubscribers.add(subscriber);
}

@SuppressWarnings("unchecked")
public void publish(DomainEvent event) {
    // 発行されたドメインイベントの型を基に、当該イベントを購読しているサブスクライバに通知する。
    Class<? extends DomainEvent> key = event.getClass();
    List<DomainEventSubscriber<?>> subscribers = SUBSCRIBERS.get().get(key);

    if (subscribers == null) {
        return;
    }

    subscribers.forEach(subscriber -> ((DomainEventSubscriber<DomainEvent>) subscriber)
        .handle(event));
}

アプリケーション層、例えばUseCaseクラスなどから以下のようにしてDomainEventPublisherにサブスクライバを登録します。

publisher.subscribe(NoteCreated.class, event -> {
    // NoteCreatedイベントが発生した場合に呼び出される
});

コールバックは同一スレッドで呼び出され、基本的には同一トランザクション内で処理されることになります。
今回はこのコールバック内で非同期処理を実行するための処理を行いますが、それ以外にもイベントをトリガーに別の処理を行うことも当然できます。ただし、DDD的には同一トランザクション内で複数の集約に変更を加えるのはアンチパターンです。イベントを基に別のモデルにも変更を加えたい場合は今回実装するように非同期処理とし、結果整合とするのが推奨されています。

何故複数の集約に変更を加えるのがアンチパターンなのか?
もし同一トランザクション内で複数の集約に変更が加えられるとどうなるでしょうか?例えば上記のように集約Aの変更イベントをトリガーにして集約Bに変更を加えるような振る舞いを実行した場合、集約Aと集約Bは同一のトランザクション内で永続化されます。この時、別のリクエストによる集約Bへの変更が同時に発生した場合、排他制御によって集約Bの保存に失敗します。そしてそれは同一トランザクション内で処理された集約Aの変更も破棄され、ユーザーのリクエストは失敗に終わります。そのため、UXを下げることになります。
トランザクション、そして集約とはつまり整合性を取る単位です。集約Aと集約Bが強整合性を持たなければならない場合は一つの集約となるようにドメインモデルを見直しましょう。(ただし例外もあります。)

大抵の場合、Webサーバーはスレッドをプールして使いまわすようにしているでしょう。
そのため、ThreadLocalにサブスクライバを登録したままにしておくと、他のリクエストのときに多重にイベントのハンドリングが発生してしまうことになります。

これを回避するためにパブリッシャにサブスクライバを開放するメソッドを用意し、リクエスト開始時点などでリセットしてあげるようにします。
今回はServletを使って実装するため、Filterで処理するようにします。

public class DomainEventPublisher {
    
    ・・・

    public void reset() {
        SUBSCRIBERS.get().clear();
    }
    
}

@Singleton
@WebFilter(urlPatterns = "/*")
public class DomainEventResetFilter implements Filter {

    @Inject
    private DomainEventPublisher publisher;
    
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        publisher.reset();
        chain.doFilter(request, response);
    }
    ・・・
}

ドメインイベントを発行するのは誰か?

DomainEventPublisherを定義し、ドメインイベントをサブスクライバに通知出来るようになりました。
ではドメインイベントをパブリッシャに発行するコードはどこに書くのでしょうか?

IDDD(実践ドメイン駆動設計)ではドメインモデル内でイベントを作成し、パブリッシャへ発行するコードが紹介されています。ドメインイベントはドメインモデルの振る舞いによって発生するものです。なのでIDDDの実装は自然に思えます。

しかし、ドメインモデル内からイベントを発行するにはドメインモデルにパブリッシャを持たせなければなりません。出来ればドメインモデルにはモデルを表現するための属性以外をもたせたくはありませんし、ドメインモデルをテストするのにパブリッシャのインスタンスを用意するのも面倒です。

ドメインモデルの振る舞いは大きくCommandとQueryに分ける事ができます。Commandは変更を加える副作用のあるもの、Queryは副作用のない処理の結果を受け取るものです(CQRS(Command Query Responsibility Segregation)パターン)。Commandは内部状態を変更するだけで戻り値は返しません。

基本的にドメインイベントが表現するのは集約に対する変更(つまりCommandの結果)です。
今回はこのCommandの戻り値としてドメインイベントを返すようにします。こうすることでドメインモデル内でドメインイベントが生成されるように実装しつつ、パブリッシャを呼び出す責務をドメインモデルから外すことが出来ます。

戻り値でドメインイベントを返すようにしても、依然として誰が発行するのかは解決出来ていません。
実装方法は色々あると思いますが、今回はドメインサービスが発行する責務を持つようにします。ドメインサービスにパブリッシャをDIして使うのは非常に自然に書けます。

public class NoteService {

    private final DomainEventPublisher eventPublisher;
    ・・・
    
    @Inject
    public NoteService(
            DomainEventPublisher eventPublisher,
            ・・・
    ) {
        this.eventPublisher = eventPublisher;
        ・・・
    }

    public Note createNote(ProjectId projectId, ColumnId columnId, String description) {
        ・・・
        NoteCreated event = ・・・
        eventPublisher.publish(event);
        ・・・
    }

}

しかしドメインサービスの乱用はドメインモデル貧血症を起こすため、アンチパターンとされています。
また、ドメインイベントを受け取ってイベント発行するだけのコードを用意するのも面倒です。
今回はドメインイベントを購読するのは、ドメインモデルまたはドメインサービスを実行するUseCaseクラスに限るため、ドメインサービスを使ってイベント発行をするのはサービスを使ったほうが良いパターンの場合のみにし、それ以外は割り切ってUseCaseクラスで直接ドメインモデルからイベントを戻り値として受け取るようにしてしまいます。

イベントと結果の受け取り

ドメインイベントはCommand系メソッドの戻り値にすることにしましたが、NoteCreatedのようなイベントはどうしたらよいでしょうか?
newによるインスタンス生成ではイベントを返すことは出来ません。同様に、ファクトリメソッドを用意しても、作成したインスタンスを受け取る必要があるためイベントを戻り値に出来ません。

基本的にはCQRSで設計し、内部状態に変更を加えるものは戻り値を持たないようにしたいところですが、時には結果とともにイベントを発生させたくなることもあるでしょう。
特に新規作成イベントなどでは前述したようにインスタンスとイベントの受け取りが必要になります。
そのため、イベントと処理結果をタプルとして戻り値に出来るように入れ物を作ってあげることにします。

@Value
public class DomainResult<R, E extends DomainEvent> {
    R result;
    E event;
}

今回は複数のイベント発生などを想定していないのでイベントは一つにしましたが、複数発生させたい場合はListなどで定義すると良いでしょう。

前述したとおり、newではイベントを返せないので、Noteにはファクトリメソッドを用意し、その戻り値にDomainResultを指定します。

public final class Note extends ConcurrencyEntity {
    
    ・・・

    public static DomainResult<Note, NoteEvent> create(
            NoteId id,
            ProjectId projectId,
            ColumnId columnId,
            String description
    ) {
        Note note = new Note(id, projectId, columnId, description);
        NoteCreated event = new NoteCreated(projectId, columnId, id);
        return new DomainResult<>(note, event);
    }

    public Note(NoteId id, ProjectId projectId, ColumnId columnId, String description) {
        ・・・
    }
    ・・・
}

アプリケーション層(Application Layer)

アプリケーション層ではドメイン層から発行されるドメインイベントを観測し、メッセージングサービスへと登録する仕組みを構築していきます。

構成図からもわかるように、このレイヤーの仕事は少々複雑です。
確実に非同期処理を実行するために、トランザクションやメッセージの確立性を考慮した設計にする必要があります。

詳しく見ていきましょう。

トランザクショナルメッセージング

ドメインイベントが発生したら、それをトリガーに非同期処理を開始するために、メッセージングサービスにイベントをメッセージとして登録する必要があります。

しかし、ドメインイベントをそのままメッセージに変換してすぐにメッセージングサービスへ登録してしまってよいでしょうか?

メッセージングサービスへの登録が成功すると、メッセージングサービスは直ちにサブスクライバにメッセージの通知を行います。
もしメッセージングサービスへのメッセージ登録後に、アプリケーションでの何かしらの処理が失敗しロールバックされるとどうなるでしょうか?
イベントが発生するきっかけになった集約への変更は取り消され、そのイベントは発生しなかったことになります。しかし既にメッセージは送信されており、受け取った側は取り消されてしまったイベントに反応し誤作動を起こすかもしれません。

このような問題を回避し、信頼のおけるメッセージ送信を行うにはどうすればよいでしょうか?

ここではトランザクショナルメッセージングという手法を使います。
集約への変更の保存と同一のトランザクションでイベントを一旦DBに保存してしまいます。正常にDBがコミットされた後、メッセージングサービスに未送信のイベントを登録します。
そうすることで確実に集約の変更が反映されたあとで別処理が実行されるようになります。

メッセージの送信タイミングと方法

ここで一つ考えなければならないことがあります。
DBに保存されたイベントはいつ、どのように送信すればよいでしょう?

トランザクションが確立し、イベントが確実に保存された後で、未送信のイベントをDBから読み取りメッセージングサービスに登録する処理を実行させる必要があります。

方法はいくつか考えられます。

1つはバッチなどでDBをポーリングし、定期的に未送信のイベントを検知して送信する方法です。この方法はシンプルでわかりやすいですが、それなりの頻度でバッチを回す必要があるため、コストが高く付きます。

他にはDBのトランザクションログをテーリングさせる方法もあります(Transaction Log Tailing)。こちらはDBの仕組みを使って独自に検知する仕組みを構築する必要があるため、実装が大変です。

特にGAEでアプリケーションを構築している場合はポーリングはあまりうれしくありません。
イベントの送信自体は出来る限りリアルタイムに行っていきたいものです。そのため、高頻度でバッチを実行することになり、GAEのインスタンスが起動し続けることになってしまいます。これではGAEの旨味が消えてしまいます。出来ればイベントが発生したときにだけ動かしたいところです。

そこで今回はイベントをDBに保存しつつ、同時にメッセージングサービスへの送信も行ってしまい、無駄にインスタンスが起動し続けるのを回避します。

イベントリレーによる信頼性の確保

しかしながら、保存と送信を同時にやってしまったら、結局コミットに失敗した時にメッセージの信頼性がなくなってしまいます。それでは意味がありません。

そこで、一度プロクシをはさみます。
イベントをDBに保存したら、直接本来のイベントの送信先であるチャンネル(トピック)に送信するのではなく、一旦代理の送信先にメッセージを送ります。

そのプロクシがDBをチェックし、イベントが確立されていることを確認した後、メッセージングサービスに本来のチャンネルに送信させ、イベントを送信済みにマークします。もしイベントがDBに保存されていない場合はトランザクションが完了していないと判断しリトライをします。もし規定回数リトライしてもイベントを確認出来ない場合はコミットに失敗しイベントがロストしたと判断し、メッセージの送信を破棄します。

このようにすることで、ポーリングをすることなく、変更をDBに確実に反映した上でメッセージを送信することが出来ます。(※ トランザクションが完了するまでに時間を要する場合は、十分な回数と時間でリトライを設ける必要があります。)

メッセージの冪等性
上記の方法をとったとしても、"送信済みのマークをする"というのはDBに変更を加えるということに他ならず、結局メッセージを送信した後にコミットが失敗する可能性を排除出来ません。
その場合は多重にメッセージが送信されてしまう可能性があります。そのため、この手法はイベントの確立を保証するものではありますが、メッセージの多重送信に対応するためには受信側で冪等性を担保する必要があります。

トランザクショナルメッセージングの実装

上記の内容を踏まえて設計をしていきます。
アプリケーション層ではイベントのDBへの保存とメッセージングサービスへのメッセージの登録をします。

イベントはDBに保存する際にIDを発行するようにします。今回はDBでのシーケンシャルなIDにしましたが、発生日時などでも良いかもしれません。
イベントの追加と取り出しのメソッドを定義します。

public class EventQueue {
    public void push(DomainEvent event) { ・・・ }
    public <T extends DomainEvent> Optional<T> pop(long id) { ・・・ }
}

上述したように、イベントの保存と同時にプロクシへの送信を行う必要があります。
今回はこのEventQueue#pushでそれらを実行します。

DBへの保存とメッセージの送信は別な関心事なので、それぞれを担うクラスを定義しましょう。アプリケーション層のクラスから直接DBを操作したり、Pub/Subのクライアントライブラリを操作してしまってはアプリケーション層に技術的な関心事を組み込んでしまうことになります。そういった直接的な処理はインフラストラクチャ層に実装することにし、アプリケーション層ではそのためのインターフェース 群を用意します。

// イベントのためのDBにアクセスするためのインターフェース
public interface EventStore {
    <T extends DomainEvent> StoredEvent<T> save(StoredEvent<T> event);
    <T extends DomainEvent> Optional<StoredEvent<T>> fetchById(long id);
    void delete(StoredEvent<?> event);
}

// メッセージングサービスへアクセスするためのインターフェース
public interface EventDispatcher {
    void dispatchToProxy(long eventId);
    void dispatch(long eventId, DomainEvent event);
}

保存したイベントのIDを持つ必要があるので、イベントを保存するためのデータクラスとしてStoredEventクラスを定義しています。
EventDispatcherではプロクシへの送信用と本来のチャンネルへ送信する用のメソッドを定義しています。

これらの実装クラスをインフラストラクチャ層で提供します。


package dev.fumin.sample.eventdriven.infrastructure.event;

public class PubsubEventDispatcher implements EventDispatcher {
    ・・・
    @Override
    public void dispatchToProxy(long eventId) {
        // プロクシ用のPub/Subのトピックへpublishする
        ・・・
    }

    @Override
    public void dispatch(long eventId, DomainEvent event) {
        // イベントに対応するPub/Subのトピックへpublishする
        ・・・
    }
}
public class MySqlEventStore implements EventStore {
    ・・・
    @Override
    public <T extends DomainEvent> StoredEvent<T> save(StoredEvent<T> event) {
         // DBへ保存
         ・・・
    }

    @Override
    public <T extends DomainEvent> Optional<StoredEvent<T>> fetchById(long id) {
        // DBから取り出し
        ・・・
    }

    @Override
    public void delete(StoredEvent<?> event) {
        // 削除処理
        ・・・
    }
}

これらのクラスを使ってキューで追加と取り出しを実装していきます。
ここでは省略しますが、イベントのデータはjsonにシリアライズして保存しています。

public class EventQueue {

    private final EventStore store;
    private final EventDispatcher dispatcher;

    @Inject
    public EventQueue(EventStore store, EventDispatcher dispatcher) {
        this.store = store;
        this.dispatcher = dispatcher;
    }

    public void push(DomainEvent event) {
        StoredEvent<DomainEvent> storedEvent = new StoredEvent<>(event);
        // DBへ保存
        storedEvent = store.save(storedEvent);
        // 保存したイベントのIDをプロクシへ送信
        dispatcher.dispatchToProxy(storedEvent.getId()
                .orElseThrow(() -> new IllegalStateException("event id is null.")));
    }

    public <T extends DomainEvent> Optional<T> pop(long id) {
        return store.<T>fetchById(id).map(stored -> {
            // DBから削除することで、送信済みとして扱う。
            store.delete(stored);
            return stored.getEvent();
        });
    }

}

今回は送信済みのマークをする代わりに、DBから削除するようにしています。
データを残したい場合はフラグを立てるような実装にするとよいでしょう。
(※ Pub/Subなどは重複してpushしてくる可能性があるため、メッセージの送信が重複しないように送信済みとして扱うための仕組みが必要です。)

イベントの保存

では、上記の仕組みを利用して、アプリケーション層でのイベントの保存処理を実装してみます。
UseCaseクラスでDomainEventPublisherにサブスクライバを登録し、発生したイベントを保存します。

public class CreateNoteUseCase {

    private final NoteService noteService;
    private final DomainEventPublisher eventPublisher;
    private final EventQueue eventQueue;

    @Inject
    public CreateNoteUseCase(
        NoteService noteService,
        DomainEventPublisher eventPublisher,
        EventQueue eventQueue
    ) {
        this.noteService = noteService;
        this.eventPublisher = eventPublisher;
        this.eventQueue = eventQueue;
    }

    // @Transactionalアノテーションがついているメソッドは自動的にDBのトランザクションが開始します
    @Transactional
    public String handle(CreateNoteCommand command) {
        
        // 購読するイベントを指定し、指定したイベントが発生したらキューに入れる
        eventPublisher.subscribe(NoteCreated.class, eventQueue::push);
    
        ProjectId projectId = new ProjectId(command.getProjectId());
        ColumnId columnId = new ColumnId(command.getColumnId());
        Note note = noteService.createNote(projectId, columnId, command.getDescription());
        return note.getId().getValue();
    }
}

前述した通り、イベントは集約の保存と同一のトランザクション内で保存される必要があります。
UseCaseクラス、つまりアプリケーション層でトランザクションを制御することで同じトランザクション内で処理されるようにします。

@Transactionalアノテーションが付与されたメソッドが呼び出されると自動的にDBのトランザクションを開始するインターセプターが実装されています。これをGuiceのAOPサポートの機能を使ってバインドします。今回はDomaというライブラリを使ってDBにアクセスするようにしています。

public class DomaTransactionInterceptor implements MethodInterceptor {
    ・・・
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        TransactionManager tm = config.getTransactionManager();
        try {
            return tm.required(() -> {
                try {
                    return invocation.proceed();
                } catch (Throwable t) {
                    throw new RuntimeException(t);
                }
            });
        } catch (Exception e) {
            throw e.getCause();
        }
    }
}

public class InfrastructureModule extends ServletModule {

    @Override
    protected void configureServlets() {
        super.configureServlets();
        ・・・
        bindInterceptor(Matchers.any(), Matchers.annotatedWith(Transactional.class),
                new DomaTransactionInterceptor(DomaConfig.getInstance()));
    }
}

ボイラープレートの排除

上記のコードは非同期処理をするたびに記述しなければならないボイラープレートになります。ボイラープレートは退屈で面倒なので排除したいところです。

毎度必要な依存関係を注入してサブスクライブするのは面倒なので自動化してしまいましょう。先程のTransactionalに用いたAOP的なアプローチを使って簡略化出来るようにしてみます。

まずEnqueueEventアノテーションを用意します。このアノテーションでは購読するイベントを指定出来るようにします。

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface EnqueueEvent {
    Class<? extends DomainEvent>[] value();
}

そして、このアノテーションが付与されているメソッドが呼び出されたら自動的に指定されたイベントを購読して保存するようなインターセプターを実装します。

public class EnqueueEventInterceptor implements MethodInterceptor {

    @Inject
    private EventQueue eventQueue;

    @Inject
    private DomainEventPublisher publisher;

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // アノテーションから購読するイベントを取得
        EnqueueEvent annotation = invocation.getMethod().getAnnotation(EnqueueEvent.class);
        EventCapture captor = new EventCapture();
        // 指定されたイベントの分だけサブスクライバを登録しイベントの購読を開始する
        Arrays.stream(annotation.value())
                .forEach(eventClass -> publisher.subscribe(eventClass, captor::handle));
        Object o = invocation.proceed();
        // 受け取ったイベントをすべてキューに入れる
        captor.pushAll(eventQueue);
        return o;
    }

    private static class EventCapture implements DomainEventSubscriber<DomainEvent> {

        private List<DomainEvent> events = new ArrayList<>();

        @Override
        public void handle(DomainEvent event) {
            events.add(event);
        }

        public void pushAll(EventQueue queue) {
            events.forEach(queue::push);
        }

    }

}

このようにすると、UseCaseでのイベント購読&保存が以下のように書けるようになります。

public class CreateNoteUseCase {

    private final NoteService noteService;

    @Inject
    public CreateNoteUseCase(NoteService noteService) {
        this.noteService = noteService;
    }

    @Transactional
    @EnqueueEvent(NoteCreated.class)
    public String handle(CreateNoteCommand command) {
        ProjectId projectId = new ProjectId(command.getProjectId());
        ColumnId columnId = new ColumnId(command.getColumnId());
        Note note = noteService.createNote(projectId, columnId, command.getDescription());
        return note.getId().getValue();
    }

}

イベントの購読と保存が宣言的になり可読性が向上します。

イベントリレーの実装

最後にイベントリレーを実装していきます。
このクラスの実装はシンプルです。プロクシにpushされたイベントのIDを使ってイベントキューからイベントを取り出し、存在すればメッセージングサービスに登録し、存在しなければエラーを出してリトライさせます。これにより、イベントが確立された後、メッセージが送信されることになります。

public class EventRelay {
    ・・・
    @Transactional
    public void relay(long eventId) {
        // キューからイベントを取り出す。DBから読み取れない場合は、未コミット又はロストしたと判断する
        DomainEvent event = queue.pop(eventId)
                .orElseThrow(() -> new IllegalStateException("Event is not yet ensured or lost."));
        // 存在する場合は本来のチャネルに向けて送信する
        dispatcher.dispatch(eventId, event);
    }
}

プレゼンテーション層(Presentation Layer)

プレゼンテーション層ではプロクシのためのメッセージの受け口を提供します。
また、イベントが確立されて本来のメッセージとして送信された場合の受け口も定義します。

メッセージの冪等性

Pub/SubやCloudTasksはat least oneを保証するサービスです。
これは一度は必ず送ってくれますが、一度だけ送ってくるとは限らないということです。
同じメッセージが複数回送信される可能性があるため、一度だけ処理するようにしなければなりません。
今回はメッセージが処理済みであることをDBに保存し、重複したメッセージが来た際にはメッセージを破棄するようにします。

まず消費したメッセージを保存するテーブルを作成します。

カラム名 概要
id イベントID 数値
receiver 受信者 文字列
received_at 受信日時 日時

このテーブルではイベントのIDと受信者で複合キーにしています。
同じメッセージを複数のエンドポイントが受け取る可能性があるためです。
また、IDはPub/SubのIDではなく、システムで発行しているイベントのIDを保存します。
プロクシがメッセージを送信したあと、DBへのコミットに失敗する可能性があります。そうするとプロクシのリトライ機構が再度メッセージをメッセージングサービスへ登録してしまい、Pub/Subでは新たに同じ内容に別のIDを割り当ててメッセージを送信します。
そのため、Pub/SubのメッセージIDをキーにしてしまうと重複判定ができず、複数回同じ内容が実行されてしまうことになります。

そこで、Pub/SubのメッセージにはイベントのIDを入れておき、イベントのIDをキーにすることでプロクシからの重複送信に対応しています。

処理済みのメッセージはどんどん溜まっていくので長く運用していると容量を圧迫する可能性があります。受信日時があれば、十分に古くなったものからバッチなどで削除したりバックアップをとったりできます。

テーブルを定義したらデータクラスとDBのアクセス用のインターフェースを用意します。

@Value
public class ConsumedEvent {
    long eventId;
    String receiver;
    Date receivedAt;
}

public interface ConsumedEventStore {
    boolean exists(long eventId, String receiver);
    void insert(ConsumedEvent event);
}

毎回メッセージのレシーバーでチェック処理を書くのは面倒なので、基底クラスを用意し一律で処理するようにします。

@Value
public class Event {

    long eventId;
    Message message;
    String subscription;

    public static class Message {

        @Getter
        private final String messageId;

        @Getter
        private final String data;

        @Getter
        private final Map<String, String> attributes = new HashMap<>();

        public Message(String messageId, String data) {
            this.messageId = messageId;
            this.data = data;
        }
    }
}

public abstract class EventReceiver extends HttpServlet {

    private final JsonParser parser = new JsonParser();

    @Inject
    private ConsumedEventStore consumedEventStore;

    @Override
    @Transactional
    public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        // リクエストからJSON形式のボディを取得し、Eventクラスにパースする
        String body = req.getReader().lines().collect(Collectors.joining());
        JsonObject json = parser.parse(body).getAsJsonObject();
        JsonObject messageObj = json.get("message").getAsJsonObject();
        JsonObject attributesObj = messageObj.get("attributes").getAsJsonObject();

        Event.Message message = new Event.Message(
                messageObj.get("messageId").getAsString(),
                new String(Base64.getDecoder().decode(messageObj.get("data").getAsString()))
        );
        attributesObj.entrySet()
                .forEach(entry -> message.getAttributes()
                        .put(entry.getKey(), entry.getValue().getAsString()));

        long eventId = Long.parseLong(message.getAttributes().get("eventId"));
        Event event = new Event(eventId, message, json.get("subscription").getAsString());

        // 消費済みのイベントは無視する
        if (!consumedEventStore.exists(eventId, event.getSubscription())) {
            // Pub/Subから同一のメッセージが重複して送信される可能性があるので
            // 冪等性をもたせるために消費したイベントを保存する
            ConsumedEvent consumedEvent =
                    new ConsumedEvent(eventId, event.getSubscription(), new Date());
            consumedEventStore.insert(consumedEvent);
            onReceive(event);
        }

        resp.setStatus(HttpServletResponse.SC_OK);
    }

    // 未処理の場合、サブクラスに処理を委譲する
    protected abstract void onReceive(Event event);

}

これで各レシーバーはメッセージが未処理の場合のみ呼び出されるようになります。

メッセージの順序
メッセージングサービスは必ずしも順序通りにメッセージを送信するとは限りません。基本的には先に投げられたものから送信されますが、それはメッセージがサーバーで順々に消費されるというわけではありません。処理によっては順序が問題になることもあるでしょう。上記の対策は重複には対応できますが、順序の問題には対応できません。順序が重要な場合は別の対策を考える必要があります。

EventProxy

イベントの確立を保証するためにまずはプロクシを通す必要があります。
EventProxyの役目はプロクシのエンドポイントの提供と、pushされたメッセージからイベントIDを拾い出し、EventRelayに渡してあげることです。
先程のEventReceiverのサブクラスとして実装し、イベントのIDを取得します。

@Singleton
@WebServlet(value = "/event/proxy")
public class EventProxy extends EventReceiver {

    @Inject
    private EventRelay eventRelay;

    @Override
    protected void onReceive(Event event) {
        // イベントリレーに受信したイベントのIDを渡し、本来のチャンネルに送信させる。
        eventRelay.relay(event.getEventId());
    }

}

イベントレシーバー

あとはプロクシから送信されたメッセージを受信し、別処理を駆動するエンドポイントを実装すれば完了です。
EventProxy同様EventReceiverを継承し、必要なパラメータをUseCaseクラスに渡していきます。今回はイベント名 + Receiverとしましたが、同じイベントで複数の非同期処理を駆動する場合は、それとわかるような命名規約にすると良いと思います。

@Singleton
@WebServlet(value = "/event/receiver/note-created")
public class NoteCreatedReceiver extends EventReceiver {

    private final JsonParser jsonParser = new JsonParser();

    @Inject
    private CopyNoteToTaskUseCase useCase;

    @Override
    protected void onReceive(Event event) {
        // 受信したイベント情報から必要なデータを取得する
        JsonObject root = jsonParser.parse(event.getMessage().getData()).getAsJsonObject();
        String projectId = root.get("projectId").getAsJsonObject().get("value").getAsString();
        String noteId = root.get("noteId").getAsJsonObject().get("value").getAsString();
        // UseCaseクラスを呼び出し、後続の処理を開始する。
        useCase.handle(projectId, noteId);
    }

}

まとめ

以上がドメインイベントを使ったイベント駆動のアーキテクチャになります。
非同期での処理は様々なことを考慮する必要があり、複雑な実装が必要になりますが、このようなアーキテクチャにすることで処理が増えても一つ一つの処理の塊は小さく保つことが出来ます。
また、ドメインイベントを使うことで、ただただシステマチックにメッセージングを行うのではなく、ドメイン知識を曇らせることなくイベント駆動を実現できます。

上記の説明では要点のみをピックアップしているため、より詳細を追いたい方はソースコードを参照してください。

参考文献

Discussion