🦖

[Mastra] Agent呼び出しにstreamVNextとwatchを両方使う罠

に公開

概要

MastraでAgentをWorkflow等で呼び出すときに、基本的にはstreamVNextを利用すると思うのですが、resume等でユーザー側の操作を待つ場合、再開後の監視にはwatchを使うことになります。
その場合に、watchとstreamVNextのレスポンスの型が違うという事象があり、実装が使いまわせない罠があります。

違いについて

レスポンス形式

streamVNextはJSONオブジェクトが連結されて返されます。{...}{...}{...}
watchはJSON自体が文字列として連結されて返されます。"{...}{...}{...}"

JSON構造

streamVNextはイベントタイプごとに返却されます。

{
  "type": "workflow-step-start",
  "runId": "xxx",
  "from": "WORKFLOW",
  "payload": {
    "stepName": "create-script",
    "id": "create-script",
    "stepCallId": "xxx",
    "payload": {
      "pdfContent": "...",
      "structureName": "1"
    },
    "startedAt": 1759473774843,
    "status": "running"
  }
}

イベントタイプとは下記のようにワークフローの各ステップの到達時の情報です。

  • workflow-start: ワークフロー開始
  • workflow-step-start: ステップ開始
  • workflow-step-result: ステップ完了
  • workflow-step-suspended: ステップ一時停止
  • workflow-finish: ワークフロー完了/失敗

watchの場合には、基本的にこれまでのステップを含めた全ての情報を格納して返却されます。
そのため、suspendPayloadを見るだけでは、現在suspendされているかどうかは判断できません。resumePayloadやoutput、endedAtなども含めて比較し、新しく発生したsuspendであるかを判定する必要があります。

{
  "type": "watch",
  "payload": {
    "currentStep": {
      "id": "create-script",
      "payload": { ... },
      "startedAt": 1759473172166,
      "status": "running" | "success",
      "suspendPayload": {
        "reason": "...",
        "currentScript": { ... }
      },
      "suspendedAt": 1759474401302,
      "resumePayload": {
        "approved": true,
        "script": { ... }
      },
      "resumedAt": 1759474405055,
      "output": { ... },
      "endedAt": 1759474405189
    },
    "workflowState": {
      "status": "running" | "suspended" | "completed" | "failed",
      "steps": { ... }
    }
  },
  "eventTimestamp": 1759473228030,
  "runId": "xxxx"
}

まとめ

streamVNextにはresumeが発生してもstream eventが途切れない機構もあるのですが、処理の都合上いつでも再開できる様にしたく、resume時にはwatchを使う実装としていました。
streamVNextがrunIdを引数に持った再開処理に対応できると嬉しいのですが、、、

Contrea Tech Blog

Discussion