iTranslated by AI

The content below is an AI-generated translation. This is an experimental feature, and may contain errors. View original article
🍔

Implementing Event-Driven Architecture with Domain Events

に公開

Prologue

Ballooning Requirements and Complicating APIs

As you develop a system, requirements often grow, and it may become necessary to perform various processes within a single API call.
For example, when an API to register a schedule is executed, you might need to save data to the DB, create an event in Google Calendar, register a task to issue a notification just before the start time, and register another task for a notification just before the end time...

When requirements for a single API increase like this, the program becomes complex, and error handling for each process increases, making control difficult.
Furthermore, when depending on external services such as Google Calendar, communication costs are incurred, slowing down responses. If an external service is down, the API will also fail, reducing system availability.

The Necessity of Asynchronous Processing

How can we keep processing simple without compromising availability?
One approach is to split the processing and execute it asynchronously.

For instance, in the example above, is the Google Calendar event registration something that must absolutely be executed during the API call (must be completed before returning a response)?
What if the schedule registration API only performs the DB registration, returns the response, and then executes the Google Calendar integration asynchronously?

The communication time with Google Calendar is removed from the API execution time, and the API succeeds regardless of Google Calendar's status, improving availability. Even if Google Calendar is down, as long as retries continue, the event will be created once it recovers, eventually achieving consistency. Furthermore, the time taken for retries does not affect the API execution. In other words, UX is improved because the user is not kept waiting and the operation succeeds.

In this way, by executing processes asynchronously and aiming for eventual consistency, you can break down large chunks of processing while increasing availability.
Of course, it depends on the requirements, but a certain amount of delay is often acceptable.

Introduction of Event-Driven Architecture via Messaging Services

This style, where executing an API etc. triggers another process, is called "event-driven."
So, specifically, how should we "execute asynchronously"?

In Java, you could start a thread and trigger the corresponding event handler.
However, when building an app on GAE, there are restrictions on threads, and since you would be using the same instance's resources, heavy processing could monopolize resources and affect other requests.

Therefore, we use Pub/Sub (or Cloud Tasks, etc.) to process events as new requests. This allows us to address these issues.

And onto Microservices

In addition to avoiding GAE limitations, there is another purpose for using messaging services: the transition to microservices.

As a system grows, the codebase becomes larger, and the number of development members increases. The code gains complexity, and drawbacks emerge from multiple teams and members operating on a single, massive codebase.

Microservices are crucial for the continuous development of large systems by multiple teams and members. By separating services, the codebase is divided, enabling development across multiple teams. Furthermore, you can select server specifications and configurations on a service-by-service basis, allowing each to scale individually. Even if a failure occurs in one service, it becomes possible to continue providing parts of the functionality through other services.

In this way, microservices offer various benefits for developing large systems. However, microservices naturally involve interaction between services. This carries the same types of problems as the Google Calendar integration mentioned earlier. APIs that depend on other services will face issues like communication costs and the inability to provide functionality due to failures.

To avoid such situations, asynchronous integration via messaging becomes essential in microservices. Therefore, by implementing asynchronous processing through messaging in advance, the possibility of transitioning smoothly to microservices increases.

However, it's not a matter of simply doing it for everything. In early stages or small projects, employing various tactics beforehand while considering microservices isn't necessarily the right answer. In the spirit of YAGNI (You Ain't Gonna Need It), spending resources on things that aren't necessary can lead to over-costing. Let's handle this while consulting on a per-project basis.

Representing and Implementing Events Using Domain Events

So, what kind of implementation should we use to actually make it event-driven? If you use client libraries for Pub/Sub or Cloud Tasks within the processing when executing an API, you can execute processing asynchronously.

However, embedding technical concerns such as Pub/Sub directly into business logic obscures the domain knowledge expressed by the code and reduces the replaceability of the code for achieving asynchronous processing.

Therefore, we represent events using "Domain Events," one of the tactical design patterns in DDD (Domain Driven Design). A Domain Event is a model that represents something that happened within the domain. For example, something like "a task has been registered" is an event. The technique of naming this something like TaskRegistered and treating it as a class is the Domain Event pattern.

When a REST API like POST /tasks is executed, the task is registered in the DB, and a TaskRegistered event is issued. A subscriber, such as in the application layer, detects the occurrence of that event and registers a message using a messaging service. By doing this, the business logic expresses the domain knowledge that an event has occurred while separating technical concerns from the domain layer. Furthermore, by designing according to the DIP (Dependency Inversion Principle) and providing an interface for the messaging service, the method of implementing messaging becomes replaceable.

Name of the Domain Event
In DDD, "language" holds significant meaning. Words that appear in actual conversations or documents are treated directly as models. By using a unified vocabulary in both code and conversation, the link between code and domain knowledge becomes stronger. This collection of unified vocabulary (glossary) is called "Ubiquitous Language" in DDD.
Domain Events must also be named according to the Ubiquitous Language. In the example above, it was "a task was registered," but if expressions like "a task was scheduled" are used in conversation, use TaskScheduled etc., aligning the names with the language.

Building an Event-Driven Mechanism

Configuration

In this section, we will build an event-driven mechanism in GAE/Java.
First, the following diagram shows the overall implementation.

* The implemented code is published on GitHub

Technologies Used

Item Name Technology Used
Database Cloud SQL for MySQL
Messaging System Cloud Pub/Sub
Language Java 11
Execution Environment GAE/Java 11

Since we are representing events using domain events this time, we will adopt a DDD-like design.
Additionally, the system is built by separating layers based on Clean Architecture overall.

The layers are divided into the following four:

  • Domain Layer
    • The layer that defines domain models and places business logic. Domain events are defined and occur here.
  • Application Layer
    • The layer that manages transactions and executes use cases. It observes domain events occurring in the domain layer and handles the storage and delivery of events.
  • Presentation Layer
    • Defines API endpoints and entry points for Push requests from messaging services.
  • Infrastructure Layer
    • The layer describing technical concerns. It provides implementations for accessing the database and messaging services.

Concrete explanations will be provided in the following chapters.

Scenario

To build the event-driven mechanism, let's consider a system like GitHub Projects as an example. In GitHub Projects, you can create a Project, multiple Columns within that Project, and further, multiple Notes within those Columns.
In this sample, we will implement a simplified system based on GitHub Projects. We will also add the following original rules:

  • A Column cannot be added unless the Project is active
  • A Note cannot be added unless the Project is active
  • A Note cannot be added unless the Column is active
  • When a Note is created, copy it to Google Tasks as a task

Models

Domain Layer

In the domain layer, we define the core domain events. We also provide a mechanism in this layer to deliver domain events to the application layer.

DomainEvent

First, let's define a domain event. In the scenario above, there was the following requirement:

When a Note is created, copy it to Google Tasks as a task

These "when something was done" or "when something happened" occurrences are events. We define these as domain models. Here, we define it as a model named NoteCreated.
First, let's define a base interface so we can handle domain events abstractly.

public interface DomainEvent {
}

Although we used an interface here, you could use an abstract class if there are attributes you want domain events to share in common. Now, let's define NoteCreated.

public interface NoteEvent extends DomainEvent {
}

@Value
public class NoteCreated implements NoteEvent {

    ProjectId projectId;
    ColumnId columnId;
    NoteId noteId;

}

In this example, we define a marker interface to indicate events related to Notes. While this implementation doesn't have much meaning right now, it could be used to provide common properties as an abstract class or to perform abstracted processing.

NoteCreated is designed to hold the Project ID, Column ID, and Note ID of the Note that was the source of the event.

What should be included in an event?

In this example, a Note instance is needed to create a task. With the information above, you can restore the Note instance from the DB based on those IDs, but that incurs overhead. If the event contains all the Note's information, the consumer can retrieve the information without overhead. This technique of packing information needed by the consumer into the event is called Event Enrichment. However, this has its own disadvantages, such as making event maintenance difficult when the model changes. Design what should be included in an event by considering the requirements and what works best for the project.

DomainEventPublisher, DomainEventSubscriber

The domain layer is where business logic, such as operations and business rules, is expressed and implemented. While the expression of domain knowledge—such as "if XX is done, YY happens"—occurs here, it is the responsibility of the application layer to prepare for triggering asynchronous processing by passing domain events to a messaging service.

Therefore, we need a way to notify the application layer of domain events that occurred in the domain layer. However, directly calling application layer classes from the domain layer would result in a dependency from the internal to the external from a Clean Architecture perspective, so we want to use an implementation based on the Dependency Inversion Principle (DIP).

As such, we provide an interface in the domain layer called DomainEventSubscriber for subscribing to and receiving domain events, and provide its implementation in the application layer. This is registered with the DomainEventPublisher, which issues the domain events to perform notifications.

サブスクライバは以下のように定義します。
イベントを受け取るためのメソッドが一つ定義されているだけのシンプルなインターフェースです。

public interface DomainEventSubscriber<T extends DomainEvent> {
    void handle(T event);
}

次はパブリッシャーを定義します。
パブリッシャーにはサブスクライバと購読するイベントを登録するためのsubscribe()メソッドと、ドメインイベントを発行するpublish()メソッドを定義します。

public class DomainEventPublisher {

    public <T extends DomainEvent> void subscribe(
            Class<T> subscribeTo,
            DomainEventSubscriber<T> subscriber
    ) {
        ・・・
    }

    public void publish(DomainEvent event) {
        ・・・
    }
    
}

時には複数箇所でイベントが発行される場合もあるでしょうし、イベントに対するサブスクライバも一つとは限りません。任意のイベントに複数のサブスクライバを登録出来るようにサブスクライバをドメインイベントの型をキーにリストで持てるようにします。
スレッド単位(リクエスト単位)でサブスクライバを持つようにThreadLocalに保持するようにします。(DIコンテナを利用してリクエストスコープなシングルトンインスタンスを作るのも一つの手だと思います。)

DomainEventPublisher.java
private static final ThreadLocal<Map<Class<? extends DomainEvent>, List<DomainEventSubscriber<?>>>>
            SUBSCRIBERS = ThreadLocal.withInitial(HashMap::new);

public <T extends DomainEvent> void subscribe(
        Class<T> subscribeTo,
        DomainEventSubscriber<T> subscriber
) {
    // Register subscriber using the domain event type as a key
    List<DomainEventSubscriber<?>> domainEventSubscribers = SUBSCRIBERS.get()
           .computeIfAbsent(subscribeTo, key -> new ArrayList<>());
    domainEventSubscribers.add(subscriber);
}

@SuppressWarnings("unchecked")
public void publish(DomainEvent event) {
    // Notify subscribers who are watching for this event based on the issued domain event type.
    Class<? extends DomainEvent> key = event.getClass();
    List<DomainEventSubscriber<?>> subscribers = SUBSCRIBERS.get().get(key);

    if (subscribers == null) {
        return;
    }

    subscribers.forEach(subscriber -> ((DomainEventSubscriber<DomainEvent>) subscriber)
        .handle(event));
}

Subscribers are registered with the DomainEventPublisher from the application layer, for example, from a UseCase class, as follows:

publisher.subscribe(NoteCreated.class, event -> {
    // Called when a NoteCreated event occurs
});

Callbacks are invoked on the same thread and are basically processed within the same transaction.
In this case, we perform processing to execute asynchronous tasks within this callback, but you can naturally use the event as a trigger for other processes as well. However, in DDD, modifying multiple aggregates within the same transaction is considered an anti-pattern. If you want to make changes to another model based on an event, it is recommended to treat it as an asynchronous process and aim for eventual consistency, as we are implementing here.

Why is it an anti-pattern to modify multiple aggregates?
What happens if changes are made to multiple aggregates within the same transaction? For example, if you execute a behavior that triggers a change in Aggregate B based on a change event in Aggregate A, both Aggregate A and Aggregate B are persisted within the same transaction. At this point, if a change to Aggregate B from another request occurs simultaneously, the save for Aggregate B will fail due to concurrency control. This would also cause the change to Aggregate A, processed within the same transaction, to be discarded, resulting in a failed user request. Therefore, it degrades the UX.
A transaction, and thus an aggregate, is a unit of consistency. If Aggregate A and Aggregate B must have strong consistency, rethink the domain model so they become a single aggregate. (However, there are exceptions.)

Usually, a web server pools and reuses threads.
Therefore, if subscribers are left registered in the ThreadLocal, duplicate event handling will occur during other requests.

To avoid this, we provide a method in the publisher to release subscribers and reset them at the start of a request, etc.
Since we are implementing using Servlets this time, we will handle this with a Filter.

public class DomainEventPublisher {
    
    ・・・

    public void reset() {
        SUBSCRIBERS.get().clear();
    }
    
}

@Singleton
@WebFilter(urlPatterns = "/*")
public class DomainEventResetFilter implements Filter {

    @Inject
    private DomainEventPublisher publisher;
    
    @Override
    public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {
        publisher.reset();
        chain.doFilter(request, response);
    }
    ・・・
}

Who Issues Domain Events?

We have defined the DomainEventPublisher and are now able to notify subscribers of domain events.
But where should the code that issues domain events to the publisher be written?

In IDDD (Implementing Domain-Driven Design), code that creates events within the domain model and issues them to the publisher is introduced. Domain events are generated by the behavior of the domain model. Thus, the IDDD implementation seems natural.

However, to issue events from within the domain model, the domain model must hold a publisher. Ideally, we want the domain model to hold nothing other than the attributes representing the model, and it is also cumbersome to prepare a publisher instance just to test the domain model.

The behavior of a domain model can be broadly categorized into Commands and Queries. A Command is something with side effects that applies changes, while a Query is something that receives the result of a process without side effects (the CQRS (Command Query Responsibility Segregation) pattern). A Command only changes the internal state and does not return a value.

Basically, domain events represent changes to an aggregate (i.e., the result of a Command).
In this implementation, we will return a domain event as the return value of a Command. By doing this, we can implement it so that domain events are generated within the domain model while moving the responsibility for calling the publisher outside of the domain model.

Even if we return domain events as return values, the issue of who actually issues them remains unresolved.
While there are various implementation methods, this time we will give the domain service the responsibility of issuing them. Injecting and using the publisher in a domain service can be written very naturally.

public class NoteService {

    private final DomainEventPublisher eventPublisher;
    ・・・
    
    @Inject
    public NoteService(
            DomainEventPublisher eventPublisher,
            ・・・
    ) {
        this.eventPublisher = eventPublisher;
        ・・・
    }

    public Note createNote(ProjectId projectId, ColumnId columnId, String description) {
        ・・・
        NoteCreated event = ・・・
        eventPublisher.publish(event);
        ・・・
    }

}

However, overusing domain services is considered an anti-pattern because it can cause "Anemic Domain Model" syndrome.
Also, it is tedious to prepare code just for receiving a domain event and issuing it.
In this case, since the ones subscribing to domain events are limited to UseCase classes that execute domain models or domain services, we will use domain services for event issuance only when that pattern is better; otherwise, we will simply have the UseCase class receive the event directly from the domain model as a return value.

Receiving Events and Results

We decided to make domain events the return value of Command-style methods, but what should we do for events like NoteCreated?
An instance creation via new cannot return an event. Similarly, even if you provide a factory method, you cannot make the event the return value because you need to receive the created instance.

Basically, we want to design using CQRS so that operations that modify internal state do not have return values, but sometimes you will want to generate an event along with a result.
In particular, for "newly created" events, as mentioned earlier, it becomes necessary to receive both the instance and the event.
Therefore, we will create a container so that the event and the processing result can be returned as a tuple.

@Value
public class DomainResult<R, E extends DomainEvent> {
    R result;
    E event;
}

Since we are not assuming the occurrence of multiple events in this example, we have defined only one event, but if you want to generate multiple, you could define it with a List, etc.

As mentioned before, new cannot return an event, so we provide a factory method in Note and specify DomainResult as its return value.

public final class Note extends ConcurrencyEntity {
    
    ・・・

    public static DomainResult<Note, NoteEvent> create(
            NoteId id,
            ProjectId projectId,
            ColumnId columnId,
            String description
    ) {
        Note note = new Note(id, projectId, columnId, description);
        NoteCreated event = new NoteCreated(projectId, columnId, id);
        return new DomainResult<>(note, event);
    }

    public Note(NoteId id, ProjectId projectId, ColumnId columnId, String description) {
        ・・・
    }
    ・・・
}

Application Layer

In the application layer, we build the mechanism to observe domain events issued from the domain layer and register them with the messaging service.

As seen in the configuration diagram, the work of this layer is somewhat complex. To ensure that asynchronous processing is executed reliably, the design must consider transactionality and message persistence.

Let's look into the details.

Transactional Messaging

When a domain event occurs, we need to register the event as a message in the messaging service to trigger asynchronous processing.

However, is it okay to convert the domain event into a message and register it with the messaging service immediately?

Once registration with the messaging service succeeds, the messaging service immediately notifies the subscribers of the message. If a failure occurs in any part of the application processing after the message is registered, and the transaction is rolled back, what happens? The changes to the aggregate that triggered the event are undone, and it's as if the event never happened. Yet, the message has already been sent, and the receiving side might react to an event that was cancelled, leading to malfunctions.

How can we avoid such problems and achieve reliable message transmission?

Here, we use a technique called Transactional Messaging. We save the event to the DB within the same transaction as the persistence of changes to the aggregate. After the DB transaction is successfully committed, we register the unsent events with the messaging service. This ensures that the other processing is executed only after the changes to the aggregate have been reliably reflected.

Message Timing and Methods

One thing we must consider here: when and how should events saved in the DB be sent?

We need to execute a process that reads unsent events from the DB and registers them with the messaging service after the transaction is established and the event is reliably saved.

Several methods can be considered.

One is to poll the DB with a batch process or similar to detect and send unsent events periodically. This method is simple and easy to understand, but it incurs high costs because the batch must be run at a significant frequency.

Another is to tail the DB transaction logs (Transaction Log Tailing). This is difficult to implement because it requires building a unique detection mechanism using DB internals.

Polling is particularly unwelcome when building applications on GAE. We want to send events in as close to real-time as possible. To do that, batches would need to be executed at high frequency, causing GAE instances to remain active continuously. This negates the benefits of GAE. Ideally, we want the system to run only when an event occurs.

Therefore, in this case, we save the event to the DB and simultaneously attempt to send it to the messaging service, thereby avoiding the unnecessary continuous running of instances.

Ensuring Reliability with Event Relay

However, if we save and send simultaneously, the reliability of the message is lost if the commit fails. That would be meaningless.

So, we insert a proxy in between. When an event is saved to the DB, it is not sent directly to the final channel (topic) for the event; instead, the message is sent to an interim proxy destination.

That proxy checks the DB, and after confirming that the event has been established (persisted), it has the messaging service send it to the original channel and marks the event as sent. If the event is not found in the DB, it judges that the transaction is not yet complete and retries. If it cannot confirm the event even after a specified number of retries, it determines that the commit failed and the event was lost, and it discards the message transmission.

By doing this, we can send messages after ensuring changes are reflected in the DB without polling. (*If the transaction takes time to complete, you need to provide retries with sufficient frequency and duration.)

Message Idempotency
Even with this method, "marking as sent" is nothing more than making a change to the DB, so we cannot completely eliminate the possibility that the commit fails after the message is sent. In that case, the message might be sent multiple times. Therefore, while this method guarantees the establishment of the event, the receiving side must ensure idempotency to handle duplicate message delivery.

Implementing Transactional Messaging

Based on the points discussed above, we will proceed with the design. In the application layer, we will save events to the DB and register messages with the messaging service.

When saving an event to the DB, an ID is issued. In this example, we used a sequential ID from the DB, but something like the occurrence timestamp might also work. We define methods for adding and retrieving events.

public class EventQueue {
    public void push(DomainEvent event) { ・・・ }
    public <T extends DomainEvent> Optional<T> pop(long id) { ・・・ }
}

As mentioned above, it is necessary to send the message to the proxy at the same time as saving the event. We execute these actions within this EventQueue#push method.

Since saving to the DB and sending messages are separate concerns, let's define classes responsible for each. Directly operating the DB or Pub/Sub client libraries from application layer classes would embed technical concerns into the application layer. We will implement such direct processing in the infrastructure layer and prepare interfaces for them in the application layer.

// Interface for accessing the database for events
public interface EventStore {
    <T extends DomainEvent> StoredEvent<T> save(StoredEvent<T> event);
    <T extends DomainEvent> Optional<StoredEvent<T>> fetchById(long id);
    void delete(StoredEvent<?> event);
}

// Interface for accessing the messaging service
public interface EventDispatcher {
    void dispatchToProxy(long eventId);
    void dispatch(long eventId, DomainEvent event);
}

We defined the StoredEvent class as a data class for saving events, as we need to hold the ID of the saved event. In EventDispatcher, we defined methods for sending to the proxy and for sending to the original channel.

We provide implementation classes for these in the infrastructure layer.


package dev.fumin.sample.eventdriven.infrastructure.event;

public class PubsubEventDispatcher implements EventDispatcher {
    ・・・
    @Override
    public void dispatchToProxy(long eventId) {
        // Publish to the Pub/Sub topic for the proxy
        ・・・
    }

    @Override
    public void dispatch(long eventId, DomainEvent event) {
        // Publish to the Pub/Sub topic corresponding to the event
        ・・・
    }
}
public class MySqlEventStore implements EventStore {
    ・・・
    @Override
    public <T extends DomainEvent> StoredEvent<T> save(StoredEvent<T> event) {
         // Save to DB
         ・・・
    }

    @Override
    public <T extends DomainEvent> Optional<StoredEvent<T>> fetchById(long id) {
        // Retrieve from DB
        ・・・
    }

    @Override
    public void delete(StoredEvent<?> event) {
        // Deletion process
        ・・・
    }
}

We will implement addition and retrieval in the queue using these classes. Although omitted here, the event data is serialized into JSON for storage.

public class EventQueue {

    private final EventStore store;
    private final EventDispatcher dispatcher;

    @Inject
    public EventQueue(EventStore store, EventDispatcher dispatcher) {
        this.store = store;
        this.dispatcher = dispatcher;
    }

    public void push(DomainEvent event) {
        StoredEvent<DomainEvent> storedEvent = new StoredEvent<>(event);
        // Save to DB
        storedEvent = store.save(storedEvent);
        // Send the saved event ID to the proxy
        dispatcher.dispatchToProxy(storedEvent.getId()
                .orElseThrow(() -> new IllegalStateException("event id is null.")));
    }

    public <T extends DomainEvent> Optional<T> pop(long id) {
        return store.<T>fetchById(id).map(stored -> {
            // Treat as "sent" by deleting from the DB.
            store.delete(stored);
            return stored.getEvent();
        });
    }

}

In this case, instead of marking it as "sent," we delete it from the DB. If you want to keep the data, you should implement it by setting a flag. (*Since services like Pub/Sub might push duplicate messages, a mechanism to treat messages as "sent" is necessary to avoid redundant transmissions.)

Saving Events

Now, let's use the mechanism described above to implement the event storage process in the application layer.
In the UseCase class, we register a subscriber with the DomainEventPublisher and save the occurring events.

public class CreateNoteUseCase {

    private final NoteService noteService;
    private final DomainEventPublisher eventPublisher;
    private final EventQueue eventQueue;

    @Inject
    public CreateNoteUseCase(
        NoteService noteService,
        DomainEventPublisher eventPublisher,
        EventQueue eventQueue
    ) {
        this.noteService = noteService;
        this.eventPublisher = eventPublisher;
        this.eventQueue = eventQueue;
    }

    // Methods with the @Transactional annotation automatically start a DB transaction
    @Transactional
    public String handle(CreateNoteCommand command) {
        
        // Specify the event to subscribe to and put it in the queue when it occurs
        eventPublisher.subscribe(NoteCreated.class, eventQueue::push);
    
        ProjectId projectId = new ProjectId(command.getProjectId());
        ColumnId columnId = new ColumnId(command.getColumnId());
        Note note = noteService.createNote(projectId, columnId, command.getDescription());
        return note.getId().getValue();
    }
}

As mentioned earlier, events must be saved within the same transaction as the persistence of the aggregate.
By controlling the transaction in the UseCase class—the application layer—we ensure they are processed within the same transaction.

An interceptor is implemented that automatically starts a DB transaction when a method annotated with @Transactional is called. This is bound using Guice's AOP support. In this example, we use a library called Doma for DB access.

public class DomaTransactionInterceptor implements MethodInterceptor {
    ・・・
    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        TransactionManager tm = config.getTransactionManager();
        try {
            return tm.required(() -> {
                try {
                    return invocation.proceed();
                } catch (Throwable t) {
                    throw new RuntimeException(t);
                }
            });
        } catch (Exception e) {
            throw e.getCause();
        }
    }
}

public class InfrastructureModule extends ServletModule {

    @Override
    protected void configureServlets() {
        super.configureServlets();
        ・・・
        bindInterceptor(Matchers.any(), Matchers.annotatedWith(Transactional.class),
                new DomaTransactionInterceptor(DomaConfig.getInstance()));
    }
}

Eliminating Boilerplate

The code above becomes boilerplate that must be written every time you perform asynchronous processing. Since boilerplate is tedious, we want to eliminate it.

Injecting dependencies and subscribing every time is cumbersome, so let's automate it. We will try to simplify this using an AOP-based approach similar to the one used for @Transactional.

First, we prepare an @EnqueueEvent annotation. This annotation will allow us to specify the events to be subscribed to.

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface EnqueueEvent {
    Class<? extends DomainEvent>[] value();
}

Then, we implement an interceptor that automatically subscribes to and saves the specified events whenever a method annotated with this is called.

public class EnqueueEventInterceptor implements MethodInterceptor {

    @Inject
    private EventQueue eventQueue;

    @Inject
    private DomainEventPublisher publisher;

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        // Get the events to subscribe to from the annotation
        EnqueueEvent annotation = invocation.getMethod().getAnnotation(EnqueueEvent.class);
        EventCapture captor = new EventCapture();
        // Register subscribers for each specified event and start subscribing
        Arrays.stream(annotation.value())
                .forEach(eventClass -> publisher.subscribe(eventClass, captor::handle));
        Object o = invocation.proceed();
        // Put all received events into the queue
        captor.pushAll(eventQueue);
        return o;
    }

    private static class EventCapture implements DomainEventSubscriber<DomainEvent> {

        private List<DomainEvent> events = new ArrayList<>();

        @Override
        public void handle(DomainEvent event) {
            events.add(event);
        }

        public void pushAll(EventQueue queue) {
            events.forEach(queue::push);
        }

    }

}

By doing this, event subscription and storage in the UseCase can be written as follows:

public class CreateNoteUseCase {

    private final NoteService noteService;

    @Inject
    public CreateNoteUseCase(NoteService noteService) {
        this.noteService = noteService;
    }

    @Transactional
    @EnqueueEvent(NoteCreated.class)
    public String handle(CreateNoteCommand command) {
        ProjectId projectId = new ProjectId(command.getProjectId());
        ColumnId columnId = new ColumnId(command.getColumnId());
        Note note = noteService.createNote(projectId, columnId, command.getDescription());
        return note.getId().getValue();
    }

}

Event subscription and storage become declarative, improving readability.

Implementing the Event Relay

Finally, we will implement the event relay.
The implementation of this class is simple. It retrieves the event from the event queue using the event ID pushed to the proxy; if it exists, it registers the event with the messaging service, and if it doesn't, it throws an error to trigger a retry. This ensures that the message is sent only after the event has been established.

public class EventRelay {
    ・・・
    @Transactional
    public void relay(long eventId) {
        // Retrieve the event from the queue. If it cannot be read from the DB, assume it is either uncommitted or lost.
        DomainEvent event = queue.pop(eventId)
                .orElseThrow(() -> new IllegalStateException("Event is not yet ensured or lost."));
        // If it exists, dispatch it to the original channel.
        dispatcher.dispatch(eventId, event);
    }
}

Presentation Layer

The presentation layer provides the entry point for messages intended for the proxy.
It also defines entry points for when the event is established and sent as the original message.

Message Idempotency

Services like Pub/Sub and Cloud Tasks guarantee at least once delivery.
This means they will definitely send the message at least once, but not necessarily only once.
Since the same message may be sent multiple times, we must ensure it is processed only once.
In this implementation, we will save the fact that a message has been processed into the DB and discard any duplicate messages that arrive.

First, let's create a table to store the consumed messages.

Column Name Description Type
id Event ID Numeric
receiver Receiver String
received_at Reception Date/Time DateTime

In this table, we use a composite key consisting of the event ID and the receiver.
This is because multiple endpoints might receive the same message.
Also, for the ID, we save the event ID issued by our system rather than the Pub/Sub ID.
After the proxy sends a message, the commit to the DB might fail. In that case, the proxy's retry mechanism will re-register the message with the messaging service, and Pub/Sub will assign a new ID to the message even though the content is the same.
Therefore, if we were to use the Pub/Sub message ID as the key, we wouldn't be able to detect duplicates, resulting in the same content being executed multiple times.

By including the event ID in the Pub/Sub message and using that event ID as the key, we can handle duplicate transmissions from the proxy.

As processed messages accumulate over long-term operation, they may consume significant storage capacity. With the reception timestamp, you can use batch processes to delete or back up sufficiently old records.

Once the table is defined, we prepare the data class and the interface for DB access.

@Value
public class ConsumedEvent {
    long eventId;
    String receiver;
    Date receivedAt;
}

public interface ConsumedEventStore {
    boolean exists(long eventId, String receiver);
    void insert(ConsumedEvent event);
}

Writing check logic for every message receiver is tedious, so we provide a base class to handle this uniformly.

@Value
public class Event {

    long eventId;
    Message message;
    String subscription;

    public static class Message {

        @Getter
        private final String messageId;

        @Getter
        private final String data;

        @Getter
        private final Map<String, String> attributes = new HashMap<>();

        public Message(String messageId, String data) {
            this.messageId = messageId;
            this.data = data;
        }
    }
}

public abstract class EventReceiver extends HttpServlet {

    private final JsonParser parser = new JsonParser();

    @Inject
    private ConsumedEventStore consumedEventStore;

    @Override
    @Transactional
    public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        // Get the JSON-formatted body from the request and parse it into the Event class
        String body = req.getReader().lines().collect(Collectors.joining());
        JsonObject json = parser.parse(body).getAsJsonObject();
        JsonObject messageObj = json.get("message").getAsJsonObject();
        JsonObject attributesObj = messageObj.get("attributes").getAsJsonObject();

        Event.Message message = new Event.Message(
                messageObj.get("messageId").getAsString(),
                new String(Base64.getDecoder().decode(messageObj.get("data").getAsString()))
        );
        attributesObj.entrySet()
                .forEach(entry -> message.getAttributes()
                        .put(entry.getKey(), entry.getValue().getAsString()));

        long eventId = Long.parseLong(message.getAttributes().get("eventId"));
        Event event = new Event(eventId, message, json.get("subscription").getAsString());

        // Ignore events that have already been consumed
        if (!consumedEventStore.exists(eventId, event.getSubscription())) {
            // Since there's a possibility of duplicate messages being sent from Pub/Sub,
            // we save the consumed event to ensure idempotency.
            ConsumedEvent consumedEvent =
                    new ConsumedEvent(eventId, event.getSubscription(), new Date());
            consumedEventStore.insert(consumedEvent);
            onReceive(event);
        }

        resp.setStatus(HttpServletResponse.SC_OK);
    }

    // Delegate processing to the subclass if not yet processed
    protected abstract void onReceive(Event event);

}

With this, each receiver will be called only when the message is unprocessed.

Message Ordering
Messaging services do not necessarily deliver messages in order. While messages are generally sent in the order they were submitted, this doesn't mean they will be consumed sequentially by the server. Depending on the process, ordering might become an issue. While the measures above address duplicates, they do not address ordering issues. If order is critical, separate measures need to be considered.

EventProxy

To ensure the establishment of events, it is necessary to pass them through a proxy first.
The role of EventProxy is to provide an endpoint for the proxy, extract the event ID from the pushed message, and pass it to the EventRelay.
It is implemented as a subclass of the previously mentioned EventReceiver to retrieve the event ID.

@Singleton
@WebServlet(value = "/event/proxy")
public class EventProxy extends EventReceiver {

    @Inject
    private EventRelay eventRelay;

    @Override
    protected void onReceive(Event event) {
        // Pass the received event ID to the event relay and have it send it to the original channel.
        eventRelay.relay(event.getEventId());
    }

}

Event Receiver

The final step is to implement an endpoint that receives the message sent from the proxy and triggers the subsequent process.
Like EventProxy, it inherits from EventReceiver and passes the necessary parameters to the UseCase class. In this case, I used the naming convention Event Name + Receiver, but if you are triggering multiple asynchronous processes with the same event, it would be a good idea to use a naming convention that makes that clear.

@Singleton
@WebServlet(value = "/event/receiver/note-created")
public class NoteCreatedReceiver extends EventReceiver {

    private final JsonParser jsonParser = new JsonParser();

    @Inject
    private CopyNoteToTaskUseCase useCase;

    @Override
    protected void onReceive(Event event) {
        // Retrieve the necessary data from the received event information
        JsonObject root = jsonParser.parse(event.getMessage().getData()).getAsJsonObject();
        String projectId = root.get("projectId").getAsJsonObject().get("value").getAsString();
        String noteId = root.get("noteId").getAsJsonObject().get("value").getAsString();
        // Call the UseCase class to start the subsequent process.
        useCase.handle(projectId, noteId);
    }

}

Summary

The above describes the event-driven architecture using domain events.
Asynchronous processing requires considering various factors and involves complex implementation, but adopting this architecture allows each processing block to remain small even as the number of processes increases.
Furthermore, by using domain events, you can achieve event-driven implementation without obscuring domain knowledge, rather than just performing messaging systematically.

Since the explanation above only highlights the key points, please refer to the source code if you would like to follow the details further.

References

Discussion