🚀

Kotlin × Spring WebFlux × jOOQなプロダクトにR2DBCを導入した話

2024/02/06に公開

こんにちは。株式会社マネーフォワードでバックエンドエンジニアをしているTaskです。
私達はマネーフォワードクラウド連結会計というプロダクトを担当しています。
このプロダクトでは主な技術スタックとしてKotlinとSpring Boot (Spring WebFlux) が用いられており、リアクティブシステムを意識して開発されています。
一方で、2年前の開発開始当初からDBドライバとしてJDBCが用いられており、DBアクセスのみブロッキング処理をしているという状況でした。

今回、2週間ほどかけてJDBCからR2DBCに移行したので、実装例やハマりポイントなどを紹介できればと思います。

利用ツールとバージョン

  • Kotlin: 1.9.22
  • Spring Boot: 3.2.2
  • Spring Framework: 6.1.3
  • jOOQ: 3.19.1
  • MySQL

なぜJDBCからR2DBCに移行する必要があるのか

Spring WebFluxとJDBCを利用する理由はチームによってまちまちだと思います。
私達のチームは、主に以下の理由でした。

  • 開発開始当初、Spring Frameworkに知見があるメンバーが少なく、R2DBCのキャッチアップまで手が回らなかった
  • 開発開始当初、jOOQがR2DBCを十分にサポートしていなかった
  • 開発開始当初、R2DBCのMySQLドライバが不安定だった

現在では上記の問題が解決されており(あるいは解決されつつあり)、反対にJDBCを使い続けるデメリットが目立つようになってきました。

  • IOブロッキングによるパフォーマンスの問題
  • トランザクションの問題

特にトランザクションの問題は実装方法に大きな影響を及ぼしていました。
Spring Frameworkは通常トランザクションの管理をThreadLocalを用いて行っているので、スレッドを切り替えて処理を行うSpring WebFluxでは専用の(= R2DBC用の)トランザクション管理を導入する必要があります。
したがって、トランザクションを適用するために、ThreadLocalを使える、つまりsuspend関数ではない通常の関数のみで構成されたクラスを作成し、そこに更新処理を詰め込む、という設計になっていました。

これらの問題を解決するために、今回はR2DBCの導入を決断しました。

導入

jOOQ のDSLContextで R2DBC を使えるようにします。

まずは依存ライブラリとしてR2DBCドライバを追加します。

build.gradle.kts
dependencies {
  runtimeOnly("io.asyncer:r2dbc-mysql:1.0.5")
  runtimeOnly("io.r2dbc:r2dbc-pool:1.0.1.RELEASE")
}

次に、Spring Boot用の設定を追加します。

application.yaml
r2dbc:
  url: r2dbc:mysql://...
  username: ...
  password: ...
  pool:
    enabled: true

R2DBCのファクトリであるConnectionFactoryがDIできるようになるので、それを利用してDSLContextのBeanを登録します。

@Configuration
class DSLContextConnection(private val connectionFactory: ConnectionFactory) {
    @Bean
    fun dslContext(): DSLContext = DSL.using(connectionFactory).dsl()
}

これにより、R2DBCが設定されたDSLContextをDIを通して利用できるようになりました。

クエリ

R2DBCが設定されたことで、DSLContextでは以下のようなブロッキングなAPIを利用できなくなります。

  • fetch / fetchOne
  • execute

ですので、ノンブロッキングなAPIを利用するように書き換えていく必要があります。
基本的には、jOOQの各クエリがReactorのPublisherを返すようになるので、それらを適宜コルーチンで扱えるように変換していきます

fetch / fetchOne

こちらは、selectFromでほぼ同じことができます

// JDBC
fun selectById(id: Long): SampleRecord? = dslContext.fetchOne(
    SAMPLE,
    SAMPLE.ID.eq(id),
)

fun selectByName(name: String): List<SampleRecord> = dslContext.fetch(
    SAMPLE,
    SAMPLE.NAME.eq(name),
)

// R2DBC
suspend fun selectById(id: Long): SampleRecord? = dslContext.selectFrom(SAMPLE)
    .where(SAMPLE.ID.eq(id))
    .awaitFirstOrNull()

// Listを返したい場合
suspend fun selectByName(name: String): List<SampleRecord> = Flux.from(
    dslContext.selectFrom(SAMPLE)
        .where(SAMPLE.NAME.eq(name))
)
    .collectList()
    .awaitSingle()

// Flowを返したい場合
fun selectByName(name: String): Flow<SampleRecord> = dslContext.selectFrom(SAMPLE)
    .where(SAMPLE.NAME.eq(name))
    .asFlow()

また、以下のような拡張関数を用意すると、コード変更を一括でできるので便利です。

suspend inline fun <reified R : TableRecord<*>, T : TableImpl<R>> DSLContext.nonBlockingFetchOne(
    tableImpl: T,
    vararg condition: Condition
): R? = this.select()
    .from(tableImpl)
    .where(*condition)
    .awaitFirstOrNull()

execute

前述した通り、jOOQで書いたクエリがPublisherを返すようになるので、executeの代わりにawaitXXX拡張関数を用いてコルーチンとして扱えるようにしていきます。

  • クエリが何も返さない、あるいは単一の要素のみを返す場合、awaitSingleOrNullを用います。
  • クエリが複数の要素を返す場合、Flux#collectLIstMono<List<T>を返したあと、awaitSingleListオブジェクトを取得します。Flowで取得したい場合はasFlowを用います。
  • クエリが返す値に興味がない場合(batch系のAPIなど)は、awaitLastで返り値を無視する、で大丈夫だと思います。

AOP

プロダクトによっては、AOPを用いて特定のメソッドの事前、事後で共通の処理を行うためにSpring AOPを利用していると思います。
また、そのAdvice内でDBアクセスすることもあります。

例:

@Aspect
@Component
class CheckAspect(private val dslContext: DSLContext) {
    @Before("execution(...)")
    fun checkSomething(jp: JoinPoint): Any? {
         dslContext.fetchOne(
            SAMPLE,
            SAMPLE.ID.eq(id),
        )?: throw InvalidSomethingException()
    }
}

問題点

JDBCとは異なり、R2DBCを用いる場合はsuspend関数を用いる必要があるのですが、通常の方法ではsuspend関数をAdviceとして用いることはできません。

NG例(実行時にエラーになる):

@Aspect
@Component
class CheckAspect(private val dslContext: DSLContext) {
    @Before("execution(...)")
    suspend fun checkSomething(jp: JoinPoint): Any? {
        dslContext.selectFrom(SAMPLE)
            .where(SAMPLE.ID.eq(id))
            .awaitFirstOrNull()?: throw InvalidSomethingException()
    }
}

これはSpring Frameworkがsuspend関数をJavaのリフレクションを用いて実行しようとするために発生します。
また、suspend関数を実行するためにはContinuation をAdviceの外から取得し、利用後に元の場所に戻す必要があるのですが、これは困難です。

解決策

このエラーを避けるために、@Before@Afterの代わりに@Aroundを用いて、JoinPointの実行に巻き込む方法を紹介します。

OK例:

@Aspect
@Component
class CheckAspect(private val dslContext: DSLContext) {
    @Around("execution(...)")
    suspend fun checkSomething(pjp: ProceedingJoinPoint) = mono {
        dslContext.selectFrom(SAMPLE)
            .where(SAMPLE.ID.eq(id))
            .awaitFirstOrNull()?: throw InvalidSomethingException()
    }.then(pjp.proceed() as Mono<*>)
}

Spring Framework 6.1 (Spring Boot 3.2)での修正により、JoinPointがsuspend関数のとき、Publisherとして実行されるようになったのを利用しています。
runBlockingを用いて実行する方法もありますが、お勧めしません)

Spring Boot 3.2未満の場合は違う方法をとる必要がありますので、こちらをご参照ください。

https://zenn.dev/t45k/articles/316e03b9992ab9

トランザクション

R2DBCとjOOQを組み合わせた時のトランザクション管理は少々特殊であり、通常のSpring Frameworkの@Transactionalは使えなくなります。
jOOQの公式ドキュメントでは、R2DBCでのトランザクション管理は以下のように紹介されています。

dslContext.transactionCoroutine { configuration -> 
    configuration.dsl().insertInto(...)
    configuration.dsl().insertInto(...)
} // 例外が投げられたらロールバックする

https://www.jooq.org/doc/latest/manual/sql-execution/transaction-management/
https://www.jooq.org/doc/latest/manual/sql-building/kotlin-sql-building/kotlin-coroutines/

つまり、あるDSLContextのオブジェクトに対して最初にtransactionCoroutineを実行し、そのラムダ内で取得できるDSLContextのオブジェクトに対するクエリがロールバック対象になります。

問題点

この方法はSpring FrameworkのようにDIを多様する実装方法とは相性が悪いです。
というのも、通常はRepositoryクラス等にDSLContextのオブジェクトを直接DIするので、トランザクション対象の時だけ利用するDSLContextを切り替えるのが困難だからです。

例:

class SampleRepository(private val dslContext: DSLContext) {
    suspend fun selectById(id:Long) = dslContext.selectFrom(...)
}

class SampleUseCase(private val dslContext: DSLContext, sampleRepository: SampleRepository) {
    suspend fun do() {
        dslContext.transactionCoroutine { config ->
            // SampleRepository#selectById を config.dsl() を用いて実行できない
        }
    }
}

一つの対策としては全てのリポジトリのメソッドの引数にDSLContextを持たせることですが、実装量が増える上に、リポジトリのインターフェースをいわゆるドメイン層においている場合、実装の詳細が入り込むことになります。

解決策

これを回避するために、今回はCoroutineContextDSLContextのオブジェクトを持たす、という方針を取りました。
CoroutineContextwithContextを用いることで切り替えることができます。
通常は処理スレッド(Dispatchers.IOなど)を切り替えるために用いられることが多いですが、CoutineContext内に任意のオブジェクトを格納することで、withContext内だけ利用できるグローバル変数的に利用することもできます。

まずは、CoroutineContextDSLContextのオブジェクトを保持するためのコードを追加します。
Spring WebFluxはReactorを用いて実装されているので、Reactorのコード内でも参照できるようにReactorContextにオブジェクトを格納します。

private const val KEY = "org.jooq.DSLContext"

fun CoroutineContext.addDSLContext(dslContext: DSLContext): CoroutineContext {
    val reactorContext = this[ReactorContext]
    return if (reactorContext == null) {
        this + ReactorContext(Context.of(KEY, dslContext))
    } else {
        this + reactorContext.context.put(KEY, dslContext).asCoroutineContext()
    }
}

fun CoroutineContext.getDSLContext(): DSLContext? =
    this[ReactorContext]?.context?.getOrEmpty<DSLContext>(KEY)?.getOrNull()

次に、Spring FrameworkのTransactionalOperatorのように、ラムダ内でトランザクションを適用できるような実装を追加します。

@Component
class TransactionCoroutineOperator(private val dslContext: DSLContext) {
    suspend fun <T> execute(block: suspend CoroutineScope.() -> T): T {
        val propagatedDSLContext = coroutineContext.getDSLContext()
        return if (propagatedDSLContext != null) {
            execute(propagatedDSLContext, block)
        } else {
            execute(dslContext, block)
        }
    }

    private suspend fun <T> execute(dslContext: DSLContext, block: suspend CoroutineScope.() -> T): T =
        dslContext.transactionCoroutine { config ->
            withContext(coroutineContext.addDSLContext(config.dsl()), block)
        }
}

このままでは、リポジトリでCoroutineContext内のDSLContextオブジェクトを利用できません。
そこで今回は、TransactionAwareDSLContextというDSLContextをラップしたクラスを用意しました。
各リポジトリにこのクラスをDIし、これを経由してDSLContextのオブジェクトを取得するようにしました。

@Component
class TransactionAwareDSLContext(private val dslContext: DSLContext) {
    suspend fun get(): DSLContext = coroutineContext.getDSLContext() ?: this.dslContext
}

// リポジトリ実装
@Repository
class SampleRepository(private val dslContext: TransactionAwareDslContext) {
    // トランザクションが張られている(= CoroutineContext内にDSLContextオブジェクトがある)場合はそれを利用する
    // そうでない場合はTransactionAwareDSLContextにDIされているDSLContextオブジェクトを利用する
    suspend fun findById(id: Long): SampleRecord? = 
        dslContext.get().nonBlockingFetchOne(SAMPLE, SAMPLE.ID)
}

これにより、TransactionCoroutineOperator経由でトランザクションを実行することができます。

@Component
class UseCase(
    private val sampleRepository: SampleRepository,
    private val transactionCoroutineOperator: TransactionCoroutineOperator
) {
    fun exec(id: Long) {
        transactionCoroutineOperator.execute {
            sampleRepository.findById(id)
            ...
        }
    }
}

一方で、Spring Frameworkの@Transactionalのように、アノテーションを用いたトランザクション管理をしたい方もいると思います。
そこで、AOPを用いてそれと似たことをやる方法を紹介します。

@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionalCoroutine

@Aspect
@Component
class TransactionalCoroutineAspect(private val transactionCoroutineOperator: TransactionCoroutineOperator) {

    @Around("@annotation(com.moneyforward.consolidatedac.transaction.TransactionalCoroutine) && args(.., kotlin.coroutines.Continuation)")
    fun executeAnnotatedMethodInTransaction(joinPoint: ProceedingJoinPoint): Mono<*> = mono {
        transactionCoroutineOperator.execute {
            (joinPoint.proceed() as Mono<*>).awaitSingleOrNull()
        }
    }

    @Around("@within(com.moneyforward.consolidatedac.transaction.TransactionalCoroutine) && args(.., kotlin.coroutines.Continuation)")
    fun executeMethodWithinAnnotatedClassInTransaction(joinPoint: ProceedingJoinPoint): Mono<*> = mono {
        transactionCoroutineOperator.execute {
            (joinPoint.proceed() as Mono<*>).awaitSingleOrNull()
        }
    }
}

これにより、@TransactionalCoroutineを適用したクラスおよびメソッドにトランザクションを適用できます。

@Component
class UseCase(
    private val sampleRepository: SampleRepository,
    private val transactionCoroutineOperator: TransactionCoroutineOperator
) {
    fun exec(id: Long) {
        transactionCoroutineOperator.execute {
            sampleRepository.findById(id)
            ...
        }
    }
}

ネストしたトランザクションの扱いについては、こちらをご覧ください。
https://blog.jooq.org/nested-transactions-in-jooq/

ハマりポイント

batchに空リストを利用できない

jOOQではDSLContext#batchメソッドが用意されており、これにinsertやupdateクエリのリストを与えることでJDBCのバッチ操作を行えます。
また、空リストが与えられた時、何もしない、という処理になります。

R2DBCも同様にバッチ操作を行えるのですが、JDBCと異なりR2DBCの仕様であるSPIでは、クエリを与えずにバッチ操作を行なった時の動作は未定義となっています。
https://r2dbc.io/spec/1.0.0.RELEASE/spec/html/#batches.executing

どういう動作になるかはRDBMSで異なるのですが、MySQLを利用している場合、例外が投げられます。
ですので、batchメソッドを利用する場合、事前にクエリのリストが空でないことを確認するか、ArrowのNonEmptyListを利用するといった対応が必要になります。

R2DBCが名前解決に失敗する

JDBCと違いR2DBCにはNettyが使われています。
JDBCでは成功していた名前解決がNettyでは失敗する場合があるので、注意が必要です。
私達の場合、CI用のDBのURLに_が使われていたのが原因で名前解決に失敗しました。
(これはJavaの仕様のようです。)

DatadogのAPMからDB呼び出しの時間が取れない

現状、dd-trace-javaはR2DBCをサポートしていません。
https://github.com/DataDog/dd-trace-java/issues/4673

気長に待ちましょう。

最後に

ここまで読んでいただきありがとうございました。
今後R2DBCを導入する人の助けになれば幸いです。

マネーフォワード大阪開発拠点では常にメンバーを募集しています。
興味がある方は、ぜひカジュアル面談から応募していただければと思います。

https://hrmos.co/pages/moneyforward/jobs?category=1666323214451404802

Discussion