🔍

TemporalのDurableさを覗く #LayerX_AI_Agent_ブログリレー

に公開

LayerX AI Agent ブログリレー 32 日目の記事です。

バクラク事業部 @itkq です。Platform Engineering部 SRE チームで仕事をしています。以下の記事が公開されているように、AI Agentの文脈でTemporalをやっていっています。
https://zenn.dev/layerx/articles/b5f6cf6e47221e
https://zenn.dev/layerx/articles/18dabf30c0abf6

Temporalとは何であり何を解決したいのか、またTemporalを使ったアプリケーションの実装イメージについてはこれらを参照してください。

この記事は、Temporal Systemをざっくりと理解してから、実験を通してTemporalの "Durableさ" に触れます[1]

Durable Execution

Temporalについて調べると "Durable" という単語をよく見かけます。ドキュメントでは次のように説明されています。

Temporal is a Durable Execution Platform. Durable Execution ensures that your application behaves correctly despite adverse conditions by guaranteeing that it will run to completion. This shift simplifies the development process. If a failure or a crash happens, your business processes keep running seamlessly without interruptions. Developers shift their focus on business logic rather than infrastructure concerns and create applications that are inherently scalable and maintainable.

https://docs.temporal.io/evaluate/understanding-temporal#durable-execution

障害やクラッシュが発生するような状況でも正しくアプリケーションが動作することを保証することで、アプリケーションコードはロジックのみに集中できて嬉しい、ということのようです。具体的に理解するため、実際にいくつか障害シナリオを実践していくことにします。

Temporal System

前提として、Temporalのシステム面を解説していきます。Temporalに関わる非常にざっくりとした登場人物は以下です。

  • Client: Workflowに対する操作をする
  • Temporal Server: Workflowのコントロールプレーン
  • Worker: Workflowの中身を実行するデータプレーン

Temporal Serverは、次のサービス(Temporal Services)から構成されます。

  • Frontend gateway: RPCのゲートウェイ。認証/認可・レートリミット・ルーティングを行う
  • History subsystem: Events (Event History) の確定・永続化と mutable state の管理、timer のスケジューリング、Taskの生成とMatching へエンキュー
  • Matching subsystem: Task Queueを所有し、(Frontend 経由で) Workerのlong-pollに応じてTaskをdispatchする
  • Worker Service: 内部のバックグラウンド処理に利用

https://docs.temporal.io/temporal-service

これらの関係をざっくり図示したものが以下です(適宜Worker Serviceを省くなどしています)。

https://docs.temporal.io/temporal-service/temporal-server

用語の説明が続きます。

  • Event
    • Event Historyに追記するimmutableな出来事。e.g. WorkflowExecutionStarted, ActivityTaskScheduled, etc.
  • Event History
    • Eventを時系列で追記したSingle Source of Truth
  • State (mutable state)
    • 現在の実行状態の集約 (最新イベント番号等)。Historyから導出され、永続化されるが、Workerから直接書き換えない
  • Workflow runtime
    • Workerプロセス内でWorkflowコードを決定的に実行し、Historyのリプレイから現在状態に到達してCommandを生成する実行環境
  • Activity executors
    • Workerプロセス内でActivity (外部I/O・副作用) を実行する
  • Task Queue
    • Matching subsystemが管理する論理キュー。WorkerはFrontendをlong-pollし、FrontendがMatchingと連携してWorkflow/ActivityのTaskを配信する
  • Workflow Task
    • 「Workflowを進める1ステップ」の仕事単位。Workflow runtimeが取得するとHistoryをリプレイしてCommandを返却する
  • Activity Task
    • 「Activityの実行」の仕事単位。Activity executorsが実行し、Completed/Failed/TimedOutを返却する
  • Command
    • Workflow runtimeが「次に行う操作」としてサーバへ返す指示。e.g. Activity のスケジュール、Complete/Fail etc. Commandが確定するとEventになる
    • コマンド一覧: https://docs.temporal.io/references/commands

Temporalのキモ

これまで説明した内容から、Temporalにおいて重要な概念はWorkflowとActivityであり、次のようにまとめられます。

1. Workflow: HistoryからCommandを決めるだけなので常に決定的に動く

Event HistoryがSingle Source of Truthであり、Workflow runtimeは、Historyをリプレイすることで、副作用を持たないCommand (Activity をスケジュール、Complete など) を決定して返すのが仕事です。これにより、Workflowの実行は常に決定的になります。Workflow runtimeはCommandをFrontendに返却し、History subsystemはそのCommandをEventに確定 (Append) し、必要なTaskをTask Queueに積みます。

2. Activity: 副作用を集約し、At-least-once前提で冪等に実行する

副作用 (外部I/Oなど) はActivityとして記述し、Activity executorsに実行させます。Retry/Timeout/Heartbeat (後述) で回復性が提供されるため、At-least-onceになります。したがって、冪等性を担保する設計が求められます。

Durableさを覗く実験概要

次の 4 個の障害シナリオについて実験をします。

シナリオ 目的(確認ポイント)
1. Workflow (Timer) interrupted Serverが停止してもTimerは履歴に残り、再起動後に処理される
2. Activity failed 一時失敗があってもRetry Policyに従い自動回復
3. Activity timeout w/o Heartbeat 途中再開はできないが、タイムアウト後に最初からやり直される
4. Activity interrupted w/ Heartbeat Heartbeatタイムアウト → 途中から再開

まだ説明していない機能について先に説明します。

Timer

TimerはWorkflow内の「一定時間後(または指定時刻)に次のWorkflow Taskを発火させる」耐久的な遅延トリガーで、 sleep() などはServerにScheduleTimerとして登録されます。トリガーすると TimerFiredがEvent Historyに追記され、Workflow Taskがenqueueされるため、Server/Workerが停止したとしても再開できます。また時間の評価はServer時刻を基準とします。

https://docs.temporal.io/workflow-execution/timers-delays#timer

Retry Policy

Retry Policyは、一時的な失敗やActivityのタイムアウト (Heartbeartも含む) 発生時に、Serverが自動再試行するためのルールです。なおWorkflowはデフォルトで再試行しませんが設定可能です。バックオフ時はTaskが再スケジュールされHistoryに記録されます。

https://docs.temporal.io/encyclopedia/retry-policies

Start-To-Close Timeout

Timeoutにはいくつか種類がありますが、今回の実験ではStart-To-Close Timeoutのみ扱います。これはActivityの1回の試行の実行時間上限です。超過するとHistoryに ActivityTaskTimedOut(START_TO_CLOSE) が記録され、Retry Policyに従い再試行されます。

https://docs.temporal.io/encyclopedia/detecting-activity-failures#start-to-close-timeout

Activity Heartbeat

ActivityにはHeartbeatという機能があり、進捗 (再開ポイント) をサーバーに定期送信する仕組みがあります。これにより、Activityが中断されたとしても、途中位置から再開できます。この機能も実験で観察します。

https://docs.temporal.io/encyclopedia/detecting-activity-failures#activity-heartbeat

実験環境

  • Docker / Docker Compose
  • Node.js 18+ / TypeScript
git clone https://github.com/temporalio/docker-compose.git
cd docker-compose
# Postgresを選択
docker compose -f docker-compose-postgres.yml up -d
# Web UI: http://localhost:8080

実験コード

要点は以下です。

  • Workflow(決定的ロジック): durabilityExperiment
    • オプションで sleep(timerSec) → Timer
    • その後、TimeoutとHeartbeatを指定してActivityを呼ぶ
  • Activity(副作用側): processLargeList
    • ループ処理を行い、任意で Heartbeat を送る
    • 環境変数 FAIL_ATTEMPTS で 最初の N 試行だけ必ず失敗を注入。 STEP_MS で 1 ステップ時間を調整。また SLOW_FIRST_ATTEMPT_MS で1回目のみの遅延を注入
  • Client: client.ts
    • SCENARIO 環境変数で Timer秒数, Start-To-Close Timeout, Heartbeat Timeout, Heartbeat有無を切り替える
// package.json
{
  "name": "temporal-durability-lab",
  "type": "module",
  "scripts": {
    "worker": "node --loader ts-node/esm src/worker.ts",
    "start": "node --loader ts-node/esm src/client.ts"
  },
  "dependencies": {
    "@temporalio/client": "^1.13.1",
    "@temporalio/worker": "^1.13.1",
    "@temporalio/workflow": "^1.13.1"
  },
  "devDependencies": {
    "ts-node": "^10.9.2",
    "typescript": "^5.9.3"
  }
}
// tsconfig.json
{
  "compilerOptions": {
    "target": "ES2022",
    "module": "NodeNext",
    "moduleResolution": "NodeNext",
    "strict": true,
    "skipLibCheck": true
  },
  "ts-node": { "esm": true },
  "include": ["src"]
}
// src/workflows/index.ts
import { proxyActivities, sleep } from '@temporalio/workflow';

export interface Acts {
  processLargeList(args: { total: number; chunk: number; useHeartbeat: boolean }): Promise<number>;
}

/** 実験ごとに引数で Activity のタイムアウトなどを切替(決定的にOK) */
export async function durabilityExperiment(opts?: {
  timerSec?: number;                 // シナリオ1で利用
  startToCloseSec?: number;          // Activityの StartToClose
  heartbeatTimeoutSec?: number;      // Activityの HeartbeatTimeout(使わない時は undefined)
  useHeartbeat?: boolean;            // Activityが heartbeat() を呼ぶか
}): Promise<string> {
  const {
    timerSec = 0,
    startToCloseSec = 30,
    heartbeatTimeoutSec,
    useHeartbeat = true
  } = opts ?? {};

  if (timerSec > 0) {
    await sleep(`${timerSec} s`);    // Timer: ScheduleTimer → TimerFired
  }

  const acts = proxyActivities<Acts>({
    taskQueue: 'demo-queue',
    startToCloseTimeout: `${startToCloseSec} s`,
    heartbeatTimeout: heartbeatTimeoutSec ? `${heartbeatTimeoutSec} s` : undefined,
    retry: { initialInterval: '2 s', maximumInterval: '30 s', maximumAttempts: 10 }
  });

  const processed = await acts.processLargeList({ total: 5000, chunk: 200, useHeartbeat });
  return `OK processed=${processed}`;
}
// src/activities/index.ts
import { Context } from '@temporalio/activity';

const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms));

/** STEP_MS: 1ループの処理時間(ms) */
const STEP_MS = Number(process.env.STEP_MS ?? '3000');
/** FAIL_ATTEMPTS: 最初の N 試行は必ず失敗(attempt は 1 始まり) */
const FAIL_ATTEMPTS = Number(process.env.FAIL_ATTEMPTS ?? '0');
/** SLOW_FIRST_ATTEMPT_MS: 最初の試行だけ追加遅延 */
const SLOW_FIRST_ATTEMPT_MS = Number(process.env.SLOW_FIRST_ATTEMPT_MS ?? '0');

export async function processLargeList(
  { total, chunk, useHeartbeat }: { total: number; chunk: number; useHeartbeat: boolean }
): Promise<number> {
  const ctx = Context.current();
  if (ctx.info.attempt === 1 && SLOW_FIRST_ATTEMPT_MS > 0) {
    await sleep(SLOW_FIRST_ATTEMPT_MS); // 1回目だけわざと時間超過させる
  }

  // 決め打ちで失敗させる(リトライ挙動をデモしやすい)
  if (FAIL_ATTEMPTS > 0 && ctx.info.attempt <= FAIL_ATTEMPTS) {
    // 観測しやすいように短い遅延だけ入れてから失敗
    await sleep(Math.min(STEP_MS, 1000));
    throw new Error(`Injected failure (attempt=${ctx.info.attempt} <= FAIL_ATTEMPTS=${FAIL_ATTEMPTS})`);
  }

  // Heartbeat 再開用のオフセット(なければ 0)
  let i = (ctx.info.heartbeatDetails as number | undefined) ?? 0;

  for (; i < total; i += chunk) {
    await sleep(STEP_MS);
    if (useHeartbeat) {
      // 進捗を保存(キャンセル検知点でもある)
      ctx.heartbeat(i + chunk);
    }
  }
  return total;
}
// src/worker.ts
import { Worker } from '@temporalio/worker';
import * as activities from './activities/index.js';

const worker = await Worker.create({
  workflowsPath: new URL('./workflows', import.meta.url).pathname,
  activities,
  taskQueue: 'demo-queue'
});
console.log('Worker started on demo-queue');
await worker.run();
// src/client.ts
import { Client } from '@temporalio/client';

const scenario = process.env.SCENARIO ?? '1'; // "1"〜"4"
const client = new Client();

const common = {
  workflowId: `wf-${Date.now()}`,
  taskQueue: 'demo-queue'
} as const;

let args: Parameters<typeof client.workflow.start>[1]['args'];

switch (scenario) {
  case '1': // Workflow (Timer) interrupted
    args = [{ timerSec: 45, startToCloseSec: 20, heartbeatTimeoutSec: 10, useHeartbeat: true }];
    break;
  case '2': // Activity failed
    args = [{ timerSec: 0, startToCloseSec: 120, heartbeatTimeoutSec: undefined, useHeartbeat: false }];
    break;
  case '3': // Activity timeout (no heartbeat)
    args = [{ timerSec: 0, startToCloseSec: 15, heartbeatTimeoutSec: undefined, useHeartbeat: false }];
    break;
  case '4': // Activity interrupted w/ Heartbeat
    args = [{ timerSec: 0, startToCloseSec: 240, heartbeatTimeoutSec: 30, useHeartbeat: true }];
    break;
  default:
    throw new Error(`Unknown SCENARIO=${scenario}`);
}

const handle = await client.workflow.start('durabilityExperiment', { ...common, args });
console.log('Started:', handle.workflowId, handle.firstExecutionRunId);
const res = await handle.result();
console.log('Result:', res);

実験1. Workflow (Timer) interrupted

STEP_MS=500 npm run worker
# Workflow開始
SCENARIO=1 npm run start
# Timer待機中にTemporal Serverを停止し再起動
docker compose stop temporal && sleep 50 && docker compose start temporal

Serverを落とす前の時点でHistoryの最後のEventはTimerStartedでした。

Server再起動後は、正常系と同じフローでWorkflowが完了しました。

Server/Worker停止中に期日を過ぎたTimerは、Server復帰後にServer側でTimerFiredとして処理され、次のWorkflow Taskがスケジュールされます (Timerの評価はServer時刻基準)。正常系との差分は、Workflowが実際に停止していた時間のみです。

実験2. Activity failed

SCENARIO=2 npm run start
FAIL_ATTEMPTS=2 STEP_MS=2000 npm run worker

わざと2回失敗させ、3回目の試行で成功しています。

リトライポリシー(デフォルト)は以下です。

Temporalは履歴の肥大化を避けるため、Activityのリトライ中はActivityTaskStartedを毎回記録しません[2]。最終的に完了(または最終失敗・最終タイムアウト)した時点で、その試行番号 (attempt) を持つ 1 つの ActivityTaskStarted イベントだけが履歴に追記されます。したがって、3回目で成功した場合は ActivityTaskStarted (attempt=3) が1件だけ見えるのが正常です。リトライの途中経過はServerでmutable stateで試行カウントを増やしながら管理され、必要に応じて再試行がスケジュールされます。

実験3. Activity timeout

SLOW_FIRST_ATTEMPT_MS=10000 STEP_MS=500 npm run worker
SCENARIO=3 npm run start

1回目だけわざと遅延させ、StartToClose Timeoutが発生します。

実験2と同様に再試行され、2回目の実行は成功しています。

実験4. Activity interrupted with Heartbeat

STEP_MS=4000 npm run worker
SCENARIO=4 npm run start
# 途中でworkerをkillし、heartbeatTimeout待つ
# その後再起動
STEP_MS=4000 npm run worker # restart

Heartbeatが記録されてからWorkerをkillします。するとHeartbeatが [200] の段階で、Heartbeat timeoutが発生します。

その後Workerを再起動することで、Heartbeatは順調に更新され、最終的に正常終了しました。

まとめ

この記事では、Temporalのシステム構成を俯瞰し、Timer, Retry, Timeout, Heartbeatを用いた小さな実験で "Durableさ"を確認しました。

WorkflowとActivityを正しく使い分け実装すれば、障害や瞬断があってもHistoryやStateから回復できます。バクラク事業部でもAI Agentのタスク基盤として活用を進めており、今後のPlatform整備にも取り組んでいきます。引き続き知見を共有していきます。

脚注
  1. もはやAI Agentと関係ある…?と自分でも思っていますが、ブログリレー大臣のGoはでているのでご容赦ください ↩︎

  2. https://community.temporal.io/t/when-does-temporal-write-the-activitytaskstarted-event-into-workflow-history/6162 ↩︎

LayerX

Discussion