🐷

AxonFramework コマンドの冪等性

2025/01/18に公開

冪等性

AxonFrameworkはCQRS/ESをベースに作られたフレームワークで、当然マイクロサービスにも対応できるように作られています。

マイクロサービス間はトランザクションなどを利用した強い整合性をかけられないため、ネットワーク断が発生した場合はリトライ処理などをする必要があります。

残念ながら冪等性がいるかいらないかユースケースによって異なるので、Axonはデフォルトで冪等性を実現する機能を提供していません。自分で作りましょう。

いろいろやり方はあるんですが今回は一番簡単なAggreggateの中で冪等性を検証する方法です。

やり方

下ごしらえ

まずは冪等性をチェックするための機構を作りましょう。

IDオブジェクトは適当に作ってください。 ULIDとかGUID中身は何でもいいです。
今回はただのstringにしてます

IdempotencyId.kt
data class IdempotencyId(val value: String)
IdempotenceChecker.kt
class IdempotenceChecker {
    companion object {
        // ここはお好みで増やしたり減らしたり。
        // 多分100もあれば余裕で足りる
        private const val MAX_ID_COUNT = 100
    }

    private val idempotencyIds = LinkedHashSet<IdempotencyId>()

    fun register(idempotencyId: IdempotencyId) {
        if (idempotencyIds.size >= MAX_ID_COUNT) {
            val lastId = idempotencyIds.last()
            idempotencyIds.remove(lastId)
        }

        idempotencyIds.add(idempotencyId)
    }

    fun isIdempotent(idempotencyId: IdempotencyId): Boolean {
        return idempotencyIds.contains(idempotencyId)
    }
}

Aggregate

本当だったらスナップショットを考慮してもうちょい改良する必要があるのですが、今回はわかりやすさ重視で見てほしいところしか書かないです。

// 在庫集約
@Aggregate
class StockAggregate() {
    @AggregateIdentifier
    private lateinit var id: StockId
    private lateinit var quantity: StockQuantity

    private var idempotenceChecker = IdempotenceChecker()

    @CommandHandler
    constructor(command: CreateStockCommand) : this() {
        val event =
            StockCreatedEvent(
                id = command.id.value,
                productId = command.productId.value
            )

        AggregateLifecycle.apply(event)
    }

    @CommandHandler
    fun handle(command: IncreaseStockCommand) : ActionCommandResult {
        // 冪等性チェック
        if (idempotenceChecker.isIdempotent(command.idempotencyId)) {
            return ActionCommandResult.ok()
        }
        // 在庫増やせる?
        if (!quantity.canAdd(command.increaseCount)) {
            return ActionCommandResult.error(IncreaseStockError.OutOfStock.errorCode)
        }

        val event =
            StockIncreasedEvent(
                id = command.id.value,
                idempotencyId = command.idempotencyId.value,
                increaseCount = command.increaseCount.value,
            )

        AggregateLifecycle.apply(event)

        return ActionCommandResult.ok()
    }

    @CommandHandler
    fun handle(command: DecreaseStockCommand) : ActionCommandResult {
        // 冪等性チェック
        if (idempotenceChecker.isIdempotent(command.idempotencyId)) {
            return ActionCommandResult.ok()
        }
        // 在庫減らせる?
        if (!quantity.canSubtract(command.decreaseCount)) {
            return ActionCommandResult.error(DecreaseStockError.InsufficientStock.errorCode)
        }

        val event = StockDecreasedEvent(
            id = command.id.value,
            idempotencyId = command.idempotencyId.value,
            decreaseCount = command.decreaseCount.value,
        )

        AggregateLifecycle.apply(event)

        return ActionCommandResult.ok()
    }

    @EventSourcingHandler
    fun on(event: StockCreatedEvent) {
        id = StockId(event.id)
        quantity = StockQuantity(0)
    }

    @EventSourcingHandler
    fun on(event: StockIncreasedEvent) {
        val increaseCount = StockQuantity(event.increaseCount)
        quantity = quantity.add(increaseCount)
    }

    @EventSourcingHandler
    fun on(event: StockDecreasedEvent) {
        val decreaseCount = StockQuantity(event.decreaseCount)
        quantity = quantity.subtract(decreaseCount)
    }
}

Discussion