KotlinとSpring BootでAtomikosを使って分散トランザクションを実践する
概要
複数のデータベースを使用するシステムでは、データの整合性を確保するために分散トランザクション管理が重要な課題となります。
この記事では以下の内容をざっくりと解説していきます。
- 分散トランザクションの基本概念と課題
- Atomikosとは何か、なぜ必要なのか
- KotlinとSpring Boot 3を使ったAtomikosの実装手順
- 具体的な実装例と動作検証
1. はじめに
1.1 分散トランザクションとは?
分散トランザクションとは、複数のデータソースにまたがるトランザクション処理のことです。
例えば、以下のような状況で必要となってきます。
- 複数のデータベースにデータを書き込む必要がある場合
- データベースとメッセージキューの両方に整合性を持って更新する場合
- マイクロサービスアーキテクチャで複数のサービスをまたぐ処理を行う場合
通常のローカルトランザクションと異なり、分散トランザクションでは「すべてのリソースが成功するか、すべて失敗するか」という原子性を保証するために特別な仕組みが必要になります。
分散トランザクションのシーケンスフロー
以下は複数のデータベースに書き込む場合の分散トランザクションの流れを示したシーケンス図です。
1.2 Atomikos とは
Atomikosは、分散トランザクションを管理するためのJavaベースのトランザクションマネージャーです。特にXAトランザクション(2フェーズコミットプロトコルを使用した分散トランザクション)の管理を得意としています。
2フェーズコミットプロトコル
Atomikosの核となる技術が2フェーズコミットプロトコルです。これは複数のリソースにまたがるトランザクションの整合性を保証するための手法です。
特徴
-
XA対応の分散トランザクション
- 複数のデータベース(PostgreSQL、MySQL、Oracleなど)やメッセージキューにまたがるトランザクションを統合的に管理
- 2フェーズコミットプロトコルによる確実なトランザクション管理
-
Java EEアプリケーションに依存しない
- Java SE環境やSpring Bootなどの軽量フレームワークでも使用可能
- 重いアプリケーションサーバーを必要としない柔軟な運用
-
軽量で使いやすい
- JTA(Java Transaction API)を実装した標準的なインターフェース
- Spring Bootとの統合が容易
- 設定がシンプルで導入の障壁が低い
2. 環境構築
実際にAtomikosを使って分散トランザクション管理を実装する環境を構築していきます。
完全なソースコードはGithubで公開していますので、詳細はそちらをご参照ください。
2.1 使用技術スタック
技術 | バージョン | 用途 |
---|---|---|
Kotlin | 2.0.10 | 開発言語 |
Spring Boot | 3.4.1 | アプリケーションフレームワーク |
Atomikos | 6.0.0 | 分散トランザクション管理 |
PostgreSQL | 16 | データベース |
Docker | - | 環境構築・実行環境 |
2.2 Docker Composeによる環境構築
アプリケーションとデータベースの2つのコンテナを用意します。
name: 'kotlin-distributed-transaction'
services:
app:
container_name: kotlin-distributed-transaction-app-container
image: kotlin-distributed-transaction/app:dev
build:
context: .
dockerfile: ./infra/docker/app/Dockerfile
target: production
depends_on:
- db
networks:
- default
ports:
- '8080:8080'
db:
container_name: kotlin-distributed-transaction-db-container
image: kotlin-distributed-transaction/db:dev
build:
context: .
dockerfile: ./infra/docker/db/Dockerfile
restart: always
ports:
- '5432:5432'
environment:
- POSTGRES_USER=${POSTGRES_DB_USER:-test}
- POSTGRES_PASSWORD=${POSTGRES_DB_PASS:-test}
volumes:
- db-data-volume:/var/lib/postgresql/data
- ./infra/docker/db/init:/docker-entrypoint-initdb.d/
- ./infra/docker/db/config/postgresql.conf:/etc/postgresql/postgresql.conf
networks:
- default
command: ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"]
networks:
default:
name: kotlin-distributed-transaction
volumes:
db-data-volume:
name: kotlin-distributed-transaction-db-data
driver: local
2.3 PostgreSQLの設定
XAトランザクションを有効にするには、PostgreSQLの設定を変更する必要があります。postgresql.conf
に以下の設定を追加しておきます。
listen_addresses = '*'
max_connections = 100
max_prepared_transactions = 100 # XAトランザクションに必要
💡 重要ポイント:
max_prepared_transactions
はデフォルトで0(無効)になっています。XAトランザクションを使用するには必ず1以上の値を設定してください。この値は、同時に2フェーズコミットのPrepared状態にできるトランザクションの最大数を示します。
3. アプリケーションの構成
今回作成したアプリケーションの全体構成です。
app
├── build.gradle.kts
└── src
└── main
├── kotlin
│ └── com
│ └── sample
│ └── transaction
│ ├── TransactionApplication.kt
│ ├── core
│ │ └── domain
│ │ └── repository # リポジトリのインターフェース
│ ├── infrastructure
│ │ ├── db # DB/TransactionManagerの設定
│ │ └── repository # 実装クラス
│ ├── presentation
│ │ └── controller # APIコントローラー
│ └── usecase # ユースケース
└── resources
└── application.yaml
このアプリケーションでは、以下の4つのAPIエンドポイントを用意し、分散トランザクションの動作を検証します。
エンドポイント | 説明 |
---|---|
/sample/not-distributed-transaction |
通常のトランザクション(正常系) |
/sample/failed-not-distributed-transaction |
通常のトランザクション(異常系) |
/sample/distributed-transaction |
Atomikosによる分散トランザクション(正常系) |
/sample/failed-distributed-transaction |
Atomikosによる分散トランザクション(異常系) |
4. 依存関係の設定
分散トランザクションの実装に必要なライブラリをbuild.gradle.kts
に追加します。
dependencies {
// Spring Boot
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-jdbc")
// Atomikos
implementation("com.atomikos:transactions-spring-boot3-starter:6.0.0")
implementation("jakarta.transaction:jakarta.transaction-api:2.0.1")
// Database
implementation("org.postgresql:postgresql:42.7.4")
// Kotlin
implementation("org.jetbrains.kotlin:kotlin-reflect")
}
主要な依存関係の役割
- transactions-spring-boot3-starter: Spring Boot 3と連携するAtomikosのスターター
- jakarta.transaction-api: JTAトランザクションAPIの実装
- postgresql: PostgreSQLのJDBCドライバ
5. Atomikosの設定
5.1 application.yamlの設定
spring:
jta:
enabled: true # JTAトランザクションを有効化
datasource:
first:
jdbcUrl: jdbc:postgresql://db:5432/first_db
username: test
password: test
driverClassName: org.postgresql.Driver
second:
jdbcUrl: jdbc:postgresql://db:5432/second_db
username: test
password: test
driverClassName: org.postgresql.Driver
logging:
level:
com:
atomikos: DEBUG # Atomikosのログを詳細に出力
root: INFO
5.2 DataSourceとTransactionManagerの設定
DataSourceの設定
@Configuration
@EnableConfigurationProperties(DataSourceProperties::class)
class JdbcConfig(private val dataSourceProperties: DataSourceProperties) {
// 通常のDataSource(非分散トランザクション用)
@Bean("firstDataSource1")
@ConfigurationProperties("spring.datasource.first")
fun createFirstDataSource1(): DataSource = DataSourceBuilder.create().build()
@Bean("secondDataSource1")
@ConfigurationProperties("spring.datasource.second")
fun createSecondDataSource1(): DataSource = DataSourceBuilder.create().build()
// XA対応DataSource(分散トランザクション用)
@Bean("firstDataSource2")
fun createFirstDataSource2(): DataSource {
return createXADataSource(
"firstDataSource",
dataSourceProperties.first.jdbcUrl,
dataSourceProperties.first.username,
dataSourceProperties.first.password,
)
}
@Bean("secondDataSource2")
fun createSecondDataSource2(): DataSource {
return createXADataSource(
"secondDataSource",
dataSourceProperties.second.jdbcUrl,
dataSourceProperties.second.username,
dataSourceProperties.second.password,
)
}
// XA対応DataSourceの作成ヘルパーメソッド
private fun createXADataSource(resourceName: String, url: String, user: String, password: String): DataSource {
// PostgreSQLのXA対応DataSourceを作成
val pgxaDataSource = PGXADataSource().apply {
setUrl(url)
this.user = user
this.password = password
}
// AtomikosのDataSourceラッパーを作成
return AtomikosDataSourceBean().apply {
uniqueResourceName = resourceName
xaDataSource = pgxaDataSource
}
}
}
TransactionManagerの設定
@Configuration
class TransactionManagerConfig {
// ユーザートランザクション(JTA用)
@Bean
fun userTransaction(): UserTransactionImp = UserTransactionImp()
// トランザクションマネージャー(Atomikos)
@Bean
fun userTransactionManager(): UserTransactionManager {
val userTransactionManager = UserTransactionManager()
userTransactionManager.init()
return userTransactionManager
}
// Spring用プラットフォームトランザクションマネージャー
@Bean
fun transactionManager(): PlatformTransactionManager =
JtaTransactionManager(userTransaction(), userTransactionManager())
}
6. 実装例
ここでは、分散トランザクションの実装例と、通常のトランザクション実装との違いを見ていきます。
6.1 コントローラーの実装
@RestController
@RequestMapping("sample")
@CrossOrigin
class DistributedTransactionController(
private val useCase: DistributedTransactionUseCase,
) {
@GetMapping("/distributed-transaction")
operator fun invoke() {
useCase()
}
}
6.2 ユースケースの実装
@Service
@Transactional // JTAトランザクションを使用
class DistributedTransactionUseCase(
private val repository: DistributedTransactionRepository,
) {
operator fun invoke() {
repository.register()
}
}
6.3 リポジトリの実装
インターフェース
interface DistributedTransactionRepository {
fun register()
}
実装クラス(分散トランザクション)
@Repository
class DistributedTransactionRepositoryImpl(
@Qualifier("firstDataSource2") // XA対応DataSource
private val firstDataSource: DataSource,
@Qualifier("secondDataSource2") // XA対応DataSource
private val secondDataSource: DataSource,
) : DistributedTransactionRepository {
override fun register() {
val ulid = ULID.randomULID() // 一意のID生成
// 1つ目のデータベースに挿入
val firstDBJdbc = NamedParameterJdbcTemplate(firstDataSource)
val firstDBSql = "INSERT INTO first_table(id) VALUES (:id)"
val firstDBParams = mutableMapOf<String, Any>()
firstDBParams["id"] = ulid
firstDBJdbc.update(firstDBSql, firstDBParams)
// 2つ目のデータベースに挿入
val secondDBJdbc = NamedParameterJdbcTemplate(secondDataSource)
val secondDBSql = "INSERT INTO second_table(id) VALUES (:id)"
val secondDBParams = mutableMapOf<String, Any>()
secondDBParams["id"] = ulid
secondDBJdbc.update(secondDBSql, secondDBParams)
}
}
実装クラス(異常系)
エラーを発生させるバージョンでは、次のように例外を発生させます。
override fun register() {
val ulid = ULID.randomULID()
// 1つ目のDBへの挿入
val firstDBJdbc = NamedParameterJdbcTemplate(firstDataSource)
val firstDBSql = "INSERT INTO first_table(id) VALUES (:id)"
val firstDBParams = mutableMapOf<String, Any>()
firstDBParams["id"] = ulid
firstDBJdbc.update(firstDBSql, firstDBParams)
// 意図的に例外を発生させる
throw RuntimeException("error")
// この処理は実行されない
val secondDBJdbc = NamedParameterJdbcTemplate(secondDataSource)
// ...
}
7. 動作検証
実装したAPIを使って動作検証を行いました。通常のトランザクションと分散トランザクションの違いを視覚的に確認してみます。
通常トランザクションと分散トランザクションの比較
7.1 通常トランザクション(非分散)の場合
シナリオ | 結果 |
---|---|
正常系 | 両方のDBに正常に書き込まれる |
異常系 | first_dbには書き込まれるが、second_dbには書き込まれない(データ不整合が発生) |
通常トランザクション(正常系)
通常トランザクションの正常系の確認をするため、curlなどを使いAPIを叩きます。
$ curl http://localhost:8080/sample/not-distributed-transaction
通常トランザクションが正常に完了するケースでは、シンプルな流れで処理が完結します。
- 処理の流れ
- トランザクション作成
- データベース接続プールの初期化
- 処理実行(ログには表示されていない)
- コミット処理
ログの例
createCompositeTransaction ( 10000 ): created new ROOT transaction with id 172.20.0.3.tm174391267223700002
HikariPool-1 - Starting...
HikariPool-1 - Added connection org.postgresql.jdbc.PgConnection@6ed41df8
HikariPool-1 - Start completed.
HikariPool-2 - Starting...
HikariPool-2 - Added connection org.postgresql.jdbc.PgConnection@7a6e850f
HikariPool-2 - Start completed.
commit() done (by application) of transaction 172.20.0.3.tm174391267223700002
もちろん両方のデータベースには同じIDのデータが登録される結果となります。
first_db=# select * from first_table;
id
----------------------------
01JR4P3Y6KKNVPE6P1WT1MHG5Y
(1 row)
second_db=# select * from second_table;
id
----------------------------
01JR4P3Y6KKNVPE6P1WT1MHG5Y
(1 row)
通常トランザクション(異常系)
次に通常トランザクションの異常系の確認をするため、APIを実行します。
$ curl http://localhost:8080/sample/failed-not-distributed-transaction
エラーが発生した場合、トランザクションはロールバックされます。
- 処理の流れ
- トランザクション作成
- エラー発生
- ロールバック処理(2回記録されている)
- 例外のスロー
ログの例
createCompositeTransaction ( 10000 ): created new ROOT transaction with id 172.20.0.3.tm174391287916300004
rollback() done of transaction 172.20.0.3.tm174391287916300004
rollback() done of transaction 172.20.0.3.tm174391287916300004
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.RuntimeException: error] with root cause
しかし、データベースの結果をみるとfirst_db
に書き込みした内容はロールバックされておらず整合性が崩れてしまっていることが確認できます。
first_db=# select * from first_table;
id
----------------------------
01JR4MXH1WCYTA7RXQEE6FTQHH
(1 row)
second_db=# select * from second_table;
id
----
(0 rows)
7.2 分散トランザクション(Atomikos)の場合
シナリオ | 結果 |
---|---|
正常系 | 両方のDBに正常に書き込まれる |
異常系 | 両方のDBともロールバックされ、一切書き込まれない(データ整合性が保たれる) |
分散トランザクション(正常系)
では分散トランザクションの正常系を検証していきます。
$ curl http://localhost:8080/sample/distributed-transaction
分散トランザクションでは、複数のデータソースを一つのトランザクションとして扱います。正常系では2フェーズコミットが実行されます。
- 処理の流れ
- トランザクション作成
- 第1データソース(firstDataSource)の処理
- 接続取得と初期化
- XAResourceとしての登録
- SQL実行(INSERT)
- 第2データソース(secondDataSource)の処理
- 接続取得と初期化
- XAResourceとしての登録
- SQL実行(INSERT)
- 各コネクションのクローズ処理
- 2フェーズコミット実行
- Prepare段階(各リソースの準備確認)
- Commit段階(確定処理)
- リソースのリフレッシュ
ログを確認すると、分散トランザクション時に2フェーズコミットが実行されていることが分かります。
createCompositeTransaction ( 10000 ): created new ROOT transaction with id 172.20.0.3.tm174391233628700001
firstDataSource: getConnection()...
firstDataSource: init...
[...]
addParticipant ( XAResourceTransaction: XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D31 ) for transaction 172.20.0.3.tm174391233628700001
XAResource.start ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D31 , XAResource.TMNOFLAGS ) on resource firstDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69cd357b
[...]
atomikosJdbcStatementProxy for vendor instance Pooled statement wrapping physical statement INSERT INTO first_table(id) VALUES (('01JR4MCYXGDG4R3K2F06016JS4')): calling proxied close
[...]
secondDataSource: getConnection()...
secondDataSource: init...
[...]
atomikosJdbcStatementProxy for vendor instance Pooled statement wrapping physical statement INSERT INTO second_table(id) VALUES (('01JR4MCYXGDG4R3K2F06016JS4')): calling proxied close
[...]
commit() done (by application) of transaction 172.20.0.3.tm174391233628700001
XAResource.prepare ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D31 ) returning OK on resource firstDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69cd357b
XAResource.prepare ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D32 ) returning OK on resource secondDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69e550fa
XAResource.commit ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D31 , false ) on resource firstDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69cd357b
XAResource.commit ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D32 , false ) on resource secondDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69e550fa
firstDataSource: refreshed XAResource
secondDataSource: refreshed XAResource
もちろん両方のデータベースには同じIDのデータが登録されています。
first_db=# select * from first_table;
id
----------------------------
01JR4MCYXGDG4R3K2F06016JS4
(1 row)
second_db=# select * from second_table;
id
----------------------------
01JR4MCYXGDG4R3K2F06016JS4
(1 row)
分散トランザクション(異常系)
では分散トランザクションの異常系を検証していきます。
$ curl http://localhost:8080/sample/failed-distributed-transaction
分散トランザクションでエラーが発生した場合、参加しているすべてのリソースがロールバックされます。
- 処理の流れ
- トランザクション作成
- 第1データソース(firstDataSource)の処理
- 接続取得と初期化
- XAResourceとしての登録
- SQL実行(INSERT)
- エラー発生(第2データソース処理前)
- XAResourceのロールバック処理
- トランザクション全体のロールバック
- 例外のスロー
エラー時には次のようなログが出力されます。
createCompositeTransaction ( 10000 ): created new ROOT transaction with id 172.20.0.3.tm174391276233200003
firstDataSource: getConnection()...
firstDataSource: init...
atomikosJdbcConnectionProxy (state = sessionHandleState (1 context(s), isTerminated = false) for resource firstDataSource) for vendor instance Pooled connection wrapping physical connection org.postgresql.jdbc.PgConnection@3e04e4ab: calling proxied prepareStatement(INSERT INTO first_table(id) VALUES (?))
[...]
atomikosJdbcStatementProxy for vendor instance Pooled statement wrapping physical statement INSERT INTO first_table(id) VALUES (('01JR4MSYYX27T9B7NT9VA2W41A')): calling proxied close
[...]
XAResource.rollback ( XID: 3137322E32302E302E332E746D313734333931323736323333323030303033:3137322E32302E302E332E746D33 ) on resource firstDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69cd357b
rollback() done of transaction 172.20.0.3.tm174391276233200003
rollback() done of transaction 172.20.0.3.tm174391276233200003
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.RuntimeException: error] with root cause
両方のデータベースを確認するとfirst_db
の取得結果がないためロールバックがちゃんとされていることが確認できます。
first_db=# select * from first_table;
id
----
(0 rows)
second_db=# select * from second_table;
id
----
(0 rows)
8. まとめ
分散トランザクションの必要性
複数のデータベースやリソースを使用するシステムでは、データの整合性を保つために分散トランザクション管理が不可欠です。
従来の単一トランザクションでは、異なるリソース間での整合性を保証できません。
Atomikosの利点
- Spring Boot 3との簡単な統合
- 2フェーズコミットによる確実なトランザクション管理
- アプリケーションサーバーを必要としない軽量な実装
注意点
- パフォーマンスオーバーヘッド: 2フェーズコミットは通常のトランザクションより若干遅い
- PostgreSQLの設定:
max_prepared_transactions
の設定が必須 - 適切なXA対応DataSourceの使用: 通常のDataSourceと使い分けることが重要
分散トランザクションは強力ですが、マイクロサービスやイベント駆動アーキテクチャでは、Saga パターンなどの最終的整合性アプローチも検討する価値があります。
用途に合わせた適切な選択が重要です。
9. おわりに
本記事では、分散トランザクションの基本から、Atomikosによる実装、注意点までを紹介しました。
今後のシステム設計・開発において、信頼性の高いトランザクション処理を実現するための参考になれば幸いです。
また、本記事で実装した完全なソースコードはGithubで公開していますので、ご参照ください。
Discussion