🔄

KotlinとSpring BootでAtomikosを使って分散トランザクションを実践する

に公開

概要

複数のデータベースを使用するシステムでは、データの整合性を確保するために分散トランザクション管理が重要な課題となります。
この記事では以下の内容をざっくりと解説していきます。

  • 分散トランザクションの基本概念と課題
  • Atomikosとは何か、なぜ必要なのか
  • KotlinとSpring Boot 3を使ったAtomikosの実装手順
  • 具体的な実装例と動作検証

1. はじめに

1.1 分散トランザクションとは?

分散トランザクションとは、複数のデータソースにまたがるトランザクション処理のことです。
例えば、以下のような状況で必要となってきます。

  • 複数のデータベースにデータを書き込む必要がある場合
  • データベースとメッセージキューの両方に整合性を持って更新する場合
  • マイクロサービスアーキテクチャで複数のサービスをまたぐ処理を行う場合

通常のローカルトランザクションと異なり、分散トランザクションでは「すべてのリソースが成功するか、すべて失敗するか」という原子性を保証するために特別な仕組みが必要になります。

分散トランザクションのシーケンスフロー

以下は複数のデータベースに書き込む場合の分散トランザクションの流れを示したシーケンス図です。

1.2 Atomikos とは

Atomikosは、分散トランザクションを管理するためのJavaベースのトランザクションマネージャーです。特にXAトランザクション(2フェーズコミットプロトコルを使用した分散トランザクション)の管理を得意としています。

2フェーズコミットプロトコル

Atomikosの核となる技術が2フェーズコミットプロトコルです。これは複数のリソースにまたがるトランザクションの整合性を保証するための手法です。

特徴

  • XA対応の分散トランザクション

    • 複数のデータベース(PostgreSQL、MySQL、Oracleなど)やメッセージキューにまたがるトランザクションを統合的に管理
    • 2フェーズコミットプロトコルによる確実なトランザクション管理
  • Java EEアプリケーションに依存しない

    • Java SE環境やSpring Bootなどの軽量フレームワークでも使用可能
    • 重いアプリケーションサーバーを必要としない柔軟な運用
  • 軽量で使いやすい

    • JTA(Java Transaction API)を実装した標準的なインターフェース
    • Spring Bootとの統合が容易
    • 設定がシンプルで導入の障壁が低い

2. 環境構築

実際にAtomikosを使って分散トランザクション管理を実装する環境を構築していきます。

完全なソースコードはGithubで公開していますので、詳細はそちらをご参照ください。

2.1 使用技術スタック

技術 バージョン 用途
Kotlin 2.0.10 開発言語
Spring Boot 3.4.1 アプリケーションフレームワーク
Atomikos 6.0.0 分散トランザクション管理
PostgreSQL 16 データベース
Docker - 環境構築・実行環境

2.2 Docker Composeによる環境構築

アプリケーションとデータベースの2つのコンテナを用意します。

name: 'kotlin-distributed-transaction'

services:
  app:
    container_name: kotlin-distributed-transaction-app-container
    image: kotlin-distributed-transaction/app:dev
    build:
      context: .
      dockerfile: ./infra/docker/app/Dockerfile
      target: production
    depends_on:
      - db
    networks:
      - default
    ports:
      - '8080:8080'

  db:
    container_name: kotlin-distributed-transaction-db-container
    image: kotlin-distributed-transaction/db:dev
    build:
      context: .
      dockerfile: ./infra/docker/db/Dockerfile
    restart: always
    ports:
      - '5432:5432'
    environment:
      - POSTGRES_USER=${POSTGRES_DB_USER:-test}
      - POSTGRES_PASSWORD=${POSTGRES_DB_PASS:-test}
    volumes:
      - db-data-volume:/var/lib/postgresql/data
      - ./infra/docker/db/init:/docker-entrypoint-initdb.d/
      - ./infra/docker/db/config/postgresql.conf:/etc/postgresql/postgresql.conf
    networks:
      - default
    command: ["postgres", "-c", "config_file=/etc/postgresql/postgresql.conf"]

networks:
  default:
    name: kotlin-distributed-transaction

volumes:
  db-data-volume:
    name: kotlin-distributed-transaction-db-data
    driver: local

2.3 PostgreSQLの設定

XAトランザクションを有効にするには、PostgreSQLの設定を変更する必要があります。postgresql.confに以下の設定を追加しておきます。

listen_addresses = '*'
max_connections = 100
max_prepared_transactions = 100  # XAトランザクションに必要

💡 重要ポイント: max_prepared_transactionsはデフォルトで0(無効)になっています。XAトランザクションを使用するには必ず1以上の値を設定してください。この値は、同時に2フェーズコミットのPrepared状態にできるトランザクションの最大数を示します。

3. アプリケーションの構成

今回作成したアプリケーションの全体構成です。

app
 ├── build.gradle.kts
 └── src
     └── main
       ├── kotlin
       │ └── com
       │     └── sample
       │         └── transaction
       │             ├── TransactionApplication.kt
       │             ├── core
       │             │ └── domain
       │             │     └── repository  # リポジトリのインターフェース
       │             ├── infrastructure
       │             │ ├── db             # DB/TransactionManagerの設定
       │             │ └── repository     # 実装クラス
       │             ├── presentation
       │             │ └── controller     # APIコントローラー
       │             └── usecase          # ユースケース
       └── resources
           └── application.yaml

このアプリケーションでは、以下の4つのAPIエンドポイントを用意し、分散トランザクションの動作を検証します。

エンドポイント 説明
/sample/not-distributed-transaction 通常のトランザクション(正常系)
/sample/failed-not-distributed-transaction 通常のトランザクション(異常系)
/sample/distributed-transaction Atomikosによる分散トランザクション(正常系)
/sample/failed-distributed-transaction Atomikosによる分散トランザクション(異常系)

4. 依存関係の設定

分散トランザクションの実装に必要なライブラリをbuild.gradle.ktsに追加します。

dependencies {
    // Spring Boot
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-jdbc")
    
    // Atomikos
    implementation("com.atomikos:transactions-spring-boot3-starter:6.0.0")
    implementation("jakarta.transaction:jakarta.transaction-api:2.0.1")
    
    // Database
    implementation("org.postgresql:postgresql:42.7.4")
    
    // Kotlin
    implementation("org.jetbrains.kotlin:kotlin-reflect")
}

主要な依存関係の役割

  • transactions-spring-boot3-starter: Spring Boot 3と連携するAtomikosのスターター
  • jakarta.transaction-api: JTAトランザクションAPIの実装
  • postgresql: PostgreSQLのJDBCドライバ

5. Atomikosの設定

5.1 application.yamlの設定

spring:
  jta:
    enabled: true  # JTAトランザクションを有効化
  datasource:
    first:
      jdbcUrl: jdbc:postgresql://db:5432/first_db
      username: test
      password: test
      driverClassName: org.postgresql.Driver
    second:
      jdbcUrl: jdbc:postgresql://db:5432/second_db
      username: test
      password: test
      driverClassName: org.postgresql.Driver

logging:
  level:
    com:
      atomikos: DEBUG  # Atomikosのログを詳細に出力
    root: INFO

5.2 DataSourceとTransactionManagerの設定

DataSourceの設定

@Configuration
@EnableConfigurationProperties(DataSourceProperties::class)
class JdbcConfig(private val dataSourceProperties: DataSourceProperties) {
    // 通常のDataSource(非分散トランザクション用)
    @Bean("firstDataSource1")
    @ConfigurationProperties("spring.datasource.first")
    fun createFirstDataSource1(): DataSource = DataSourceBuilder.create().build()

    @Bean("secondDataSource1")
    @ConfigurationProperties("spring.datasource.second")
    fun createSecondDataSource1(): DataSource = DataSourceBuilder.create().build()

    // XA対応DataSource(分散トランザクション用)
    @Bean("firstDataSource2")
    fun createFirstDataSource2(): DataSource {
        return createXADataSource(
            "firstDataSource",
            dataSourceProperties.first.jdbcUrl,
            dataSourceProperties.first.username,
            dataSourceProperties.first.password,
        )
    }

    @Bean("secondDataSource2")
    fun createSecondDataSource2(): DataSource {
        return createXADataSource(
            "secondDataSource",
            dataSourceProperties.second.jdbcUrl,
            dataSourceProperties.second.username,
            dataSourceProperties.second.password,
        )
    }

    // XA対応DataSourceの作成ヘルパーメソッド
    private fun createXADataSource(resourceName: String, url: String, user: String, password: String): DataSource {
        // PostgreSQLのXA対応DataSourceを作成
        val pgxaDataSource = PGXADataSource().apply {
            setUrl(url)
            this.user = user
            this.password = password
        }

        // AtomikosのDataSourceラッパーを作成
        return AtomikosDataSourceBean().apply {
            uniqueResourceName = resourceName
            xaDataSource = pgxaDataSource
        }
    }
}

TransactionManagerの設定

@Configuration
class TransactionManagerConfig {
    // ユーザートランザクション(JTA用)
    @Bean
    fun userTransaction(): UserTransactionImp = UserTransactionImp()

    // トランザクションマネージャー(Atomikos)
    @Bean
    fun userTransactionManager(): UserTransactionManager {
        val userTransactionManager = UserTransactionManager()
        userTransactionManager.init()
        return userTransactionManager
    }

    // Spring用プラットフォームトランザクションマネージャー
    @Bean
    fun transactionManager(): PlatformTransactionManager =
        JtaTransactionManager(userTransaction(), userTransactionManager())
}

6. 実装例

ここでは、分散トランザクションの実装例と、通常のトランザクション実装との違いを見ていきます。

6.1 コントローラーの実装

@RestController
@RequestMapping("sample")
@CrossOrigin
class DistributedTransactionController(
    private val useCase: DistributedTransactionUseCase,
) {
    @GetMapping("/distributed-transaction")
    operator fun invoke() {
        useCase()
    }
}

6.2 ユースケースの実装

@Service
@Transactional  // JTAトランザクションを使用
class DistributedTransactionUseCase(
    private val repository: DistributedTransactionRepository,
) {
    operator fun invoke() {
        repository.register()
    }
}

6.3 リポジトリの実装

インターフェース

interface DistributedTransactionRepository {
    fun register()
}

実装クラス(分散トランザクション)

@Repository
class DistributedTransactionRepositoryImpl(
    @Qualifier("firstDataSource2")  // XA対応DataSource
    private val firstDataSource: DataSource,
    @Qualifier("secondDataSource2")  // XA対応DataSource
    private val secondDataSource: DataSource,
) : DistributedTransactionRepository {
    override fun register() {
        val ulid = ULID.randomULID()  // 一意のID生成

        // 1つ目のデータベースに挿入
        val firstDBJdbc = NamedParameterJdbcTemplate(firstDataSource)
        val firstDBSql = "INSERT INTO first_table(id) VALUES (:id)"
        val firstDBParams = mutableMapOf<String, Any>()
        firstDBParams["id"] = ulid

        firstDBJdbc.update(firstDBSql, firstDBParams)

        // 2つ目のデータベースに挿入
        val secondDBJdbc = NamedParameterJdbcTemplate(secondDataSource)
        val secondDBSql = "INSERT INTO second_table(id) VALUES (:id)"
        val secondDBParams = mutableMapOf<String, Any>()
        secondDBParams["id"] = ulid

        secondDBJdbc.update(secondDBSql, secondDBParams)
    }
}

実装クラス(異常系)

エラーを発生させるバージョンでは、次のように例外を発生させます。

override fun register() {
    val ulid = ULID.randomULID()

    // 1つ目のDBへの挿入
    val firstDBJdbc = NamedParameterJdbcTemplate(firstDataSource)
    val firstDBSql = "INSERT INTO first_table(id) VALUES (:id)"
    val firstDBParams = mutableMapOf<String, Any>()
    firstDBParams["id"] = ulid

    firstDBJdbc.update(firstDBSql, firstDBParams)

    // 意図的に例外を発生させる
    throw RuntimeException("error")

    // この処理は実行されない
    val secondDBJdbc = NamedParameterJdbcTemplate(secondDataSource)
    // ...
}

7. 動作検証

実装したAPIを使って動作検証を行いました。通常のトランザクションと分散トランザクションの違いを視覚的に確認してみます。

通常トランザクションと分散トランザクションの比較

7.1 通常トランザクション(非分散)の場合

シナリオ 結果
正常系 両方のDBに正常に書き込まれる
異常系 first_dbには書き込まれるが、second_dbには書き込まれない(データ不整合が発生

通常トランザクション(正常系)

通常トランザクションの正常系の確認をするため、curlなどを使いAPIを叩きます。

$ curl http://localhost:8080/sample/not-distributed-transaction

通常トランザクションが正常に完了するケースでは、シンプルな流れで処理が完結します。

  • 処理の流れ
    1. トランザクション作成
    2. データベース接続プールの初期化
    3. 処理実行(ログには表示されていない)
    4. コミット処理

ログの例

createCompositeTransaction ( 10000 ): created new ROOT transaction with id 172.20.0.3.tm174391267223700002
HikariPool-1 - Starting...
HikariPool-1 - Added connection org.postgresql.jdbc.PgConnection@6ed41df8
HikariPool-1 - Start completed.
HikariPool-2 - Starting...
HikariPool-2 - Added connection org.postgresql.jdbc.PgConnection@7a6e850f
HikariPool-2 - Start completed.
commit() done (by application) of transaction 172.20.0.3.tm174391267223700002

もちろん両方のデータベースには同じIDのデータが登録される結果となります。

first_db=# select * from first_table;
             id
----------------------------
 01JR4P3Y6KKNVPE6P1WT1MHG5Y
(1 row)

second_db=# select * from second_table;
             id
----------------------------
 01JR4P3Y6KKNVPE6P1WT1MHG5Y
(1 row)

通常トランザクション(異常系)

次に通常トランザクションの異常系の確認をするため、APIを実行します。

$ curl http://localhost:8080/sample/failed-not-distributed-transaction

エラーが発生した場合、トランザクションはロールバックされます。

  • 処理の流れ
    1. トランザクション作成
    2. エラー発生
    3. ロールバック処理(2回記録されている)
    4. 例外のスロー

ログの例

createCompositeTransaction ( 10000 ): created new ROOT transaction with id 172.20.0.3.tm174391287916300004
rollback() done of transaction 172.20.0.3.tm174391287916300004
rollback() done of transaction 172.20.0.3.tm174391287916300004
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.RuntimeException: error] with root cause

しかし、データベースの結果をみるとfirst_dbに書き込みした内容はロールバックされておらず整合性が崩れてしまっていることが確認できます。

first_db=# select * from first_table;
             id
----------------------------
 01JR4MXH1WCYTA7RXQEE6FTQHH
(1 row)

second_db=# select * from second_table;
 id
----
(0 rows)

7.2 分散トランザクション(Atomikos)の場合

シナリオ 結果
正常系 両方のDBに正常に書き込まれる
異常系 両方のDBともロールバックされ、一切書き込まれない(データ整合性が保たれる)

分散トランザクション(正常系)

では分散トランザクションの正常系を検証していきます。

$ curl http://localhost:8080/sample/distributed-transaction

分散トランザクションでは、複数のデータソースを一つのトランザクションとして扱います。正常系では2フェーズコミットが実行されます。

  • 処理の流れ
    1. トランザクション作成
    2. 第1データソース(firstDataSource)の処理
      • 接続取得と初期化
      • XAResourceとしての登録
      • SQL実行(INSERT)
    3. 第2データソース(secondDataSource)の処理
      • 接続取得と初期化
      • XAResourceとしての登録
      • SQL実行(INSERT)
    4. 各コネクションのクローズ処理
    5. 2フェーズコミット実行
      • Prepare段階(各リソースの準備確認)
      • Commit段階(確定処理)
    6. リソースのリフレッシュ

ログを確認すると、分散トランザクション時に2フェーズコミットが実行されていることが分かります。

createCompositeTransaction ( 10000 ): created new ROOT transaction with id 172.20.0.3.tm174391233628700001
firstDataSource: getConnection()...
firstDataSource: init...
[...]
addParticipant ( XAResourceTransaction: XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D31 ) for transaction 172.20.0.3.tm174391233628700001
XAResource.start ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D31 , XAResource.TMNOFLAGS ) on resource firstDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69cd357b
[...]
atomikosJdbcStatementProxy for vendor instance Pooled statement wrapping physical statement INSERT INTO first_table(id) VALUES (('01JR4MCYXGDG4R3K2F06016JS4')): calling proxied close
[...]
secondDataSource: getConnection()...
secondDataSource: init...
[...]
atomikosJdbcStatementProxy for vendor instance Pooled statement wrapping physical statement INSERT INTO second_table(id) VALUES (('01JR4MCYXGDG4R3K2F06016JS4')): calling proxied close
[...]
commit() done (by application) of transaction 172.20.0.3.tm174391233628700001
XAResource.prepare ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D31 ) returning OK on resource firstDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69cd357b
XAResource.prepare ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D32 ) returning OK on resource secondDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69e550fa
XAResource.commit ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D31 , false ) on resource firstDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69cd357b
XAResource.commit ( XID: 3137322E32302E302E332E746D313734333931323333363238373030303031:3137322E32302E302E332E746D32 , false ) on resource secondDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69e550fa
firstDataSource: refreshed XAResource
secondDataSource: refreshed XAResource

もちろん両方のデータベースには同じIDのデータが登録されています。

first_db=# select * from first_table;
             id             
----------------------------
01JR4MCYXGDG4R3K2F06016JS4
(1 row)

second_db=# select * from second_table;
             id             
----------------------------
 01JR4MCYXGDG4R3K2F06016JS4
(1 row)

分散トランザクション(異常系)

では分散トランザクションの異常系を検証していきます。

$ curl http://localhost:8080/sample/failed-distributed-transaction

分散トランザクションでエラーが発生した場合、参加しているすべてのリソースがロールバックされます。

  • 処理の流れ
    1. トランザクション作成
    2. 第1データソース(firstDataSource)の処理
      • 接続取得と初期化
      • XAResourceとしての登録
      • SQL実行(INSERT)
    3. エラー発生(第2データソース処理前)
    4. XAResourceのロールバック処理
    5. トランザクション全体のロールバック
    6. 例外のスロー

エラー時には次のようなログが出力されます。

createCompositeTransaction ( 10000 ): created new ROOT transaction with id 172.20.0.3.tm174391276233200003
firstDataSource: getConnection()...
firstDataSource: init...
atomikosJdbcConnectionProxy (state = sessionHandleState (1 context(s), isTerminated = false) for resource firstDataSource) for vendor instance Pooled connection wrapping physical connection org.postgresql.jdbc.PgConnection@3e04e4ab: calling proxied prepareStatement(INSERT INTO first_table(id) VALUES (?))
[...]
atomikosJdbcStatementProxy for vendor instance Pooled statement wrapping physical statement INSERT INTO first_table(id) VALUES (('01JR4MSYYX27T9B7NT9VA2W41A')): calling proxied close
[...]
XAResource.rollback ( XID: 3137322E32302E302E332E746D313734333931323736323333323030303033:3137322E32302E302E332E746D33 ) on resource firstDataSource represented by XAResource instance org.postgresql.xa.PGXAConnection@69cd357b
rollback() done of transaction 172.20.0.3.tm174391276233200003
rollback() done of transaction 172.20.0.3.tm174391276233200003
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed: java.lang.RuntimeException: error] with root cause

両方のデータベースを確認するとfirst_dbの取得結果がないためロールバックがちゃんとされていることが確認できます。

first_db=# select * from first_table;
 id
----
(0 rows)

second_db=# select * from second_table;
 id
----
(0 rows)

8. まとめ

分散トランザクションの必要性

複数のデータベースやリソースを使用するシステムでは、データの整合性を保つために分散トランザクション管理が不可欠です。
従来の単一トランザクションでは、異なるリソース間での整合性を保証できません。

Atomikosの利点

  • Spring Boot 3との簡単な統合
  • 2フェーズコミットによる確実なトランザクション管理
  • アプリケーションサーバーを必要としない軽量な実装

注意点

  • パフォーマンスオーバーヘッド: 2フェーズコミットは通常のトランザクションより若干遅い
  • PostgreSQLの設定: max_prepared_transactionsの設定が必須
  • 適切なXA対応DataSourceの使用: 通常のDataSourceと使い分けることが重要

分散トランザクションは強力ですが、マイクロサービスやイベント駆動アーキテクチャでは、Saga パターンなどの最終的整合性アプローチも検討する価値があります。
用途に合わせた適切な選択が重要です。

9. おわりに

本記事では、分散トランザクションの基本から、Atomikosによる実装、注意点までを紹介しました。
今後のシステム設計・開発において、信頼性の高いトランザクション処理を実現するための参考になれば幸いです。
また、本記事で実装した完全なソースコードはGithubで公開していますので、ご参照ください。

参考資料

Discussion