Temporal Workflowで学ぶ:冪等性・補償アクション・リトライ設計の実践
はじめに
分散システムやマイクロサービスの普及により、ワークフローの信頼性や一貫性がますます重要になっています。
本記事では、Temporal Workflowを活用した堅牢な分散処理設計のポイントを、冪等性や補償アクション(Saga)を中心に解説します。
Temporalとは
Temporalは分散システムで利用されるワークフローエンジンです。
マイクロサービスアーキテクチャにおける複数サービス間の
分散トランザクション構築時に、処理の状態管理や障害時の復旧を自動化してくれます。
従来の分散処理では、以下のような課題がありました:
- 処理の途中でサーバーがクラッシュした場合の復旧
- 長時間実行される処理の状態管理
- 複数サービス間での処理順序の制御
- 失敗時のリトライやタイムアウト処理
Temporalを使うことで、これらの複雑な制御を開発者が意識せずに済み、ビジネスロジックの実装に集中できます。特に分散トランザクションにおいては、処理の可視性と信頼性を大幅に向上させることが可能です。
Temporalの主要な実行単位
1. Workflow
処理全体のオーケストレーションを担い、状態を持ちます。
WorkflowからChildWorkflowを起動し、階層化した処理も実現できます。
2. Activity
実際のビジネスロジックを実行します。
分散トランザクションでは、外部システムへのAPIコールや副作用のある処理を担当します。
他にもTaskやQueueなどの概念がありますが、本記事では重要度が低いため省略します。
今回取り扱うケース
今回はサンプルとして、ホテルの予約システムを扱います。
ホテルの予約が入ったタイミングで、予約システムは以下の処理を実行します:
- 宿泊用ルームの確保
- ディナー食材の確保
- 駐車場の確保
予約システムは以下のような流れで動作します:
前提
- 設計の細部にはあまりこだわりません。今回はTemporalをどう組み込めるかに焦点を当てます。
- 実装はGoで行います。
- 本来はAPIやWorkerで処理を繋ぎますが、今回はそこまでは行いません。
分散トランザクションで意識すべき点は?
今回のようなホテル予約システムの分散トランザクションを実装する際、いくつかの重要な課題に直面します。これらを理解することで、なぜTemporalのようなワークフローエンジンが必要なのかが見えてきます。
まず一つは、複数サービスを呼び出す中で一部のサービスへのリクエストが失敗することです。
今回のケースでは、以下のような状況が考えられます。
- ルーム確保 → 成功
- ディナー食材確保 → 成功
- 駐車場確保 → 失敗(満車になってしまった)
この場合、予約としては失敗です。駐車場がないことになります。
また、ネットワーク障害や、送り元が誤って予約リクエストを多重送信する場合も考えられます。
この場合、同じユーザーが2つ以上の予約を持ててしまうと失敗です。同じリクエストが何度送信されても同じ結果になるよう設計する、これが分散トランザクションで検討すべき課題の一つである冪等性の考慮です。
さらに、外部APIを経由して処理を行う場合、対向システムの状態も影響します。
一時的な負荷でリクエストを捌けないケースや、リクエストパラメータが正しくても対向システムが処理できない場合もあり得ます。
この場合、リトライすべきか、またはリトライしないべきかの判断も必要です。
即時リトライか、一定期間空けてリトライするかも考慮が必要です。
これらの課題解消をサポートしてくれるのがTemporalです。
ただし、銀の弾丸ではなく、ユースケースに沿った設計や実装が必要です。
実装
作ったもの
まずワークフロー全体のコードは以下の通りです。
ロガー等は今回の話と関係がないため省略しています。
func HotelBookingSaga(ctx workflow.Context, request BookingRequest) (*BookingResult, error) {
// リトライポリシーの設定
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Minute,
MaximumAttempts: 3,
NonRetryableErrorTypes: []string{
"*activities.BusinessError",
"*activities.ValidationError",
},
}
// アクティビティオプションの設定
activityOptions := workflow.ActivityOptions{
StartToCloseTimeout: 30 * time.Second,
RetryPolicy: retryPolicy,
}
ctx = workflow.WithActivityOptions(ctx, activityOptions)
// 結果の初期化
result := &BookingResult{
Success: false,
BookingID: request.BookingID,
Compensations: []string{},
}
// Sagaパターンでの補償処理管理
var compensations Compensations
// Step 1: ホテルルーム予約
hotelRequest := activities.HotelBookingRequest{
BookingID: request.BookingID,
UserID: request.UserID,
HotelID: request.Hotel.HotelID,
}
var hotelResult activities.HotelBookingResult
err := workflow.ExecuteActivity(ctx, activities.HotelRoomBookingActivity, hotelRequest).Get(ctx, &hotelResult)
if err != nil {
result.Message = fmt.Sprintf("ホテルルーム予約に失敗: %s", err.Error())
return result, nil
}
result.HotelResult = &hotelResult
// 補償アクティビティの追加
compensations.AddCompensation(activities.CompensateHotelRoomActivity)
// Step 2: ディナー食材予約
dinnerRequest := activities.DinnerBookingRequest{
BookingID: request.BookingID,
UserID: request.UserID,
MenuType: request.Dinner.MenuType,
}
var dinnerResult activities.DinnerBookingResult
err = workflow.ExecuteActivity(ctx, activities.DinnerFoodBookingActivity, dinnerRequest).Get(ctx, &dinnerResult)
if err != nil {
result.Message = fmt.Sprintf("ディナー食材予約に失敗: %s", err.Error())
// 補償処理を実行
compensations.Compensate(ctx, false) // 順次実行
return result, nil
}
result.DinnerResult = &dinnerResult
// 補償アクティビティの追加
compensations.AddCompensation(activities.CompensateDinnerFoodActivity)
// Step 3: 駐車場予約
parkingRequest := activities.ParkingBookingRequest{
BookingID: request.BookingID,
UserID: request.UserID,
SpaceType: request.Parking.SpaceType,
}
var parkingResult activities.ParkingBookingResult
err = workflow.ExecuteActivity(ctx, activities.ParkingBookingActivity, parkingRequest).Get(ctx, &parkingResult)
if err != nil {
result.Message = fmt.Sprintf("駐車場予約に失敗: %s", err.Error())
// 補償処理を実行
compensations.Compensate(ctx, false) // 順次実行
return result, nil
}
result.ParkingResult = &parkingResult
// 補償アクティビティの追加
compensations.AddCompensation(activities.CompensateParkingActivity)
// 全て成功した場合
result.Success = true
result.Message = "ホテル予約が正常に完了しました"
return result, nil
}
基本的な流れは以下の通りです。
-
リトライポリシーの設定: アクティビティが失敗した際の自動リトライを制御します。ビジネスエラー(
BusinessError
、ValidationError
)はリトライ対象外とし、一時的なシステム障害のみリトライします。- ここで定義されたエラーはリトライしても成功しない、という観点です。
-
順次アクティビティ実行: ホテルルーム → ディナー食材 → 駐車場の順で各予約アクティビティを実行。各ステップが成功するたびに、対応する補償アクティビティを
compensations
に追加します。 - Sagaパターンによる補償管理: 途中でエラーが発生した場合、それまでに成功したアクティビティの補償処理を逆順で実行し、システム全体の整合性を保ちます。
-
エラーハンドリング: 各ステップでエラーが発生すると、補償処理を実行してワークフローを終了します。ワークフロー自体はエラーではなく
result.Success = false
で失敗を表現します。
なぜSagaパターン?
複数サービスを使う中で、システムが毎回正常に動作する保証はありません。
むしろ異常が発生することの方が自然です。
今回の例でいうと、以下のようなケースが発生します。
- ホテルの部屋自体は予約できた
- ただし、食材を抑えるシステムがダウンしていて確保ができない
- この場合、予約システムとしては成功とみなしてはいけません
- かつ、先ほど抑えたホテルの部屋を開放させる必要があります
図にすると以下の通りです。
このようにいずれかのシステムで正常に終了できなかった場合に、整合性を保つため、
Sagaパターンを組み込んでいます。
ただし、補償アクションが必ず成功するとは限りません。
そのため、補償アクション自体もリトライの考慮が必要です。
例としては、以下のようなケースです。
上記例の流れ:
- 部屋の予約は完了した
- 食材の確保に失敗した
- そのため、予約を失敗させる必要がある
- 整合性を保つため、部屋の予約をキャンセルする必要がある
- 部屋のキャンセルを行おうとしたが失敗した(例:部屋予約システムの高負荷など)
- 数秒待機したのち、部屋のキャンセルがリトライ実行されて今度は成功した
- 予約自体を失敗として、処理を終了できた
アクティビティの実装:エラー分類と冪等性
ワークフローから呼び出される各アクティビティでは、リトライポリシーを適切に機能させるため、エラーの分類と冪等性の実装が重要です。
ディナー食材予約アクティビティを例に、実装のポイントを説明します。
func (a *DinnerActivity) BookDinner(ctx context.Context, req DinnerBookingRequest) (*DinnerBookingResult, error) {
// バリデーション
if err := req.Validate(); err != nil {
return nil, err // BusinessError(リトライ対象外)
}
// 冪等性チェック(既に処理済みかどうか)
if cached, exists := dinnerCache[req.BookingID]; exists {
a.logger.Info("既に処理済みの予約リクエスト", "BookingID", req.BookingID)
return cached, nil
}
// エラー分類によるリトライ制御
switch req.BookingID {
case "booking-system-error":
// リトライ対象:一時的なシステム障害
err := NewServerError("外部システムで障害が発生しました", "SYSTEM_ERROR")
return nil, err
case "booking-out-of-stock":
// 非リトライ:ビジネスロジックエラー
err := NewBusinessError("指定されたメニューの食材が在庫不足です", "OUT_OF_STOCK")
return nil, err
default:
// 正常な予約処理
result := &DinnerBookingResult{
Success: true,
ResourceID: "food-123",
Message: "ディナー食材予約が完了しました",
}
// キャッシュに保存(冪等性保証)
dinnerCache[req.BookingID] = result
return result, nil
}
}
実装のポイント:
-
エラー分類:
BusinessError
(在庫不足など)は非リトライ、ServerError
(システム障害など)はリトライ対象 - 冪等性保証: BookingIDをキーとするキャッシュで、同じリクエストの重複実行を防止
- 明確な責務: アクティビティはビジネスロジックに集中し、リトライ制御はワークフローが担当
この実装により、ワークフローで設定したNonRetryableErrorTypes
にBusinessError
を指定することで、在庫不足のような解決不可能なエラーは即座に失敗とし、一時的なシステム障害のみリトライするという制御が実現できます。
リトライポリシーの実際の効果
設定したリトライポリシーは以下のように動作します:
retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second, // 初回リトライまで1秒待機
BackoffCoefficient: 2.0, // 待機時間を2倍ずつ増加
MaximumInterval: time.Minute, // 最大待機時間は1分
MaximumAttempts: 3, // 最大3回まで試行
}
具体的な動作例(ServerErrorが発生した場合):
- 1回目: 即座に実行 → 失敗
- 2回目: 1秒待機後に実行 → 失敗
- 3回目: 2秒待機後に実行 → 成功(または最終失敗)
この指数バックオフにより、一時的な高負荷状況でもシステムに過度な負担をかけることなく、適切な間隔でリトライを行えます。MaximumInterval
を設定することで、待機時間が無制限に伸びることも防げます。
BusinessErrorの場合はNonRetryableErrorTypes
に指定されているため、即座に失敗として処理され、無駄なリトライによるリソース消費を避けられます。
補償アクティビティの実装
補償処理も同様に冪等性を考慮して実装されています:
func (a *HotelActivity) CompensateHotel(ctx context.Context, bookingID string, resourceID string) (*CompensationResult, error) {
// 冪等性チェック(既に補償済みかどうか)
if cached, exists := hotelCompensationCache[bookingID]; exists {
a.logger.Info("既に補償済みの予約", "BookingID", bookingID)
return cached, nil
}
// 実際の補償処理(ホテル予約のキャンセル)
a.logger.Info("ホテルルーム予約をキャンセルしました", "BookingID", bookingID, "ResourceID", resourceID)
result := &CompensationResult{
Success: true,
Message: "ホテルルーム予約の補償処理が完了しました",
}
// キャッシュに保存(冪等性保証)
hotelCompensationCache[bookingID] = result
return result, nil
}
補償処理でも冪等性を保つことで、リトライ時に重複した補償処理(二重キャンセルなど)を防げます。
ハマったところなど
- エラーハンドリングについて
activityからworkflowにエラーを返します。
ただし、この時にactivityで返したエラーはそのままworkflowに返却されるわけではなく、
Temporalを経由するため、エラーがTemporalのアプリケーションエラーにラップされます。
概念を理解するのが難しかったというより、エラーを適切に抜き出すのに少し手間取りました。
Temporalのアプリケーションエラーを抜き出す仕組みがあると嬉しかったです。
- DLQ(Dead Letter Queue)の設計について
今回は触れていません。
Sagaパターンを組み込んだことで、冪等性を保つように補償アクティビティを実行できるようになりました。
成功したアクティビティに対して、他のAPIが失敗した場合に取り消すようにしています。
ただし、理論上この補償アクションが失敗し続けることも十分あり得ます。
膨大なリトライ設定があっても、全て失敗すれば上限に達します。
設計上、補償アクションが失敗して終わるのは許容できません。
だからといって、無限リトライも現実的ではありません。
その場合は、段階的なDLQを組み込むのが望ましいと考えています。
- AWS SQSのような外部Queueサービスにポーリングさせる
- ローカルストレージのような内部Queueに失敗したものを溜めておく
外部ストレージやローカルストレージにDLQを滞留させようとしても、失敗するリスクは常にあります。
ここについてはまだ最適解が出ていないので、今回は割愛します。
まとめ
今回はTemporal Workflowを使った分散トランザクションや
Sagaパターンについて解説しました。
Temporalを利用することで、自前で準備しないといけないものを減らし、分散トランザクションを
構築できるのは非常に助かります。
ただし、それでも完璧な構築を目指すのは難しく、改めて分散トランザクションの難しさも学ぶことができました。
今後もより良い活用方法を模索していきたいと思います。
参考にさせていただいたもの
Discussion