Zenn
✉️

AxonFramework 冪等性

2025/01/18に公開

なんで冪等性がいるの?

AxonFrameworkはCQRS/ESをベースに作られたフレームワークで、当然のようにマイクロサービスを前提に作られています。

マイクロサービス間の通信はトランザクションを利用した強い整合性をかけられないため、ネットワーク断が発生した場合はリトライ処理をする必要があります。

マイクロサービス = リトライ処理がいる = すべてのAPIを冪等性にしたほうがいい

です。2個連続でデータが作られちゃったりしちゃいます。

AxonFrameworkにおいての冪等性

EventHandlerはAt Least Oneなので最低一回のメッセージ到達を保証していますが、2回到達してしまう可能性があります。

CommandHandlerはAxonServerを使うとノード間通信になるため、内部の処理が成功してEventを発行したのにもかかわらず、呼び出し元には失敗として通知される可能性があります。

下の例では支払いは済んでいるのに30秒待機されたので、呼び出し元にはTimeOutエラーが通知されます。
脳死でリトライしたら2回支払いしちゃいますね。

kotlin
@Component
class PaymentInteractor(
    private val paymentService: PaymentService,
    private val eventGateway: EventGateway
){
    @CommandHandler
    fun handle(command: PaymentCcommand) {
        //外部の支払いサービスを使って支払いをする
        paymentService.handle(command.serviceId)

        //成功したらEventを発行する
        val envet = PaymentedEvent(command.serviceId)
        eventGateway.publish(event)

        //30秒待機
        Thread.sleep(30000)
    }
}

Commandの冪等性

下ごしらえ

まずは冪等性をチェックするための機構を作りましょう。

IDオブジェクトは適当に作ってください。 ULIDとかGUIDとか中身は何でもいいです。
今回はただのstringにしてます

IdempotencyId.kt
data class IdempotencyId(val value: String)
IdempotenceChecker.kt
class IdempotenceChecker {
    companion object {
        // ここはお好みで増やしたり減らしたり。
        // 多分100もあれば余裕で足りる
        private const val MAX_ID_COUNT = 100
    }

    private val idempotencyIds = LinkedHashSet<IdempotencyId>()

    fun register(idempotencyId: IdempotencyId) {
        if (idempotencyIds.size >= MAX_ID_COUNT) {
            val lastId = idempotencyIds.last()
            idempotencyIds.remove(lastId)
        }

        idempotencyIds.add(idempotencyId)
    }

    fun isIdempotent(idempotencyId: IdempotencyId): Boolean {
        return idempotencyIds.contains(idempotencyId)
    }
}

StockIdStockQuantity, IncreaseCountとかの値オブジェクト作ってますが、本質じゃないのでスキップ

Aggregate

本当だったらスナップショットを考慮してもうちょい改良する必要があるのですが、今回はわかりやすさ重視で見てほしいところしか書かないです。

StockAggregate.kt
// 在庫集約
@Aggregate
class StockAggregate() {
    @AggregateIdentifier
    private lateinit var id: StockId
    private lateinit var quantity: StockQuantity

    private var idempotenceChecker = IdempotenceChecker()

    @CommandHandler
    constructor(command: InternalCreateStockCommand) : this() {
        val event =
            StockCreatedEvent(
                id = command.id.value,
                productId = command.productId.value,
            )

        AggregateLifecycle.apply(event)
    }

    @CommandHandler
    fun handle(command: IncreaseStockCommand) {
        // 冪等性チェック
        if (idempotenceChecker.isIdempotent(command.idempotencyId)) {
            return
        }
        // 在庫増やせる?
        if (quantity.canNotAdd(command.increaseCount)) {
            throw UseCaseException(IncreaseStockError.OutOfStock)
        }

        val increasedStockQuantity = quantity.add(command.increaseCount)

        val event =
            StockIncreasedEvent(
                id = command.id.value,
                increaseCount = command.increaseCount.value,
                idempotencyId = command.idempotencyId.value,
                increasedStockQuantity = increasedStockQuantity.value,
            )

        AggregateLifecycle.apply(event)
    }

    @CommandHandler
    fun handle(command: DecreaseStockCommand) {
        // 冪等性チェック
        if (idempotenceChecker.isIdempotent(command.idempotencyId)) {
            return
        }
        // 在庫減らせる?
        if (quantity.canNotSubtract(command.decreaseCount)) {
            throw UseCaseException(DecreaseStockError.InsufficientStock)
        }

        val decreasedStockQuantity = quantity.subtract(command.decreaseCount)

        val event =
            StockDecreasedEvent(
                id = command.id.value,
                decreaseCount = command.decreaseCount.value,
                idempotencyId = command.idempotencyId.value,
                decreasedStockQuantity = decreasedStockQuantity.value,
            )

        AggregateLifecycle.apply(event)
    }

    @CommandHandler
    fun handle(command: DeleteStockCommand) {
        val event =
            StockDeletedEvent(
                id = command.id.value,
            )

        AggregateLifecycle.apply(event)
    }

    @EventSourcingHandler
    fun on(event: StockCreatedEvent) {
        id = StockId(event.id)
        quantity = StockQuantity(0)
    }

    @EventSourcingHandler
    fun on(event: StockIncreasedEvent) {
        quantity = StockQuantity(event.increasedStockQuantity)
        idempotenceChecker.register(IdempotencyId(event.idempotencyId))
    }

    @EventSourcingHandler
    fun on(event: StockDecreasedEvent) {
        quantity = StockQuantity(event.decreasedStockQuantity)
        idempotenceChecker.register(IdempotencyId(event.idempotencyId))
    }

    @EventSourcingHandler
    fun on(event: StockDeletedEvent) {
        AggregateLifecycle.markDeleted()
    }
}

Eventの冪等性

Eventの構造

これは在庫を入庫したというイベントです。
単純に作るなら、IDとincreaseCountの二つで十分なのですが、この値を使って増減させるだけでは冪等性が確保できません。

なので、入庫した後の在庫数みたいな項目も追加します。

StockIncreasedEvent.kt
data class StockIncreasedEvent(
    override val id: String,
    val increaseCount: Int,
    //  ↓冪等性対策↓
    val idempotencyId: String,
    val increasedStockQuantity: Int,
) : StockEvent

EventHandler (射影)

ちなみにEventHandlerは冪等性は保証してないけど、Eventの順番は保証してます。
なので、入庫した後の在庫数で最終的なデータを更新すれば冪等性が担保できるわけです。

StockProjector.kt
    @EventHandler
    fun on(event: StockIncreasedEvent) {
        val entity = stockJpaRepository.findById(event.id).orElseThrow()

        val updatedEntity = entity.copy(quantity = event.increasedStockQuantity)

        stockJpaRepository.save(updatedEntity)
    }

※ Eventの中に冪等キーを持ってるので、 Event Consumer側でDBとかの仕組みを使って管理することも可能ですが、今回は例として簡単なやり方にしています。

個人的には Eventの中身はシンプルにするのが好きなので実際にやるなら DB側に冪等キーを持たせます。

Discussion

ログインするとコメントできます