🌀

Vercel AI SDK + Temporal で Agent Loop を回す #LayerX_AI_Agent_ブログリレー

に公開


og:image 設定できると嬉しいな〜 と思いつつできないので、とりあえず先頭に配置されたアイキャッチ画像

LayerX AI Agent ブログリレー 22日目の記事です。

バクラク事業部 スタッフエンジニアの @izumin5210 です。
Platform Engineering 部 Enabling チームでいろいろなことをしています。

前回の記事で、AI Agent や Workflow を Durable に動かすことに利用できる Workflow Engine である Temporal について紹介しました。

https://zenn.dev/layerx/articles/b5f6cf6e47221e

今回は TypeScript + Vercel AI SDK で実装した AI Agent を Temporal 上で動かすための実装を解説します。


AI Agent を Temporal 上で動かすには?

Python の Pydantic AI では Pydantic AI 自身が、OpenAI Agents SDK Python では Temporal Workflow がそれぞれ Temporal Workflow 上で動かすための実装を公開しています。

https://ai.pydantic.dev/durable_execution/temporal/

https://temporal.io/blog/announcing-openai-agents-sdk-integration

TypeScript でも Mastra であれば Ingest という別の Workflow Engine での実行をサポートしており、 Temporal Workflow についても対応することが vNext Workflows リリース時のブログで表明されています。

https://mastra.ai/blog/vNext-workflows

しかし、TypeScript で Mastra などのフレームワークを使わず Vercel の AI SDKOpenAI Agents SDK TypeScript で AI Agent を実装し Temporal Workflow 上で動かしたい場合、ちょうどいい感じのパッケージなどは今のところは提供されておらず、自前で実装する必要があります。

本記事ではタイトルにあるように AI SDK を例に、Temporal Workflow 上で Durable な AI Agents を実装する方法を解説していきます。

おさらい: Vercel AI SDK と Agent Loop

いきなり Temporal Workflow と AI SDK を組み合わせる前に、まずは AI SDK と Agent Loop についておさらいします。
以下は AI SDK に1つの Tool を組み合わせ、天気を教えてくれる AI Agent の例です。

// AI SDK の "Next.js App Router Quickstart" のコード例から一部改変
// https://ai-sdk.dev/docs/getting-started/nextjs-app-router

const query = "今日の東京の天気は?"

const stream = streamText({
  model: openai('gpt-4o'),
  prompt: query,
  stopWhen: stepCountIs(5),
  tools: {
    weather: tool({
      description: 'Get the weather in a location (fahrenheit)',
      inputSchema: z.object({
        location: z.string().describe('The location to get the weather for'),
      }),
      execute: async ({ location }) => {
        const temperature = Math.round(Math.random() * (90 - 32) + 32);
        return {
          location,
          temperature,
        };
      },
    }),
  },
});

このコードを実行すると、内部的には以下のような挙動になるでしょう。
呼び出し元から見た挙動は「今日の天気は?という入力で71℉ですが返ってくる」だけですが、内部的には2回の API 呼び出し(LLM処理)とツール実行が含まれます。

今回の例は天気情報を適当に返しているのでレスポンスも一瞬ですが、実際のプロダクトでは API をコールするためもっと遅くなるはずです。また、tool 実行前後でより複雑なテキストを生成するケースでは tool 実行だけでなくその前後の LLM の処理でも時間がかかる可能性があります。そのような状態で、例えば後段の LLM の処理がネットワーク不調等で失敗するとどうなるでしょうか。毎回最初から実行しているとお金が無駄にかかりますし、レイテンシが増大するためユーザ体験としても悪くなります。

そこで、この一連の流れをワークフローと捉え、それぞれの LLM の処理や tool 実行を記録し、いずれかが一時的に失敗したときはそこから再開できるようにすることでワークフローが Durable に(耐久性が高く)なる…。 これが前回の記事でも紹介した Temporal Workflow 導入のモチベーションです。

AI SDK で実装した AI Agent を Temporal Workflow で動かす

先ほどの AI SDK のコード例を見てもらうとわかるとおりですが、個々の LLM 呼び出しや tool 実行自体は generateText 関数に隠蔽されています。 全体として Durable にするにはそれぞれの実行を Temporal Workflow でいうところの Activity とする必要があります。

// activities.ts
//
// Activity(これまでの記事中の表現だとタスクに相当)の実装

export async function getUserNameById(id: string): Promise<string> {
  const user = await repository.getUserById(id)
  return user.name
}

export async function greet(name: string): Promise<string> {
  return `👋 Hello, ${name}!`;
}
// workflow.ts
//
// Workflow の実装

import { proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';

const { getUserNameById, greet } = proxyActivities<typeof activities>();

// この Workflow 関数が durable に実行される
export async function sayHelloWorkflow({ userId }: { userId: string }): Promise<string> {
  // Activity がワーカープロセスに振り分けられ処理される
  const name = await getUserNameById(userId)
  return await greet(name);
}

TypeScript での Temporal Workflow 利用例 - 前回記事より引用

上記のコードでは Activity としたい(実行を記録したい)関数は Workflow から単純に呼び出されているだけなので自明ですが、 AI SDK では内部の LLM 呼び出しや tool 実行を Activity とする必要があります。これはどうすればいいでしょうか。

AI SDK の Agent Loop をツール実行の直前で止める

すごく単純な話ですが、 AI SDK がツールの実行の直前あるいは直後でループを中断してくれればそれでいいわけです。

  1. LLM 処理: 最初にどのツールを実行するかと、ツールに渡すパラメタを決定してループ中断
  2. ツール呼び出し: 決定したパラメタでツール実行
  3. LLM 処理: ツールの結果をコンテキストにのせてループ再開
  4. ...

以上のようなステップで分かれ、かつ各ステップが別の Temporal Activity で動作してくれれば、例えば3ステップ目の LLM 処理が失敗しても1, 2ステップの保存済みの結果を使って3ステップ目から再開できるはずです。

ではこの「ツールの実行で止める」というのをどうやって実現するかですが、Human-in-the-Loop(HITL) の実装が参考になります。 HITL は「(ユーザの許可が得られるまで) Loop を止める」ということで、やりたいことは実は似ています。 AI SDK の Cookbook には HITL の実装例が掲載されているので参考にできます。

https://ai-sdk.dev/cookbook/next/human-in-the-loop

このドキュメントにあるとおりで、 AI SDK ではツール定義から execute 関数を取り除くことでツール実行の手前でループを止めることができます。

const getWeatherTool = tool({
  description: "Get the weather in a location (fahrenheit)",
  inputSchema: z.object({
    location: z.string().describe("The location to get the weather for"),
  }),
  outputSchema: z.object({
    temperature: z.number().describe("The temperature in fahrenheit"),
  }),
  // execute 関数がないので、このツール実行手前で確実に Loop が止まる
});

// weather tool の execute 関数だったものは外に置いておく
async function getWeatherInformation({
  location,
}: InferToolInput<typeof getWeatherTool>) {
  const temperature = Math.round(Math.random() * (90 - 32) + 32);
  return {
    location,
    temperature,
  };
}

Agent Loop を自前で制御し、ツール実行結果を LLM に渡す

ループが中断されたらそこまでに生成された messages を確認し、ツール実行待ち状態があればツールを実行し、その結果を書き戻します。 そして、ツール実行結果が反映された messagesgenerateText に渡すことでループを再開できます

const tools = { weather: getWeatherTool };
export type MyUITools = InferUITools<typeof tools>;
export type MyUIMessage = UIMessage<unknown, {}, MyUITools>;

const messages: MyUIMessage[] = [
  {
    id: generateId(),
    role: "user",
    parts: [{ type: "text", text: "今日の東京の天気は?" }],
  },
];

let step = 0;
const maxSteps = 10;

while (step < maxSteps) {
  // LLM の処理
  const stream = streamText({
    model: openai("gpt-4o"),
    // 先頭で定義した `messages` にコンテキスト(メッセージ履歴)をためておき、 LLM に渡す
    messages,
    tools,
  });

  // stream からメッセージ吸い出し
  const newMsgs: MyUIMessage[] = [];
  for await (const msg of readUIMessageStream<MyUIMessage>({
    stream: stream.toUIMessageStream(),
  })) {
    if (newMsgs.at(-1)?.id === msg.id) {
      newMsgs[newMsgs.length - 1] = msg;
    } else {
      newMsgs.push(msg);
    }
  }

  /** 未処理の tool-result (LLM に渡してない tool 実行結果)があるか */
  let hasUnprocessedToolResults = false;

  // LLM の処理結果から、ツールの実行待ち状態のメッセージを探す
  for (const uiMsg of uiMessages) {
    const parts: typeof uiMsg.parts = [];
    for (const part of uiMsg.parts) {
      let newPart = part;
      switch (part.type) {
        case "tool-weather": {
          if (part.state === "input-available") {
            const output = await getWeatherInformation(content.input);
            // tool 実行結果を messages に反映(`tool-call` を `tool-result` で置き換え)
            newPart = {
              ...part,
              state: "output-available",
              output,
            }
            // 上記の tool-result を LLM に処理させたい
            hasUnprocessedToolResults = true;
          }
          break;
        }
      }
      parts.push(newPart);
    }
    messages.push({ ...uiMsg, parts });
  }

  // 未処理の tool-result がなければ終了, あればループを回す
  if (!hasUnprocessedToolResults) break
  step++;
}

(上記のコードは結構威圧感ある感じになってますが、実際は汎化・抽象化などによりもうちょっとまともなコードにするといいでしょう。)

AI SDK の Loop Control に関するドキュメントの最下部に自前でループを操作する例があり、これも参考にしています。というか上記の例も自前で Loop Control しているようなものなので…。

https://ai-sdk.dev/docs/agents/loop-control#implementing-a-manual-loop

LLM の処理, tool 実行を Temporal Activity にする

これで Temporal Acivity としたい(中断・再開可能にしたい) LLM の処理と tool の実行が分離できました。あとはこれらを関数に切り出し Activity として呼び出すようにするだけです。

// activities.ts

/** LLM でメッセージを処理させる Activity */
export async function processLLM(inputMessages: MyUIMessage[]) {
  const stream = streamText({
    model: openai("gpt-4o"),
    messages: convertToModelMessages(inputMessages),
    tools,
  });
  const newMsgs: MyUIMessage[] = [];
  for await (const msg of readUIMessageStream<MyUIMessage>({
    stream: stream.toUIMessageStream(),
  })) {
    if (newMsgs.at(-1)?.id === msg.id) {
      newMsgs[newMsgs.length - 1] = msg;
    } else {
      newMsgs.push(msg);
    }
  }
  return newMsgs;
}

/** ツール実行する Activity */
export async function executeToolCall(
  part: ToolUIPart<MyUITools>
): Promise<ToolUIPart<MyUITools>> {
  if (part.state !== "input-available") return part;
  switch (part.type) {
    case "tool-weather": {
      const output = await getWeatherInformation(part.input);
      return {
        ...part,
        state: "output-available",
        output,
      };
      break;
    }
    default: {
      throw new Error(`Unknown tool: ${part.type}`);
    }
  }
}
// workflows.ts

import { proxyActivities } from '@temporalio/workflow';
import type * as activities from './activities';

const { processLLM, executeToolCall } = proxyActivities<typeof activities>();

export async function runAgent() {
  const messages: MyUIMessage[] = [
    {
      id: generateId(),
      role: "user",
      parts: [{ type: "text", text: "今日の東京の天気は?" }],
    },
  ];

  let step = 0;
  const maxSteps = 10;

  while (step < maxSteps) {
    // LLM の処理を Temporal Activity として実行
    const uiMessages = await processLLM(messages);

    /** 未処理の tool-result があるか */
    let hasUnprocessedToolResults = false;

    // LLM の処理結果から、ツールの実行待ち状態のメッセージを探す
    for (const uiMsg of uiMessages) {
      const parts: typeof uiMsg.parts = [];
      for (const part of uiMsg.parts) {
        let newPart = part;
        if (part.type.startsWith("tool-")) {
          newPart = await executeToolCall(part as ToolUIPart<MyUITools>);
          hasUnprocessedToolResults = true;
        }
        parts.push(newPart);
      }
      messages.push({ ...uiMsg, parts });
    }

    // 未処理の tool-result がなければ終了
    if (!hasUnprocessedToolResults) break;
    step++;
  }

  // ...
}

この runAgent Workflow を実行しその様子を Web UI で見てみると、processLLMexecuteTool がそれぞれ独立した Activity として実行されている様子が見えるはずです。


Temporal の Web UI 上での実行結果。 ツール実行とその前後の LLM の処理が別 Activity になっているのがわかる。 わかりやすくするために getWeatherTool にはスリープを入れています。


各 Activity の入出力が Web UI 上で確認できる。

おわりに

TypeScript + AI SDK で実装した AI Agent の Agent Loop を Temporal Workflow として実装する例を紹介しました。一見するとちょっと威圧感ある実装ですが、一方で比較的汎化しやすい実装でもあるはずです。うまく薄い抽象化をかぶせることで一度書くだけで再利用しやすいコンポーネントにできるかもしれません。

「いやいや、とはいえコード複雑すぎやろ! Python(あるいはMastra)使お!」と思った方もいるでしょう。このあたりの基盤的な実装は今後おそらくコミュニティでも整備が進むはずで、もうちょっと待てば新しいソリューションが出るかもしれません。

今回は抽象化が少ない Vercel AI SDK を利用したからコード量が増えてるとも言えるし、逆に抽象化が少ないおかげでどんな要件でも柔軟に対応できているとも言えます。 AI Agent 周りはこの先どんな進化をしていくかは不透明なので、エコシステムもどう発展していくか・どういう抽象をかませるのが正解かもおそらく誰も見えていない状態です。 要するに、今うまいこと Bet できれば覇権が取れる可能性もあるということです。ワクワクしますね?

LayerX では AI Agent 時代のソフトウェアエンジニアリング・ Platform Engineering を共に作り上げていく仲間を募集しています!

LayerX

Discussion