Open16

mastra.ai 読解 | nextjs x hono x mastra といった構成でエージェントを作るメモ

hanzochanghanzochang

Mastraの主要概念と依存関係を俯瞰する

https://mastra.ai/docs

基本概念

Agent

  • AIエージェントの基本機能を提供
  • メモリ管理とツール呼び出しを実現
  • 会話履歴の保持と機能拡張を可能に
export const chefAgent = new Agent({
  name: "chef-agent",
  instructions:
    "You are Michel, a practical and experienced home chef" +
    "You help people cook with whatever ingredients they have available.",
  model: openai("gpt-4o-mini"),
});

Tools

  • エージェントの機能拡張を実現
  • 関数呼び出し機能を提供
  • カスタムツールの追加を可能に
// このツールはAgentに組み込まれる
export const stockPrices = createTool({
  id: "Get Stock Price",
  inputSchema: z.object({
    symbol: z.string(),
  }),
  description: `Fetches the last day's closing stock price for a given symbol`,
  execute: async ({ context: { symbol } }) => {
    console.log("Using tool to fetch stock price for", symbol);
    return {
      symbol,
      currentPrice: await getStockPrice(symbol),
    };
  },
});

// こんな感じに
export const stockAgent = new Agent<typeof tools>({
  name: "Stock Agent",
  instructions:
    "You are a helpful assistant that provides current stock prices. When asked about a stock, use the stock price tool to fetch the stock price.",
  model: openai("gpt-4o-mini"),
  tools: {
    stockPrices: tools.stockPrices, //ココ
  },
});

Workflow

  • グラフベースのワークフローエンジン
  • LLM呼び出しを確定的な方法で実行
  • 分岐や連鎖を含む制御フローを実現
myWorkflow
  .step(stepA)
    .then(stepB)
    .then(stepD)
  .after(stepA)
    .step(stepC)
    .then(stepE)
  .after([stepD, stepE])
    .step(stepF);

Memory

  • 会話履歴の保持と管理を実現
  • コンテキストベースの情報検索を提供
  • エージェントの状態管理を実現
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
 
const agent = new Agent({
  name: "Project Manager",
  instructions:
    "You are a project manager. You are responsible for managing the project and the team.",
  model: openai("gpt-4o-mini"),
});
 
await agent.stream("When will the project be completed?", {
  threadId: "project_123",
  resourceId: "user_123",
});

オプショナル

RAG

  • ドキュメント処理と埋め込み生成を提供
  • 複数のベクトルストアを統一的に扱えるAPIを実装
  • 関連チャンクの検索と知識の統合を可能に

Voice

  • テキスト読み上げと音声認識を提供
  • 音声対話機能を実装
  • 複数の音声プロバイダーとの連携を可能に

Evals

  • 出力の自動評価機能を提供
  • 毒性、バイアス、関連性などの評価を実現
  • カスタム評価指標の定義を可能に

MCP

  • (これあるのでかいね)
  • 複数のサーバーとの同時接続が可能
  • 標準入出力やSSEなど、様々な通信方式をサポート
  • 外部サービスとの統合を容易に実現
import { MCPConfiguration } from "@mastra/mcp";
import { Agent } from "@mastra/core/agent";
import { openai } from "@ai-sdk/openai";
 
const mcp = new MCPConfiguration({
  servers: {
    // stdio example
    sequential: {
      name: "sequential-thinking",
      server: {
        command: "npx",
        args: ["-y", "@modelcontextprotocol/server-sequential-thinking"],
      },
    },
    // SSE example
    weather: {
      url: new URL("http://localhost:8080/sse"),
      requestInit: {
        headers: {
          Authorization: "Bearer your-token",
        },
      },
    },
  },
});
hanzochanghanzochang

MastraはVercel製のAISDKに依存している

  • AI SDK にLLMモデルの参照(model routing)は依存
  • toolsも同様。toolsはStream処理にも関連してくる
hanzochanghanzochang

Mastra 単体でServe可能

Mastra uses Hono as its underlying HTTP server framework. When you build a Mastra application using mastra build, it generates a Hono-based HTTP server in the .mastra directory.

  • mastra単体で単独でサーバー起動可能
  • mastraサーバーにはHonoが使われている
import { Mastra } from '@mastra/core';
 
export const mastra = new Mastra({
  // Other configuration options
  serverMiddleware: [
    {
      handler: async (c, next) => {
        // Example: Add authentication check
        const authHeader = c.req.header('Authorization');
        if (!authHeader) {
          return new Response('Unauthorized', { status: 401 });
        }
        
        // Continue to the next middleware or route handler
        await next();
      },
      path: '/api/*', // Optional: defaults to '/api/*' if not specified
    },
    {
      handler: async (c, next) => {
        // Example: Add request logging
        console.log(`${c.req.method} ${c.req.url}`);
        await next();
      },
      // This middleware will apply to all routes since no path is specified
    }
  ]
});

https://mastra.ai/docs/deployment/server

hanzochanghanzochang

Next.jsに統合して利用する場合

2パターンのインストールガイドがある

  1. NextjsをAPIサーバー単独で利用する場合 | Separate Backend Integration
  2. NextjsでFrontとServerどちらも同居して利用する場合 | Direct Integration
    1. Server Actionsで利用するパターン
    2. API Routesで利用するパターン

*VibesCodingの場合2がおすすめ 人間による文脈伝達がしんどいので

Source

https://mastra.ai/docs/frameworks/01-next-js

hanzochanghanzochang

Streamを用いたResponseが可能

LLMの回答にでてくる、文字がリアルタイムにどんどん出てくるアレに対応

const stream = await myAgent.stream([
  { role: "user", content: "Tell me a story." },
]);
 
console.log("Agent:");
 
for await (const chunk of stream.textStream) {
  process.stdout.write(chunk);
}
Source

https://mastra.ai/docs/agents/00-overview#:~:text=%2C response.text)%3B-,Streaming%20responses,-For%20more%20real

hanzochanghanzochang

AgentのInputとOutputはZod利用可能

これはWorkflowにAgentを組み込んだ時に効果を発揮するだろう
型安全にINとOUTにデータを引き渡しできる

import { z } from "zod";
 
// Define the Zod schema
const schema = z.object({
  summary: z.string(),
  keywords: z.array(z.string()),
});
 
// Use the schema with the agent
const response = await myAgent.generate(
  [
    {
      role: "user",
      content:
        "Please provide a summary and keywords for the following text: ...",
    },
  ],
  {
    output: schema,
  },
);
 
console.log("Structured Output:", response.object);
Source

Using Zod

hanzochanghanzochang

Next.jsでのStreamの実装

Server

import { mastra } from '@/src/mastra';
 
export async function POST(req: Request) {
  const { messages } = await req.json();
  const myAgent = mastra.getAgent('weatherAgent');
  const stream = await myAgent.stream(messages);
 
  return stream.toDataStreamResponse();
}
  • stream.toDataStreamResponse() により Server-Sent Events にてクライアントに返却
    • Content-Type: text/plain; charset=utf-8 ではあるがSSEとのこと

SSEについては下記
https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation

Client

'use client';

import { useCompletion } from '@ai-sdk/react';

export default function Page() {
  const { completion, input, handleInputChange, handleSubmit } = useCompletion({
    streamProtocol: 'text',
  });

  return (
    <form onSubmit={handleSubmit}>
      <input name="prompt" value={input} onChange={handleInputChange} />
      <button type="submit">Submit</button>
      <div>{completion}</div>
    </form>
  );
}
  • Vercel AI SDKの useCompletion hookを用いることで受け取ったストリームを処理しレンダリングできる
hanzochanghanzochang

AI SDK Streamの中身

Stream例

f:{"messageId":"msg-P55Bgxd2L54SciutpqxZSjqP"}
9:{"toolCallId":"call_kGtM9Xq70JfvkQbXmIx2QYYk","toolName":"weatherTool","args":{"location":"Tokyo"}}
a:{"toolCallId":"call_kGtM9Xq70JfvkQbXmIx2QYYk","result":{"temperature":15.1,"feelsLike":13.1,"humidity":57,"windSpeed":7.4,"windGust":29.9,"conditions":"Clear sky","location":"Tokyo"}}
e:{"finishReason":"tool-calls","usage":{"promptTokens":181,"completionTokens":15},"isContinued":false}
f:{"messageId":"msg-SWWUe7t3jajdHgOyvcuPpXJt"}
0:"The"
0:" current"
0:" weather"
0:" in"
0:" Tokyo"
0:" is"
0:" clear"
0:" with"
0:" a"
0:" temperature"
0:" of"
0:" "
0:"15"
0:"."
0:"1"
0:"°C"
0:","
0:" though"
0:" it"
0:" feels"
0:" like"
0:" "
0:"13"
0:"."
0:"1"
0:"°C"
0:"."
0:" The"
0:" humidity"
0:" is"
0:" at"
0:" "
0:"57"
0:"%,"
0:" and"
0:" there's"
0:" a"
0:" wind"
0:" speed"
0:" of"
0:" "
0:"7"
0:"."
0:"4"
0:" km"
0:"/h"
0:" with"
0:" gust"
0:"s"
0:" up"
0:" to"
0:" "
0:"29"
0:"."
0:"9"
0:" km"
0:"/h"
0:"."
e:{"finishReason":"stop","usage":{"promptTokens":246,"completionTokens":60},"isContinued":false}
d:{"finishReason":"stop","usage":{"promptTokens":427,"completionTokens":75}}

f:json

f:{"messageId":"msg-..."}
  • message開始。メッセージのIDを通知。通常この後にツールコールやレスポンスが続く。

9:json

9:{"toolCallId":"call_...","toolName":"weatherTool","args":{"location":"Tokyo"}}
  • ツール呼び出し開始。
  • 9: は SDK 内部的に「ツール呼び出し準備ができたよ」というイベント。

a:json

a:{"toolCallId":"call_...","result":{...}}
  • ツールの結果が返ってきた(たとえば、天気情報)。
  • これはバックエンドでtoolNameに対応する実処理が呼ばれて返ってきたレスポンス。

e:json

e:{"finishReason":"tool-calls", ...}
  • ツール呼び出しが終わったタイミング。
  • finishReasonがtool-callsというのは、LLM側が「ツール呼び出しで一旦ストップしてるよ」という意味。

0: が連続してる部分

0:"The"
0:" current"
0:" weather"
  • 本文ストリーミング
  • チャットの返答をトークン単位(もしくは単語単位)で分割して送っている

最後の e: と d:

  • e: メッセージのストリームが正常に終わったという意味(finishReason: stop)
  • d: → 会話全体の終了情報(トークン数や完了理由などを含む、ラップアップ)
hanzochanghanzochang

現在Agentが何をやっているかをリアルタイムに示したい

今どんな作業をしているかを示すと、Agenticな演出になる。
進捗状況がすぐにわかり待ち時間のストレスを軽減できる。

やりかた

Agenticに現在作業して いることを示したい場合 AI SDKの useChat() を用い、streamを受信し onToolCall を用いて現在の状況を示すといい

  const { messages, input, handleInputChange, handleSubmit, isLoading } = useChat({
    api: '/api/weather',
    onToolCall: (toolCallPayload) => {
      const toolName = toolCallPayload.toolCall.toolName
      const args = toolCallPayload.toolCall.args as { location: string }
      if (toolName === 'weatherTool') {
        setToolStatus(`🔧 ${toolName}${args.location}の天気を取得中...`)
      } else {
        setToolStatus(`🌦️ 天気情報を取得中...`)
      }
    },
    onFinish: () => {
      setToolStatus('')
    },
  })

onToolCallはStreamの下記のJsonが来た時のcallbackとなっている

9:{"toolCallId":"call_...","toolName":"weatherTool","args":{"location":"Tokyo"}}
hanzochanghanzochang

Next.jsのHonoにmastraのstreamを組み込む場合

私はよくApiを、next.jsのmiddlewareにhonoを組み込んで利用している。

nextjsのapiのhonoのroutingにmastraのstream組み込みたい。
とした時に下記の様にやるとhonoの恩恵を受けながらも簡単に実現できる。

import { Hono } from 'hono'
import { mastra } from '@/server/mastra'

export const getWeather = new Hono()

const route = getWeather.post('/', async (c) => {
  try {
    const body = await c.req.json()
    const { messages } = weatherRequestSchema.parse(body)

    const agent = mastra.getAgent('weatherAgent')
    const lastMessage = messages[messages.length - 1]
    const city = lastMessage.content
    const stream = await agent.stream(`What's the weather like in ${city}?`)
   // ここがhonoを介したstreamの実装
    return c.newResponse(stream.toDataStreamResponse().body, {
      headers: {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        Connection: 'keep-alive',
      },
    })
  } catch (error) {
    console.error('API Error:', error)
    return c.json(
      {
        error: {
          message: '天気情報の取得に失敗しました',
          details: error instanceof Error ? error.message : undefined,
        },
      },
      500,
    )
  }
})

export type GetWeatherRoute = typeof route

もともと stream.toDataStreamResponse() でstreamは提供されているので、
honoが提供する streamSSE は用いず、newResponse にてresponseを作成する。
honoの c(Context) も維持される。

mastraが利用するAI SDKでもおそらく同じ実装でいけるはず。

hanzochanghanzochang

mastra devサーバーを起動

pnpm mastra dev -d server/mastra     

ディレクトリは -d で指定可能 デフォルトは src/mastra

hanzochanghanzochang

デバッグツールでは対話型でテスト可能

デバッグツールで、作ったagentと対話型で実行チェックできる

hanzochanghanzochang

Agentからworkflowは呼び出せるのか?

workflowがagentを呼び出す、という構造だと思っていたが、agentがworkflowを呼び出すといったこともできるっぽい?  現在調べている。少なくともagentの引数でworkflowは取れないが、tool内部で呼び出すことは可能か調べている。

https://mastra.ai/docs/agents/00-overview

hanzochanghanzochang

toolとしてworkflowを定義し、agentでtoolを使えば良い模様。

import { createTool } from '@mastra/core'
import { z } from 'zod'
import { mastra } from '..'

export const sourcingTool = createTool({
  id: 'search-tool',
  description: '検索する',
  inputSchema: z.object({
    input: z.string().describe('検索ワード'),
  }),

  outputSchema: z.string(),
  execute: async ({ context }) => {
    const sourcingWorkflow = mastra.getWorkflow('searchWorkflow')
    const run = sourcingWorkflow.createRun()
    await run.start({
      triggerData: {
        input: context.input,
      },
    })

    return 'success'
  },
})

hanzochanghanzochang

中断での対話式のワークフロー

suspend() を入れることにより一時中断し、ユーザーにパラメータの再入力を求めることができる

    console.log('Low confidence in generated content, suspending for human guidance', { guidance })

    // If confidence is low, suspend for human guidance
    if (!guidance) {
      // only suspend if no guidance is provided
      await suspend()
      return undefined
    }
https://mastra.ai/examples/workflows/suspend-and-resume