Kubernatesクラスタでの構築 - Workflow Engine「Temporal」(4)
前回は、Temporalで使用するMySQLのセットアップ方法を紹介しました。今回はセットアップしたMySQLを前提として、KubernatesクラスタにTemporalサービスを構築する手順を紹介したいと思います。
Temporalの構成要素
構築をはじめる前に、まずはTemporalを構成する要素について整理したいと思います。図に書き出すと以下のようになります。
Temporalサーバー
Temporal サーバーは、Frontend、History、Matching、Worker の4つの独立してスケーリング可能なサービスで構成されています。これらのサービスは連携して Temporal サービスの中核を形成します。それぞれのコンポーネントと Temporal サーバーの関係について説明します。
temporal-frontend
Frontend サービスは Temporal サーバーのゲートウェイとして機能します。このサービスは、すべての受信コールに対するレート制限、ルーティング、および認証を担当します。
-
役割:
- ネームスペース操作
- 外部イベント処理
- ワーカーポーリング
- 可視性リクエスト処理
- CLI 操作処理
- 他サービスとの連携: Frontend サービスは、すべての他サービス(History、Matching、Worker)およびデータベースと通信します。
temporal-history
History サービスは、Workflow 実行の状態を保持するために重要な役割を果たします。
-
役割:
- Workflow 実行状態を Workflow History に保存する。
- Workflow 実行が進行可能な場合、更新された履歴を Task Queue にタスクとして追加する。
- 他サービスとの連携: Matching サービスおよびデータベースと通信します。
temporal-matching
Matching サービスは、タスクのディスパッチ用に Task Queue を管理します。
-
役割:
- ワーカーとタスクのマッチングを行う。
- 新しいタスクを適切なキューにルーティングする。
- 他サービスとの連携: Frontend サービス、History サービス、およびデータベースと通信します。
temporal-worker
Worker サービスは、バックグラウンド処理を行います。なお、これはTemporalサーバーの内部で動作するWorkerで、アプリケーションのWorkerとは異なるものなので混同しないように注意しましょう。
-
役割:
- レプリケーションキューの処理。
- システム Workflow の実行。
- 他サービスとの連携: 主に Frontend サービスと通信します。
補足
- 詳細は What is a Temporal Service? をご覧ください。
- なお、開発環境で使用している
temporalio/auto-setup
の Docker イメージには、Frontend、History、Matching、Worker のすべてのサービスが単一プロセスで動作するように含まれています。
temporal-admin-tools
Temporal を管理・操作するための各種ツールを含むサービスです。Temporal CLI が含まれており、ワークフローの開始、ワークフロー履歴の確認、ネームスペースの管理などの操作を行うことができます。また、temporal-sql-tool も含まれており、データベースのスキーマ管理を行うこともできます。
temporal-webui
ワークフロー実行の可視化を提供するサービスです。主にデバッグ用途で使用され、ワークフロー実行の状態やメタデータを確認できます。
Helm チャート を用いた Kubernates クラスタへの反映
Temporalには helm-charts が提供されています。今回はこのチャートを用いて構築することにしましょう。
helmfileは環境ごとの設定ができるため、staging環境の設定を書いていきましょう。まずは、起点となるhelmfile.yamlからです。ここで指定するチャートのバージョンとTemporalのバージョンの対応は、index.yamlから確認出来ます。
# helmfile.yaml
environments:
staging:
---
repositories:
- name: temporal
url: https://go.temporal.io/helm-charts
releases:
- name: temporal
chart: temporal/temporal
version: 0.52.0 # APP_VERSION:1.25.2 に対応しているチャートのバージョン
values:
- "temporal-base.yaml"
- "temporal-{{ .Environment.Name }}.yaml"
namespace: my-app-{{ .Environment.Name }}
次に、temporal-base.yamlを書いていきます。valuesのデフォルト値は、values.yamlとなっています。こちらを参考に設定していきましょう。
なお、replicaCountやresourcesの値はいったん最低限動くであろう値を設定しています。本番稼働にあたっては、負荷の状況を見て適切にチューニングする必要がありそうです。
# temporal-base.yaml
# temporalサーバー全般の設定
# デフォルトでは 'temporal-frontend.<namespace>.svc.cluster.local:7233' で
# クラスター内のgRPC通信を受け付ける
server:
config:
persistence:
# ワークフローの実行履歴やタスクキューなど、
# Temporalの中核的なデータを保存するために使用
default:
driver: "sql"
sql:
driver: "mysql8"
port: 3306
database: temporal
user: temporal_app
# "temporal-db-secret"というSecretのpasswordキーの値を自動で参照。
# ここにDBへの接続パスワードを設定しておく。
existingSecret: temporal-db-secret
maxConns: 20
maxConnLifetime: "1h"
tls:
enabled: true
caData: /etc/temporal/global-bundle.pem
# ワークフローの検索や可視化のためのデータを保存
# 大規模な環境や高度な検索機能が必要な場合は、Elasticsearchの使用を検討する
visibility:
driver: "sql"
sql:
driver: "mysql8"
port: 3306
database: temporal_visibility
user: temporal_app
# "temporal-db-secret"というSecretのvisibility-passwordキーの値を自動で参照。
# ここにDBへの接続パスワードを設定しておく。
maxConns: 20
maxConnLifetime: "1h"
tls:
enabled: true
caData: /etc/temporal/global-bundle.pem
additionalVolumeMounts:
- name: rds-ca-certs
mountPath: "/etc/temporal/global-bundle.pem"
subPath: "global-bundle.pem"
readOnly: true
additionalVolumes:
- name: rds-ca-certs
configMap:
name: rds-ca-certs
items:
- key: global-bundle.pem
path: global-bundle.pem
# temporal-frontendの設定
frontend:
replicaCount: 1
resources: # TODO: 本番環境用の適切な設定値を検討
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
# temporal-historyの設定
history:
replicaCount: 1
resources: # TODO: 本番環境用の適切な設定値を検討
limits:
cpu: 100m
# OOMKilledになるため、256Miにしている
memory: 256Mi
requests:
cpu: 10m
memory: 64Mi
# temporal-matchingの設定
matching:
replicaCount: 1
resources: # TODO: 本番環境用の適切な設定値を検討
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
# temporal-workerの設定
worker:
replicaCount: 1
resources: # TODO: 本番環境用の適切な設定値を検討
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
# temporal-webuiの設定
web:
ingress:
enabled: true
className: nginx
annotations:
nginx.ingress.kubernetes.io/proxy-body-size: 15m
# TODO: IPやOAuth等によるアクセス制御を追記
# デバッグ用のツールなので、パブリックにアクセスさせない
resources: # TODO: 本番環境用の適切な設定値を検討
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
admintools:
resources: # TODO: 本番環境用の適切な設定値を検討
limits:
cpu: 100m
memory: 128Mi
requests:
cpu: 10m
memory: 64Mi
# MySQLのみ使用する
# 他のデータストアは無効化
mysql:
enabled: true
cassandra:
enabled: false
postgresql:
enabled: false
prometheus:
enabled: false
grafana:
enabled: false
elasticsearch:
enabled: false
# 事故防止のため、DBスキーマの設定は自動で行わない
# DBスキーマの管理はadmintoolsコンテナに入って、temporal-sql-toolコマンドで行う
# これが有効の場合、temporal-schema Jobが作成され、そのJOBの中でスキーマの設定処理が行われる。
schema:
createDatabase:
enabled: false
setup:
enabled: false
update:
enabled: false
次に、temporal-staging.yamlを書いていきます。データベースやwebuiのエンドポイントなど、環境によって異なる値を設定していきます。
# temporal-staging.yaml
server:
config:
persistence:
default:
sql:
host: xxxx.xxxx.ap-northeast-1.rds.amazonaws.com
visibility:
sql:
host: xxxx.xxxx.ap-northeast-1.rds.amazonaws.com
web:
ingress:
hosts:
- temporal-webui.example.com
tls:
- secretName: xxxxx
hosts:
- temporal-webui.example.com
最後に、helmfileを使ってマニフェストを生成します。
# マニフェストを生成
helmfile --environment staging --file helmfile.yaml template > helmfile-manifests.yaml
Namespaceの設定
Temporalには、Namespaceというワークフローの実行環境を論理的に分離する仕組みがあります。すべてのワークフローはいずれかのNamespaceに所属します。そのため事前にNamespaceを作成しておく必要があります。
temporal-admin-tools を用いた管理
Temporalでは、temporal-admin-toolsポッドを使用してNamespaceを管理できます。
なお、temporal-admin-toolsは、TemporalサーバーのAPI経由でNamespaceの設定を行っています。TemporalサーバーのAPIのエンドポイントは、valuesの初期値で設定されているため、基本的には明示的に設定する必要はありません。
以下に手順を示します。
# admin-toolsポッドへのアクセス
kubectl exec -it <admin-tools-pod-name> -- /bin/bash
# "my-ns"というNamespaceを作成
temporal operator namespace create --namespace my-ns
# Namespaceの一覧表示
temporal operator namespace list
# Namespace設定の確認
temporal operator namespace describe --namespace my-ns
# my-nsにデータ保持期間を設定
# デフォルト値は72h(=3日)
# 保持期間は1日以上にする必要あり
temporal operator namespace update --retention 168h0m0s my-ns
※ データ保持期限について
retentionの値は、クローズされた Workflow のデータを保存する期間を決定し、その期間が終了するとデータが削除されます。ただし、実行中の Workflow には影響しません。また、長期保存が必要な場合は、アーカイブ機能を活用できます。
Temporal のアーカイブ機能を使用するには、アーカイブをセットアップし、有効化する必要があります。アーカイブにはストレージプロバイダーを設定する必要があります。以下のいずれかを選択できます:
- Amazon S3
- Google Cloud Storage
- ローカルファイルシステム
詳しくは、Self-hosted Archival setup をご覧ください。
Go SDK を用いた管理
Go SDKを用いてNamespaceを作成することも可能です。以下にサンプルコードを示します。
package main
import (
"context"
"fmt"
"log/slog"
"time"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/sdk/client"
"google.golang.org/protobuf/types/known/durationpb"
)
var TemporalAddr = "example.com:7233"
var TemporalNamespace = "my-ns"
var TemporalRetention = "720h" // 30日
// Namespace設定用のclient
func newNamespaceClient() (client.NamespaceClient, error) {
c, err := client.NewNamespaceClient(client.Options{HostPort: TemporalAddr})
if err != nil {
return nil, fmt.Errorf("failed to create client: %w", err)
}
return c, nil
}
// Namespaceの設定
func SetupNamespace() error {
ns := TemporalNamespace
retention, err := time.ParseDuration(TemporalRetention)
if err != nil {
return fmt.Errorf(" namespace の有効期間のパースに失敗しました: %w", err)
}
c, err := newNamespaceClient()
if err != nil {
return fmt.Errorf(" namespace Client の初期化に失敗しました: %w", err)
}
defer c.Close()
// Namespace の存在を確認
_, err = c.Describe(context.Background(), ns)
if err == nil {
slog.Info("temporal namespace already exists", "namespace", ns)
return nil
}
// Namespace の作成
err = c.Register(context.Background(), &workflowservice.RegisterNamespaceRequest{
Namespace: ns,
WorkflowExecutionRetentionPeriod: durationpb.New(retention),
})
if err != nil {
return fmt.Errorf("failed to register temporal namespace %s: %w", ns, err)
}
slog.Info("temporal namespace created", "namespace", ns)
// ドキュメントによると、Namespaceは利用できるまで最大15秒かかる
// ref: https://docs.temporal.io/namespaces#registration
// そのため、15秒待つ
time.Sleep(15 * time.Second)
return nil
}
// 指定のNamespaceを使う場合のクライアントの設定
func NewClient() (client.Client, error) {
opts := client.Options{
HostPort: TemporalAddr,
Namespace: TemporalNamespace,
}
c, err := client.Dial(opts)
if err != nil {
return nil, err
}
return c, nil
}
まとめ
この記事では、Temporalの主要なコンポーネント(Frontend、History、Matching、Worker)の役割と関係性について説明し、Helm チャートを使用してKubernetesクラスタにTemporalをデプロイする方法を紹介しました。また、Namespaceの作成と管理について、temporal-admin-toolsとGo SDKの両方のアプローチを解説しました。
Discussion