脱データ不整合!? Workflow Engine「Temporal」の魅力
はじめに
分散システムを扱う際、エラーやクラッシュによる処理の中断はよくあるものかと思います。特に、EC サイトの支払い処理など、複数のステップを経て完了する重要なプロセスでは、障害が発生した場合にその進行状態を適切に管理しないと、取引の二重処理やデータの不整合といった問題が発生する危険性があります。復旧作業に骨を折った方も多いのではないでしょうか。
Temporal は、こういった課題を解決するために、処理の進行状況を自動的に追跡し、障害が発生してもその地点から再開できる仕組みを提供しています。複雑な復旧コードの実装や、手作業による復旧作業がなくなるのであれば、これは非常に魅力的ですね。
ここでは、Temporal が一体どういうものかについて解説していきたいと思います。
Temporal とは
Temporal は、分散システムでのワークフローを管理するためのオープンソースのワークフローエンジンです。Uber での実用的なニーズから生まれた Cadence の設計思想を継承しつつ、独立したプロジェクトとして成長したものとなります。
用語
ワークフロー(Workflow):複数のステップで構成される一連の処理の流れを定義したものです。たとえば、口座 A から口座 B への送金処理もワークフローとして表現できます。口座 A からの出金、口座 B への入金がステップとなります。
アクティビティ(Activity):アクティビティは、ステップの実体です。外部サービスの呼び出しやデータベース操作など、失敗しやすい具体的な処理をカプセル化したものです。
アーキテクチャ
Temporal のアーキテクチャはこのようになっています。
Temporal サービスは、ワークフローの状態を保持し何を実行するべきかの情報を提供します。
Client は、Temporal サービスにワークフローの実行リクエストを送ります。
Worker は、ワークフローの実行ロジックを持ちます。Temporal サービスをポーリングし、必要に応じてワークフローを実行します。
Temporal サービスはセルフホストも可能ですし、クラウドサービスとしても提供されているようです。
Client や Worker は、Temporal が提供している SDK を介して、gRPC 経由で Temporal サービスと通信します。SDK は「Go」「Java」「PHP」「Python」「TypeScript」「.NET」に対応しているようです。
さて、これだけ見るとメッセージキューイングしているだけのように見えます。サンプル実装を見てカラクリを紐解いていきましょう。
サンプル実装 - 概要
Run your first Temporal application with the Go SDK で紹介されている実装をみていきます。ソースコードは money-transfer-project-template-go にホストされています。
start/main.go
package main
import (
"context"
"log"
"go.temporal.io/sdk/client"
"money-transfer-project-template-go/app"
)
// @@@SNIPSTART money-transfer-project-template-go-start-workflow
func main() {
// プロセスごとに1回だけクライアントオブジェクトを作成します
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Temporalクライアントの作成に失敗しました:", err)
}
defer c.Close()
input := app.PaymentDetails{
SourceAccount: "85-150",
TargetAccount: "43-812",
Amount: 250,
ReferenceID: "12345",
}
options := client.StartWorkflowOptions{
ID: "pay-invoice-701",
TaskQueue: app.MoneyTransferTaskQueueName,
}
log.Printf("口座%sから口座%sへ%dの送金を開始します", input.SourceAccount, input.TargetAccount, input.Amount)
we, err := c.ExecuteWorkflow(context.Background(), options, app.MoneyTransfer, input)
if err != nil {
log.Fatalln("ワークフローの開始に失敗しました:", err)
}
log.Printf("ワークフローID: %s 実行ID: %s\n", we.GetID(), we.GetRunID())
var result string
err = we.Get(context.Background(), &result)
if err != nil {
log.Fatalln("ワークフロー結果の取得に失敗しました:", err)
}
log.Println(result)
}
// @@@SNIPEND
worker/main.go
package main
import (
"log"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"money-transfer-project-template-go/app"
)
// @@@SNIPSTART money-transfer-project-template-go-worker
func main() {
c, err := client.Dial(client.Options{})
if err != nil {
log.Fatalln("Temporalクライアントの作成に失敗しました。", err)
}
defer c.Close()
w := worker.New(c, app.MoneyTransferTaskQueueName, worker.Options{})
// このワーカーは、ワークフローとアクティビティの両方の関数をホストします。
w.RegisterWorkflow(app.MoneyTransfer)
w.RegisterActivity(app.Withdraw)
w.RegisterActivity(app.Deposit)
w.RegisterActivity(app.Refund)
// タスクキューのリッスンを開始します。
err = w.Run(worker.InterruptCh())
if err != nil {
log.Fatalln("ワーカーの起動に失敗しました", err)
}
}
// @@@SNIPEND
workflow.go
package app
import (
"fmt"
"time"
"go.temporal.io/sdk/temporal"
"go.temporal.io/sdk/workflow"
)
// @@@SNIPSTART money-transfer-project-template-go-workflow
func MoneyTransfer(ctx workflow.Context, input PaymentDetails) (string, error) {
// RetryPolicyは、アクティビティが失敗した場合に自動でリトライする方法を指定します。
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second, // 初回のリトライまでの待機時間
BackoffCoefficient: 2.0, // リトライ間隔の指数バックオフ係数
MaximumInterval: 100 * time.Second, // リトライの最大待機時間
MaximumAttempts: 500, // 0は無制限のリトライ回数
NonRetryableErrorTypes: []string{"InvalidAccountError", "InsufficientFundsError"}, // リトライしないエラータイプ
}
options := workflow.ActivityOptions{
// タイムアウトオプションは、アクティビティ関数が自動的にタイムアウトする時間を指定します。
StartToCloseTimeout: time.Minute,
// 必要に応じて、カスタマイズしたリトライポリシーを提供します。
// Temporalはデフォルトで失敗したアクティビティをリトライします。
RetryPolicy: retrypolicy,
}
// オプションを適用します。
ctx = workflow.WithActivityOptions(ctx, options)
// お金を引き出します。
var withdrawOutput string
withdrawErr := workflow.ExecuteActivity(ctx, Withdraw, input).Get(ctx, &withdrawOutput)
if withdrawErr != nil {
return "", withdrawErr
}
// お金を預けます。
var depositOutput string
depositErr := workflow.ExecuteActivity(ctx, Deposit, input).Get(ctx, &depositOutput)
if depositErr != nil {
// 預金に失敗した場合、元の口座にお金を戻します。
var result string
refundErr := workflow.ExecuteActivity(ctx, Refund, input).Get(ctx, &result)
if refundErr != nil {
return "",
fmt.Errorf("Deposit: %vへの預金に失敗しました: %v。%vへのお金の返金もできませんでした: %w",
input.TargetAccount, depositErr, input.SourceAccount, refundErr)
}
return "", fmt.Errorf("Deposit: %vへの預金に失敗しました: お金は%vに返金されました: %w",
input.TargetAccount, input.SourceAccount, depositErr)
}
result := fmt.Sprintf("振込完了(取引ID: %s, %s)", withdrawOutput, depositOutput)
return result, nil
}
// @@@SNIPEND
このワークフローでは、送金元からお金を出金し、送金先にお金を入金し、入金に失敗したときは送金元にお金を戻す、という処理を行っています。
まずワークフローの実行リクエストを見てみます。実行したいワークフローと、そのワークフローにわたす引数を指定して、SDK を介してリクエストを行っています。
we, err := c.ExecuteWorkflow(context.Background(), options, app.MoneyTransfer, input)
これにより、Temporal サービスにワークフローが登録され、タスクキューにエンキューされます。
次に Worker を見ていきます。対象となるワークフローやそのアクティビティを指定して、SDK 経由でタスクキューをポーリングする設定をしています。
w := worker.New(c, app.MoneyTransferTaskQueueName, worker.Options{})
w.RegisterWorkflow(app.MoneyTransfer)
w.RegisterActivity(app.Withdraw)
w.RegisterActivity(app.Deposit)
w.RegisterActivity(app.Refund)
err = w.Run(worker.InterruptCh())
タスクキューからデキューできたら、対象のワークフローを実行するわけですね。
サンプル実装 - 正常系
では、タスクキューに MoneyTransfer が流れてきたとします。Worker はそれを補足して、MoneyTransfer を実行します。
最初に実行されるアクティビティはこれです。SDK でラップして、アクティビティの実体となる関数(Withdraw)を実行しています。
withdrawErr := workflow.ExecuteActivity(ctx, Withdraw, input).Get(ctx, &withdrawOutput)
ここで、このアクティビティの実行イベントが、渡された引数や実行結果としての戻り値とともに、Temporal サービスに記録されます。
続いて、Deposit です。
depositErr := workflow.ExecuteActivity(ctx, Deposit, input).Get(ctx, &depositOutput)
ここでも同様に、SDK でラップして、Deposit を実行します。このアクティビティの実行イベントが、渡された引数や実行結果としての戻り値とともに、Temporal サービスに記録されます。
ワークフローがエラーを返さずに終了したら、そのイベントが Temporal サービスに記録されます。正常系はここで終了です。
サンプル実装 - 異常系(1)
次に、エラーした際にどのように復旧するのかについて見ていきましょう。Deposit でエラーが発生するパターンです。
Withdraw の実行までは正常系と同じで、その後 Deposit でエラーが発生します。そのときの挙動はどうなるでしょうか。
コードの上部でリトライポリシーを設定していました。このルールに従って、Deposit はリトライを行います。
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second, // 初回のリトライまでの待機時間
BackoffCoefficient: 2.0, // リトライ間隔の指数バックオフ係数
MaximumInterval: 100 * time.Second, // リトライの最大待機時間
MaximumAttempts: 500, // 0は無制限のリトライ回数
NonRetryableErrorTypes: []string{"InvalidAccountError", "InsufficientFundsError"}, // リトライしないエラータイプ
}
リトライ中、ワークフローの関数 MoneyTransfer は実行されたままです。
depositErr := workflow.ExecuteActivity(ctx, Deposit, input).Get(ctx, &depositOutput)
も実行されたままです。この ExecuteActivity の中で、成功するまでリトライが繰り返されます。もちろん、エラー発生に関しては、Temporal サービスにイベントとして登録されています。
ここでリトライを繰り返し、Deposit が成功したら、MoneyTransfer 関数に制御が戻ってきます。あとは正常系と同じ処理となります。
また、ここでリトライを繰り返し上限に達した場合でも、MoneyTransfer 関数に制御が戻ってきます。そのときは、depositErr に値が返ってくるので、その値をもとに不整合を解消する処理を実行します。サンプル実装では送金元にお金を戻す処理を実行していますね。
if depositErr != nil {
var result string
refundErr := workflow.ExecuteActivity(ctx, Refund, input).Get(ctx, &result)
// 略
}
アクティビティ単位の柔軟なリトライ設定を、宣言的に書けるのはいいですね。自前でどうにかする場合は、泥臭い実装をする必要があるかもしれません。
サンプル実装 - 異常系(2)
もう少し複雑なケースをみていきます。Deposit でエラーが発生しリトライを繰り返している途中に、デプロイ等の理由で worker プロセスが終了したとします。どうなるでしょうか。
Go SDK の場合は、TERM シグナルを待ち、それを受け取るとクリーンシャットダウンが実行されます。クリーンシャットダウンでは、新しいタスクのポーリングを止めて、実行中のアクティビティの完了を待ちます。なので、Deposit がエラーを返した時点で、一度 worker はプロセスを終了することになります。
その後、新しい Worker が起動し、再びポーリングを始めます。古い Worker が終了した時点では、Deposit を実行するアクティビティがタスクキューに入った状態となっています。なので、新しい worker は、そのタスクキューをポーリングし、Deposit アクティビティの実行を開始します。ワークフローを実行するわけではなさそうです。
さて、ここで Deposit がリトライ上限に達したとします。アクティビティは終了し、呼び出し元であったワークフローがタスクキューにセットされます。
Worker は再びワークフローをポーリングし、MoneyTransfer の実行を開始します。ここからが着目ポイントです。
MoneyTransfer の最初に実行されるのは、
withdrawErr := workflow.ExecuteActivity(ctx, Withdraw, input).Get(ctx, &withdrawOutput)
です。再び出金処理がされるのかと不安に思うかも知れませんが、そんなことはありません。過去に実行済のアクティビティであるため、Withdraw 自体の実行はスキップされます。代わりに Temporal サービスに保存されたイベントから、実行結果を再現して返します。
入金処理も同様です。
depositErr := workflow.ExecuteActivity(ctx, Deposit, input).Get(ctx, &depositOutput)
ここはエラーを返したので、エラーを返すということが再現されて、Deposit 自体の実行はスキップされます。
そして未実行だった
refundErr := workflow.ExecuteActivity(ctx, Refund, input).Get(ctx, &result)
のアクティビティが実行されて、ワークフローは終了します。
これはかなり強力ですね。ワークフローには数十分〜数日という寿命の長いものもありえます。それを実行途中で止めたとしても適切に再開できるというのは、大きな魅力ではないでしょうか。
まとめ
このように Temporal は、処理の進行状況を自動的に追跡し、障害が発生してもその地点から再開できる仕組みを提供しています。分散システムを扱う際、エラーやクラッシュによる処理の中断はよくあることで、そこからの復旧を考慮した実装を行うのは大変です。Temporal を活用して、より複雑なアプリケーションを容易に実現していければ最高ですね。
Discussion