🐯

Mastra WorkflowをHono経由でストリーミングしてフロントで受け取る方法

に公開

はじめに

Mastra は、AI エージェントを構築するための TypeScript フレームワークです。

https://mastra.ai/

Mastra には、複数の AI エージェントの実行を組み合わせる Workflows という機能があります。

今回は、これを手軽に実行するために、Hono の API として Workflows を実行し、各イベントの結果をストリームにてフロントエンド側に送るコードを実装します。

今回作成したコードは下記にあります。

https://github.com/hirokisakabe/mastra-workflow-stream-to-clientslide-sample

前提

  • Next.js の Route Handlers に Hono を組み込んでいます。
    • 私の趣味です。
    • 今回 Next.js 固有の機能は使いません。
  • Mastra も Next.js に組み込んでいます。
    • Next.js, Hono, Mastra が同一アプリとして動作しています。

実装

バックエンドの実装

まずは、バックエンドから実装しましょう。

Mastra のサンプル Workflow (weatherWorkflow) を使います。

mastraオブジェクトから Workflows を取得して、API 上で実行するだけです。

const run = await mastra.getWorkflow("weatherWorkflow").createRunAsync();

通常実行する場合は次のようにしますが

await run.start({ inputData: {...} });

ストリームで返して欲しい場合は下記のようにします。

await run.stream({ inputData: {...} })

さて、Hono にはストリーム処理のヘルパー関数があるので、こちらと組み合わせましょう。

https://hono.dev/docs/helpers/streaming

Mastra のストリームを for-of で取り出して、 Hono のストリームに書き込みます。

import { stream } from "hono/streaming";

// <中略>

app.get("/stream", async (c) => {
  const run = await mastra.getWorkflow("weatherWorkflow").createRunAsync();

  const { stream: workflowStream } = run.stream({
    inputData: { city: "tokyo" },
  });

  return stream(c, async (stream) => {
    for await (const chunk of workflowStream) {
      await stream.write(JSON.stringify(chunk) + "\n"); // NDJSON
    }
  });
});

フロントエンドの実装

さて、次はフロントエンドです。

レスポンスボディからgetReaderを介してチャンクごとにデータを取得します。

const res = await honoClient.api.stream.$get();

const reader = res.body.getReader();

Hono RPC を使ってもストリームには型がつかないので、zod でスキーマ定義 & バリデーションして、型をつけます。

// see https://mastra.ai/ja/reference/workflows/run-methods/stream
const streamEventTypeSchema = z.enum([
  "start",
  "step-start",
  "tool-call",
  "tool-call-streaming-start",
  "tool-call-delta",
  "step-result",
  "step-finish",
  "finish",
]);

const streamEventSchema = z.object({
  type: streamEventTypeSchema,
  payload: z.any().optional(),
});

const parsed = streamEventSchema.safeParse(...);

全景はこんな感じです。

async function getStream() {
  const res = await honoClient.api.stream.$get();

  if (!res.body) {
    throw new Error("No response body");
  }

  const reader = res.body.getReader();
  const decoder = new TextDecoder();

  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();

    if (done) {
      break;
    }

    buffer += decoder.decode(value, { stream: true });

    const lines = buffer.split("\n");

    // 最後の要素は未完かもしれないので残す
    buffer = lines.pop() ?? "";

    for (const line of lines) {
      if (!line) {
        continue;
      }

      const parsed = streamEventSchema.safeParse(JSON.parse(line));

      if (!parsed.success) {
        console.error("Invalid chunk:", parsed.error, line);
        continue;
      }

      const chunk = parsed.data; // chunkはstreamEventSchemaに従う

      // ここで各イベントの処理を行う
    }
  }

  if (buffer.trim()) {
    const parsed = streamEventSchema.safeParse(JSON.parse(buffer));

    if (!parsed.success) {
      console.error("Invalid final chunk:", parsed.error, buffer);
      return;
    }

    const chunk = parsed.data; // chunkはstreamEventSchemaに従う

    // ここで各イベントの処理を行う
  }
}

動作の様子

上記を組み合わせて、実際にフロントエンドで、Workflows のイベントを逐次表示してみました。

実行結果の全体

おわりに

まとめ

今回の手法を用いれば、Mastra の Workflows の進捗状況のデータを、フロントエンドまで簡単に渡すことができました!

簡単にワークフローの進捗状況をフロントエンドで表示したいときには、活用していきたいです!

注意点

  • Workflows のデータが全てフロントに渡されてしまうので、セキュリティ観点で問題がないか考慮する必要があります。
  • 今回の実装だと Workflows はアプリケーションサーバで実行される前提です。Workflows が重い処理の場合、別環境で実行する必要があるかもです。
  • 今回の実装だと通信が中断された場合、復旧ができません。復旧できるようにする場合、ポーリングパターンを採用する必要があるかもです。

ちょっとした感想

  • AI SDKで一部実装を簡易化できないか検討しましたが、ざっと見た限りだと、利用できそうなものはありませんでした。ただし、今後便利な SDK として発展していくと思うので、watch しておきたいです。
  • Mastra のドキュメントは英語版がおすすめです。日本語版は LLM で機械的に訳したのか、読みにくい & 情報が古いように見えました。

参考

https://mastra.ai/en/reference/workflows/run-methods/stream

https://hono.dev/docs/helpers/streaming

Discussion