Closed5

mastraを検証する

TSTS

なんかmastraのワークフローがうまくいかないを考える

使用するもの

  • Next.js
  • Mastra
TSTS

一旦シンプルなワークフローを作成

simple-workflow.ts
import { createStep, createWorkflow } from "@mastra/core";
import z from "zod";

export const simpleWorkflow = createWorkflow({
    id: "simple-workflow",
    description: "A simple workflow for site analysis with report generation",
    inputSchema: z.object({}),
    outputSchema: z.object({
        status: z.string(),
        description: z.string(),
    }),
})
.then(createStep({
    id: "first-step",
    description: "This is the first step of the workflow",
    inputSchema: z.object({}),
    outputSchema: z.object({
        status: z.string(),
    }),
    execute: async () => {
        // return status after 1.5minutes
        await new Promise(resolve => setTimeout(resolve, 90 * 1000));
        return { status: "success" };
    }
}))
.then(createStep({
    id: "second-step",
    description: "This is the second step of the workflow",
    inputSchema: z.object({
        status: z.string(),
    }),
    outputSchema: z.object({
        status: z.string(),
        description: z.string(),
    }),
    execute: async () => {
        // return status after 1.5minutes
        await new Promise(resolve => setTimeout(resolve, 90 * 1000));
        return { status: "success", description: "Second step completed" };
    }
}))
.then(createStep({
    id: "final-step",
    description: "This is the final step of the workflow",
    inputSchema: z.object({
        status: z.string(),
        description: z.string(),
    }),
    outputSchema: z.object({
        status: z.string(),
        description: z.string(),
    }),
    execute: async () => {
        // return final status after 1.5minutes
        await new Promise(resolve => setTimeout(resolve, 90 * 1000));
        return { status: "completed", description: "Workflow completed successfully" };
    }
}))
.commit();
TSTS

ルートハンドラーで実行

route.ts
const { inputData = {} } = await request.json();

// Mastra Dev Server の workflows エンドポイントを使用してsimple-workflowを実行
const mastraResponse = await fetch("http://localhost:4111/api/workflows/simpleWorkflow/start-async", {
    method: "POST",
    headers: {
        "Content-Type": "application/json",
    },
    body: JSON.stringify({
        inputData,
    }),
});

結果

タイムアウトで終了

TSTS

mastra/clientで実行

mastra-client.ts
import { MastraClient } from "@mastra/client-js";

export const mastraClient = new MastraClient({
  baseUrl: "http://localhost:4111/",
  retries: 3,
  backoffMs: 300,
  maxBackoffMs: 5000,
});

startAsyncで実行

const startWorkflowViaClient = async () => {
    try {
        // MastraClientをインポート
        const { mastraClient } = await import('../../lib/mastra-client');
        
        // ワークフローインスタンスを取得
        const workflow = mastraClient.getWorkflow("simpleWorkflow");
        
        // 実行インスタンスを作成
        const run = await workflow.createRun();
        setRunId(run.runId);
        setStatus('running');

        // ワークフローウォッチを開始(リアルタイム監視)
        workflow.watch({ runId: run.runId }, (record) => {
        console.log('Workflow progress:', record);
        
        // ワークフロー状態の更新
        if (record.payload?.workflowState) {
            const workflowState = record.payload.workflowState;
            setStatus(workflowState.status);
            
            // 完了時の処理
            if (workflowState.status === 'success' || workflowState.status === 'failed') {
            setIsRunning(false);
            setResult({
                status: workflowState.status,
                steps: workflowState.steps,
                result: workflowState.status === 'success' ? workflowState.steps : undefined
            } as ClientWorkflowResult);
            }
        }
        });

        // ワークフローを非同期で開始
        const startResult = await workflow.startAsync({
        runId: run.runId,
        inputData: {},
        });

        console.log('Client execution result:', startResult);
        setResult(startResult as ClientWorkflowResult);
        setStatus(startResult.status);
        setIsRunning(false);

    } catch (error) {
        console.error('Client execution error:', error);
        throw new Error(error instanceof Error ? error.message : 'クライアント実行でエラーが発生しました');
    }
};

結果

3分ごとにフローが4回自動実行された

startで実行

page
const startWorkflowViaClientWatch = async () => {
    try {
      // MastraClientをインポート
      const { mastraClient } = await import('../../lib/mastra-client');
      
      // ワークフローインスタンスを取得
      const workflow = mastraClient.getWorkflow("simpleWorkflow");
      
      // 実行インスタンスを作成
      const run = await workflow.createRun();
      setRunId(run.runId);
      setStatus('running');

      // ワークフロー監視を開始(start前に設定)
      workflow.watch({ runId: run.runId }, (record) => {
        console.log('Workflow watch record:', record);
        
        if (record.payload?.workflowState) {
          const workflowState = record.payload.workflowState;
          setStatus(workflowState.status);
          
          // 完了状態をチェック
          if (workflowState.status === 'success' || workflowState.status === 'failed') {
            setIsRunning(false);
            setResult({
              status: workflowState.status,
              result: workflowState.steps
            } as ClientWorkflowResult);
          }
        }
        
        if (record.payload?.currentStep) {
          console.log('Current step:', record.payload.currentStep);
        }
      });

      // start メソッドでワークフローを開始(非同期、完了を待たない)
      const startResult = await workflow.start({
        runId: run.runId,
        inputData: {},
      });

      console.log('Client watch start result:', startResult);
      // startResultは開始確認のみなので、ここでは結果を設定しない
      // watchコールバックで実際の完了を待つ

    } catch (error) {
      console.error('Client watch execution error:', error);
      throw new Error(error instanceof Error ? error.message : 'クライアント監視実行でエラーが発生しました');
    }
};

結果

正常完了

TSTS

なんとなくの結果

長時間実行のワークフローをクライアントから呼び出すときにはstartを使え!
(ドキュメントの読み込み理解が甘かったとしか言えないですね笑)

長時間実行のワークフローにおける実行方法比較マトリクス

下のマトリクスはGitHub Copilotでまとめてもらった

項目 APIルート + startAsync APIルート + start Client + startAsync Client + start
実行時間制限 ❌ 10秒〜15分(ホスティング依存) ✅ 制限なし ❌ ブラウザタブ依存 ✅ 制限なし
タイムアウトリスク 🔴 高い(4.5分で確実にタイムアウト) 🟢 低い 🔴 高い(長時間実行で失敗) 🟢 低い
レスポンス性 ❌ ブロッキング(完了まで待機) ✅ 即座にレスポンス ❌ ブロッキング ✅ 即座にレスポンス
進捗監視 ❌ 不可 ✅ ポーリングで可能 ❌ 不可 ✅ watch/ポーリングで可能
エラーハンドリング 🟡 基本的 ✅ 詳細可能 🟡 基本的 ✅ 詳細可能
実装複雑度 🟢 シンプル 🟡 中程度(ポーリング必要) 🟢 シンプル 🟡 中程度
ネットワーク効率 🟡 1回のリクエスト ❌ 複数リクエスト 🟡 1回のリクエスト 🟡 watch=WebSocket/ポーリング=HTTP
ブラウザ依存度 🟢 低い(サーバー実行) 🟢 低い(サーバー実行) 🔴 高い(タブ閉じると中断) 🔴 高い(タブ閉じると中断)
このスクラップは1ヶ月前にクローズされました