AxonFrameworkで冪等性を担保する方法
なぜ冪等性?
AxonFrameworkはCQRS/ESをベースに作られたフレームワークで、当然のようにマイクロサービスを前提に作られています。
マイクロサービス間の通信はトランザクションを利用した強い整合性をかけられないため、ネットワーク断が発生した場合はリトライ処理をする必要があります。
マイクロサービス = リトライ処理が必要 = すべてのAPIを冪等性にしたほうがいい
だと自分は思っています。
AxonFrameworkは冪等性を提供しない
残念ながらAxonFrameworkは冪等性を提供していません。
ここでは、冪等性が必要なのはシステムの要件によって変わるのでAxonが用意するのではなく、それはユーザーが実装するべきだ。みたいなことが書かれています。
ComandHandlerは冪等性を担保しない
CommandHandlerはAxonServerを使うとノード間通信になるため、内部の処理が成功してEventを発行したのにもかかわらず、呼び出し元には失敗として通知される可能性があります。
下の例では支払いは済んでいるのに30秒待機されたので、呼び出し元にはTimeOutエラーが通知されます。
脳死でリトライしたら2回支払いしちゃいますね。
@Component
class PaymentInteractor(
private val paymentService: PaymentService,
private val eventGateway: EventGateway
){
@CommandHandler
fun handle(command: PaymentCcommand) {
// 外部の支払いサービスを使う
paymentService.handle(command.serviceId)
// 成功したらEventを発行する
val envet = PaymentedEvent(command.serviceId)
eventGateway.publish(event)
// 30秒待機
Thread.sleep(30000)
}
}
EventHandlerは冪等性を担保しない
EventHandlerはAt Least One
なので最低一回のメッセージ到達を保証していますが、2回到達してしまう可能性があります。
下の例の場合だと、1回の取引のつもりが2回支払いをしてしまう可能性があります。
@Component
@ProcessingGroup(AccountProjectorEventProcessor.PROCESSOR_NAME)
// 銀行口座Projector
class AccountProjector(
private val repository: AccountJpaRepository,
) {
@EventHandler
fun on(
event: PaymentEvent,
@Timestamp timestamp: Instant,
) {
// 銀行口座のデータを取り出す
val entity = repository.findById(event.accountId)
// 残高を引く
entity.balance -= event.amount
// 永続化
repository.save(entity)
}
}
AxonDeveloper「通常、複数のシステムが関係する場合、「正確に1回」は不可能で、「少なくとも1回」が最良の配信保証となります。」
AxonFrameworkで冪等性を実装する方法
今回は商品の入庫
というユースケースを想定してCommand側とEvent側をどうやって実装すれば冪等性を担保できるのか解説していきます。
Commandの冪等性
下準備
まずは冪等性をチェックするための機構を作りましょう。
冪等性IDを表すオブジェクトは適当に作ってください。 ULIDとかGUIDとか中身は何でもいいです。
今回はただの例なので、簡単なStringにしてます
data class IdempotencyId(val value: String)
次に冪等性をチェックするサービスを作成します。
class IdempotenceChecker {
companion object {
// ここはお好みで増やしたり減らしたり。
// 多分100もあれば余裕で足りる
private const val MAX_ID_COUNT = 100
}
private val idempotencyIds = LinkedHashSet<IdempotencyId>()
// 冪等IDを登録する
fun register(idempotencyId: IdempotencyId) {
if (idempotencyIds.size >= MAX_ID_COUNT) {
val lastId = idempotencyIds.last()
idempotencyIds.remove(lastId)
}
idempotencyIds.add(idempotencyId)
}
// 冪等IDがすでに登録されているか?をチェック
fun isIdempotent(idempotencyId: IdempotencyId): Boolean {
return idempotencyIds.contains(idempotencyId)
}
}
Aggregate
// 在庫集約
@Aggregate
class StockAggregate() {
@AggregateIdentifier
private lateinit var id: StockId
private lateinit var quantity: StockQuantity
private var idempotenceChecker = IdempotenceChecker()
@CommandHandler
constructor(command: InternalCreateStockCommand) : this() {
val event =
StockCreatedEvent(
id = command.id.value,
productId = command.productId.value,
)
AggregateLifecycle.apply(event)
}
@CommandHandler
fun handle(command: IncreaseStockCommand) {
// 冪等性チェック
if (idempotenceChecker.isIdempotent(command.idempotencyId)) {
return
}
// 在庫増やせる?
if (quantity.canNotAdd(command.increaseCount)) {
throw UseCaseException(IncreaseStockError.OutOfStock)
}
val increasedStockQuantity = quantity.add(command.increaseCount)
val event =
StockIncreasedEvent(
id = command.id.value,
increaseCount = command.increaseCount.value,
idempotencyId = command.idempotencyId.value,
increasedStockQuantity = increasedStockQuantity.value,
)
AggregateLifecycle.apply(event)
}
@CommandHandler
fun handle(command: DecreaseStockCommand) {
// 冪等性チェック
if (idempotenceChecker.isIdempotent(command.idempotencyId)) {
return
}
// 在庫減らせる?
if (quantity.canNotSubtract(command.decreaseCount)) {
throw UseCaseException(DecreaseStockError.InsufficientStock)
}
val decreasedStockQuantity = quantity.subtract(command.decreaseCount)
val event =
StockDecreasedEvent(
id = command.id.value,
decreaseCount = command.decreaseCount.value,
idempotencyId = command.idempotencyId.value,
decreasedStockQuantity = decreasedStockQuantity.value,
)
AggregateLifecycle.apply(event)
}
@CommandHandler
fun handle(command: DeleteStockCommand) {
val event =
StockDeletedEvent(
id = command.id.value,
)
AggregateLifecycle.apply(event)
}
@EventSourcingHandler
fun on(event: StockCreatedEvent) {
id = StockId(event.id)
quantity = StockQuantity(0)
}
@EventSourcingHandler
fun on(event: StockIncreasedEvent) {
quantity = StockQuantity(event.increasedStockQuantity)
idempotenceChecker.register(IdempotencyId(event.idempotencyId))
}
@EventSourcingHandler
fun on(event: StockDecreasedEvent) {
quantity = StockQuantity(event.decreasedStockQuantity)
idempotenceChecker.register(IdempotencyId(event.idempotencyId))
}
@EventSourcingHandler
fun on(event: StockDeletedEvent) {
AggregateLifecycle.markDeleted()
}
}
StockId
、StockQuantity
,IncreaseCount
とかの値オブジェクトが出現しますが、本質じゃないのでスキップします。
Eventの冪等性
Eventの構造
これは在庫を入庫したというイベントです。
単純に作るなら、IDとincreaseCountの二つで十分なのですが、この値を使って増減させるだけでは冪等性が確保できません。
なので、入庫した後の在庫数みたいな項目も追加します。
data class StockIncreasedEvent(
override val id: String,
val increaseCount: Int,
// ↓冪等性対策↓
val idempotencyId: String,
val increasedStockQuantity: Int,
) : StockEvent
EventHandler (射影)
ちなみにEventHandlerは冪等性は保証してないけど、Eventの順番は保証してます。
なので、入庫した後の在庫数で最終的なデータを更新すれば冪等性が担保できるわけです。
@EventHandler
fun on(event: StockIncreasedEvent) {
val entity = stockJpaRepository.findById(event.id).orElseThrow()
val updatedEntity = entity.copy(quantity = event.increasedStockQuantity)
stockJpaRepository.save(updatedEntity)
}
※ Eventの中に冪等キーを持ってるので、 Event 購読側で冪等キーを管理することも可能です。その場合、 "入庫した後の在庫数" というプロパティをイベントに持つ必要がないのでイベントがすっきりします。
Discussion