Mastra で作る AIエージェント(9) Mastraでの実際的なワークフローの作り方
Mastra で作るAI エージェント というシリーズの第9回です。
マルチエージェントのひとつ「ワークフロー」
前回から、複数のAIエージェントがタスクを分担・協業する「マルチエージェント」の章に突入し、1つ目の方法論として「ワークフロー」を確認しました。
もともと、AIエージェントの構成を「三国志」になぞらえて以下のように把握しました。
- フロントに立つリーダーの劉備=エージェント:何をやるにしても軍師に相談
- 天才軍師・諸葛孔明=LLM:劉備に何かと助言するが、決して自分が直接前面に出ない
- 将軍たち=ツール:劉備に呼ばれて定型作業を遂行、RAG/API/MCPなどなど

マルチエージェントの1つ目は、エージェントを順次つないだ「ワークフロー」でした。

前回は「ワークフローとは何ぞや」で終わったので、今回は実際にMastraを使って実装していきます。
ワークフロー作成の全体フロー
「長文要約ワークフロー」を題材に、実際にMastraでワークフローを開発していく流れを示します。以下の手順で作成していきます。
- Phase 1: 要件定義
- Phase 2: 設計
- Phase 3: 実装準備
- Phase 4: 実装
Phase 1:要件定義
まず、何を達成したいワークフローなのかを明確にします。
- どんな問題を解決するのか?
- ワークフローの入力は何か?
- ワークフローの出力は何か?
- そのためにどんなビジネスロジックが必要か?
例えば、今回の長文要約ワークフローで考えると以下のようになります。
問題: 技術論文や長文記事を効率的に要約したい
入力: URL または テキスト
出力: 構造化された要約(タイトル、要点、キーワード)
ビジネスロジック:
1. ドキュメント取得
2. セクション分割
3. 各セクションの要点抽出
4. 意味的な構造化
5. 最終要約生成
Phase 2:設計
2.1 プロセスの分解(ステップの洗い出し)
タスクを論理的なステップに分解します。分解するときには以下の分解の観点を意識します。
- 順序性:前のステップの結果が次のステップで必要か
- 独立性:並列実行できるステップはあるか
- 責務:各ステップは単一の責務を持つか
- 再利用性:他のワークフローでも使えるか
例えば、今回の長文要約ワークフローで考えると以下のようになります。
// ステップ案
1. fetchDocument // ドキュメント取得
2. splitSections // セクション分割
3. extractKeyPoints // 各セクションから要点抽出(並列可能)
4. structurePoints // 要点を意味的に構造化
5. generateSummary // 最終要約生成
この例では、要件定義で洗い出した「ビジネスロジック」と今回の「ステップ」が1対1で対応していますが、必ずしも1対1に対応させる必要はありません。あくまで上記の「分解の観点」で考えるとよいと思います。例えば「ドキュメント取得」というビジネスロジックを以下の「URL検証ステップ」「ドキュメントフェッチステップ」「パース・正規化ステップ」の3ステップに分解することも考えられます。または「並列処理の制御(.parallel(), .foreach())」や「状態管理のための初期化・更新」、「エラーハンドリング・リトライ」といった実装上の制約でステップを設けることもあります。逆にパフォーマンスの理由で、関連する複数の処理を1つのステップにまとめることもあり得るでしょう。
2.2 制御フローの選択
各ステップをどう繋げるかを決定します。前回の記事にも書いた通り、主な制御フローは以下の通りです。
| メソッド | 用途 | 入力 | 出力 |
|---|---|---|---|
.then() |
順次実行 | T |
U |
.parallel() |
同じ入力に対して異なる処理 | T |
{ a: U, b: V } |
.foreach() |
配列の各要素に同じ処理 | T[] |
U[] |
.branch() |
条件分岐 | T |
{ selectedStep: U } |
.map() |
データ変換 | T |
U |
例えば、今回の長文要約ワークフローで考えると以下のようになります。
workflow
.then(fetchDocument) // 順次: ドキュメント取得
.then(splitSections) // 順次: セクション分割
.foreach(extractKeyPoints, { concurrency: 3 }) // 並列: 各セクションから要点抽出
.then(structurePoints) // 順次: 構造化
.then(generateSummary) // 順次: 要約生成
2.3 データフローの設計(スキーマ設計)
各ステップの入出力スキーマを設計します。「ワークフロー全体の入出力」とか「各ステップの入出力」とかが入り乱れて若干混乱しがちですが、基本ルールと手順を抑えればそれほど難しい話ではありません。
基本ルール:
- 最初のステップの
inputSchema= ワークフローのinputSchema - 最後のステップの
outputSchema= ワークフローのoutputSchema - 各ステップの
outputSchema= 次のステップのinputSchema- 一致しない場合は
.map()で変換
- 一致しない場合は
設計手順:
- ワークフロー全体の入出力を定義
- 各ステップの責務から必要な入出力を洗い出し
- スキーマの型を Zod で定義
- データ変換が必要な箇所を特定
例えば、今回の長文要約ワークフローで考えると以下のような手順で設計していくことになります。
import { z } from "zod";
// ----------------------------------------------------------------------
// 手順1:ワークフロー全体の入出力を定義
// ----------------------------------------------------------------------
// ワークフロー全体のインプット:URLを渡す
const workflowInput = z.object({
url: z.string()
});
// ワークフロー全体のアウトプット:タイトル・要約・キーポイント・タグ
const workflowOutput = z.object({
title: z.string(),
summary: z.string(),
keyPoints: z.array(z.string()),
tags: z.array(z.string())
});
// ----------------------------------------------------------------------
// 手順2:各ステップの責務から必要な入出力を洗い出し
// 手順3:スキーマの型を Zod で定義
// ----------------------------------------------------------------------
// Step 1: fetchDocument
// インプット:(ワークフローのインプットと同じなので割愛)
// アウトプット:内容・メタデータ{タイトル・著者}
const step1Output = z.object({
content: z.string(),
metadata: z.object({
title: z.string(),
author: z.string().optional()
})
});
// Step 2: splitSections
// インプット:(Step 1 のアウトプットと同じなので割愛)
// アウトプット:{セクションタイトル・内容}の配列
const step2Output = z.array(z.object({
sectionTitle: z.string(),
content: z.string()
}));
// Step 3: extractKeyPoints(foreach用)
// インプット:(Step 2 のアウトプットの配列を分解したもの)
// アウトプット:セクションタイトル・キーポイント
const step3Input = z.object({
sectionTitle: z.string(),
content: z.string()
});
const step3Output = z.object({
sectionTitle: z.string(),
keyPoints: z.array(z.string())
});
// ... 残りのステップも同様に定義
2.4 ステート管理の検討
複数ステップ間でデータを共有する必要がある場合、ワークフロー状態(ステート)を使用します。
ステートを使うケースの例は以下の通りです。
- 進捗カウンターの追跡
- 累積結果の保持
- 全ステップで共有する設定値
- 途中結果のキャッシュ
逆に以下のようなケースではステートを使わないのが原則です。
- 次のステップにのみ渡すデータ(outputSchema で十分)
- 一度しか使わないデータ
例
const stateSchema = z.object({
processedCount: z.number(),
startTime: z.date(),
intermediateResults: z.array(z.any())
});
const step = createStep({
stateSchema,
execute: async ({ state, setState }) => {
// 状態を読む
const count = state.processedCount;
// 状態を更新
setState({
...state,
processedCount: count + 1
});
}
});
2.5. ステップ入出力設計とステート設計の実践的ガイドライン
スキーマ設計の難しさ
上記の手順では、ワークフロー各ステップの入出力を設計してから、ステートの設計をしています。「基本はステップ入出力、明確な理由がある場合だけステート」ということですが、しかしそんなに簡単なものでしょうか。
問題1:設計変更による移動コスト
例えば、設計当初は「この属性は1回しか使わない」ということで、ステップ入出力に配置していたとします。しかし、後になって「他のステップでも必要になった」ということになった場合、ステートに移動が必要になり、リファクタリングコストが発生してしまいます。そもそも設計時点で「どの属性が複数ステップで使われるか」を完璧に予測するのは難しい。
問題2:参照先の煩雑さ
また、「この属性はステップ入力から、あの属性はステートから」のように参照先が2つに分かれ、実装時に煩雑に感じそうです。
// ステップの実装内で参照先が分かれる
execute: async ({ context, state }) => {
const title = context.documentTitle; // ステップ入力から
const count = state.processedCount; // ステートから
const metadata = context.metadata; // ステップ入力から
const cache = state.intermediateResults; // ステートから
}
そうであれば、「すべての属性をステートに寄せてしまえ」と考えたくなります。
それでも「すべてステートに寄せる」のはアンチパターン
それでも、「すべてのデータをステートで管理する」のはアンチパターンです。なぜ問題なのでしょうか。
-
データフローが不明瞭
// ❌ 悪い例: ステップの依存関係が分からない const step1 = createStep({ id: "step1", stateSchema: bigStateSchema, execute: async ({ state, setState }) => { // stateから何を使うかコードを読まないと分からない const data = state.someData; setState({ result: processData(data) }); } }); -
テスタビリティの低下
// ❌ 悪い例: テストに巨大な状態オブジェクトが必要 test("step1", () => { const hugeState = { prop1: ..., prop2: ..., // ... 50個の属性 }; // ステップが実際に使うのは2つだけなのに }); -
再利用性の低下
// ❌ 悪い例: 他のワークフローで使えない // 特定のステート構造に依存してしまう const validateStep = createStep({ stateSchema: specificWorkflowStateSchema, execute: async ({ state }) => { // このステップは特定のワークフローでしか使えない } }); -
型安全性の低下
- すべてのステップが巨大な状態オブジェクト全体にアクセス可能
- 本来アクセスすべきでないデータにもアクセスできてしまう
-
暗黙的な依存関係
// ❌ 悪い例: ステップの順序依存が分からない workflow .then(step1) // stateに何を設定? .then(step2) // stateの何を期待? .then(step3) // stateの何を使う?
設計時の予測テクニック
ということで、「基本はステップ入出力、明確な理由がある場合だけステート」という原則がよいです。完璧な予測は不可能ですが、以下の考え方で精度を上げられます。
-
ステップ間の距離を考える
- 隣接ステップ: 入出力で十分
- 離れたステップ: ステートを検討
-
データの性質を考える
- 変換されていくデータ: 入出力
- 不変の設定・メタデータ: ステート候補
-
類似ワークフローの経験を活かす
- 過去の実装で共通して使われたデータは?
- プロジェクトの典型的なパターンは?
-
ドメイン知識を活かす
- このビジネスプロセスで常に必要な情報は?
- ログ・監査で全体的に記録すべきものは?
Phase 3:実装準備
Phase 3 の「実装準備」とPhase 4 の「実装」は、まとめて「実装」にしてもよかったのですが、ここではワークフローそのものを作る前のエージェントや共通スキーマを「実装準備」としています。
3.1 必要なエージェントとツールの洗い出し・実装
ワークフローから呼ばれるエージェントやツールを実装します。
ここは、シングルエージェントの部分で解説しているので割愛します。基本的には、ツール → エージェント の順番で実装していきます。
3.2 共通スキーマの定義
複数ステップで使用するスキーマは、別ファイルで管理します。
schemas/summary-workflow.ts:
import { z } from "zod";
export const documentSchema = z.object({
content: z.string(),
metadata: z.object({
title: z.string(),
author: z.string().optional()
})
});
export const sectionSchema = z.object({
sectionTitle: z.string(),
content: z.string()
});
export const keyPointsSchema = z.object({
sectionTitle: z.string(),
keyPoints: z.array(z.string())
});
export const summaryOutputSchema = z.object({
title: z.string(),
summary: z.string(),
keyPoints: z.array(z.string()),
tags: z.array(z.string())
});
Phase 4:実装
いよいよワークフローの実装です。基本的には、ステップ → ワークフロー の順番で実装していきます。
4.1 ステップの実装
設計したステップを createStep で実装します。
実装例:
import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
import { fetchDocumentTool } from "../tools/fetch-document";
// Step 1: ドキュメント取得
const fetchDocumentStep = createStep({
id: "fetch-document",
inputSchema: z.object({
url: z.string()
}),
outputSchema: z.object({
content: z.string(),
metadata: z.object({
title: z.string()
})
}),
execute: async ({ inputData }) => {
const result = await fetchDocumentTool.execute(inputData);
return result;
}
});
// Step 2: セクション分割
const splitSectionsStep = createStep({
id: "split-sections",
inputSchema: z.object({
content: z.string()
}),
outputSchema: z.array(z.object({
sectionTitle: z.string(),
content: z.string()
})),
execute: async ({ inputData }) => {
const { content } = inputData;
// セクション分割ロジック
const sections = content.split('\n\n').map((section, index) => ({
sectionTitle: `Section${index + 1}`,
content: section
}));
return sections;
}
});
// Step 3: 要点抽出(エージェント使用)
const extractKeyPointsStep = createStep({
id: "extract-key-points",
inputSchema: z.object({
sectionTitle: z.string(),
content: z.string()
}),
outputSchema: z.object({
sectionTitle: z.string(),
keyPoints: z.array(z.string())
}),
execute: async ({ inputData, mastra }) => {
const agent = mastra.getAgent("summary-agent");
const response = await agent.generate(
`以下のセクションから重要なポイントを3-5個抽出してください:\n\n${inputData.content}`
);
// レスポンスをパース(実際にはstructuredOutputを使うべき)
const keyPoints = response.text.split('\n').filter(line => line.trim());
return {
sectionTitle: inputData.sectionTitle,
keyPoints
};
}
});
エージェントをステップとして使用する場合:
import { summaryAgent } from "../agents/summary-agent";
// エージェントを直接ステップとして使用
const agentStep = createStep(summaryAgent, {
structuredOutput: {
schema: z.object({
summary: z.string(),
tags: z.array(z.string())
})
}
});
4.2 ワークフローの組み立て
ステップを組み合わせてワークフローを構築します。
組み立てのポイント:
-
createWorkflowで開始 -
.then(),.parallel(),.foreach()などで制御フロー構築 -
.commit()で完成
実装例:
import { createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
export const summaryWorkflow = createWorkflow({
id: "summary-workflow",
inputSchema: z.object({
url: z.string()
}),
outputSchema: z.object({
title: z.string(),
summary: z.string(),
keyPoints: z.array(z.string()),
tags: z.array(z.string())
})
})
// 1. ドキュメント取得
.then(fetchDocumentStep)
// 2. メタデータを抽出しつつ、コンテンツを次へ
.map(({ inputData }) => ({
content: inputData.content,
title: inputData.metadata.title
}))
// 3. セクション分割
.then(splitSectionsStep)
// 4. 各セクションから要点抽出(並列処理)
.foreach(extractKeyPointsStep, { concurrency: 3 })
// 5. 要点を統合
.map(({ inputData, getStepResult }) => {
const allKeyPoints = inputData.flatMap(section => section.keyPoints);
const title = getStepResult(fetchDocumentStep).metadata.title;
return { title, keyPoints: allKeyPoints };
})
// 6. 最終要約生成(エージェント使用)
.then(generateFinalSummaryStep)
.commit();
4.3 データマッピングの調整
ステップ間でスキーマが一致しない場合、.map() で変換します。
マッピングパターン:
// 1. inputData を使って前ステップの出力を変換
.map(({ inputData }) => ({
newField: inputData.oldField.toUpperCase()
}))
// 2. getStepResult を使って特定ステップの出力を取得
.map(({ getStepResult }) => {
const step1Result = getStepResult(step1);
return { value: step1Result.data };
})
// 3. getInitData を使ってワークフロー初期入力を取得
.map(({ getInitData }) => {
const initData = getInitData();
return { original: initData.url };
})
// 4. mapVariable を使ってフィールドをリネーム
import { mapVariable } from "@mastra/core/workflows";
.map({
renamedField: mapVariable({
step: previousStep,
path: "originalField"
})
})
4.4 エラーハンドリングの追加
適切なエラーハンドリングを実装します。
基本的なエラーハンドリング:
const step = createStep({
execute: async ({ inputData }) => {
try {
// 処理
const result = await someOperation(inputData);
return result;
} catch (error) {
// エラーログ
console.error(`Error in step:`, error);
throw new Error(`Failed to process:${error.message}`);
}
}
});
リトライロジック:
// createStep で リトライオプションを指定
const step = createStep(agent, {
retries: 3 // 3回までリトライ
});
ループの中断:
const loopStep = createStep({
execute: async ({ inputData, iterationCount }) => {
// 最大イテレーション数チェック
if (iterationCount >= 10) {
throw new Error("Maximum iterations reached");
}
// 処理...
}
});
workflow.dountil(loopStep, async ({ inputData }) => {
return inputData.isDone;
});
いかがでしたでしょうか。実際にありそうなユースケースを例にとってワークフローを作る流れを追ってみました。次回は、マルチエージェントの2つ目「エージェント・スーパーバイザー」をご紹介します。
Discussion