Axon Framework の Deadline 機能 を使ってみた
はじめに
Axon Framework の Deadline 機能を使って、処理のタイムアウト処理を実装してみたので、その使い心地と注意事項、所感等をまとめます。
Deadline とは
Axon における Deadline とは、大きく以下の 2 つがあります。
- Deadline Manager - 指定の時間経過後にコールバックを呼び出す
- Event Scheduler - 指定の時間経過後にイベントを発行する
どちらも業務的な「期限」を表現するものであり、似たようなものですが、Deadline Manager は期限到来時に何らかの処理を動かしたい場合、Event Scheduler は直接イベントを発行したい場合に使うようです。
公式ドキュメントの例では、クレジットカード決済の完了確認は Deadline Manager、請求証の期限切れは Event Scheduler というようなニュアンスで書いてありましたが、おそらく以下のような解釈かと思います。
- クレジットカード決済の完了確認
- 数秒後に完了確認をして、必要ならイベント発行
- 期限到来でチェック処理が挟まる、条件に応じてイベント発行するから、 Deadline Manager
- 請求書の期限切れ
- 数日後に確定で期限切れする
- イベント発行のためのチェック処理が不要なため、Event Scheduler
実装の種類
Deadline Manager も Event Scheduler も、内部的に何らかのスケジューラーを使用して実現されます。公式では、以下のような実装が用意されています。
利用ライブラリ | Deadline Manager | Event Scheduler |
---|---|---|
Pure Java | SimpleDeadlineManager |
SimpleEventSchduler |
JobRunr | JobRunrDeadlineManager |
JobRunrEventScheduler |
db-scheduler | DbSchedulerDeadlineManager |
DbSchedulerEventScheduler |
Quartz | QuartzDeadlineManager |
QuartzEventScheduler |
Axon Server | なし | AxonServerEventScheduler |
Pure Java の実装では ScheduledExecurotService
が内部的に使われているため、複数のインスタンスを跨いだスケジューリングができず、再起動したらスケジュールが消えてしまいます。そのため、実質的にその他の実装が必須になってきます。
その他の実装は、スケジュールの永続化やインスタンスを跨いだスケジューリングに対応しているため、お好みで選んでよさそうです。
ちなみに私は db-scheduler がシンプルで使いやすくて好みです。
セットアップ方法
ここからは、実際に使用するためのセットアップ方法を記載します。
私の好みに合わせて db-scheduler の例で記載しますが、他のライブラリを使用する場合は、そのライブラリに合わせたセットアップが必要になります。
前提
- Spring Boot 3.5.5
- Axon 4.12.1
db-scheduler-spring-boot-starter の追加
pom.xml
に、 db-scheduler-spring-boot-starter
を追加します。
<dependency>
<groupId>com.github.kagkarlsson</groupId>
<artifactId>db-scheduler-spring-boot-starter</artifactId>
<version>13.0.0</version>
</dependency>
ここでのバージョンは、 使用している Axon で指定されているバージョンにする必要があります。どのバージョンを指定する必要があるかは、Axonのpom.xml を参照してください。
db-scheduler 用のテーブル追加
db-scheduler が内部的に使用するテーブルを追加します。スキーマ定義は db-schedulerのリポジトリにある ため、db-scheduler のバージョン・利用する DB に合うものを調べます。
ここでは、v13.0.0 の PostgreSQL 向けのものを選択しました。
create table scheduled_tasks (
task_name text not null,
task_instance text not null,
task_data bytea,
execution_time timestamp with time zone not null,
picked BOOLEAN not null,
picked_by text,
last_success timestamp with time zone,
last_failure timestamp with time zone,
consecutive_failures INT,
last_heartbeat timestamp with time zone,
version BIGINT not null,
PRIMARY KEY (task_name, task_instance)
);
CREATE INDEX execution_time_idx ON scheduled_tasks (execution_time);
CREATE INDEX last_heartbeat_idx ON scheduled_tasks (last_heartbeat);
ちなみに、テーブル名をデフォルトから変更したい場合は、 application.properties
に以下のように設定を追加すれば可能です。
db-scheduler.table-name=custom_scheduled_tasks
その他、可能な設定については 公式ドキュメント を参照してください。
利用例
サンプルアプリ
以下のようなシンプルなタスク管理の Aggregate を用意しました。
@Aggregate
public class TaskAggregate {
@AggregateIdentifier
private String id;
private String name;
private boolean completed;
private String deleted;
protected TaskAggregate() {
// Required by Axon Framework
}
@CommandHandler
public TaskAggregate(TaskCommand.CreateTaskCommand command) {
AggregateLifecycle.apply(new TaskEvent.TaskCreatedEvent(command.taskId(), command.name()));
}
@CommandHandler
public void handle(TaskCommand.UpdateTaskCommand command) {
AggregateLifecycle.apply(new TaskEvent.TaskUpdatedEvent(command.taskId(), command.name()));
}
@CommandHandler
public void handle(TaskCommand.CompleteTaskCommand command) {
AggregateLifecycle.apply(new TaskEvent.TaskCompletedEvent(command.taskId()));
}
@CommandHandler
public void handle(TaskCommand.DeleteTaskCommand command) {
AggregateLifecycle.apply(new TaskEvent.TaskDeletedEvent(command.taskId()));
}
@EventSourcingHandler
public void on(TaskEvent.TaskCreatedEvent event) {
this.id = event.taskId();
this.name = event.name();
this.completed = false;
this.deleted = false;
}
@EventSourcingHandler
public void on(TaskEvent.TaskUpdatedEvent event) {
this.name = event.name();
}
@EventSourcingHandler
public void on(TaskEvent.TaskCompletedEvent event) {
this.completed = true;
}
@EventSourcingHandler
public void on(TaskEvent.TaskDeletedEvent event) {
this.deleted = event.taskId();
}
}
この TodoAggregate
に対して、「完了してから一定時間経過したら自動削除する」処理を、Deadlines を使って実装してみます。
Saga で Deadline を使用する場合
以下のような Saga を用意して、 DeadlineManager
を注入して、スケジュールを指定します。
@Saga
public class TaskAutoDeletionSaga {
// (1)
@Autowired
private transient DeadlineManager deadlineManager;
@Autowired
private transient CommandGateway commandGateway;
private String taskId;
private String deadlineId;
@StartSaga
@SagaEventHandler(associationProperty = "taskId")
public void on(TaskEvent.TaskCreatedEvent event) {
this.taskId = event.taskId();
}
// (2)
@SagaEventHandler(associationProperty = "taskId")
public void on(TaskEvent.TaskCompletedEvent event) {
// タスクが完了したら自動削除のデッドラインを設定
this.deadlineId = this.deadlineManager.schedule(Duration.ofMinutes(5), "auto-deletion");
}
// (3)
@DeadlineHandler(deadlineName = "auto-deletion")
public void onAutoDeletion() {
// 自動削除のデッドラインが到来したら、削除コマンドを投げる
this.deadlineId = null;
this.commandGateway.send(new TaskCommand.DeleteTaskCommand(this.taskId));
}
// (4)
@EndSaga
@SagaEventHandler(associationProperty = "taskId")
public void on(TaskEvent.TaskDeletedEvent event) {
// 削除時にデッドラインがスケジュールされていたらキャンセルする
if (this.deadlineId != null) {
this.deadlineManager.cancelSchedule("auto-deletion", this.deadlineId);
}
}
}
ポイントは以下の通りです。
- (1) 使用する依存関係は
@Autowired
かつtransient
にする-
transient
にしないと、Saga インスタンスの保存時に一緒に保存しようとして、エラーが起きます
-
- (2) Deadline をトリガーしたいイベントハンドラ内で、
DeadlineManager.schedule()
を呼び出す- スケジュールは期間 (
Duration
) でも日時 (Instant
) でも指定可能 - ここで取得した deadlineId は、キャンセルに備えてフィールドに保存しておく
- スケジュールは期間 (
- (3) Deadline がトリガーされると
@DeadlineHandler
が呼び出されるため、必要な処理をする- ここでは、削除用のコマンドを投げる
- (4) Deadline をキャンセルする場合は、
DeadlineManager.cancelSchedule()
を呼び出す- ここでは、自動削除前に手動で削除された場合に、キャンセルするようにしている
- トリガー済みの Deadline Id を指定してキャンセルするとエラーになるため、キャンセル要否はちゃんと設計する必要がある
Aggregate で Deadline を使用する場合
同じ処理を、今度は Aggregate の中で実装してみます。
@Aggregate
public class TaskAggregate {
// (1)
@Autowired
private transient DeadlineManager deadlineManager;
@AggregateIdentifier
private String id;
// 省略
private String autoDeletionDeadlineId; // (2)
// 省略
@CommandHandler
public void handle(TaskCommand.CompleteTaskCommand command) {
// (3)
var autoDeletionDeadlineId = this.deadlineManager.schedule(Duration.ofMinutes(5), "auto-deletion");
AggregateLifecycle.apply(new TaskEvent.TaskCompletedEvent(command.taskId(), autoDeletionDeadlineId));
}
// 省略
@EventSourcingHandler
public void on(TaskEvent.TaskCompletedEvent event) {
this.completed = true;
// (4)
this.autoDeletionDeadlineId = event.autoDeletionDeadlineId();
}
// (5)
@DeadlineHandler(deadlineName = "auto-deletion")
public void onAutoDeletion() {
this.autoDeletionDeadlineId = null;
AggregateLifecycle.apply(new TaskEvent.TaskDeletedEvent(this.id));
}
@EventSourcingHandler
public void on(TaskEvent.TaskDeletedEvent event) {
this.deleted = event.taskId();
// (6)
if (this.autoDeletionDeadlineId != null) {
this.deadlineManager.cancelSchedule("auto-deletion", this.autoDeletionDeadlineId);
this.autoDeletionDeadlineId = null;
}
}
}
- (1) Saga の場合と同様、
@Autowired
かつtransient
なフィールドとして、DeadlineManager
を受け取るようにする - (2) Saga の場合と同様、 deadlineId をフィールドに保持できるようにする
- (3) Saga とは違い、コマンドハンドラ内で
DeadlineManager.schedule()
を呼び出し、取得した deadlineId をイベントに含めるようにする - (4) イベントから deadlineId を取得し、 (2) のフィールドに保存する
- (5) Deadline がトリガーされると
@DeadlineHandler
が呼び出されるため、必要な処理をする- Saga の場合とは違い、直接イベントを発行してよい
- もちろん、条件によって異なるのであれば、任意の処理を挟んでOK
- (6) Deadline が不要になったら、
DeadlineManager.cancelSchedule()
を呼び出す
Saga と似ているところは多いものの、deadlineId をイベントに含めないといけない点、 @DeadlineHandler
内で直接イベントを発行してよい点などが違います。
状態はすべてイベントを元に復元する必要がある Aggregate の特徴が出ているように感じます。
使い分けのポイント
- 単一の Aggregate に閉じる場合:Aggregate で使う
- 今回の例ではこれで十分そう
- 複数の Aggregate に跨って処理が必要な場合:Saga で使う
- 決済が一定時間内に終わらなければ、商品の購入をキャンセルするなど
- 単一の Aggregate に閉じるけど、可読性のためにスケジュール処理は切り出したい:Saga で使う
- Aggregate の状態に応じて複数種類の期限を使い分ける場合は、Aggregate は状態管理に専念して、Saga 側で期限の管理をするほうがわかりやすそう
使用する際の注意事項
- Axon の期待する db-scheduler のバージョンを合わせる
- バージョン相違があると、エラーが起きる可能性あり
-
@DeadlineHandler
内ではそのスケジューラーをキャンセルしてはいけない- エラーになる
- スケジュールされていない(完了済み含む)deadlineId をキャンセルしてはいけない
- エラーになる
- 起動の正確性はスケジューラーライブラリ次第
- db-scheduler の場合は、10s ごとにスケジュールを起動するため、どうしてもタイムラグが発生する
- Saga または Aggregate の内側でしか使えない
所感
Aggregate のステータス遷移に合わせてスケジュール処理をするのに非常に便利な機能だと感じました。業務処理にはなんらかの「期限」が必要になることが多く、その場合はアプリケーションレイヤーでなんとかすることが多いと思いますが、Axon に統合されていることで、ドメインの内側で対処できる点が良いです。機会があれば、積極的に使っていきたいです。
今回の例では DeadlineManager
を使いましたが、今回の例のような場合は EventScheduler
を使うほうがシンプルに実装できそうなので、今後試してみたいと思います。
Discussion