Kotlin × Spring WebFlux × jOOQなプロダクトにR2DBCを導入した話
こんにちは。株式会社マネーフォワードでバックエンドエンジニアをしているTaskです。
私達はマネーフォワードクラウド連結会計というプロダクトを担当しています。
このプロダクトでは主な技術スタックとしてKotlinとSpring Boot (Spring WebFlux) が用いられており、リアクティブシステムを意識して開発されています。
一方で、2年前の開発開始当初からDBドライバとしてJDBCが用いられており、DBアクセスのみブロッキング処理をしているという状況でした。
今回、2週間ほどかけてJDBCからR2DBCに移行したので、実装例やハマりポイントなどを紹介できればと思います。
利用ツールとバージョン
- Kotlin: 1.9.22
- Spring Boot: 3.2.2
- Spring Framework: 6.1.3
- jOOQ: 3.19.1
- MySQL
なぜJDBCからR2DBCに移行する必要があるのか
Spring WebFluxとJDBCを利用する理由はチームによってまちまちだと思います。
私達のチームは、主に以下の理由でした。
- 開発開始当初、Spring Frameworkに知見があるメンバーが少なく、R2DBCのキャッチアップまで手が回らなかった
- 開発開始当初、jOOQがR2DBCを十分にサポートしていなかった
- 開発開始当初、R2DBCのMySQLドライバが不安定だった
現在では上記の問題が解決されており(あるいは解決されつつあり)、反対にJDBCを使い続けるデメリットが目立つようになってきました。
- IOブロッキングによるパフォーマンスの問題
- トランザクションの問題
特にトランザクションの問題は実装方法に大きな影響を及ぼしていました。
Spring Frameworkは通常トランザクションの管理をThreadLocalを用いて行っているので、スレッドを切り替えて処理を行うSpring WebFluxでは専用の(= R2DBC用の)トランザクション管理を導入する必要があります。
したがって、トランザクションを適用するために、ThreadLocalを使える、つまりsuspend関数ではない通常の関数のみで構成されたクラスを作成し、そこに更新処理を詰め込む、という設計になっていました。
これらの問題を解決するために、今回はR2DBCの導入を決断しました。
導入
jOOQ のDSLContext
で R2DBC を使えるようにします。
まずは依存ライブラリとしてR2DBCドライバを追加します。
dependencies {
runtimeOnly("io.asyncer:r2dbc-mysql:1.0.5")
runtimeOnly("io.r2dbc:r2dbc-pool:1.0.1.RELEASE")
}
次に、Spring Boot用の設定を追加します。
r2dbc:
url: r2dbc:mysql://...
username: ...
password: ...
pool:
enabled: true
R2DBCのファクトリであるConnectionFactory
がDIできるようになるので、それを利用してDSLContext
のBeanを登録します。
@Configuration
class DSLContextConnection(private val connectionFactory: ConnectionFactory) {
@Bean
fun dslContext(): DSLContext = DSL.using(connectionFactory).dsl()
}
これにより、R2DBCが設定されたDSLContext
をDIを通して利用できるようになりました。
クエリ
R2DBCが設定されたことで、DSLContext
では以下のようなブロッキングなAPIを利用できなくなります。
-
fetch
/fetchOne
execute
ですので、ノンブロッキングなAPIを利用するように書き換えていく必要があります。
基本的には、jOOQの各クエリがReactorのPublisher
を返すようになるので、それらを適宜コルーチンで扱えるように変換していきます
fetch
/ fetchOne
こちらは、selectFrom
でほぼ同じことができます
// JDBC
fun selectById(id: Long): SampleRecord? = dslContext.fetchOne(
SAMPLE,
SAMPLE.ID.eq(id),
)
fun selectByName(name: String): List<SampleRecord> = dslContext.fetch(
SAMPLE,
SAMPLE.NAME.eq(name),
)
// R2DBC
suspend fun selectById(id: Long): SampleRecord? = dslContext.selectFrom(SAMPLE)
.where(SAMPLE.ID.eq(id))
.awaitFirstOrNull()
// Listを返したい場合
suspend fun selectByName(name: String): List<SampleRecord> = Flux.from(
dslContext.selectFrom(SAMPLE)
.where(SAMPLE.NAME.eq(name))
)
.collectList()
.awaitSingle()
// Flowを返したい場合
fun selectByName(name: String): Flow<SampleRecord> = dslContext.selectFrom(SAMPLE)
.where(SAMPLE.NAME.eq(name))
.asFlow()
また、以下のような拡張関数を用意すると、コード変更を一括でできるので便利です。
suspend inline fun <reified R : TableRecord<*>, T : TableImpl<R>> DSLContext.nonBlockingFetchOne(
tableImpl: T,
vararg condition: Condition
): R? = this.select()
.from(tableImpl)
.where(*condition)
.awaitFirstOrNull()
execute
前述した通り、jOOQで書いたクエリがPublisher
を返すようになるので、execute
の代わりにawaitXXX
拡張関数を用いてコルーチンとして扱えるようにしていきます。
- クエリが何も返さない、あるいは単一の要素のみを返す場合、
awaitSingleOrNull
を用います。 - クエリが複数の要素を返す場合、
Flux#collectLIst
でMono<List<T>
を返したあと、awaitSingle
でList
オブジェクトを取得します。Flow
で取得したい場合はasFlow
を用います。 - クエリが返す値に興味がない場合(
batch
系のAPIなど)は、awaitLast
で返り値を無視する、で大丈夫だと思います。
AOP
プロダクトによっては、AOPを用いて特定のメソッドの事前、事後で共通の処理を行うためにSpring AOPを利用していると思います。
また、そのAdvice内でDBアクセスすることもあります。
例:
@Aspect
@Component
class CheckAspect(private val dslContext: DSLContext) {
@Before("execution(...)")
fun checkSomething(jp: JoinPoint): Any? {
dslContext.fetchOne(
SAMPLE,
SAMPLE.ID.eq(id),
)?: throw InvalidSomethingException()
}
}
問題点
JDBCとは異なり、R2DBCを用いる場合はsuspend関数を用いる必要があるのですが、通常の方法ではsuspend関数をAdviceとして用いることはできません。
NG例(実行時にエラーになる):
@Aspect
@Component
class CheckAspect(private val dslContext: DSLContext) {
@Before("execution(...)")
suspend fun checkSomething(jp: JoinPoint): Any? {
dslContext.selectFrom(SAMPLE)
.where(SAMPLE.ID.eq(id))
.awaitFirstOrNull()?: throw InvalidSomethingException()
}
}
これはSpring Frameworkがsuspend関数をJavaのリフレクションを用いて実行しようとするために発生します。
また、suspend関数を実行するためにはContinuation
をAdviceの外から取得し、利用後に元の場所に戻す必要があるのですが、これは困難です。
解決策
このエラーを避けるために、@Before
や@After
の代わりに@Around
を用いて、JoinPointの実行に巻き込む方法を紹介します。
OK例:
@Aspect
@Component
class CheckAspect(private val dslContext: DSLContext) {
@Around("execution(...)")
suspend fun checkSomething(pjp: ProceedingJoinPoint) = mono {
dslContext.selectFrom(SAMPLE)
.where(SAMPLE.ID.eq(id))
.awaitFirstOrNull()?: throw InvalidSomethingException()
}.then(pjp.proceed() as Mono<*>)
}
Spring Framework 6.1 (Spring Boot 3.2)での修正により、JoinPointがsuspend関数のとき、Publisherとして実行されるようになったのを利用しています。
(runBlocking
を用いて実行する方法もありますが、お勧めしません)
Spring Boot 3.2未満の場合は違う方法をとる必要がありますので、こちらをご参照ください。
トランザクション
R2DBCとjOOQを組み合わせた時のトランザクション管理は少々特殊であり、通常のSpring Frameworkの@Transactional
は使えなくなります。
jOOQの公式ドキュメントでは、R2DBCでのトランザクション管理は以下のように紹介されています。
dslContext.transactionCoroutine { configuration ->
configuration.dsl().insertInto(...)
configuration.dsl().insertInto(...)
} // 例外が投げられたらロールバックする
つまり、あるDSLContext
のオブジェクトに対して最初にtransactionCoroutine
を実行し、そのラムダ内で取得できるDSLContext
のオブジェクトに対するクエリがロールバック対象になります。
問題点
この方法はSpring FrameworkのようにDIを多様する実装方法とは相性が悪いです。
というのも、通常はRepository
クラス等にDSLContext
のオブジェクトを直接DIするので、トランザクション対象の時だけ利用するDSLContext
を切り替えるのが困難だからです。
例:
class SampleRepository(private val dslContext: DSLContext) {
suspend fun selectById(id:Long) = dslContext.selectFrom(...)
}
class SampleUseCase(private val dslContext: DSLContext, sampleRepository: SampleRepository) {
suspend fun do() {
dslContext.transactionCoroutine { config ->
// SampleRepository#selectById を config.dsl() を用いて実行できない
}
}
}
一つの対策としては全てのリポジトリのメソッドの引数にDSLContext
を持たせることですが、実装量が増える上に、リポジトリのインターフェースをいわゆるドメイン層においている場合、実装の詳細が入り込むことになります。
解決策
これを回避するために、今回はCoroutineContext
にDSLContext
のオブジェクトを持たす、という方針を取りました。
CoroutineContext
はwithContext
を用いることで切り替えることができます。
通常は処理スレッド(Dispatchers.IO
など)を切り替えるために用いられることが多いですが、CoutineContext
内に任意のオブジェクトを格納することで、withContext
内だけ利用できるグローバル変数的に利用することもできます。
まずは、CoroutineContext
にDSLContext
のオブジェクトを保持するためのコードを追加します。
Spring WebFluxはReactorを用いて実装されているので、Reactorのコード内でも参照できるようにReactorContext
にオブジェクトを格納します。
private const val KEY = "org.jooq.DSLContext"
fun CoroutineContext.addDSLContext(dslContext: DSLContext): CoroutineContext {
val reactorContext = this[ReactorContext]
return if (reactorContext == null) {
this + ReactorContext(Context.of(KEY, dslContext))
} else {
this + reactorContext.context.put(KEY, dslContext).asCoroutineContext()
}
}
fun CoroutineContext.getDSLContext(): DSLContext? =
this[ReactorContext]?.context?.getOrEmpty<DSLContext>(KEY)?.getOrNull()
次に、Spring FrameworkのTransactionalOperator
のように、ラムダ内でトランザクションを適用できるような実装を追加します。
@Component
class TransactionCoroutineOperator(private val dslContext: DSLContext) {
suspend fun <T> execute(block: suspend CoroutineScope.() -> T): T {
val propagatedDSLContext = coroutineContext.getDSLContext()
return if (propagatedDSLContext != null) {
execute(propagatedDSLContext, block)
} else {
execute(dslContext, block)
}
}
private suspend fun <T> execute(dslContext: DSLContext, block: suspend CoroutineScope.() -> T): T =
dslContext.transactionCoroutine { config ->
withContext(coroutineContext.addDSLContext(config.dsl()), block)
}
}
このままでは、リポジトリでCoroutineContext
内のDSLContext
オブジェクトを利用できません。
そこで今回は、TransactionAwareDSLContext
というDSLContext
をラップしたクラスを用意しました。
各リポジトリにこのクラスをDIし、これを経由してDSLContext
のオブジェクトを取得するようにしました。
@Component
class TransactionAwareDSLContext(private val dslContext: DSLContext) {
suspend fun get(): DSLContext = coroutineContext.getDSLContext() ?: this.dslContext
}
// リポジトリ実装
@Repository
class SampleRepository(private val dslContext: TransactionAwareDslContext) {
// トランザクションが張られている(= CoroutineContext内にDSLContextオブジェクトがある)場合はそれを利用する
// そうでない場合はTransactionAwareDSLContextにDIされているDSLContextオブジェクトを利用する
suspend fun findById(id: Long): SampleRecord? =
dslContext.get().nonBlockingFetchOne(SAMPLE, SAMPLE.ID)
}
これにより、TransactionCoroutineOperator
経由でトランザクションを実行することができます。
@Component
class UseCase(
private val sampleRepository: SampleRepository,
private val transactionCoroutineOperator: TransactionCoroutineOperator
) {
fun exec(id: Long) {
transactionCoroutineOperator.execute {
sampleRepository.findById(id)
...
}
}
}
一方で、Spring Frameworkの@Transactional
のように、アノテーションを用いたトランザクション管理をしたい方もいると思います。
そこで、AOPを用いてそれと似たことをやる方法を紹介します。
@Target(AnnotationTarget.FUNCTION, AnnotationTarget.CLASS)
@Retention(AnnotationRetention.RUNTIME)
annotation class TransactionalCoroutine
@Aspect
@Component
class TransactionalCoroutineAspect(private val transactionCoroutineOperator: TransactionCoroutineOperator) {
@Around("@annotation(com.moneyforward.consolidatedac.transaction.TransactionalCoroutine) && args(.., kotlin.coroutines.Continuation)")
fun executeAnnotatedMethodInTransaction(joinPoint: ProceedingJoinPoint): Mono<*> = mono {
transactionCoroutineOperator.execute {
(joinPoint.proceed() as Mono<*>).awaitSingleOrNull()
}
}
@Around("@within(com.moneyforward.consolidatedac.transaction.TransactionalCoroutine) && args(.., kotlin.coroutines.Continuation)")
fun executeMethodWithinAnnotatedClassInTransaction(joinPoint: ProceedingJoinPoint): Mono<*> = mono {
transactionCoroutineOperator.execute {
(joinPoint.proceed() as Mono<*>).awaitSingleOrNull()
}
}
}
これにより、@TransactionalCoroutine
を適用したクラスおよびメソッドにトランザクションを適用できます。
@Component
class UseCase(
private val sampleRepository: SampleRepository,
private val transactionCoroutineOperator: TransactionCoroutineOperator
) {
fun exec(id: Long) {
transactionCoroutineOperator.execute {
sampleRepository.findById(id)
...
}
}
}
ネストしたトランザクションの扱いについては、こちらをご覧ください。
ハマりポイント
batchに空リストを利用できない
jOOQではDSLContext#batch
メソッドが用意されており、これにinsertやupdateクエリのリストを与えることでJDBCのバッチ操作を行えます。
また、空リストが与えられた時、何もしない、という処理になります。
R2DBCも同様にバッチ操作を行えるのですが、JDBCと異なりR2DBCの仕様であるSPIでは、クエリを与えずにバッチ操作を行なった時の動作は未定義となっています。
どういう動作になるかはRDBMSで異なるのですが、MySQLを利用している場合、例外が投げられます。
ですので、batchメソッドを利用する場合、事前にクエリのリストが空でないことを確認するか、ArrowのNonEmptyList
を利用するといった対応が必要になります。
R2DBCが名前解決に失敗する
JDBCと違いR2DBCにはNettyが使われています。
JDBCでは成功していた名前解決がNettyでは失敗する場合があるので、注意が必要です。
私達の場合、CI用のDBのURLに_
が使われていたのが原因で名前解決に失敗しました。
(これはJavaの仕様のようです。)
DatadogのAPMからDB呼び出しの時間が取れない
現状、dd-trace-javaはR2DBCをサポートしていません。
気長に待ちましょう。
最後に
ここまで読んでいただきありがとうございました。
今後R2DBCを導入する人の助けになれば幸いです。
マネーフォワード大阪開発拠点では常にメンバーを募集しています。
興味がある方は、ぜひカジュアル面談から応募していただければと思います。
Discussion