🐙

脱データ不整合!? Workflow Engine「Temporal」の魅力 (2)

2024/11/04に公開

前回は、Temporal が一体どういうものかについて解説しました。

今回は、Temporalを用いた実装について書いていきたいと思います。使用する言語はGo言語です。以下を目標としてやっていきます。

  • docker-composeを用いた開発環境構築
  • Workflowのサンプル実装
  • Workflowのテストコードのサンプル実装

開発環境構築

Temporal サービス を担う docker-compose files を、公式が提供してくれています。なのでこれを clone して docker compose up するだけで動きます。

しかし、場合によっては、各プロジェクトごとに準備した compose.yml に統合したいケースもあると思います。今回は、RDBにMySQLを使う docker-compose-mysql.yml を参考に設定してみました。

services:
  db:
    image: mysql:8
    user: mysql
    ports:
      - 3306:3306
    environment:
      MYSQL_ALLOW_EMPTY_PASSWORD: "yes"
      MYSQL_USER: "app"
      MYSQL_PASSWORD: "app"
      MYSQL_DATABASE: "default"
      TZ: "UTC"
    volumes:
      - ./database/data:/var/lib/mysql
    healthcheck:
      test: "mysqladmin ping -h localhost"
      timeout: 20s
      retries: 10

  backend: # アプリケーションのバックエンド
    image: ubuntu:latest
    depends_on:
      db:
        condition: service_healthy
    working_dir: /app/src/backend
    command: bash -c 'pwd && make dev' # 適当な起動コマンド
    ports:
      - 8080:8080

  temporal: # Temporal Server
    depends_on:
      db:
        condition: service_healthy
    environment: # DBはbackendと共有
      - DB=mysql8
      - DB_PORT=3306
      - MYSQL_USER=root
      - MYSQL_PWD=
      - MYSQL_SEEDS=db
      - DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
    image: temporalio/auto-setup:1.25.0
    ports:
      - 7233:7233
    volumes:
	  # https://github.com/temporalio/docker-compose/tree/main/dynamicconfig をコピーして、
	  # ./temporal/dynamicconfig に配置する
      - ./temporal/dynamicconfig:/etc/temporal/config/dynamicconfig

  temporal-admin-tools: # コマンドラインベースのTemporal管理ツールを提供
    depends_on:
      - temporal
    environment:
      - TEMPORAL_ADDRESS=temporal:7233
      - TEMPORAL_CLI_ADDRESS=temporal:7233
    image: temporalio/admin-tools:1.25.0-tctl-1.18.1-cli-1.0.0
    stdin_open: true
    tty: true

  temporal-ui: # ブラウザでWorkflowの状態を確認できるページを提供
    depends_on:
      - temporal
    environment:
      - TEMPORAL_ADDRESS=temporal:7233
      - TEMPORAL_CORS_ORIGINS=http://localhost:7234
    image: temporalio/ui:2.26.2
    ports:
      - 7234:8080 # backendとポートが重複しないように

データベースコンテナは、アプリケーション と Temporal Server とで共有することにして、リソースの使用量を抑えました。本番環境では、保守性の観点から別に分けたほうがいいかもしれません。

なお、Temporal Server が参照している development-sql.yaml は以下のようになっていました。

limit.maxIDLength:
  - value: 255
    constraints: {}
system.forceSearchAttributesCacheRefreshOnRead:
  - value: true # Dev setup only. Please don't turn this on in production.
    constraints: {}

limit.maxIDLength は、Temporalで使用されるさまざまな識別子(Workflow ID、Workflow Type、Task Queue名など)の最大長を定義するもののようです。デフォルトではこの制限は1000文字です。しかし、MySQLやPostgreSQLは通常、varcharフィールドの長さをデフォルトで255文字に制限しています。この制限を加えることで、データベースシステムとの互換性が確保されるようです。Github Issue

system.forceSearchAttributesCacheRefreshOnRead は、trueに設定することで、作成された検索属性を即座に利用可能にできるようです。開発環境では、新しく作成された検索属性をキャッシュの更新を待たずにすぐに利用できるようになると便利だから、という理由のようです。

サンプル実装

さて、ここから具体的な実装に入っていきます。

まずは基本方針です。Workflowの実行リクエストをする側、Workflowを実行する側(Worker)とでは、ソースコードは分けることも可能ですが、今回は共有する方式を取りました。Railsとsidekiqのような関係ですね。

backend/workflow

workflow に関する処理は、backend/workflow パッケージにまとめました。
パッケージに含まれるファイルとしては以下のとおりです。

ファイル 説明
activity.go workflow で実行される activity を定義します。
client.go temporal に接続する client です。wokflow の実行リクエストや実行時のイベント送信はこの client 経由で行われます。
manager.go 各種 workflow を扱う構造体のインターフェースです。
sample.go Workflow の実装サンプルです。ひとつの Workflow のロジック、実行リクエスト、実行処理する worker の起動を担います。manager のインターフェースを備えており、activity と client を内包して実装されています。
catalog.go sample.go をはじめとする各種 Workflow をメンバーとして持つ構造体です。backend/workflow パッケージ外からは、この構造体を経由して Workflow の操作を行います。

workflow の実行リクエストや実行は、catalog 経由で manager のメソッドを叩くだけでよく、temporal は workflow パッケージ内に閉じるようにしています。

activity.go

package workflow

import (
	"context"
	"log/slog"
)

type ActivityExecutor interface {
	SampleOne(ctx context.Context, input SampleOneInput) (*SampleOneOutput, error)
	SampleTwo(ctx context.Context, input SampleTwoInput) (*SampleTwoOutput, error)
}

/**
 * workflowから呼び出されるアクティビティをメソッドとして持つ構造体
 * temporalに依存しない独立した実装となる.
 */
type ActivityClient struct {
	// dbやapiのclientが必要であればここに追加し
	// catalog,manager経由で受け取る
}

func NewActivityClient() ActivityExecutor {
	return &ActivityClient{}
}

type SampleOneInput struct {
	ID string
}

type SampleOneOutput struct {
	Message string
}

func (a *ActivityClient) SampleOne(ctx context.Context, input SampleOneInput) (*SampleOneOutput, error) {
	slog.Info("activity sample01", "input", input)
	return &SampleOneOutput{Message: "msg01"}, nil
}

type SampleTwoInput struct {
	ID string
}

type SampleTwoOutput struct {
	Message string
}

func (a *ActivityClient) SampleTwo(ctx context.Context, input SampleTwoInput) (*SampleTwoOutput, error) {
	slog.Info("activity sample02", "input", input)
	return &SampleTwoOutput{Message: "msg02"}, nil
}

client.go

package workflow

import (
	"github.com/spf13/viper"
	"github.com/stretchr/testify/mock"
	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/mocks"
)

func NewClient() (client.Client, error) {
	opts := client.Options{
		HostPort: viper.GetString("temporal_addr"),
	}
	c, err := client.Dial(opts)
	if err != nil {
		return nil, err
	}
	return c, nil
}

func NewMockClient() client.Client {
	c := &mocks.Client{}
	c.On("StartWorkflow", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
	return c
}

manager.go

package workflow

import (
	"go.temporal.io/sdk/workflow"
)

/**
 * Workflowを制御するManagerのインターフェース
 *
 * 実体はtemporalに依存する実装となるが、
 * ビジネスロジックと密結合しているため、workflowパッケージにまとめる。
 */
type manager[T, U any] interface {
	queueName() string                                 // Workflowを処理するワーカーのキュー名
	WorkflowName() string                              // Workflowの名前
	Workflow(ctx workflow.Context, input T) (U, error) // Workflowの定義
	RequestWorkflow(id string, input T) error          // Workflow実行のリクエスト
	RunWorker() error                                  // Workflowを処理するワーカーを起動
}

sample.go

package workflow

import (
	"context"
	"fmt"
	"time"

	"go.temporal.io/sdk/client"
	"go.temporal.io/sdk/temporal"
	"go.temporal.io/sdk/worker"
	"go.temporal.io/sdk/workflow"
)

// Workflowの入力パラメータ.
type SampleInput struct {
	ID string
}

// Workflowの出力パラメータ.
type SampleOutput struct {
	ID      string
	Message string
}

type SampleManager struct {
	client client.Client
}

func NewSampleManager(client client.Client) manager[SampleInput, SampleOutput] {
	return &SampleManager{
		client: client,
	}
}

func (m *SampleManager) queueName() string {
	return "sample-queue"
}

func (m *SampleManager) WorkflowName() string {
	return "SampleWorkflow"
}

func (m *SampleManager) Workflow(ctx workflow.Context, input SampleInput) (SampleOutput, error) {
	out := SampleOutput{
		ID: input.ID,
	}

	// RetryPolicyは、アクティビティが失敗した場合の自動的な再試行の処理方法を指定します。
	retrypolicy := &temporal.RetryPolicy{
		InitialInterval:        time.Second,
		BackoffCoefficient:     2.0,
		MaximumInterval:        100 * time.Second,
		MaximumAttempts:        10, // 0 is unlimited retries
		NonRetryableErrorTypes: []string{"SampleActivityError"},
	}

	options := workflow.ActivityOptions{
		// タイムアウトオプションは、アクティビティ関数を自動的にタイムアウトさせるタイミングを指定します。
		StartToCloseTimeout: time.Minute,
		// オプションでカスタマイズされたRetryPolicyを提供します。
		// Temporalはデフォルトで失敗したアクティビティを再試行します。
		RetryPolicy: retrypolicy,
	}

	// オプションを適用する
	ctx = workflow.WithActivityOptions(ctx, options)

	a := NewActivityClient()

	var msg1 SampleOneOutput
	err1 := workflow.ExecuteActivity(ctx, a.SampleOne, SampleOneInput(input)).Get(ctx, &msg1)

	if err1 != nil {
		var msg2 SampleTwoOutput
		err2 := workflow.ExecuteActivity(ctx, a.SampleTwo, SampleTwoInput(input)).Get(ctx, &msg2)

		if err2 != nil {
			return out, fmt.Errorf("sample02: failed to execute activity: %w", err2)
		}

		out.Message = msg2.Message
		return out, nil
	}

	out.Message = msg1.Message

	return out, nil
}

// SampleFlowの実行リクエストをする.
func (m *SampleManager) RequestWorkflow(id string, input SampleInput) error {
	_, err := m.client.ExecuteWorkflow(context.Background(), client.StartWorkflowOptions{
		ID:        id,
		TaskQueue: m.queueName(),
	}, m.WorkflowName(), input)

	if err != nil {
		return err
	}

	// 以下を実行すると、Workflowが終了するまでブロックされる(=同期処理になる)
	// var result SampleOutput
	// err = we.Get(context.Background(), &result)
	// if err != nil {
	// 	return err
	// }

	return nil
}

// SampleFlowのworkerを起動する.
func (m *SampleManager) RunWorker() error {
	a := NewActivityClient()

	w := worker.New(m.client, m.queueName(), worker.Options{})
	w.RegisterWorkflowWithOptions(m.Workflow, workflow.RegisterOptions{Name: m.WorkflowName()})
	w.RegisterActivity(a.SampleOne)
	w.RegisterActivity(a.SampleTwo)

	return w.Run(worker.InterruptCh())
}

catalog.go

package workflow

import (
	"go.temporal.io/sdk/client"
)

/**
 * Workflowを制御するManagerを保持する構造体
 * パッケージ外からはこの構造体を通じてWorkflowを利用する
 * temporalはworkflowパッケージ内に閉じるようにしている.
 */
type Catalog struct {
	Sample manager[SampleInput, SampleOutput]

	// 他のWorkflowを登録する場合はここに追加する
}

func NewCatalog(client client.Client) *Catalog {
	return &Catalog{
		Sample: NewSampleManager(client),

		// 他のWorkflowを登録する場合はここに追加する
	}
}

backend/xxx

上記実装を踏まえたうえで、Workflowの実行リクエストはこうしました。

今回は、WorkflowのIDを指定するようにしています。デフォルトの挙動では、同じIDを使用するとエラーするので二重実行防止が可能となります。また、このIDを使用して Workflow の状態も確認できるようです。なお、未指定でも Workflow の実行は可能なようで、そのときは Temporal が自動で Workflow に UUID を付与するようです。

wcl, err := workflow.NewClient()
if err != nil {
	return fmt.Errorf("failed to initialize workflow client: %w", err)
}
defer wcl.Close()
wc := workflow.NewCatalog(wcl)

// WorkflowのIDを指定することで、二重実行防止や実行状態確認に利用できる。
wid, err := uuid.NewV4()
if err != nil {
	fmt.Println("Error generating UUID:", err)
	return
}

err = wc.Sample.RequestWorkflow(wid, workflow.SampleInput{ID: "TEST-ID"})

backend/worker

workflowを監視実行するworkerの起動を行います。managerに含まれるworkflowはそれぞれ独立しているので、それらをゴルーチンを使って複数起動しています。

「実行頻度の高い特定のWorkflowだけを処理するworker」みたいなものを実装したい場合は、もうひと工夫必要そうです。

package worker

import (
	"fmt"
	"log/slog"

	"example.com/backend/workflow"
)

func Start() error {
	wcl, err := workflow.NewClient()
	if err != nil {
		return fmt.Errorf("failed to initialize workflow client: %w", err)
	}
	defer wcl.Close()
	wc := workflow.NewCatalog(wcl)

	return run(wc)
}

func run(wc *workflow.Catalog) error {
	errCh := make(chan error, 1)

	workflows := []struct {
		name string
		fn   func() error
	}{
		{wc.Sample.WorkflowName(), wc.Sample.RunWorker},

		// 他のワークフローはここに追加
	}

	for _, wf := range workflows {
		go func(name string, fn func() error) {
			err := fn()
			if err != nil {
				slog.Error("worker startup failed", "worker", name, "error", err)
				errCh <- fmt.Errorf("error occurred while starting %s: %w", name, err)
			}
		}(wf.name, wf.fn)
	}

	// メインゴルーチンでエラーを監視
	for {
		// debug
		slog.Info("start worker")

		err := <-errCh
		return err // エラーが発生した場合、即座に返す
	}
}

テストコード

Workflowのテストコードはこんな感じです。

package workflow

import (
	"errors"
	"testing"

	"github.com/stretchr/testify/mock"
	"github.com/stretchr/testify/require"
	"go.temporal.io/sdk/testsuite"
)

func TestSampleWorkflow(t *testing.T) {
	// テストスイートとテスト実行環境のセットアップ
	testSuite := &testsuite.WorkflowTestSuite{}
	env := testSuite.NewTestWorkflowEnvironment()

	ac := NewActivityClient()
	sm := SampleManager{
		client: NewMockClient(),
	}

	// アクティビティの実装をモック
	// OnActivity はアクティビティの実装をモックするためのメソッド
	// - 第1引数はインターセプトされるアクティビティ
	// - 第2引数は temporal 独自の context なので、mock.Anything を指定する
	// - 第3引数はインターセプトされたアクティビティに渡される引数。この引数が渡された場合Returnで指定した値が返される。
	// Return はアクティビティの戻り値をモックするためのメソッド
	// - 第1引数はアクティビティの戻り値
	// - 第2引数はアクティビティのエラー
	// ref: https://docs.temporal.io/develop/go/testing-suite#mock-and-override-activities
	env.OnActivity(ac.SampleOne, mock.Anything, SampleOneInput{ID: "123"}).Return(&SampleOneOutput{Message: "msg01"}, nil)
	env.OnActivity(ac.SampleTwo, mock.Anything, SampleTwoInput{ID: "123"}).Return(&SampleTwoOutput{Message: "msg02"}, nil)

	// ワークフローの実行
	env.ExecuteWorkflow(sm.Workflow, SampleInput{ID: "123"})

	// ワークフローの実行結果を検証
	require.True(t, env.IsWorkflowCompleted())
	require.NoError(t, env.GetWorkflowError())

	// ワークフローの戻り値を検証
	var result SampleOutput
	require.NoError(t, env.GetWorkflowResult(&result))
	require.Equal(t, "msg01", result.Message)

	// アクティビティが期待される回数だけ実行されたかを検証
	// ref: https://github.com/temporalio/sdk-go/issues/1672#issuecomment-1893644524
	env.AssertActivityNumberOfCalls(t, "SampleOne", 1)
	env.AssertActivityNumberOfCalls(t, "SampleTwo", 0)
}

func TestSampleWorkflowSaga(t *testing.T) {
	testSuite := &testsuite.WorkflowTestSuite{}
	env := testSuite.NewTestWorkflowEnvironment()

	ac := NewActivityClient()
	sm := SampleManager{
		client: NewMockClient(),
	}

	// SampleOneがエラーするケース
	env.OnActivity(ac.SampleOne, mock.Anything, SampleOneInput{ID: "123"}).Return(nil, errors.New("error"))
	env.OnActivity(ac.SampleTwo, mock.Anything, SampleTwoInput{ID: "123"}).Return(&SampleTwoOutput{Message: "msg02"}, nil)

	// ワークフローの実行
	env.ExecuteWorkflow(sm.Workflow, SampleInput{ID: "123"})

	// ワークフローの実行結果を検証
	// 正常終了するべきなのでエラーはない
	require.True(t, env.IsWorkflowCompleted())
	require.NoError(t, env.GetWorkflowError())

	// ワークフローの戻り値を検証
	var result SampleOutput
	require.NoError(t, env.GetWorkflowResult(&result))
	require.Equal(t, "msg02", result.Message)

	env.AssertActivityNumberOfCalls(t, "SampleOne", 10) // エラーに伴うリトライを検証
	env.AssertActivityNumberOfCalls(t, "SampleTwo", 1)  // SampleOneがエラーしたのでSampleTwoが実行されるべき
}

func TestSampleWorkflowError(t *testing.T) {
	testSuite := &testsuite.WorkflowTestSuite{}
	env := testSuite.NewTestWorkflowEnvironment()

	ac := NewActivityClient()
	sm := SampleManager{
		client: NewMockClient(),
	}

	// すべてのアクティビティがエラーするケース
	env.OnActivity(ac.SampleOne, mock.Anything, SampleOneInput{ID: "123"}).Return(nil, errors.New("error1"))
	env.OnActivity(ac.SampleTwo, mock.Anything, SampleTwoInput{ID: "123"}).Return(nil, errors.New("error2"))

	// ワークフローの実行
	env.ExecuteWorkflow(sm.Workflow, SampleInput{ID: "123"})

	// ワークフローはエラーで返されるか検証
	require.True(t, env.IsWorkflowCompleted())
	require.Error(t, env.GetWorkflowError())

	// アクティビティの実行回数の検証
	env.AssertActivityNumberOfCalls(t, "SampleOne", 10)
	env.AssertActivityNumberOfCalls(t, "SampleTwo", 10)
}

まとめ

開発環境構築、サンプル実装、テストコードのサンプル実装に分けて、Temporalを用いたコードを書いてきました。改善点はありそうなのですが、実装しながらブラッシュアップしていければと思っています。

Discussion