📘

Mastra で作る AIエージェント(9) Mastraでの実際的なワークフローの作り方

に公開

Mastra で作るAI エージェント というシリーズの第9回です。


マルチエージェントのひとつ「ワークフロー」

前回から、複数のAIエージェントがタスクを分担・協業する「マルチエージェント」の章に突入し、1つ目の方法論として「ワークフロー」を確認しました。

もともと、AIエージェントの構成を「三国志」になぞらえて以下のように把握しました。

  • フロントに立つリーダーの劉備=エージェント:何をやるにしても軍師に相談
  • 天才軍師・諸葛孔明=LLM:劉備に何かと助言するが、決して自分が直接前面に出ない
  • 将軍たち=ツール:劉備に呼ばれて定型作業を遂行、RAG/API/MCPなどなど

マルチエージェントの1つ目は、エージェントを順次つないだ「ワークフロー」でした。

前回は「ワークフローとは何ぞや」で終わったので、今回は実際にMastraを使って実装していきます。

ワークフロー作成の全体フロー

「長文要約ワークフロー」を題材に、実際にMastraでワークフローを開発していく流れを示します。以下の手順で作成していきます。

  • Phase 1: 要件定義
  • Phase 2: 設計
  • Phase 3: 実装準備
  • Phase 4: 実装

Phase 1:要件定義

まず、何を達成したいワークフローなのかを明確にします。

  • どんな問題を解決するのか?
  • ワークフローの入力は何か?
  • ワークフローの出力は何か?
  • そのためにどんなビジネスロジックが必要か?

例えば、今回の長文要約ワークフローで考えると以下のようになります。

問題: 技術論文や長文記事を効率的に要約したい
入力: URL または テキスト
出力: 構造化された要約(タイトル、要点、キーワード)
ビジネスロジック:
  1. ドキュメント取得
  2. セクション分割
  3. 各セクションの要点抽出
  4. 意味的な構造化
  5. 最終要約生成

Phase 2:設計

2.1 プロセスの分解(ステップの洗い出し)

タスクを論理的なステップに分解します。分解するときには以下の分解の観点を意識します。

  1. 順序性:前のステップの結果が次のステップで必要か
  2. 独立性:並列実行できるステップはあるか
  3. 責務:各ステップは単一の責務を持つか
  4. 再利用性:他のワークフローでも使えるか

例えば、今回の長文要約ワークフローで考えると以下のようになります。

// ステップ案
1. fetchDocument     // ドキュメント取得
2. splitSections     // セクション分割
3. extractKeyPoints  // 各セクションから要点抽出(並列可能)
4. structurePoints   // 要点を意味的に構造化
5. generateSummary   // 最終要約生成

この例では、要件定義で洗い出した「ビジネスロジック」と今回の「ステップ」が1対1で対応していますが、必ずしも1対1に対応させる必要はありません。あくまで上記の「分解の観点」で考えるとよいと思います。例えば「ドキュメント取得」というビジネスロジックを以下の「URL検証ステップ」「ドキュメントフェッチステップ」「パース・正規化ステップ」の3ステップに分解することも考えられます。または「並列処理の制御(.parallel().foreach())」や「状態管理のための初期化・更新」、「エラーハンドリング・リトライ」といった実装上の制約でステップを設けることもあります。逆にパフォーマンスの理由で、関連する複数の処理を1つのステップにまとめることもあり得るでしょう。

2.2 制御フローの選択

各ステップをどう繋げるかを決定します。前回の記事にも書いた通り、主な制御フローは以下の通りです。

メソッド 用途 入力 出力
.then() 順次実行 T U
.parallel() 同じ入力に対して異なる処理 T { a: U, b: V }
.foreach() 配列の各要素に同じ処理 T[] U[]
.branch() 条件分岐 T { selectedStep: U }
.map() データ変換 T U

例えば、今回の長文要約ワークフローで考えると以下のようになります。

workflow
  .then(fetchDocument)      // 順次: ドキュメント取得
  .then(splitSections)      // 順次: セクション分割
  .foreach(extractKeyPoints, { concurrency: 3 })  // 並列: 各セクションから要点抽出
  .then(structurePoints)    // 順次: 構造化
  .then(generateSummary)    // 順次: 要約生成

2.3 データフローの設計(スキーマ設計)

各ステップの入出力スキーマを設計します。「ワークフロー全体の入出力」とか「各ステップの入出力」とかが入り乱れて若干混乱しがちですが、基本ルールと手順を抑えればそれほど難しい話ではありません。

基本ルール:

  1. 最初のステップの inputSchema = ワークフローの inputSchema
  2. 最後のステップの outputSchema = ワークフローの outputSchema
  3. 各ステップの outputSchema = 次のステップの inputSchema
    • 一致しない場合は .map() で変換

設計手順:

  1. ワークフロー全体の入出力を定義
  2. 各ステップの責務から必要な入出力を洗い出し
  3. スキーマの型を Zod で定義
  4. データ変換が必要な箇所を特定

例えば、今回の長文要約ワークフローで考えると以下のような手順で設計していくことになります。

import { z } from "zod";

// ----------------------------------------------------------------------
// 手順1:ワークフロー全体の入出力を定義
// ----------------------------------------------------------------------

// ワークフロー全体のインプット:URLを渡す
const workflowInput = z.object({
  url: z.string()
});

// ワークフロー全体のアウトプット:タイトル・要約・キーポイント・タグ
const workflowOutput = z.object({
  title: z.string(),
  summary: z.string(),
  keyPoints: z.array(z.string()),
  tags: z.array(z.string())
});

// ----------------------------------------------------------------------
// 手順2:各ステップの責務から必要な入出力を洗い出し
// 手順3:スキーマの型を Zod で定義
// ----------------------------------------------------------------------

// Step 1: fetchDocument
//  インプット:(ワークフローのインプットと同じなので割愛)
//  アウトプット:内容・メタデータ{タイトル・著者}
const step1Output = z.object({
  content: z.string(),
  metadata: z.object({
    title: z.string(),
    author: z.string().optional()
  })
});

// Step 2: splitSections
//  インプット:(Step 1 のアウトプットと同じなので割愛)
//  アウトプット:{セクションタイトル・内容}の配列
const step2Output = z.array(z.object({
  sectionTitle: z.string(),
  content: z.string()
}));

// Step 3: extractKeyPoints(foreach用)
//  インプット:(Step 2 のアウトプットの配列を分解したもの)
//  アウトプット:セクションタイトル・キーポイント
const step3Input = z.object({
  sectionTitle: z.string(),
  content: z.string()
});
const step3Output = z.object({
  sectionTitle: z.string(),
  keyPoints: z.array(z.string())
});

// ... 残りのステップも同様に定義

2.4 ステート管理の検討

複数ステップ間でデータを共有する必要がある場合、ワークフロー状態(ステート)を使用します。

ステートを使うケースの例は以下の通りです。

  • 進捗カウンターの追跡
  • 累積結果の保持
  • 全ステップで共有する設定値
  • 途中結果のキャッシュ

逆に以下のようなケースではステートを使わないのが原則です。

  • 次のステップにのみ渡すデータ(outputSchema で十分)
  • 一度しか使わないデータ

const stateSchema = z.object({
  processedCount: z.number(),
  startTime: z.date(),
  intermediateResults: z.array(z.any())
});

const step = createStep({
  stateSchema,
  execute: async ({ state, setState }) => {
    // 状態を読む
    const count = state.processedCount;

    // 状態を更新
    setState({
      ...state,
      processedCount: count + 1
    });
  }
});

2.5. ステップ入出力設計とステート設計の実践的ガイドライン

スキーマ設計の難しさ

上記の手順では、ワークフロー各ステップの入出力を設計してから、ステートの設計をしています。「基本はステップ入出力、明確な理由がある場合だけステート」ということですが、しかしそんなに簡単なものでしょうか。

問題1:設計変更による移動コスト

例えば、設計当初は「この属性は1回しか使わない」ということで、ステップ入出力に配置していたとします。しかし、後になって「他のステップでも必要になった」ということになった場合、ステートに移動が必要になり、リファクタリングコストが発生してしまいます。そもそも設計時点で「どの属性が複数ステップで使われるか」を完璧に予測するのは難しい。

問題2:参照先の煩雑さ

また、「この属性はステップ入力から、あの属性はステートから」のように参照先が2つに分かれ、実装時に煩雑に感じそうです。

// ステップの実装内で参照先が分かれる
execute: async ({ context, state }) => {
  const title = context.documentTitle;  // ステップ入力から
  const count = state.processedCount;   // ステートから
  const metadata = context.metadata;    // ステップ入力から
  const cache = state.intermediateResults; // ステートから
}

そうであれば、「すべての属性をステートに寄せてしまえ」と考えたくなります。

それでも「すべてステートに寄せる」のはアンチパターン

それでも、「すべてのデータをステートで管理する」のはアンチパターンです。なぜ問題なのでしょうか。

  1. データフローが不明瞭

    // ❌ 悪い例: ステップの依存関係が分からない
    const step1 = createStep({
      id: "step1",
      stateSchema: bigStateSchema,
      execute: async ({ state, setState }) => {
        // stateから何を使うかコードを読まないと分からない
        const data = state.someData;
        setState({ result: processData(data) });
      }
    });
    
  2. テスタビリティの低下

    // ❌ 悪い例: テストに巨大な状態オブジェクトが必要
    test("step1", () => {
      const hugeState = {
        prop1: ...,
        prop2: ...,
        // ... 50個の属性
      };
      // ステップが実際に使うのは2つだけなのに
    });
    
  3. 再利用性の低下

    // ❌ 悪い例: 他のワークフローで使えない
    // 特定のステート構造に依存してしまう
    const validateStep = createStep({
      stateSchema: specificWorkflowStateSchema,
      execute: async ({ state }) => {
        // このステップは特定のワークフローでしか使えない
      }
    });
    
  4. 型安全性の低下

    • すべてのステップが巨大な状態オブジェクト全体にアクセス可能
    • 本来アクセスすべきでないデータにもアクセスできてしまう
  5. 暗黙的な依存関係

    // ❌ 悪い例: ステップの順序依存が分からない
    workflow
      .then(step1)  // stateに何を設定?
      .then(step2)  // stateの何を期待?
      .then(step3)  // stateの何を使う?
    

設計時の予測テクニック

ということで、「基本はステップ入出力、明確な理由がある場合だけステート」という原則がよいです。完璧な予測は不可能ですが、以下の考え方で精度を上げられます。

  1. ステップ間の距離を考える
    • 隣接ステップ: 入出力で十分
    • 離れたステップ: ステートを検討
  2. データの性質を考える
    • 変換されていくデータ: 入出力
    • 不変の設定・メタデータ: ステート候補
  3. 類似ワークフローの経験を活かす
    • 過去の実装で共通して使われたデータは?
    • プロジェクトの典型的なパターンは?
  4. ドメイン知識を活かす
    • このビジネスプロセスで常に必要な情報は?
    • ログ・監査で全体的に記録すべきものは?

Phase 3:実装準備

Phase 3 の「実装準備」とPhase 4 の「実装」は、まとめて「実装」にしてもよかったのですが、ここではワークフローそのものを作る前のエージェントや共通スキーマを「実装準備」としています。

3.1 必要なエージェントとツールの洗い出し・実装

ワークフローから呼ばれるエージェントやツールを実装します。

ここは、シングルエージェントの部分で解説しているので割愛します。基本的には、ツール → エージェント の順番で実装していきます。

3.2 共通スキーマの定義

複数ステップで使用するスキーマは、別ファイルで管理します。

schemas/summary-workflow.ts:

import { z } from "zod";

export const documentSchema = z.object({
  content: z.string(),
  metadata: z.object({
    title: z.string(),
    author: z.string().optional()
  })
});

export const sectionSchema = z.object({
  sectionTitle: z.string(),
  content: z.string()
});

export const keyPointsSchema = z.object({
  sectionTitle: z.string(),
  keyPoints: z.array(z.string())
});

export const summaryOutputSchema = z.object({
  title: z.string(),
  summary: z.string(),
  keyPoints: z.array(z.string()),
  tags: z.array(z.string())
});

Phase 4:実装

いよいよワークフローの実装です。基本的には、ステップ → ワークフロー の順番で実装していきます。

4.1 ステップの実装

設計したステップを createStep で実装します。

実装例:

import { createStep } from "@mastra/core/workflows";
import { z } from "zod";
import { fetchDocumentTool } from "../tools/fetch-document";

// Step 1: ドキュメント取得
const fetchDocumentStep = createStep({
  id: "fetch-document",
  inputSchema: z.object({
    url: z.string()
  }),
  outputSchema: z.object({
    content: z.string(),
    metadata: z.object({
      title: z.string()
    })
  }),
  execute: async ({ inputData }) => {
    const result = await fetchDocumentTool.execute(inputData);
    return result;
  }
});

// Step 2: セクション分割
const splitSectionsStep = createStep({
  id: "split-sections",
  inputSchema: z.object({
    content: z.string()
  }),
  outputSchema: z.array(z.object({
    sectionTitle: z.string(),
    content: z.string()
  })),
  execute: async ({ inputData }) => {
    const { content } = inputData;
    // セクション分割ロジック
    const sections = content.split('\n\n').map((section, index) => ({
      sectionTitle: `Section${index + 1}`,
      content: section
    }));
    return sections;
  }
});

// Step 3: 要点抽出(エージェント使用)
const extractKeyPointsStep = createStep({
  id: "extract-key-points",
  inputSchema: z.object({
    sectionTitle: z.string(),
    content: z.string()
  }),
  outputSchema: z.object({
    sectionTitle: z.string(),
    keyPoints: z.array(z.string())
  }),
  execute: async ({ inputData, mastra }) => {
    const agent = mastra.getAgent("summary-agent");
    const response = await agent.generate(
      `以下のセクションから重要なポイントを3-5個抽出してください:\n\n${inputData.content}`
    );

    // レスポンスをパース(実際にはstructuredOutputを使うべき)
    const keyPoints = response.text.split('\n').filter(line => line.trim());

    return {
      sectionTitle: inputData.sectionTitle,
      keyPoints
    };
  }
});

エージェントをステップとして使用する場合:

import { summaryAgent } from "../agents/summary-agent";

// エージェントを直接ステップとして使用
const agentStep = createStep(summaryAgent, {
  structuredOutput: {
    schema: z.object({
      summary: z.string(),
      tags: z.array(z.string())
    })
  }
});

4.2 ワークフローの組み立て

ステップを組み合わせてワークフローを構築します。

組み立てのポイント:

  1. createWorkflow で開始
  2. .then().parallel().foreach() などで制御フロー構築
  3. .commit() で完成

実装例:

import { createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";

export const summaryWorkflow = createWorkflow({
  id: "summary-workflow",
  inputSchema: z.object({
    url: z.string()
  }),
  outputSchema: z.object({
    title: z.string(),
    summary: z.string(),
    keyPoints: z.array(z.string()),
    tags: z.array(z.string())
  })
})
  // 1. ドキュメント取得
  .then(fetchDocumentStep)

  // 2. メタデータを抽出しつつ、コンテンツを次へ
  .map(({ inputData }) => ({
    content: inputData.content,
    title: inputData.metadata.title
  }))

  // 3. セクション分割
  .then(splitSectionsStep)

  // 4. 各セクションから要点抽出(並列処理)
  .foreach(extractKeyPointsStep, { concurrency: 3 })

  // 5. 要点を統合
  .map(({ inputData, getStepResult }) => {
    const allKeyPoints = inputData.flatMap(section => section.keyPoints);
    const title = getStepResult(fetchDocumentStep).metadata.title;
    return { title, keyPoints: allKeyPoints };
  })

  // 6. 最終要約生成(エージェント使用)
  .then(generateFinalSummaryStep)

  .commit();

4.3 データマッピングの調整

ステップ間でスキーマが一致しない場合、.map() で変換します。

マッピングパターン:

// 1. inputData を使って前ステップの出力を変換
.map(({ inputData }) => ({
  newField: inputData.oldField.toUpperCase()
}))

// 2. getStepResult を使って特定ステップの出力を取得
.map(({ getStepResult }) => {
  const step1Result = getStepResult(step1);
  return { value: step1Result.data };
})

// 3. getInitData を使ってワークフロー初期入力を取得
.map(({ getInitData }) => {
  const initData = getInitData();
  return { original: initData.url };
})

// 4. mapVariable を使ってフィールドをリネーム
import { mapVariable } from "@mastra/core/workflows";

.map({
  renamedField: mapVariable({
    step: previousStep,
    path: "originalField"
  })
})

4.4 エラーハンドリングの追加

適切なエラーハンドリングを実装します。

基本的なエラーハンドリング:

const step = createStep({
  execute: async ({ inputData }) => {
    try {
      // 処理
      const result = await someOperation(inputData);
      return result;
    } catch (error) {
      // エラーログ
      console.error(`Error in step:`, error);
      throw new Error(`Failed to process:${error.message}`);
    }
  }
});

リトライロジック:

// createStep で リトライオプションを指定
const step = createStep(agent, {
  retries: 3  // 3回までリトライ
});

ループの中断:

const loopStep = createStep({
  execute: async ({ inputData, iterationCount }) => {
    // 最大イテレーション数チェック
    if (iterationCount >= 10) {
      throw new Error("Maximum iterations reached");
    }
    // 処理...
  }
});

workflow.dountil(loopStep, async ({ inputData }) => {
  return inputData.isDone;
});

いかがでしたでしょうか。実際にありそうなユースケースを例にとってワークフローを作る流れを追ってみました。次回は、マルチエージェントの2つ目「エージェント・スーパーバイザー」をご紹介します。

>> 次回 : (10) スーパーバイザーがエージェントたちを束ねる

Discussion