🐯
Mastra WorkflowをHono経由でストリーミングしてフロントで受け取る方法
はじめに
Mastra は、AI エージェントを構築するための TypeScript フレームワークです。
Mastra には、複数の AI エージェントの実行を組み合わせる Workflows という機能があります。
今回は、これを手軽に実行するために、Hono の API として Workflows を実行し、各イベントの結果をストリームにてフロントエンド側に送るコードを実装します。
今回作成したコードは下記にあります。
前提
- Next.js の Route Handlers に Hono を組み込んでいます。
- 私の趣味です。
- 今回 Next.js 固有の機能は使いません。
- Mastra も Next.js に組み込んでいます。
- Next.js, Hono, Mastra が同一アプリとして動作しています。
実装
バックエンドの実装
まずは、バックエンドから実装しましょう。
Mastra のサンプル Workflow (weatherWorkflow
) を使います。
mastra
オブジェクトから Workflows を取得して、API 上で実行するだけです。
const run = await mastra.getWorkflow("weatherWorkflow").createRunAsync();
通常実行する場合は次のようにしますが
await run.start({ inputData: {...} });
ストリームで返して欲しい場合は下記のようにします。
await run.stream({ inputData: {...} })
さて、Hono にはストリーム処理のヘルパー関数があるので、こちらと組み合わせましょう。
Mastra のストリームを for-of で取り出して、 Hono のストリームに書き込みます。
import { stream } from "hono/streaming";
// <中略>
app.get("/stream", async (c) => {
const run = await mastra.getWorkflow("weatherWorkflow").createRunAsync();
const { stream: workflowStream } = run.stream({
inputData: { city: "tokyo" },
});
return stream(c, async (stream) => {
for await (const chunk of workflowStream) {
await stream.write(JSON.stringify(chunk) + "\n"); // NDJSON
}
});
});
フロントエンドの実装
さて、次はフロントエンドです。
レスポンスボディからgetReader
を介してチャンクごとにデータを取得します。
const res = await honoClient.api.stream.$get();
const reader = res.body.getReader();
Hono RPC を使ってもストリームには型がつかないので、zod でスキーマ定義 & バリデーションして、型をつけます。
// see https://mastra.ai/ja/reference/workflows/run-methods/stream
const streamEventTypeSchema = z.enum([
"start",
"step-start",
"tool-call",
"tool-call-streaming-start",
"tool-call-delta",
"step-result",
"step-finish",
"finish",
]);
const streamEventSchema = z.object({
type: streamEventTypeSchema,
payload: z.any().optional(),
});
const parsed = streamEventSchema.safeParse(...);
全景はこんな感じです。
async function getStream() {
const res = await honoClient.api.stream.$get();
if (!res.body) {
throw new Error("No response body");
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split("\n");
// 最後の要素は未完かもしれないので残す
buffer = lines.pop() ?? "";
for (const line of lines) {
if (!line) {
continue;
}
const parsed = streamEventSchema.safeParse(JSON.parse(line));
if (!parsed.success) {
console.error("Invalid chunk:", parsed.error, line);
continue;
}
const chunk = parsed.data; // chunkはstreamEventSchemaに従う
// ここで各イベントの処理を行う
}
}
if (buffer.trim()) {
const parsed = streamEventSchema.safeParse(JSON.parse(buffer));
if (!parsed.success) {
console.error("Invalid final chunk:", parsed.error, buffer);
return;
}
const chunk = parsed.data; // chunkはstreamEventSchemaに従う
// ここで各イベントの処理を行う
}
}
動作の様子
上記を組み合わせて、実際にフロントエンドで、Workflows のイベントを逐次表示してみました。
実行結果の全体
おわりに
まとめ
今回の手法を用いれば、Mastra の Workflows の進捗状況のデータを、フロントエンドまで簡単に渡すことができました!
簡単にワークフローの進捗状況をフロントエンドで表示したいときには、活用していきたいです!
注意点
- Workflows のデータが全てフロントに渡されてしまうので、セキュリティ観点で問題がないか考慮する必要があります。
- 今回の実装だと Workflows はアプリケーションサーバで実行される前提です。Workflows が重い処理の場合、別環境で実行する必要があるかもです。
- 今回の実装だと通信が中断された場合、復旧ができません。復旧できるようにする場合、ポーリングパターンを採用する必要があるかもです。
ちょっとした感想
- AI SDKで一部実装を簡易化できないか検討しましたが、ざっと見た限りだと、利用できそうなものはありませんでした。ただし、今後便利な SDK として発展していくと思うので、watch しておきたいです。
- Mastra のドキュメントは英語版がおすすめです。日本語版は LLM で機械的に訳したのか、読みにくい & 情報が古いように見えました。
参考
Discussion