🧩

トランザクションの実装について考える

に公開

この記事はGMOペパボ エンジニア Advent Calendar 2025 - Adventar12日目の記事です。
昨日はてつをさんの「Upstash Redisを触ってみた」でした。
REST APIでアクセスできる、TCP接続が使えない環境でも利用可能なサーバーレス向けRedisサービスの紹介記事でグローバルレプリケーションもできることやダッシュボードが見やすくセットアップも簡単という使ってみたくなる紹介でした。

12日目はトランザクションの実装パターンのお話しです。


1. 何が課題?

昔ながらのモノリシックアーキテクチャでは、1つのデータベースのACIDトランザクションで整合性が完結していました。開発者は BEGIN → 処理 → COMMITROLLBACK に任せておけばよかったのです。

BEGIN;
  UPDATE orders SET status = 'confirmed' WHERE id = 123;
  UPDATE inventory SET stock = stock - 1 WHERE product_id = 456;
  INSERT INTO payments (order_id, amount) VALUES (123, 5000);
COMMIT; -- 全部成功 or 全部ロールバック

しかし、マイクロサービス化や外部API連携が進むと、複数サービス/複数DBをまたぐ処理が必要になります。

問題が顕在化する具体例

ECサイトの注文処理を考えてみましょう。

この例で前提にしたいのは「マイクロサービスとして独立している」状態です。各サービスは独立してデプロイされ、データもサービスごとに所有します(他サービスのテーブルを直接更新しません)。そのため、たとえ同じDB製品を使っていても、サービスをまたいだ更新を1つのACIDトランザクションにまとめるのは基本できません。

1. 注文サービス(注文DB) → 注文を作成
2. 在庫サービス(在庫DB) → 在庫を減らす
3. 決済サービス(外部API) → クレジットカード課金
4. 配送サービス(配送DB) → 配送手配

この一連の処理で、ステップ2までは成功したがステップ3で失敗した場合どうなるでしょうか?

  • 注文は作成されている
  • 在庫は減っている
  • 決済は失敗
  • 配送手配はされていない

単一DBなら ROLLBACK ですべて元に戻りますが、分散環境では在庫だけが減った状態で残ってしまいます。この「部分的な成功」が最も厄介なシナリオです!

なお、これはマイクロサービスに限った話ではなく、モノリスでも外部API(例: 決済)をまたぐ時点で同様に「一括で COMMITROLLBACK できない」問題が出てきます。

これを解決するために生まれたのが、分散トランザクションのパターンです。


2. 分散トランザクションへのアプローチを整理

複数サービス・複数DBをまたぐ整合性を扱うための代表的な方式を整理します。

2.1. 2 フェーズコミット (2PC: Two-Phase Commit)

仕組み:

  1. 準備フェーズ: コーディネーターが全参加ノードに「コミットできる?」を確認
  2. コミットフェーズ: 全員がYesなら一斉にCommit、誰かがNoなら全員Abort

メリット:

  • 分散環境でも原子的コミット/ロールバックを保証
  • 強い整合性を実現できる

デメリット:

  • 準備フェーズ中、全ノードがリソースをロック → スループット低下
  • コーディネーターが単一障害点になる
  • NoSQLや外部APIでは適用しづらい(XAなどトランザクション参加が必要)

2.2. TCC (Try-Confirm-Cancel)

仕組み:

  • Try: リソースの予約・仮押さえ
  • Confirm: 予約の確定
  • Cancel: 予約の取り消し

メリット:

  • 各操作が明示的な3ステップAPIとして実装される
  • ビジネスロジックに合致しやすい

デメリット:

  • 各サービスがTry/Confirm/Cancelの3つのAPIを実装する必要がある
  • 予約状態の管理が複雑

2.3. Saga パターン

仕組み:

  • ローカルトランザクションの連鎖
  • 失敗時は補償トランザクションで整合性を回復

メリット:

  • 長時間ロックが発生しない
  • NoSQL・外部API・異種DBに対応可能
  • スケーラビリティが高い

デメリット:

  • 補償処理の実装が必要
  • 実行中は一時的に不整合な状態が存在する

2.4. アウトボックスパターン + イベント駆動

仕組み:

  1. アプリは業務データの更新と同時に、Outboxテーブルへ「送るべきイベント(=キュー投入相当)」を INSERT する
  2. これらを同一DBトランザクションCOMMIT し、DB更新とイベント記録を原子的に確定させる
  3. 別プロセス(ポーリングまたはCDC)がOutboxを読み取り、メッセージブローカーへ非同期に配信する(成功したら送信済みとして更新)
  4. 受信側はイベントを順に処理して下流の状態を追従させ、最終的な整合性を作る

メリット:

  • 「少なくとも1回配信」を実現しやすい(ただし重複配信は起こりうる)
  • メッセージロストを起こしにくくする
  • データベースとメッセージングの一貫性を保つ

デメリット:

  • 別プロセス(ポーリングまたはCDC)が必要
  • 遅延が発生する可能性
  • 重複配信に備え、コンシューマ側の冪等性が必要

これらのパターンにはそれぞれトレードオフがあり、ユースケースに応じて使い分ける必要があります。この記事では特に Saga パターン にフォーカスします!


3. 2PCをおさらい(強い整合性とトレードオフ)

3.1. 仕組み

2フェーズコミット(2PC)は、分散システムで全参加者が合意してコミットするプロトコルです。

フェーズ1: 準備(Prepare)

コーディネーター → 全参加ノード: 「コミットできる?」
各ノード → コーディネーター: 「Yes」または「No」
※ この時点で各ノードはリソースをロック

フェーズ2: コミット(Commit)

全員がYesの場合:
  コーディネーター → 全参加ノード: 「Commit!」
  各ノード: コミット実行 → ロック解放

誰かがNoまたはタイムアウトの場合:
  コーディネーター → 全参加ノード: 「Abort!」
  各ノード: ロールバック実行 → ロック解放

3.2. メリット

  • 強い整合性: 分散環境でも強い整合性を取りやすい
  • 原子性(全成功 or 全失敗): 全員が合意できたときだけコミットする
  • アプリ側の負担が少ない: 補償トランザクションを書かなくて良い

3.3. デメリット/限界

ブロッキング問題

準備フェーズから確定フェーズまで、全ノードがリソースをロックし続けるため、他のトランザクションがブロックされます。

時刻 T0: トランザクション開始
時刻 T1: Prepare → 全ノードがロック取得
時刻 T2: ネットワーク遅延...(この間、ロックは保持され続ける)
時刻 T5: やっとCommit到着 → ロック解放

→ T1〜T5の間、他のトランザクションは待たされる

パフォーマンスオーバーヘッド

2PCは、参加者との往復通信・ログ・ロック保持などが増えるため、単一DBのトランザクションと比べて遅くなりやすいです。

単一障害点

コーディネーターが落ちると、進行中のトランザクションが「CommitかAbortか」判断できず、復旧までブロックしやすいです。

準備フェーズ完了 → 全ノードがロック保持
コーディネーターがクラッシュ!
→ 全ノードは「CommitかAbortか」がわからずロック保持したまま待ち続ける

技術的制約

  • XAなど、トランザクションへの「参加」を提供するリソースに限られる
  • 外部APIや一部のマネージドサービスは参加できず、適用しづらい

デッドロックリスク

ロック保持時間が長くなりやすく、競合やデッドロックが起きる可能性が高まりがちです。

3.4. 適用シナリオ

2PCが適しているのは:

シナリオ 理由
銀行振込などの金融トランザクション 絶対に全成功or全失敗が必要
閉じた環境・安定したネットワーク ネットワーク分断のリスクが低い
少量・低頻度のトランザクション ロックの影響が小さい

逆に、マイクロサービス・マルチDB・クラウド環境では可用性面でフィットしないケースが多いです。


4. Sagaパターンを見ていく

4.1. 基本の考え方

Sagaパターンは、巨大な分散トランザクションを各サービスで完結する小さなローカルトランザクションに分解します。

キーコンセプト:

  • 各サービスは独立してローカルDBをコミット
  • 成功したら次のサービスにイベント/メッセージを渡す
  • 失敗したら補償トランザクション(compensating transaction)で整合性を回復
  • 強い整合性は諦め、最終的な一貫性(eventual consistency) を目指す
成功パターン:
  注文作成 → 在庫予約 → 決済 → 配送準備 → 完了

失敗パターン(決済で失敗):
  注文作成 → 在庫予約 → 決済(失敗)
  → 補償: 在庫解放 → 注文キャンセル

4.2. 2つの進め方(Choreography / Orchestration)

Sagaには Choreography(振り付け型)Orchestration(指揮者型) の2つの実装方式があります。

Choreography(コレオグラフィ)

特徴:

  • 各サービスがイベントを購読し、次のイベントを発行
  • 中央コーディネーターなし
  • イベント駆動で疎結合

実装イメージ:

注文サービス: 注文作成 → 「OrderCreatedイベント」発行
  ↓
在庫サービス: OrderCreatedを購読 → 在庫予約 → 「InventoryReservedイベント」発行
  ↓
決済サービス: InventoryReservedを購読 → 決済実行
  成功: 「PaymentCompletedイベント」発行
  失敗: 「PaymentFailedイベント」発行
  ↓
在庫サービス: PaymentFailedを購読 → 在庫解放(補償)

メリット:

  • 中央の単一障害点(オーケストレーター)がない
  • 各サービスが独立して動作
  • 疎結合でスケーラブル

デメリット:

  • ビジネスプロセス全体の流れが分散し見えづらい
  • デバッグや監視が難しい(「今どこまで進んでいる?」が追いづらい)
  • サービス間の循環参照が発生しやすい
  • タイムアウトや再試行をグローバルに実装しづらい

適用シーン:

  • シンプルで小規模なワークフロー
  • サービス間の依存が少ない
  • イベント駆動アーキテクチャが既に確立している

Orchestration(オーケストレーション)

特徴:

  • 中央のオーケストレーターがワークフロー全体を管理
  • 各ステップを順に呼び出し、失敗時は補償を指揮
  • ステートマシンやワークフローエンジンで実装

実装イメージ:

オーケストレーター(ステートマシン):
  1. 在庫サービス.Reserve() を呼び出し
  2. 成功 → 決済サービス.Charge() を呼び出し
  3. 成功 → 配送サービス.Prepare() を呼び出し
  4. 成功 → 完了

  失敗時:
    - 補償を逆順で実行
    - 決済サービス.Refund()
    - 在庫サービス.Release()

メリット:

  • ワークフローが明示的で理解しやすい
  • デバッグ・監視・追跡が容易
  • 各サービスは他サービスのイベントを気にしなくて良い
  • 複雑なビジネスロジックに対応可能

デメリット:

  • オーケストレーターが単一障害点になりうる(冗長化で緩和は可能)
  • オーケストレーター自体が複雑化する可能性

適用シーン:

  • 複雑で大規模なワークフロー
  • 可観測性・保守性を重視
  • ビジネスロジックの変更が頻繁

4.3. 比較表

観点 Choreography Orchestration
制御方式 分散(各サービスが自律) 中央集権(オーケストレーターが指揮)
可視性 低い(フローが見えづらい) 高い(ステートマシンで明示)
デバッグ 難しい 容易
単一障害点 中央の単一障害点はなし ありうる(オーケストレーター)
スケーラビリティ 高い オーケストレーター設計次第
適用規模 シンプル・小規模 複雑・大規模

4.4. どういうときにSagaが効く?

  • マルチDB/NoSQL/外部サービス混在の環境では2PCが使いにくく、Sagaが現実的な選択肢になりやすい
  • 各ステップがローカルトランザクションなので長時間ロックが起きにくく、スケールさせやすい
  • メッセージ/イベント駆動と相性がよく、非同期・疎結合・耐障害性を両立しやすい
  • クラウド環境で一般的な「部分停止」「ネットワーク分断」を前提にした設計と親和性が高い

5. Orchestration 方式での実装例と注意点

ここからは、Orchestration方式での実装を深掘りします!

5.1. 基本的な実装パターン(Go言語)

まずは、シンプルなOrchestrationの実装例を見てみましょう。

package saga

import (
    "context"
    "fmt"
    "log"
)

// ===== Saga ステップの定義 =====
type Step struct {
    Name        string
    Execute     func(ctx context.Context, orderID string) error
    Compensate  func(ctx context.Context, orderID string) error
}

// ===== Saga オーケストレーター =====
type SagaOrchestrator struct {
    steps          []Step
    completedSteps []int
}

func NewSagaOrchestrator() *SagaOrchestrator {
    return &SagaOrchestrator{
        steps:          []Step{},
        completedSteps: []int{},
    }
}

func (s *SagaOrchestrator) AddStep(step Step) {
    s.steps = append(s.steps, step)
}

func (s *SagaOrchestrator) Execute(ctx context.Context, orderID string) error {
    // 各ステップを順次実行
    for i, step := range s.steps {
        log.Printf("[Saga] 実行中: %s (注文ID: %s)", step.Name, orderID)

        if err := step.Execute(ctx, orderID); err != nil {
            log.Printf("[Saga] ステップ失敗: %s - エラー: %v", step.Name, err)

            // 補償トランザクションを逆順で実行
            s.compensate(ctx, orderID)
            return fmt.Errorf("Saga失敗 at %s: %w", step.Name, err)
        }

        s.completedSteps = append(s.completedSteps, i)
        log.Printf("[Saga] 完了: %s", step.Name)
    }

    log.Printf("[Saga] 成功: 注文ID %s", orderID)
    return nil
}

func (s *SagaOrchestrator) compensate(ctx context.Context, orderID string) {
    log.Printf("[Saga] 補償トランザクション開始: 注文ID %s", orderID)

    // 完了したステップを逆順で補償
    for i := len(s.completedSteps) - 1; i >= 0; i-- {
        stepIndex := s.completedSteps[i]
        step := s.steps[stepIndex]

        log.Printf("[Saga] 補償中: %s", step.Name)

        if err := step.Compensate(ctx, orderID); err != nil {
            // 補償失敗はログに記録し、継続
            log.Printf("[Saga] 補償失敗: %s - エラー: %v (継続)", step.Name, err)
            // 本番環境ではアラート送信やDLQへの送信が必要
        } else {
            log.Printf("[Saga] 補償完了: %s", step.Name)
        }
    }
}

5.2. 各サービスの実装例

次に、各サービスの実装を見てみましょう。

// ===== 在庫サービス =====
type InventoryService struct {
    db *sql.DB
}

func (i *InventoryService) Reserve(ctx context.Context, orderID string) error {
    log.Printf("[在庫] 予約開始: %s", orderID)

    // 冪等性チェック: すでに予約済みならスキップ
    var status string
    err := i.db.QueryRow(
        "SELECT status FROM inventory_reservations WHERE order_id = ?",
        orderID,
    ).Scan(&status)

    if err == nil && status == "RESERVED" {
        log.Printf("[在庫] すでに予約済み: %s", orderID)
        return nil
    }

    // 在庫を予約
    _, err = i.db.ExecContext(ctx,
        `INSERT INTO inventory_reservations (order_id, product_id, quantity, status, created_at)
         VALUES (?, ?, ?, 'RESERVED', NOW())`,
        orderID, "product_123", 1,
    )

    if err != nil {
        return fmt.Errorf("在庫予約失敗: %w", err)
    }

    log.Printf("[在庫] 予約完了: %s", orderID)
    return nil
}

func (i *InventoryService) Release(ctx context.Context, orderID string) error {
    log.Printf("[在庫] 解放開始: %s", orderID)

    // 冪等性確保: すでに解放済みならスキップ
    var status string
    err := i.db.QueryRow(
        "SELECT status FROM inventory_reservations WHERE order_id = ?",
        orderID,
    ).Scan(&status)

    if err == sql.ErrNoRows || status == "RELEASED" {
        log.Printf("[在庫] すでに解放済みまたは存在しない: %s", orderID)
        return nil
    }

    // 在庫を解放
    _, err = i.db.ExecContext(ctx,
        "UPDATE inventory_reservations SET status='RELEASED', updated_at=NOW() WHERE order_id=? AND status='RESERVED'",
        orderID,
    )

    if err != nil {
        return fmt.Errorf("在庫解放失敗: %w", err)
    }

    log.Printf("[在庫] 解放完了: %s", orderID)
    return nil
}

// ===== 決済サービス =====
type PaymentService struct {
    db         *sql.DB
    paymentAPI *ExternalPaymentAPI
}

func (p *PaymentService) Charge(ctx context.Context, orderID string) error {
    log.Printf("[決済] 課金開始: %s", orderID)

    // 冪等性チェック: すでに課金済みならスキップ
    var status string
    err := p.db.QueryRow(
        "SELECT status FROM payments WHERE order_id = ?",
        orderID,
    ).Scan(&status)

    if err == nil && status == "CHARGED" {
        log.Printf("[決済] すでに課金済み: %s", orderID)
        return nil
    }

    // 外部決済API呼び出し(冪等性キー使用)
    idempotencyKey := fmt.Sprintf("order-%s-charge", orderID)
    err = p.paymentAPI.Charge(ctx, orderID, 5000, idempotencyKey)
    if err != nil {
        return fmt.Errorf("決済失敗: %w", err)
    }

    // DB に記録
    _, err = p.db.ExecContext(ctx,
        `INSERT INTO payments (order_id, amount, status, idempotency_key, created_at)
         VALUES (?, ?, 'CHARGED', ?, NOW())`,
        orderID, 5000, idempotencyKey,
    )

    if err != nil {
        return fmt.Errorf("決済記録失敗: %w", err)
    }

    log.Printf("[決済] 課金完了: %s", orderID)
    return nil
}

func (p *PaymentService) Refund(ctx context.Context, orderID string) error {
    log.Printf("[決済] 返金開始: %s", orderID)

    // 冪等性チェック: すでに返金済みならスキップ
    var status string
    err := p.db.QueryRow(
        "SELECT status FROM payments WHERE order_id = ?",
        orderID,
    ).Scan(&status)

    if err == sql.ErrNoRows {
        log.Printf("[決済] 課金記録がない: %s", orderID)
        return nil
    }

    if status == "REFUNDED" {
        log.Printf("[決済] すでに返金済み: %s", orderID)
        return nil
    }

    // 外部決済API呼び出し(冪等性キー使用)
    idempotencyKey := fmt.Sprintf("order-%s-refund", orderID)
    err = p.paymentAPI.Refund(ctx, orderID, idempotencyKey)
    if err != nil {
        return fmt.Errorf("返金失敗: %w", err)
    }

    // DB を更新
    _, err = p.db.ExecContext(ctx,
        "UPDATE payments SET status='REFUNDED', updated_at=NOW() WHERE order_id=? AND status='CHARGED'",
        orderID,
    )

    if err != nil {
        return fmt.Errorf("返金記録失敗: %w", err)
    }

    log.Printf("[決済] 返金完了: %s", orderID)
    return nil
}

// ===== 配送サービス =====
type ShippingService struct {
    db *sql.DB
}

func (s *ShippingService) Prepare(ctx context.Context, orderID string) error {
    log.Printf("[配送] 準備開始: %s", orderID)

    // 冪等性チェック
    var status string
    err := s.db.QueryRow(
        "SELECT status FROM shipping_orders WHERE order_id = ?",
        orderID,
    ).Scan(&status)

    if err == nil && status == "PREPARED" {
        log.Printf("[配送] すでに準備済み: %s", orderID)
        return nil
    }

    // 配送準備
    _, err = s.db.ExecContext(ctx,
        `INSERT INTO shipping_orders (order_id, status, created_at)
         VALUES (?, 'PREPARED', NOW())`,
        orderID,
    )

    if err != nil {
        return fmt.Errorf("配送準備失敗: %w", err)
    }

    log.Printf("[配送] 準備完了: %s", orderID)
    return nil
}

func (s *ShippingService) Cancel(ctx context.Context, orderID string) error {
    log.Printf("[配送] キャンセル開始: %s", orderID)

    // 冪等性チェック
    var status string
    err := s.db.QueryRow(
        "SELECT status FROM shipping_orders WHERE order_id = ?",
        orderID,
    ).Scan(&status)

    if err == sql.ErrNoRows || status == "CANCELLED" {
        log.Printf("[配送] すでにキャンセル済みまたは存在しない: %s", orderID)
        return nil
    }

    // 配送キャンセル
    _, err = s.db.ExecContext(ctx,
        "UPDATE shipping_orders SET status='CANCELLED', updated_at=NOW() WHERE order_id=? AND status='PREPARED'",
        orderID,
    )

    if err != nil {
        return fmt.Errorf("配送キャンセル失敗: %w", err)
    }

    log.Printf("[配送] キャンセル完了: %s", orderID)
    return nil
}

5.3. メイン処理

func ProcessOrder(ctx context.Context, orderID string) error {
    inventory := &InventoryService{db: inventoryDB}
    payment := &PaymentService{db: paymentDB, paymentAPI: paymentAPI}
    shipping := &ShippingService{db: shippingDB}

    saga := NewSagaOrchestrator()

    // ステップ1: 在庫予約
    saga.AddStep(Step{
        Name:       "在庫予約",
        Execute:    inventory.Reserve,
        Compensate: inventory.Release,
    })

    // ステップ2: 決済
    saga.AddStep(Step{
        Name:       "決済",
        Execute:    payment.Charge,
        Compensate: payment.Refund,
    })

    // ステップ3: 配送準備
    saga.AddStep(Step{
        Name:       "配送準備",
        Execute:    shipping.Prepare,
        Compensate: shipping.Cancel,
    })

    return saga.Execute(ctx, orderID)
}

5.4. 冪等性の重要性 — 「悪い例」と「良い例」

補償処理の冪等性は、Saga成功の必須条件です。具体例で見てみましょう。

悪い例(冪等性がない)

func (i *InventoryService) Reserve(ctx context.Context, orderID string, productID string, quantity int) error {
    // 状態チェックなし!
    // リトライや重複実行が起きると、在庫が二重に減る可能性があります。
    _, err := i.db.ExecContext(
        ctx,
        "UPDATE inventory SET stock = stock - ? WHERE product_id = ?",
        quantity, productID,
    )
    return err
}

問題点:

  • ネットワークエラーでリトライすると、同じ注文に対して在庫が2回減る
  • すでに予約済みかどうかチェックしていない
  • 在庫不足(stock < quantity)を考慮していない

良い例(冪等性あり)

func (i *InventoryService) Reserve(ctx context.Context, orderID string, productID string, quantity int) error {
    // 例: inventory_reservations に UNIQUE(order_id, product_id) を張る前提
    tx, err := i.db.BeginTx(ctx, nil)
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 1. 予約レコードを作って「この注文の予約処理は1回だけ」にする
    //    すでに存在する(=予約済み/処理済み)なら成功として扱う
    _, err = tx.ExecContext(
        ctx,
        "INSERT INTO inventory_reservations (order_id, product_id, quantity, status) VALUES (?, ?, ?, 'RESERVING')",
        orderID, productID, quantity,
    )
    if err != nil {
        // UNIQUE制約違反などは「すでに予約済み」とみなしてOK(冪等)
        return nil
    }

    // 2. 在庫が足りるときだけ減らす(条件付きUPDATE)
    r, err := tx.ExecContext(
        ctx,
        "UPDATE inventory SET stock = stock - ? WHERE product_id = ? AND stock >= ?",
        quantity, productID, quantity,
    )
    if err != nil {
        return err
    }
    affected, _ := r.RowsAffected()
    if affected == 0 {
        return fmt.Errorf("在庫不足: product_id=%s", productID)
    }

    // 3. 予約を確定(条件付きUPDATE)
    _, err = tx.ExecContext(
        ctx,
        "UPDATE inventory_reservations SET status='RESERVED' WHERE order_id=? AND product_id=? AND status='RESERVING'",
        orderID, productID,
    )
    if err != nil {
        return err
    }

    return tx.Commit()
}

ポイント:

  • 予約レコード(UNIQUE制約など)で「同じ注文の予約は1回だけ」にする
  • 在庫の減算は条件付きUPDATEでガードする(stock >= quantity)
  • すでに予約済みなら成功として扱う(冪等)

5.5. Sagaログの管理

Sagaの進行状況をDBに記録することで、障害時の復旧やデバッグが容易になります。
また、(特にOrchestration方式では)ログを永続化しておくことで、プロセスが落ちても saga_id / current_step / status から状態を復元し、未完了ステップの再開や補償の続きが可能になります(そのためにも各ステップ/補償の冪等性が重要です)。

// Sagaログテーブル
CREATE TABLE saga_logs (
  saga_id      VARCHAR(255) PRIMARY KEY,
  order_id     VARCHAR(255) NOT NULL,
  current_step INT NOT NULL,
  status       VARCHAR(50) NOT NULL, -- RUNNING, COMPLETED, COMPENSATING, FAILED
  error_message TEXT,
  created_at   TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  updated_at   TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  INDEX idx_order_id (order_id),
  INDEX idx_status (status)
);

// Sagaステップログテーブル
CREATE TABLE saga_step_logs (
  id           BIGINT PRIMARY KEY AUTO_INCREMENT,
  saga_id      VARCHAR(255) NOT NULL,
  step_name    VARCHAR(255) NOT NULL,
  step_index   INT NOT NULL,
  status       VARCHAR(50) NOT NULL, -- STARTED, COMPLETED, COMPENSATED, FAILED
  started_at   TIMESTAMP NOT NULL,
  completed_at TIMESTAMP NULL,
  error_message TEXT,
  FOREIGN KEY (saga_id) REFERENCES saga_logs(saga_id)
);
func (s *SagaOrchestrator) Execute(ctx context.Context, orderID string) error {
    sagaID := generateSagaID(orderID)

    // Sagaログを作成
    s.logSagaStart(sagaID, orderID)

    for i, step := range s.steps {
        // ステップ開始ログ
        s.logStepStart(sagaID, step.Name, i)

        if err := step.Execute(ctx, orderID); err != nil {
            // ステップ失敗ログ
            s.logStepFailed(sagaID, step.Name, i, err)
            s.logSagaStatus(sagaID, "COMPENSATING")

            // 補償実行
            s.compensate(ctx, orderID, sagaID)
            s.logSagaStatus(sagaID, "FAILED")

            return fmt.Errorf("Saga失敗 at %s: %w", step.Name, err)
        }

        // ステップ完了ログ
        s.logStepCompleted(sagaID, step.Name, i)
        s.completedSteps = append(s.completedSteps, i)
    }

    // Saga完了ログ
    s.logSagaStatus(sagaID, "COMPLETED")
    return nil
}

5.6. タイムアウトとリトライ

各ステップにタイムアウトとリトライを組み込みます。

func (s *Step) ExecuteWithRetry(ctx context.Context, orderID string, maxRetries int) error {
    var lastErr error

    for attempt := 1; attempt <= maxRetries; attempt++ {
        // タイムアウト付きコンテキスト
        execCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
        defer cancel()

        err := s.Execute(execCtx, orderID)
        if err == nil {
            return nil
        }

        lastErr = err
        log.Printf("リトライ %d/%d: %v", attempt, maxRetries, err)

        // Exponential backoff
        backoffDuration := time.Duration(math.Pow(2, float64(attempt))) * time.Second
        time.Sleep(backoffDuration)
    }

    return fmt.Errorf("最大リトライ回数超過: %w", lastErr)
}

6. 2PC と Saga、どちらを選ぶ?

シナリオや要件に応じて、強い整合性と可用性/スケーラビリティのどちらを優先するかを決めるのが現実的です。

シナリオ/要件 適する方式 理由
銀行振込など「絶対に全成功 or 全失敗」が必須 2PC や分散合意プロトコル 強い整合性が必須
ECの注文・在庫・予約など部分失敗からの補償を許容 Saga パターン 最終的整合性で十分
マイクロサービス+異種DB/NoSQL/スケール重視 Saga パターン 2PCが使えない・重い
高頻度・高並列・大規模トラフィック Saga 長時間ロックを避けやすい
外部API・サードパーティサービスとの連携 Saga XAインターフェース非対応

7. おわりに

マイクロサービス時代に「強いACIDで全部解決」は現実的ではなく、2PCも実装コードが複雑になりがちだったり難しい場面が増えてきました。

今回取り上げたSagaパターンは、最近ムームードメインでリリースされた Google Workspaceカード の申し込みシステムでも、Orchestration方式で採用しています。
ドメインの申し込みや決済、ドメインの取得処理、Googleへのリクエストなど多くの外部API連携が必要な実装でしたが、Sagaパターンを使うことで小さなトランザクションに分離し、全体の整合性を取れるようにしました。その結果、状態管理もしやすい構成にできました。

今後こういった分散トランザクションを扱うことも増えると思うので、この記事が誰かの役に立てば幸いです。

GMOペパボ株式会社

Discussion