atama plus techblog
🧩

Mastraワークフローの型安全性と変更容易性を高める設計指針

に公開

はじめに

こんにちは、atamaplus の takei です。

最近、趣味や業務でエージェントフレームワークであるMastraワークフロー機能[1]を約 1 ヶ月ほど使ってみました。
実際に使ってみて非常に便利である一方で、適切に設計しないと型安全性や変更容易性(保守性)に課題が生じやすいと感じました。

特に以下のような課題に直面しました。

  • ステップ[2] の中で getInitData[3]runtimeContext[4] を使うと型推論が効かず、型安全でないコードになる
  • ステップがワークフロー定義に依存し、ワークフローの変更や他のワークフローでのステップの再利用が難しくなる
  • ステップ内でのエラー発生がインターフェース上に明示されず、エラーハンドリング漏れの原因となる

このような課題に対して、ステップはすべての依存関係と副作用を入出力スキーマで明示し、ワークフロー層でワークフロー固有の処理(データ変換や依存の解決)を行うという設計方針を採用しました。

つまり、ステップを他のステップや実行時の値に依存しない「純粋な処理単位」として設計し、ワークフローから分離することで、以下のような利点を得られました。

  • ステップ 単体での 型安全性 の確保
  • ワークフロー構造の変更(順序変更・差し替え・並列化など)に対する柔軟性の向上
  • 異なるワークフロー間での ステップの再利用性 の向上

これにより、ステップが型安全かつ再利用可能なワークフローを構築できるようになりました。

この記事では、型安全で変更しやすい Mastra ワークフロー設計を実現するための設計指針と、実際のコード例を紹介します。

Mastra ワークフローの基本

まずは、Mastra ワークフローの基本的な構成要素について簡単に説明します。
詳細については 公式ドキュメント を参照してください。

主な構成要素

ワークフローは、複数のタスク(ステップ)を定められた順序で実行し、特定の目的を達成するための処理フローです。

Step(ステップ)

ワークフロー内の最小の実行単位であり、入力を受け取って処理を行い、出力を返します。

const analyzeTextStep = createStep({
  id: "analyzeText",
  // 入力
  inputSchema: z.object({
    text: z.string(),
  }),
  // 出力
  outputSchema: z.object({
    sentiment: z.string(),
    keywords: z.array(z.string()),
  }),
  // 処理
  execute: async ({ inputData }) => {
    // LLMを使ってテキスト分析
    const result = await analyzeSentiment(inputData.text);
    return {
      sentiment: result.sentiment,
      keywords: result.keywords,
    };
  },
});

Workflow(ワークフロー)

複数の ステップ を組み合わせて処理フローを定義します。
各ステップは、前のステップの出力を入力として受け取ります。

const textProcessingWorkflow = createWorkflow({
  id: "textProcessing",
  // ワークフローの入力
  inputSchema: z.object({
    text: z.string(),
  }),
  // ワークフローの出力
  outputSchema: z.object({
    sentiment: z.string(),
    keywords: z.array(z.string()),
  }),
})
  // ステップの実行順序を定義
  .then(analyzeTextStep)
  .then(generateSummaryStep)
  .commit();

Mastra ワークフローの課題

ここから実際にワークフローを実装する中で直面した課題について、具体的なコード例とともに紹介します。

型安全性の課題

Mastra のワークフローは柔軟にステップを構成できますが、その一方で型推論が効かないケースがあり、型安全性が損なわれることがあります。

初期値にアクセスする getInitData の戻り値が型安全でない

ワークフローの初期値にアクセスするためには getInitData を使用できます。

しかし、この関数の戻り値は any 型のため、型安全ではありません

const generateReportWorkflow = createWorkflow({
  id: "generateReport",
  inputSchema: z.object({
    userId: z.string(),
    reportType: z.string(),
  }),
})
  .then(fetchUserDataStep)
  .then(analyzeDataStep)
  .then(
    createStep({
      id: "formatReport",
      outputSchema: z.object({ report: z.string() }),
      execute: async ({ inputData, getInitData }) => {
        // 初期値を取得するためにgetInitDataを使用
        const { reportType } = getInitData(); // any型になってしまう

        // analyzeDataStepの結果を使用
        const analysisResult = inputData;

        // reportTypeの型が不明なので、実行時エラーの可能性
        return {
          report: formatReportByType(analysisResult, reportType),
        };
      },
    })
  )
  .commit();

any 型になってしまう理由は、ステップがワークフローとは独立しており、getInitData の戻り値の型が実行時にしか確定しないためです。
(この例のようにステップをワークフロー内で定義していて initData が明確な場合でも型推論が効かないのは少し残念ですが、今後の改善に期待しています。)

getInitData<Type>()のように型引数をつけて戻り値に型を与えることは可能ですが、ランタイムで得られる実際の値との一致は保証されません。また、複数のワークフローで同じステップを再利用する場合、初期値がワークフローごとに異なることも多いため、型引数の使い回しも現実的ではありません。

実行時コンテキスト(runtimeContext)も型が保証されない

Mastra ではRuntime Contextという外部から実行時に値を注入できる柔軟な仕組みが提供されています。

ワークフローで RuntimeContext にアクセスするためには、runtimeContextexecute の引数で受け取ります。

ただし、runtimeContext の型は any であるため、型安全なコードを書くのが難しいです。

const workflow = createWorkflow({
  id: "processData",
  inputSchema: z.object({
    text: z.string(),
  }),
})
  .then(
    createStep({
      id: "fetchData",
      outputSchema: z.object({ data: z.string() }),
      execute: async ({ inputData, runtimeContext }) => {
        // runtimeContextから値を取得するが、型がanyになる
        const apiKey = runtimeContext.get("apiKey"); // any
        const environment = runtimeContext.get("environment"); // any

        // 型が不明なため、実行時エラーの可能性
        const data = await fetchWithApiKey(apiKey, {
          env: environment,
          text: inputData.text,
        });

        return { data };
      },
    })
  )
  .commit();

// ワークフロー実行時
const run = workflow.createRun();
const runtimeContext = new RuntimeContext();
runtimeContext.set("apiKey", process.env.API_KEY);
runtimeContext.set("environment", "production");

const result = await run.start({ runtimeContext });

これらの機能もgetInitDataと同様に、型引数をつけることは可能ですが、実際の値との一致は保証されないため型安全ではありません。

ステップ間の依存による再利用性・変更容易性の課題

型安全性とは別に、ステップ間の依存関係が強くなると、再利用性や変更容易性に課題が生じます。

特に getStepResult を使って他のステップの出力に依存すると、以下のような問題が発生します

  • ステップが他のステップに強く依存するため、単体での再利用が難しくなる
  • ワークフロー内の変更(ステップの順序変更、別のステップとの入れ替え、並列化など)が難しくなる

getStepResult によってステップ間の依存が強くなる

getStepResult を使うと、任意の前ステップの出力を参照できます。

const step1Result = getStepResult(step1); // 結果を取得したいステップを引数に渡す

しかし、この関数はワークフロー内でのステップの定義順に対する型検証が行われないため、依存関係の整合性が型では保証されません
引数に渡すステップがワークフロー内で先に定義されていなくても型エラーにはならず、定義が誤っている場合実行時にエラーになります。

const complexWorkflow = createWorkflow({...})
  .then(step1)
  .then(createStep({
    id: "step2",
    outputSchema: z.object({ result: z.string() }),
    execute: async ({ inputData, getStepResult }) => {

      /**
       * step3の結果を参照している
       *
       * step3はこのステップよりも後に定義されていて参照できないはずだが型エラーにならない
       * 実行時にエラーになる
       **/
      const step3Result = getStepResult(step3);

      return {
        result: combineResults(step3Result.data, inputData)
      };
    }
  }))
  .then(step3)
  .then(step4)
  .commit();

解決策: ステップの依存をすべて入力スキーマに明示する

これらの問題を解決するには、ステップを入力のみに依存する関数として設計するのが効果的でした。

ステップは他のステップやワークフローの実行時依存の値(初期値・コンテキストなど)に依存しないように設計し、ワークフロー固有の処理や依存関係の解決はワークフロー層で担うようにします。

基本的な考え方

  1. ステップは inputSchema で定義された入力のみに依存する
  2. 必要なデータはすべて input として明示的に渡す
  3. ステップ間の依存関係やデータ整形は、ワークフロー層でマッピング する

Before/After の比較

実際のコードで比較してみます。

Before: 依存が入力スキーマに明示されていないステップ

const generateSummaryStep = createStep({
  id: "generateSummary",
  inputSchema: z.object({
    keywords: z.array(z.string()),
  }),
  outputSchema: z.object({
    summary: z.string(),
  }),
  execute: async ({ inputData, getInitData, getStepResult }) => {
    // ❌ 全ての依存が入力スキーマに明示されていない
    // 3つの異なる場所からデータを取得
    const originalText = getInitData().text; // any
    const sentiment = getStepResult(analyzeStep).sentiment; // analyzeStepをワークフローで事前に実行していないとランタイムエラー
    const keywords = inputData.keywords;

    return {
      summary: await generateSummaryWithContext(
        originalText,
        sentiment,
        keywords
      ),
    };
  },
});

入力スキーマを見ただけでは全ての依存が表現されておらず、このステップの呼び出しが成功するかはこのステップを呼び出すワークフローの定義に依存します。

After: 依存がすべてスキーマに明示されたステップ

const generateSummaryStep = createStep({
    id: "generateSummary",
  inputSchema: z.object({
      originalText: z.string(),
    sentiment: z.string(),
    keywords: z.array(z.string()),
  }),
  outputSchema: z.object({
      summary: z.string(),
  }),
  execute: async ({ inputData }) => {
    // ✅ 全ての依存が入力スキーマに明示されている
    // inputDataのみに依存
    const { originalText, sentiment, keywords } = inputData;

    return {
      summary: await generateSummaryWithContext(
        originalText,
        sentiment,
        keywords
      ),
    };
  },
});

// ワークフロー層でステップに必要なデータをマッピング
const improvedWorkflow = createWorkflow({
  id: "textProcessing",
  inputSchema: z.object({
    text: z.string(),
  }),
});

improvedWorkflow
  .then(analyzeStep)
  .then(generateSummaryStep)
  .map(async ({ inputData, getInitData }) => {
    // ワークフロー層で必要なデータを集約
    // NOTE: 型引数による注釈なので安全ではないが、ワークフロー定義内でワークフローの型を使っているので型がズレる可能性は低い
    const initData = getInitData<typeof improvedWorkflow>();
    return {
      originalText: initData.text,
      sentiment: inputData.sentiment,
      keywords: inputData.keywords,
    };
  })
  .then(generateSummaryStep)
  .commit();

ステップの入力スキーマに全ての依存を表現して、ワークフロー側でステップに必要なデータをマッピングしてステップに渡しています。(マッピングにはmapを使用しています)
ステップの入力スキーマに全ての依存が表現されているため、ワークフロー側の呼び出し方によってステップが失敗することはありません

この設計のメリット

この設計によって以下のメリットがあります。

1. 型安全性

型安全にならない getInitDataruntimeContext を使用しないことで、ステップ内のロジックを完全に型安全に保つことができます。

// すべての入出力が型定義されている
const generateSummaryStep = createStep({
  id: "generateSummary",
  inputSchema: z.object({
    originalText: z.string(),
    sentiment: z.string(),
    keywords: z.array(z.string()),
  }),
  outputSchema: z.object({
    summary: z.string(),
  }),
  execute: async ({ inputData }) => {
    // inputDataは型安全になっている
    // inputData.originalText: string
    // inputData.sentiment: string
    // inputData.keywords: string[]
    const { originalText, sentiment, keywords } = inputData;

    return {
      summary: await generateSummaryWithContext(
        originalText,
        sentiment,
        keywords
      ),
    };
  },
});

2. 明確な依存関係による高い再利用性と変更容易性

ステップが input のみに依存し、他のステップや実行時コンテキストに依存しない 「純粋な処理単位」として設計されている場合、以下のようなメリットがあります

  • 入力さえ揃えば、他のワークフローでもそのまま再利用できる
  • ワークフロー構造の変更(ステップの順序変更や差し替え、並列化など)が容易になる
    • 一方で getStepResult を使って他のステップに依存している場合、依存先のステップの順序を変更しなくても型エラーが発生しなため、変更必要性に気づきにくく、保守性が低下する

たとえば以下の translateStep は、入力と出力が明確に定義されており、他の要素に依存していないため、どのワークフローでも再利用可能です。

// このステップは完全に独立しており、どのワークフローでも使える
const translateStep = createStep({
  id: "translate",
  inputSchema: z.object({
    text: z.string(),
    targetLanguage: z.string(),
  }),
  outputSchema: z.object({
    translatedText: z.string(),
  }),
  execute: async ({ inputData }) => {
    return {
      translatedText: await translate(inputData.text, inputData.targetLanguage),
    };
  },
});

// ワークフローA での使用
workflowA
  .map(async ({ inputData }) => ({
    text: inputData.content,
    targetLanguage: "ja",
  }))
  .then(translateStep);

// ワークフローB での使用(同じステップを再利用)
workflowB
  .map(async ({ inputData }) => ({
    text: inputData.message,
    targetLanguage: inputData.lang,
  }))
  .then(translateStep);

また、ワークフローを定義する際にステップの入出力が一致していない場合は型エラーとなるため、ワークフロー自体の定義も型安全になり、変更や追加がしやすくなります
このように、ステップの独立性を高めることで、ステップの再利用性だけでなくワークフロー全体の変更容易性や保守性も大きく向上します。

エラーハンドリングの課題と型安全な対処方法

ここまでに、型安全性の問題やステップ間の依存による再利用性の低下について紹介しました。

さらに、Mastra のワークフローにはエラーハンドリングにも課題があります。

Mastra におけるエラーハンドリング方法と課題

Mastra では、ステップ内でエラーを throw すると、ワークフロー全体の実行結果が失敗として返され、その際に発生したエラー情報も含まれます。(公式ドキュメント)

const run = await workflow.createRunAsync();
const result = await run.start();

// result.status: "failed"
// result.error: Error

このとき、result.errorにはエラーの情報が含まれています。
ただし、ステップ内でカスタムエラークラスを throw しても、カスタムエラークラスの情報は失われ、呼び出し側には通常の Error オブジェクトとしてしか返ってきません。

そのため、発生したエラーの種類ごとに処理を分岐させたい場合には、throw を使った方法は適していません。(Error メッセージからエラーの種類を判断することは可能ですが、保守性の面で課題があります。)

さらに、そもそもステップがエラーを返す可能性があるかどうかが、型定義(インターフェース)上に表現されていないため、以下のような課題が生じます。

  1. エラーが発生するかコードをよく読まないと分からない: ステップのインターフェースにエラー情報が含まれていないため、呼び出し側での事前判断が困難です。
  2. エラーハンドリングの漏れが発生しやすい: ステップを別のワークフローで再利用する際に、エラー処理の考慮漏れが起きやすく、再利用性や保守性を損ないます。

解決策: 出力にエラーを明示する

以下のステップで上記課題を解決できます。

1. ステップ側でエラーを出力に含める

ステップの出力スキーマにエラーを含めることで、ステップの中でエラーが発生することを明示的に表現できます
その際、成功時とエラー発生時のスキーマを判別可能なユニオン型を使って定義することで、その後のハンドリングが楽になります。

const improvedRiskyStep = createStep({
  id: "riskyOperation",
  inputSchema: z.object({
    data: z.string(),
  }),
  // 判別可能なユニオン型を使って成功時と失敗時の出力スキーマを定義
  outputSchema: z.discriminatedUnion("status", [
    z.object({
      status: z.literal("success"),
      result: z.string(),
    }),
    // エラーが発生した場合の出力スキーマを定義
    z.object({
      status: z.literal("error"),
      error: z.object({
        type: z.enum(["CUSTOM_ERROR1"]),
        message: z.string(),
      }),
    }),
  ]),
  execute: async ({ inputData }) => {
    try {
      const result = await riskyOperation(inputData.data);
      // 成功時の処理
      return {
        status: "success",
        result,
      };
    } catch (error) {
      // エラー時の処理
      return {
        status: "error",
        error: {
          type: "CUSTOM_ERROR1",
          message: error.message || "An unexpected error occurred",
        },
      };
    }
  },
});

このように z.discriminatedUnion("status", [...]) を使うことで、status の値によって型を判別できる構造になります。

2. ワークフロー層で適切に処理する

1 で作成したステップでエラーが発生した場合は、ワークフロー層でそのエラーをハンドリングします。
また、ワークフロー自体がエラーを返す可能性があることを型として明示するために、出力スキーマにエラーケースを含めます。

エラーをキャッチした際にはbailを使うことで早期終了させることができます。

const robustWorkflow = createWorkflow({
  id: "robustProcessing",
  inputSchema: z.object({
    data: z.string()
  }),
  outputSchema: z.discriminatedUnion("status", [
    z.object({
      status: z.literal("success"),
      finalResult: z.string()
    }),
    // エラー時の出力スキーマを定義
    z.object({
      status: z.literal("error"),
      error: z.object({
        type: z.enum(["CUSTOM_ERROR1", "CUSTOM_ERROR2"]),
        message: z.string()
      })
    })
  ])
})
  .then(improvedRiskyStep)
  .then(createStep({
    id: "checkAndProcess",
    inputSchema: ...,
    outputSchema: z.object({
      data: z.string()
    }),
    execute: async ({ inputData, bail }) => {
      // エラーチェック
      if (inputData.status === "error") {
        // エラー時はbailを使って早期終了。ワークフローの結果としてここで返した値が返される
        return bail({
          status: "error",
          error: {
            type: "CUSTOM_ERROR1",
            ...inputData.error
          }
        });
      }

      // 成功時の処理を続行
      const processed = await processData(inputData.result);
      return {
        data: processed
      };
    }
  }))
  .then(finalStep)
  .commit();

3. 呼び出し側で型に基づいてハンドリング

1,2 のステップによってエラーが指定した形式で返ってくるので、ワークフロー呼び出し側でエラーをハンドリングできます。

const run = await workflow.createRunAsync();
const result = await run.start();

// ワークフロー全体の結果としてはbailしたのでsuccessになる: https://mastra.ai/en/docs/workflows/overview#run-workflow-results
if (result.status === "success") {
  const workflowResult = result.result; // bailした値がここに入る
  // workflowResult.status: "error";
  // workflowResult.error: { type: "CUSTOM_ERROR1", message: "An unexpected error occurred" }
}

この設計のメリット

この方法によって以下のメリットがあります。ただし、コードの記述量は増えます。

  1. エラーハンドリング漏れを防げる:
    エラーの可能性が型として明示されるため、すべてのケースに対して漏れなく処理を実装できます。
    これにより、ステップを複数のワークフローで再利用する際にも、エラーハンドリング漏れによるバグを防止しやすくなります。

  2. エラー情報を保持できる
    ステップ内で throw した場合に失われてしまう、エラークラスや詳細なエラー情報を保持したまま扱うことができます。

まとめ

この記事では、Mastra ワークフローを型安全かつ変更に強い設計にするための工夫を紹介しました。

重要なのは、ステップの依存関係と副作用を全て入出力スキーマに明示した関数として設計することです。
この設計方針により、以下のメリットが得られます。

  • 型安全性の向上
    ステップ内で getInitDataruntimeContext に依存しないことで、型が明示された安全なコードが書けます

  • ワークフローの変更容易性の向上
    ステップが他のステップに依存しないため、ステップの順序変更・差し替え・並列化などのワークフローの構造変更がしやすくなります

  • ステップの再利用性の向上
    ステップが入力のみに依存するため、同じステップを他のワークフローでもそのまま再利用可能です

  • エラーハンドリングの明示と漏れ防止
    エラーをスキーマ上で表現することで、処理漏れを防ぎつつ、エラー情報を安全に扱うことができます。

これらの設計方針を取り入れることで、Mastra で保守性が高く、再利用しやすいワークフローを構築できました。

Mastra のワークフローで「型安全性」や「ワークフローの変更のしやすさ」に課題を感じている方、あるいはこれから Mastra を使ってみようという方は、ぜひこのアプローチを参考にしてみて下さい。

脚注
  1. タスクの連鎖呼び出しを可能にする機能 ↩︎

  2. ワークフローを構成する個々のタスク ↩︎

  3. ワークフローの初期値を取得するための関数 ↩︎

  4. ワークフロー実行時に値を注入するための機能 ↩︎

atama plus techblog
atama plus techblog

Discussion