⚙️

Kubernatesクラスタでの構築 - Workflow Engine「Temporal」(4)

2025/02/12に公開

前回は、Temporalで使用するMySQLのセットアップ方法を紹介しました。今回はセットアップしたMySQLを前提として、KubernatesクラスタにTemporalサービスを構築する手順を紹介したいと思います。

Temporalの構成要素

構築をはじめる前に、まずはTemporalを構成する要素について整理したいと思います。図に書き出すと以下のようになります。

Temporalのアーキテクチャ

Temporalサーバー

Temporal サーバーは、FrontendHistoryMatchingWorker の4つの独立してスケーリング可能なサービスで構成されています。これらのサービスは連携して Temporal サービスの中核を形成します。それぞれのコンポーネントと Temporal サーバーの関係について説明します。

temporal-frontend

Frontend サービスは Temporal サーバーのゲートウェイとして機能します。このサービスは、すべての受信コールに対するレート制限ルーティング、および認証を担当します。

  • 役割:
    • ネームスペース操作
    • 外部イベント処理
    • ワーカーポーリング
    • 可視性リクエスト処理
    • CLI 操作処理
  • 他サービスとの連携: Frontend サービスは、すべての他サービス(History、Matching、Worker)およびデータベースと通信します。

temporal-history

History サービスは、Workflow 実行の状態を保持するために重要な役割を果たします。

  • 役割:
    • Workflow 実行状態を Workflow History に保存する。
    • Workflow 実行が進行可能な場合、更新された履歴を Task Queue にタスクとして追加する。
  • 他サービスとの連携: Matching サービスおよびデータベースと通信します。

temporal-matching

Matching サービスは、タスクのディスパッチ用に Task Queue を管理します。

  • 役割:
    • ワーカーとタスクのマッチングを行う。
    • 新しいタスクを適切なキューにルーティングする。
  • 他サービスとの連携: Frontend サービス、History サービス、およびデータベースと通信します。

temporal-worker

Worker サービスは、バックグラウンド処理を行います。なお、これはTemporalサーバーの内部で動作するWorkerで、アプリケーションのWorkerとは異なるものなので混同しないように注意しましょう。

  • 役割:
    • レプリケーションキューの処理。
    • システム Workflow の実行。
  • 他サービスとの連携: 主に Frontend サービスと通信します。

補足

  • 詳細は What is a Temporal Service? をご覧ください。
  • なお、開発環境で使用している temporalio/auto-setup の Docker イメージには、FrontendHistoryMatchingWorker のすべてのサービスが単一プロセスで動作するように含まれています。

temporal-admin-tools

Temporal を管理・操作するための各種ツールを含むサービスです。Temporal CLI が含まれており、ワークフローの開始、ワークフロー履歴の確認、ネームスペースの管理などの操作を行うことができます。また、temporal-sql-tool も含まれており、データベースのスキーマ管理を行うこともできます。

temporal-webui

ワークフロー実行の可視化を提供するサービスです。主にデバッグ用途で使用され、ワークフロー実行の状態やメタデータを確認できます。

詳細:Temporal Web UI

temporal-webuiの画面サンプル

Helm チャート を用いた Kubernates クラスタへの反映

Temporalには helm-charts が提供されています。今回はこのチャートを用いて構築することにしましょう。

helmfileは環境ごとの設定ができるため、staging環境の設定を書いていきましょう。まずは、起点となるhelmfile.yamlからです。ここで指定するチャートのバージョンとTemporalのバージョンの対応は、index.yamlから確認出来ます。

# helmfile.yaml
environments:
  staging:
---
repositories:
  - name: temporal
    url: https://go.temporal.io/helm-charts

releases:
  - name: temporal
    chart: temporal/temporal
    version: 0.52.0 # APP_VERSION:1.25.2 に対応しているチャートのバージョン
    values:
      - "temporal-base.yaml"
      - "temporal-{{ .Environment.Name }}.yaml"
    namespace: my-app-{{ .Environment.Name }}

次に、temporal-base.yamlを書いていきます。valuesのデフォルト値は、values.yamlとなっています。こちらを参考に設定していきましょう。

なお、replicaCountやresourcesの値はいったん最低限動くであろう値を設定しています。本番稼働にあたっては、負荷の状況を見て適切にチューニングする必要がありそうです。

# temporal-base.yaml

# temporalサーバー全般の設定
# デフォルトでは 'temporal-frontend.<namespace>.svc.cluster.local:7233' で
# クラスター内のgRPC通信を受け付ける
server:
  config:
    persistence:
      # ワークフローの実行履歴やタスクキューなど、
      # Temporalの中核的なデータを保存するために使用
      default:
        driver: "sql"
        sql:
          driver: "mysql8"
          port: 3306
          database: temporal
          user: temporal_app
          # "temporal-db-secret"というSecretのpasswordキーの値を自動で参照。
          # ここにDBへの接続パスワードを設定しておく。
          existingSecret: temporal-db-secret
          maxConns: 20
          maxConnLifetime: "1h"
          tls:
            enabled: true
            caData: /etc/temporal/global-bundle.pem
      # ワークフローの検索や可視化のためのデータを保存
      # 大規模な環境や高度な検索機能が必要な場合は、Elasticsearchの使用を検討する
      visibility:
        driver: "sql"
        sql:
          driver: "mysql8"
          port: 3306
          database: temporal_visibility
          user: temporal_app
          # "temporal-db-secret"というSecretのvisibility-passwordキーの値を自動で参照。
          # ここにDBへの接続パスワードを設定しておく。
          maxConns: 20
          maxConnLifetime: "1h"
          tls:
            enabled: true
            caData: /etc/temporal/global-bundle.pem
  additionalVolumeMounts:
    - name: rds-ca-certs
      mountPath: "/etc/temporal/global-bundle.pem"
      subPath: "global-bundle.pem"
      readOnly: true
  additionalVolumes:
    - name: rds-ca-certs
      configMap:
        name: rds-ca-certs
        items:
          - key: global-bundle.pem
            path: global-bundle.pem

  # temporal-frontendの設定
  frontend:
    replicaCount: 1
    resources: # TODO: 本番環境用の適切な設定値を検討
      limits:
        cpu: 100m
        memory: 128Mi
      requests:
        cpu: 10m
        memory: 64Mi

  # temporal-historyの設定
  history:
    replicaCount: 1
    resources: # TODO: 本番環境用の適切な設定値を検討
      limits:
        cpu: 100m
        # OOMKilledになるため、256Miにしている
        memory: 256Mi
      requests:
        cpu: 10m
        memory: 64Mi

  # temporal-matchingの設定
  matching:
    replicaCount: 1
    resources: # TODO: 本番環境用の適切な設定値を検討
      limits:
        cpu: 100m
        memory: 128Mi
      requests:
        cpu: 10m
        memory: 64Mi

  # temporal-workerの設定
  worker:
    replicaCount: 1
    resources: # TODO: 本番環境用の適切な設定値を検討
      limits:
        cpu: 100m
        memory: 128Mi
      requests:
        cpu: 10m
        memory: 64Mi

# temporal-webuiの設定
web:
  ingress:
    enabled: true
    className: nginx
    annotations:
      nginx.ingress.kubernetes.io/proxy-body-size: 15m
      # TODO: IPやOAuth等によるアクセス制御を追記
      # デバッグ用のツールなので、パブリックにアクセスさせない
  resources: # TODO: 本番環境用の適切な設定値を検討
    limits:
      cpu: 100m
      memory: 128Mi
    requests:
      cpu: 10m
      memory: 64Mi

admintools:
  resources: # TODO: 本番環境用の適切な設定値を検討
    limits:
      cpu: 100m
      memory: 128Mi
    requests:
      cpu: 10m
      memory: 64Mi

# MySQLのみ使用する
# 他のデータストアは無効化
mysql:
  enabled: true
cassandra:
  enabled: false
postgresql:
  enabled: false
prometheus:
  enabled: false
grafana:
  enabled: false
elasticsearch:
  enabled: false

# 事故防止のため、DBスキーマの設定は自動で行わない
# DBスキーマの管理はadmintoolsコンテナに入って、temporal-sql-toolコマンドで行う
# これが有効の場合、temporal-schema Jobが作成され、そのJOBの中でスキーマの設定処理が行われる。
schema:
  createDatabase:
    enabled: false
  setup:
    enabled: false
  update:
    enabled: false

次に、temporal-staging.yamlを書いていきます。データベースやwebuiのエンドポイントなど、環境によって異なる値を設定していきます。

# temporal-staging.yaml
server:
  config:
    persistence:
      default:
        sql:
          host: xxxx.xxxx.ap-northeast-1.rds.amazonaws.com
      visibility:
        sql:
          host: xxxx.xxxx.ap-northeast-1.rds.amazonaws.com
web:
  ingress:
    hosts:
      - temporal-webui.example.com
    tls:
      - secretName: xxxxx
        hosts:
          - temporal-webui.example.com

最後に、helmfileを使ってマニフェストを生成します。

# マニフェストを生成
helmfile --environment staging --file helmfile.yaml template > helmfile-manifests.yaml

Namespaceの設定

Temporalには、Namespaceというワークフローの実行環境を論理的に分離する仕組みがあります。すべてのワークフローはいずれかのNamespaceに所属します。そのため事前にNamespaceを作成しておく必要があります。

temporal-admin-tools を用いた管理

Temporalでは、temporal-admin-toolsポッドを使用してNamespaceを管理できます。

なお、temporal-admin-toolsは、TemporalサーバーのAPI経由でNamespaceの設定を行っています。TemporalサーバーのAPIのエンドポイントは、valuesの初期値で設定されているため、基本的には明示的に設定する必要はありません。

以下に手順を示します。

# admin-toolsポッドへのアクセス
kubectl exec -it <admin-tools-pod-name> -- /bin/bash

# "my-ns"というNamespaceを作成
temporal operator namespace create --namespace my-ns

# Namespaceの一覧表示
temporal operator namespace list

# Namespace設定の確認
temporal operator namespace describe --namespace my-ns

# my-nsにデータ保持期間を設定
# デフォルト値は72h(=3日)
# 保持期間は1日以上にする必要あり
temporal operator namespace update --retention 168h0m0s my-ns

※ データ保持期限について

retentionの値は、クローズされた Workflow のデータを保存する期間を決定し、その期間が終了するとデータが削除されます。ただし、実行中の Workflow には影響しません。また、長期保存が必要な場合は、アーカイブ機能を活用できます。

Temporal のアーカイブ機能を使用するには、アーカイブをセットアップし、有効化する必要があります。アーカイブにはストレージプロバイダーを設定する必要があります。以下のいずれかを選択できます:

  • Amazon S3
  • Google Cloud Storage
  • ローカルファイルシステム

詳しくは、Self-hosted Archival setup をご覧ください。

Go SDK を用いた管理

Go SDKを用いてNamespaceを作成することも可能です。以下にサンプルコードを示します。

package main

import (
	"context"
	"fmt"
	"log/slog"
	"time"

	"go.temporal.io/api/workflowservice/v1"
	"go.temporal.io/sdk/client"
	"google.golang.org/protobuf/types/known/durationpb"
)

var TemporalAddr = "example.com:7233"
var TemporalNamespace = "my-ns"
var TemporalRetention = "720h" // 30日

// Namespace設定用のclient
func newNamespaceClient() (client.NamespaceClient, error) {
	c, err := client.NewNamespaceClient(client.Options{HostPort: TemporalAddr})
	if err != nil {
		return nil, fmt.Errorf("failed to create client: %w", err)
	}
	return c, nil
}

// Namespaceの設定
func SetupNamespace() error {
	ns := TemporalNamespace
	retention, err := time.ParseDuration(TemporalRetention)
	if err != nil {
		return fmt.Errorf(" namespace の有効期間のパースに失敗しました: %w", err)
	}

	c, err := newNamespaceClient()
	if err != nil {
		return fmt.Errorf(" namespace Client の初期化に失敗しました: %w", err)
	}
	defer c.Close()

	// Namespace の存在を確認
	_, err = c.Describe(context.Background(), ns)
	if err == nil {
		slog.Info("temporal namespace already exists", "namespace", ns)
		return nil
	}

	// Namespace の作成
	err = c.Register(context.Background(), &workflowservice.RegisterNamespaceRequest{
		Namespace:                        ns,
		WorkflowExecutionRetentionPeriod: durationpb.New(retention),
	})
	if err != nil {
		return fmt.Errorf("failed to register temporal namespace %s: %w", ns, err)
	}
	slog.Info("temporal namespace created", "namespace", ns)

	// ドキュメントによると、Namespaceは利用できるまで最大15秒かかる
	// ref: https://docs.temporal.io/namespaces#registration
	// そのため、15秒待つ
	time.Sleep(15 * time.Second)
	return nil
}

// 指定のNamespaceを使う場合のクライアントの設定
func NewClient() (client.Client, error) {
	opts := client.Options{
		HostPort:  TemporalAddr,
		Namespace: TemporalNamespace,
	}
	c, err := client.Dial(opts)
	if err != nil {
		return nil, err
	}
	return c, nil
}

まとめ

この記事では、Temporalの主要なコンポーネント(Frontend、History、Matching、Worker)の役割と関係性について説明し、Helm チャートを使用してKubernetesクラスタにTemporalをデプロイする方法を紹介しました。また、Namespaceの作成と管理について、temporal-admin-toolsとGo SDKの両方のアプローチを解説しました。

Workflow Engine 「Temporal」の記事一覧

Discussion