Axon Framework の Deadline 機能 を使ってみた

に公開

はじめに

Axon Framework の Deadline 機能を使って、処理のタイムアウト処理を実装してみたので、その使い心地と注意事項、所感等をまとめます。

Deadline とは

Axon における Deadline とは、大きく以下の 2 つがあります。

  • Deadline Manager - 指定の時間経過後にコールバックを呼び出す
  • Event Scheduler - 指定の時間経過後にイベントを発行する

どちらも業務的な「期限」を表現するものであり、似たようなものですが、Deadline Manager は期限到来時に何らかの処理を動かしたい場合、Event Scheduler は直接イベントを発行したい場合に使うようです。

公式ドキュメントの例では、クレジットカード決済の完了確認は Deadline Manager、請求証の期限切れは Event Scheduler というようなニュアンスで書いてありましたが、おそらく以下のような解釈かと思います。

  • クレジットカード決済の完了確認
    • 数秒後に完了確認をして、必要ならイベント発行
    • 期限到来でチェック処理が挟まる、条件に応じてイベント発行するから、 Deadline Manager
  • 請求書の期限切れ
    • 数日後に確定で期限切れする
    • イベント発行のためのチェック処理が不要なため、Event Scheduler

実装の種類

Deadline Manager も Event Scheduler も、内部的に何らかのスケジューラーを使用して実現されます。公式では、以下のような実装が用意されています。

利用ライブラリ Deadline Manager Event Scheduler
Pure Java SimpleDeadlineManager SimpleEventSchduler
JobRunr JobRunrDeadlineManager JobRunrEventScheduler
db-scheduler DbSchedulerDeadlineManager DbSchedulerEventScheduler
Quartz QuartzDeadlineManager QuartzEventScheduler
Axon Server なし AxonServerEventScheduler

Pure Java の実装では ScheduledExecurotService が内部的に使われているため、複数のインスタンスを跨いだスケジューリングができず、再起動したらスケジュールが消えてしまいます。そのため、実質的にその他の実装が必須になってきます。

その他の実装は、スケジュールの永続化やインスタンスを跨いだスケジューリングに対応しているため、お好みで選んでよさそうです。

ちなみに私は db-scheduler がシンプルで使いやすくて好みです。

セットアップ方法

ここからは、実際に使用するためのセットアップ方法を記載します。
私の好みに合わせて db-scheduler の例で記載しますが、他のライブラリを使用する場合は、そのライブラリに合わせたセットアップが必要になります。

前提

  • Spring Boot 3.5.5
  • Axon 4.12.1

db-scheduler-spring-boot-starter の追加

pom.xml に、 db-scheduler-spring-boot-starter を追加します。

<dependency>
  <groupId>com.github.kagkarlsson</groupId>
  <artifactId>db-scheduler-spring-boot-starter</artifactId>
  <version>13.0.0</version>
</dependency>

ここでのバージョンは、 使用している Axon で指定されているバージョンにする必要があります。どのバージョンを指定する必要があるかは、Axonのpom.xml を参照してください。

db-scheduler 用のテーブル追加

db-scheduler が内部的に使用するテーブルを追加します。スキーマ定義は db-schedulerのリポジトリにある ため、db-scheduler のバージョン・利用する DB に合うものを調べます。

ここでは、v13.0.0 の PostgreSQL 向けのものを選択しました。

create table scheduled_tasks (
  task_name text not null,
  task_instance text not null,
  task_data bytea,
  execution_time timestamp with time zone not null,
  picked BOOLEAN not null,
  picked_by text,
  last_success timestamp with time zone,
  last_failure timestamp with time zone,
  consecutive_failures INT,
  last_heartbeat timestamp with time zone,
  version BIGINT not null,
  PRIMARY KEY (task_name, task_instance)
);

CREATE INDEX execution_time_idx ON scheduled_tasks (execution_time);
CREATE INDEX last_heartbeat_idx ON scheduled_tasks (last_heartbeat);

ちなみに、テーブル名をデフォルトから変更したい場合は、 application.properties に以下のように設定を追加すれば可能です。

db-scheduler.table-name=custom_scheduled_tasks

その他、可能な設定については 公式ドキュメント を参照してください。

利用例

サンプルアプリ

以下のようなシンプルなタスク管理の Aggregate を用意しました。

@Aggregate
public class TaskAggregate {
  @AggregateIdentifier
  private String id;
  private String name;
  private boolean completed;
  private String deleted;

  protected TaskAggregate() {
    // Required by Axon Framework
  }

  @CommandHandler
  public TaskAggregate(TaskCommand.CreateTaskCommand command) {
    AggregateLifecycle.apply(new TaskEvent.TaskCreatedEvent(command.taskId(), command.name()));
  }

  @CommandHandler
  public void handle(TaskCommand.UpdateTaskCommand command) {
    AggregateLifecycle.apply(new TaskEvent.TaskUpdatedEvent(command.taskId(), command.name()));
  }

  @CommandHandler
  public void handle(TaskCommand.CompleteTaskCommand command) {
    AggregateLifecycle.apply(new TaskEvent.TaskCompletedEvent(command.taskId()));
  }

  @CommandHandler
  public void handle(TaskCommand.DeleteTaskCommand command) {
    AggregateLifecycle.apply(new TaskEvent.TaskDeletedEvent(command.taskId()));
  }

  @EventSourcingHandler
  public void on(TaskEvent.TaskCreatedEvent event) {
    this.id = event.taskId();
    this.name = event.name();
    this.completed = false;
    this.deleted = false;
  }

  @EventSourcingHandler
  public void on(TaskEvent.TaskUpdatedEvent event) {
    this.name = event.name();
  }

  @EventSourcingHandler
  public void on(TaskEvent.TaskCompletedEvent event) {
    this.completed = true;
  }

  @EventSourcingHandler
  public void on(TaskEvent.TaskDeletedEvent event) {
    this.deleted = event.taskId();
  }
}

この TodoAggregate に対して、「完了してから一定時間経過したら自動削除する」処理を、Deadlines を使って実装してみます。

Saga で Deadline を使用する場合

以下のような Saga を用意して、 DeadlineManager を注入して、スケジュールを指定します。

@Saga
public class TaskAutoDeletionSaga {

  // (1)
  @Autowired
  private transient DeadlineManager deadlineManager;
  @Autowired
  private transient CommandGateway commandGateway;

  private String taskId;
  private String deadlineId;

  @StartSaga
  @SagaEventHandler(associationProperty = "taskId")
  public void on(TaskEvent.TaskCreatedEvent event) {
    this.taskId = event.taskId();
  }

  // (2)
  @SagaEventHandler(associationProperty = "taskId")
  public void on(TaskEvent.TaskCompletedEvent event) {
    // タスクが完了したら自動削除のデッドラインを設定
    this.deadlineId = this.deadlineManager.schedule(Duration.ofMinutes(5), "auto-deletion");
  }

  // (3)
  @DeadlineHandler(deadlineName = "auto-deletion")
  public void onAutoDeletion() {
    // 自動削除のデッドラインが到来したら、削除コマンドを投げる
    this.deadlineId = null;
    this.commandGateway.send(new TaskCommand.DeleteTaskCommand(this.taskId));
  }

  // (4)
  @EndSaga
  @SagaEventHandler(associationProperty = "taskId")
  public void on(TaskEvent.TaskDeletedEvent event) {
    // 削除時にデッドラインがスケジュールされていたらキャンセルする
    if (this.deadlineId != null) {
      this.deadlineManager.cancelSchedule("auto-deletion", this.deadlineId);
    }
  }

}

ポイントは以下の通りです。

  • (1) 使用する依存関係は @Autowired かつ transient にする
    • transient にしないと、Saga インスタンスの保存時に一緒に保存しようとして、エラーが起きます
  • (2) Deadline をトリガーしたいイベントハンドラ内で、 DeadlineManager.schedule() を呼び出す
    • スケジュールは期間 (Duration) でも日時 (Instant) でも指定可能
    • ここで取得した deadlineId は、キャンセルに備えてフィールドに保存しておく
  • (3) Deadline がトリガーされると @DeadlineHandler が呼び出されるため、必要な処理をする
    • ここでは、削除用のコマンドを投げる
  • (4) Deadline をキャンセルする場合は、 DeadlineManager.cancelSchedule() を呼び出す
    • ここでは、自動削除前に手動で削除された場合に、キャンセルするようにしている
    • トリガー済みの Deadline Id を指定してキャンセルするとエラーになるため、キャンセル要否はちゃんと設計する必要がある

Aggregate で Deadline を使用する場合

同じ処理を、今度は Aggregate の中で実装してみます。

@Aggregate
public class TaskAggregate {

  // (1)
  @Autowired
  private transient DeadlineManager deadlineManager;

  @AggregateIdentifier
  private String id;

  // 省略

  private String autoDeletionDeadlineId; // (2)

  // 省略

  @CommandHandler
  public void handle(TaskCommand.CompleteTaskCommand command) {
    // (3)
    var autoDeletionDeadlineId = this.deadlineManager.schedule(Duration.ofMinutes(5), "auto-deletion");
    AggregateLifecycle.apply(new TaskEvent.TaskCompletedEvent(command.taskId(), autoDeletionDeadlineId));
  }

  // 省略

  @EventSourcingHandler
  public void on(TaskEvent.TaskCompletedEvent event) {
    this.completed = true;
    // (4)
    this.autoDeletionDeadlineId = event.autoDeletionDeadlineId();
  }

  // (5)
  @DeadlineHandler(deadlineName = "auto-deletion")
  public void onAutoDeletion() {
    this.autoDeletionDeadlineId = null;
    AggregateLifecycle.apply(new TaskEvent.TaskDeletedEvent(this.id));
  }

  @EventSourcingHandler
  public void on(TaskEvent.TaskDeletedEvent event) {
    this.deleted = event.taskId();
    
    // (6)
    if (this.autoDeletionDeadlineId != null) {
      this.deadlineManager.cancelSchedule("auto-deletion", this.autoDeletionDeadlineId);
      this.autoDeletionDeadlineId = null;
    }
  }
}
  • (1) Saga の場合と同様、 @Autowired かつ transient なフィールドとして、 DeadlineManager を受け取るようにする
  • (2) Saga の場合と同様、 deadlineId をフィールドに保持できるようにする
  • (3) Saga とは違い、コマンドハンドラ内で DeadlineManager.schedule() を呼び出し、取得した deadlineId をイベントに含めるようにする
  • (4) イベントから deadlineId を取得し、 (2) のフィールドに保存する
  • (5) Deadline がトリガーされると @DeadlineHandler が呼び出されるため、必要な処理をする
    • Saga の場合とは違い、直接イベントを発行してよい
    • もちろん、条件によって異なるのであれば、任意の処理を挟んでOK
  • (6) Deadline が不要になったら、 DeadlineManager.cancelSchedule() を呼び出す

Saga と似ているところは多いものの、deadlineId をイベントに含めないといけない点、 @DeadlineHandler 内で直接イベントを発行してよい点などが違います。
状態はすべてイベントを元に復元する必要がある Aggregate の特徴が出ているように感じます。

使い分けのポイント

  • 単一の Aggregate に閉じる場合:Aggregate で使う
    • 今回の例ではこれで十分そう
  • 複数の Aggregate に跨って処理が必要な場合:Saga で使う
    • 決済が一定時間内に終わらなければ、商品の購入をキャンセルするなど
  • 単一の Aggregate に閉じるけど、可読性のためにスケジュール処理は切り出したい:Saga で使う
    • Aggregate の状態に応じて複数種類の期限を使い分ける場合は、Aggregate は状態管理に専念して、Saga 側で期限の管理をするほうがわかりやすそう

使用する際の注意事項

  • Axon の期待する db-scheduler のバージョンを合わせる
    • バージョン相違があると、エラーが起きる可能性あり
  • @DeadlineHandler 内ではそのスケジューラーをキャンセルしてはいけない
    • エラーになる
  • スケジュールされていない(完了済み含む)deadlineId をキャンセルしてはいけない
    • エラーになる
  • 起動の正確性はスケジューラーライブラリ次第
    • db-scheduler の場合は、10s ごとにスケジュールを起動するため、どうしてもタイムラグが発生する
  • Saga または Aggregate の内側でしか使えない

所感

Aggregate のステータス遷移に合わせてスケジュール処理をするのに非常に便利な機能だと感じました。業務処理にはなんらかの「期限」が必要になることが多く、その場合はアプリケーションレイヤーでなんとかすることが多いと思いますが、Axon に統合されていることで、ドメインの内側で対処できる点が良いです。機会があれば、積極的に使っていきたいです。

今回の例では DeadlineManager を使いましたが、今回の例のような場合は EventScheduler を使うほうがシンプルに実装できそうなので、今後試してみたいと思います。

参考サイト

Discussion