🐷
AxonFramework コマンドの冪等性
冪等性
AxonFrameworkはCQRS/ESをベースに作られたフレームワークで、当然マイクロサービスにも対応できるように作られています。
マイクロサービス間はトランザクションなどを利用した強い整合性をかけられないため、ネットワーク断が発生した場合はリトライ処理などをする必要があります。
残念ながら冪等性がいるかいらないかユースケースによって異なるので、Axonはデフォルトで冪等性を実現する機能を提供していません。自分で作りましょう。
いろいろやり方はあるんですが今回は一番簡単なAggreggateの中で冪等性を検証する方法です。
やり方
下ごしらえ
まずは冪等性をチェックするための機構を作りましょう。
IDオブジェクトは適当に作ってください。 ULIDとかGUID中身は何でもいいです。
今回はただのstringにしてます
IdempotencyId.kt
data class IdempotencyId(val value: String)
IdempotenceChecker.kt
class IdempotenceChecker {
companion object {
// ここはお好みで増やしたり減らしたり。
// 多分100もあれば余裕で足りる
private const val MAX_ID_COUNT = 100
}
private val idempotencyIds = LinkedHashSet<IdempotencyId>()
fun register(idempotencyId: IdempotencyId) {
if (idempotencyIds.size >= MAX_ID_COUNT) {
val lastId = idempotencyIds.last()
idempotencyIds.remove(lastId)
}
idempotencyIds.add(idempotencyId)
}
fun isIdempotent(idempotencyId: IdempotencyId): Boolean {
return idempotencyIds.contains(idempotencyId)
}
}
Aggregate
本当だったらスナップショットを考慮してもうちょい改良する必要があるのですが、今回はわかりやすさ重視で見てほしいところしか書かないです。
// 在庫集約
@Aggregate
class StockAggregate() {
@AggregateIdentifier
private lateinit var id: StockId
private lateinit var quantity: StockQuantity
private var idempotenceChecker = IdempotenceChecker()
@CommandHandler
constructor(command: CreateStockCommand) : this() {
val event =
StockCreatedEvent(
id = command.id.value,
productId = command.productId.value
)
AggregateLifecycle.apply(event)
}
@CommandHandler
fun handle(command: IncreaseStockCommand) : ActionCommandResult {
// 冪等性チェック
if (idempotenceChecker.isIdempotent(command.idempotencyId)) {
return ActionCommandResult.ok()
}
// 在庫増やせる?
if (!quantity.canAdd(command.increaseCount)) {
return ActionCommandResult.error(IncreaseStockError.OutOfStock.errorCode)
}
val event =
StockIncreasedEvent(
id = command.id.value,
idempotencyId = command.idempotencyId.value,
increaseCount = command.increaseCount.value,
)
AggregateLifecycle.apply(event)
return ActionCommandResult.ok()
}
@CommandHandler
fun handle(command: DecreaseStockCommand) : ActionCommandResult {
// 冪等性チェック
if (idempotenceChecker.isIdempotent(command.idempotencyId)) {
return ActionCommandResult.ok()
}
// 在庫減らせる?
if (!quantity.canSubtract(command.decreaseCount)) {
return ActionCommandResult.error(DecreaseStockError.InsufficientStock.errorCode)
}
val event = StockDecreasedEvent(
id = command.id.value,
idempotencyId = command.idempotencyId.value,
decreaseCount = command.decreaseCount.value,
)
AggregateLifecycle.apply(event)
return ActionCommandResult.ok()
}
@EventSourcingHandler
fun on(event: StockCreatedEvent) {
id = StockId(event.id)
quantity = StockQuantity(0)
}
@EventSourcingHandler
fun on(event: StockIncreasedEvent) {
val increaseCount = StockQuantity(event.increaseCount)
quantity = quantity.add(increaseCount)
}
@EventSourcingHandler
fun on(event: StockDecreasedEvent) {
val decreaseCount = StockQuantity(event.decreaseCount)
quantity = quantity.subtract(decreaseCount)
}
}
Discussion