🪠

Mastraのサンプルワークフローの続きをやっていく~ワークフローを外から呼びだすまで

に公開

Mastra、やってますか!

AI、なんかわからんがやってみたいですよね。
ITバブルのころはなんかわからんがテキストサイトやってみたい、という人が多かったので、だいたい同じ匂いを感じてます。
でも、AIを"やる"っていうのが難しくて、Difyをちょっと触って諦めてました…。
RAGとかちょこちょこ触ったのですが、全然思ったものにはならなさそうなんですよね。普通にLLMのエージェントを使う以上のものにすることは難しい。それくらい、課金するような商品モデルは完成されたものを提供されています。
特に、外部に出せない情報を扱うようなAIサービスを展開しようとすると、ファインチューニングなど根本からAIをチューニングしないとだめなのかな~でもそれ難しいうえに簡単に変更できなさそうだな~とか。
そっちはそっちで何かやり方あるのかもしれませんが、一旦勉強しなおそうといろいろ見てて、Langchainって面白そうだな…と思っていた矢先、Mastraの記事を見てびっくりしました。
めっちゃ簡単そう!何も知らなくても解説で提示しているようなソースで十分理解できる。これはセンス良さそう。
というわけで、全部Mastraに賭けるか…と試してみました。

チュートリアルその先へ。

Mastraの入門記事は私ができたので大体の人は探して読めばできると思います。
要はエージェント、ツール、ワークフローの3つを作るようなフレームワークで、特に最終系は決まっていないので、ワークフローを使うエージェントを作ってもいいですし、エージェントのような形式ではなく、決まった入力から、間をAIがこねて最終出力をするワークフローを作ってもいい、のだと思います。サッカーくらい自由なフレームワークですね。

今回の記事ではチュートリアルその先へということで、最初にインストールすると用意されているWeatherAgentというエージェントと、weatherToolというツール、WeatherWorkflowというワークフロー、これらは結構ざっくりした状態ですので、これの改善をベースにMastra全体の理解を深めようという感じです。

今回の着地

テスト用簡易Expressサーバ

サンプルとして存在する「天気を元にアクティビティを提案する」ワークフローに、
「場所のあいまいな日本語入力が可能」な実装の変更をして、「それを外部のサービスから呼び出す」テストをします。
MastraはAPIとして出力されますが、これをどう使うかの部分まではあまり書かれていません。
ゼロから作るのは大変そうですが、実際に呼び出し部分までの使い方を学ぶことで、私のような初学者でももっと気軽にMastraを試せるようになるというねらいがこの記事の目的です。

Mastraのチュートリアルの動きを確かめるまで。

現在(2025/05)のビルドに近いインストールから動作確認までは下記の記事がいいと思います。
https://qiita.com/youtoy/items/0911983e548a14533943

公式も参考になりますね。
サンプルなしのインストールをしてもこの内容をなぞればエージェントとツールは結局同じものができます。ただ、公式はもしかして情報が古いのか?Workflowがはないんですが……
https://mastra.ai/ja/docs/getting-started/installation

さて、ツールは動きました。エージェントも動きました。ワークフローも動きました。
エージェントを動かすことでツールを使用して情報を得ることもわかりました。
ワークフローはソースを見ると、せっかく天気を調べるtoolがあるのにtoolsの天気を使わずハードコーディングしているのが謎ですが、まぁそのへんはやっぱりまだまだ出来立てホヤホヤ感ですね。もしかしたらtoolを使うような変更を加えることもチュートリアルなのかもしれません。

WeatherWorkflowの問題点

しかし、toolでもworkflowでも、Mastraのdev画面でテストしようとして、場所をいろいろ入力するとなーんかエラーになるんですね。
私の環境ではこのエラーの理由は二つありました。
ひとつは、nodeのfetchがうまくいかないという問題です。検索したらほかにも同じ方がいたので、もしかしたら最新nodejsが悪いのかもしれません。
ふたつめは、これは与えられた都市名をそのままgeocodingAPIに飛ばそうとしていて、日本語入力だとだめなんですね。

修正計画

ひとつめのエラーに対処するために、AIに聞いてなんかipv4の使用とかいろいろやってみたけどうまくいかなかったので、axiosを使うことにしました。やっぱいいよな、axios。頼りになるぜ、axios。
ふたつめのエラーについては、これはせっかくのAIツールだからAIに解決させたいですよね。

最初はプロンプトに「適当な入力はそのへんの市区町村でよろしく」って書いてたのですが、私がこのとき使っていたローカルのPCで動かすQwen3-8Bは地理についてはクソバカで「河内長野は大阪にはありません!奈良県です!」とか言って喧嘩をしてしまってもう話にならないしなかなか困っていました。
その後、いろいろ試すと、mistral-small3っていうのが応答の時間と内容のバランスがよかったので最終的にはollamaで動かすエージェントのLLMモデルはそちらになったのですが、確かに、どれだけ賢くてもモデルが生まれたより後にできた施設など、どちらにせよ知らない土地もあるでしょう。
というわけでWorkflowに入れたユーザーの入力をAPIに渡せる形に変換するためのステップを設定してみました。

まずは全体の構成となるWorkflowの定義でステップを作って…
(あ、言い忘れていましたが7日間の出力はローカルLLMには重過ぎるので特定日のみを指定する機能もつけました。その入力項目としてdateIndexが増えています。)

weatherActivity.ts
const weatherWorkflow = new Workflow({
  name: 'weatherWorkflow', //WorkflowをAPIでGETしたときにnameが出るのでidと同じにしておくとわかりやすい
  triggerSchema: z.object({
    city: z.string().describe('The city to get the weather for'),
    dateIndex: z.union([
      z.string().transform(val => val === '' ? 0 : parseInt(val, 10)),
      z.number()
    ]).optional().default(0).describe('天気予報の日付インデックス(0=今日, 1=明日, など)'),
  }),
})
  .step(translateCity)
  .then(fetchWeather)
  .then(planActivities);

  weatherWorkflow.commit();

export { weatherWorkflow };

そして都市名のチェックにはBrave searchを使ってみました。
これは共通Web検索ツールとしてtoolsに作ってもいい気がするんですが、
とりあえず動くか見たかったのでべた書きしてます。

まずはAgent

weatherActivity.ts
#入力された都市をAPIに入る形にするためのAgent
const translationAgent = new Agent({
  name: 'City Translator',
  model: gptllm, // ローカルのmistral-small3.1を使用→ ollama chatgpt→gptllm
  tools: { braveSearchTool }, // BraveSearchを追加
  instructions: `
あなたはgeocoding-apiに送る値を決めるために正しい都市名を返すためのエージェントです。
以下の手順で処理してください:

1. 入力された名称が日本の市区町村名かどうか判断するために必ずBraveSearchを使用する
   - "<入力値> 市区町村"のような検索クエリを使用して、それが市区町村かどうかを確認する
   - 検索結果から、入力が市区町村名であるか、または駅名/地域名/観光地名などの別の種類の場所であるかを判断する
   - 市区町村名の場合:そのまま英語に翻訳
   - 市区町村名でない場合(駅名、地域名、観光地名など):その場所が属する市区町村名を特定する

2. 特定した市区町村名を英語に翻訳する

3. 国名(例:Japan, USA, France)を追加する

4. 結果を指定されたJSON形式で返す

以下の形式で回答してください:
{"translatedCity": "英語の市区町村名(1語), 国名", "confidence": 0-1の数値, "isCheckedWithBraveSearch": true}
必ず有効なJSONを返してください。説明文は不要です。
  `,
});

その真下に先ほど増やしたステップの内容…こういう書き方がわかりやすいかなと思ったのですが、
もっといろいろ方法あるかもしれません。

weatherActivity.ts
// 都市名翻訳ステップを追加
const translateCity = new Step({
  id: 'translate-city',
  description: 'AIを使用して入力された都市名を英語に変換する。BraveSearchを使用して市区町村かどうかをチェックする。',
  inputSchema: z.object({
    city: z.string().describe('任意の言語での都市名'),
  }),
  outputSchema: z.object({
    originalCity: z.string(),
    translatedCity: z.string(),
    confidence: z.number(),
    isCheckedWithBraveSearch: z.boolean().optional(),
  }),
  execute: async ({ context, mastra }) => {
    const triggerData = context?.getStepResult<{ city: string }>('trigger');

    if (!triggerData) {
      throw new Error('Trigger data not found');
    }

    const originalCity = triggerData.city;
    
    // 既に英語の都市名かどうかの簡易チェック
    const isLikelyEnglish = /^[a-zA-Z\s,]+$/.test(originalCity);
    
    if (isLikelyEnglish) {
      // 既に英語の場合はそのまま返す
      return {
        originalCity,
        translatedCity: originalCity,
      };
    }
    
    // AIエージェントに翻訳リクエストを送信
    // translationAgentを作成した場合はそれを使用、そうでなければ既存のagentを使用
    const translationPrompt = `
    "${originalCity}"
    で与えられた都市名について:
    
    1. まず、braveSearchToolを使用して、この入力が市区町村名かどうかを確認してください。
       検索クエリ: "${originalCity} 市区町村" または "${originalCity} municipality"
    
    2. 検索結果を分析し、入力が市区町村名かどうかを判断してください。
       - 市区町村名の場合:そのまま英語に翻訳
       - 市区町村名でない場合(駅名、地域名、観光地名など):その場所が属する市区町村名を特定してください
    
    3. 特定した市区町村名を英語に翻訳してください。国名が含まれていない場合は、日本語の都市名なら "Japan" を、それ以外なら適切な国名を追加してください。
    
    4. 市や区など都道府県の一つ下のレイヤー名で出してください。(駅名や地方名は不可)

    (例:新宿→"Shinjuku, Japan"、中百舌鳥→"Sakai, Japan"、阿倍野区→"Abeno, Japan")
    
    以下の形式でJSONを返してください:
    {"translatedCity": "英語の都市名"}
    
    JSONだけを返し、説明は不要です。`;
    
    // AIによる翻訳実行
    const response = await translationAgent.generate([
      {
        role: 'user',
        content: translationPrompt,
      },
    ]);
    
    console.log(`Translation response: ${response.text}`);
    
    // JSONレスポースのパース
    let translationResult;
    try {
      // レスポンスからJSONを抽出
      const jsonMatch = response.text.match(/\{[\s\S]*\}/);
      if (jsonMatch) {
        translationResult = JSON.parse(jsonMatch[0]);
      } else {
        throw new Error('JSONが見つかりません');
      }
    } catch (error) {
      console.error('Translation response parsing error:', error);
    }
    
   
    console.log(`City translation: "${originalCity}" → "${translationResult.translatedCity})`);
    
    return {
      originalCity,
      translatedCity: translationResult.translatedCity,
    };
  },
});

これで、Mastraのdevで立ち上げたときの画面からworkflowのテスト画面で、市町村じゃないものをいれても正しく表示されるのを確認できます。
お天気APIの仕様を見ていないのでこうなりましたが、緯度経度で天気とかが出せるなら、もっとピンポイントに出せるような改善をしたりも考えられますね。

外からの使い方

ここで結構謎だったのが、dev画面はともかくどういう風にワークフローって使うんだろ?ってことでした。
APIがEndpointで確認できますが、実際の利用方法は全然わからないなーと。

ここで参考にしたのがこちらの記事ですが、
https://qiita.com/sumihiro3/items/e37be6b16798b12fb48d

APIはSwagger形式で参照できるみたいです。
これを見ながら、Workflowクライアントを作った(API-DOCを与えてAIで書いた)のがこちらです。

呼び出しはこうする。

index.js
// 天気ワークフローID
const WEATHER_WORKFLOW_ID = 'weatherWorkflow';
// MastraWorkflowClientのインスタンスを作成
const mastraClient = new MastraWorkflowClient(WEATHER_WORKFLOW_ID);

ほんでワークフロー作る。

mastra-client.js
// Mastraワークフローとの連携を提供するクライアントクラス
const fetch = require('node-fetch');

// API設定
const API_HOST = 'http://localhost:4111';

/**
 * Mastraワークフローの操作を行うクライアントクラス
 */
class MastraWorkflowClient {
  /**
   * クライアントの初期化
   * @param {string} workflowId 対象ワークフローID(必須)
   */
  constructor(workflowId) {
    this.workflowId = workflowId;
    this.apiBaseUrl = `${API_HOST}/api/workflows/${workflowId}`;
    this.activeWatchers = new Map();
    this.defaultTimeout = 30000; // 30秒
  }

  /**
   * ワークフロー実行を作成する
   * @returns {Promise<{runId: string}>} 実行ID
   */
  async createRun() {
    try {
      const url = `${this.apiBaseUrl}/createRun`;
      console.log(`実行を作成: ${url}`);
      
      const response = await fetch(url, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        timeout: this.defaultTimeout
      });

      if (!response.ok) {
        const errorText = await response.text();
        throw new Error(`実行の作成に失敗: ${response.status} ${response.statusText}\nResponse: ${errorText}`);
      }

      const data = await response.json();
      console.log(`実行作成結果:`, data);
      return { runId: data.runId };
    } catch (error) {
      console.error(`実行作成エラー: ${error.message}`);
      throw error;
    }
  }

  /**
   * ワークフローを実行する
   * @param {Object} params パラメータ
   * @param {string} params.runId 実行ID
   * @param {Object} params.triggerData トリガーデータ
   * @returns {Promise<Object>} 実行結果
   */
  async start({ runId, triggerData }) {
    try {
      console.log(`ワークフロー実行開始: ${runId}`, triggerData);
      
      const response = await fetch(`${this.apiBaseUrl}/start-async`, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
        },
        body: JSON.stringify(triggerData),
        timeout: this.defaultTimeout
      });

      if (!response.ok) {
        const errorText = await response.text();
        throw new Error(`ワークフロー開始失敗: ${response.status} ${response.statusText}\nレスポンス: ${errorText}`);
      }

      const data = await response.json();
      return data;
    } catch (error) {
      console.error(`ワークフロー開始エラー: ${error.message}`);
      throw error;
    }
  }

  /**
   * 実行状態を取得する
   * @param {string} runId 実行ID
   * @returns {Promise<Object>} 実行状態
   */
  async getRunStatus(runId) {
    try {
      const url = `${this.apiBaseUrl}/runs/${runId}`;
      console.log(`実行状態を取得: ${url}`);
      
      const response = await fetch(url, {
        method: 'GET',
        headers: {
          'Content-Type': 'application/json',
        },
        timeout: this.defaultTimeout
      });
      
      if (!response.ok) {
        // 404などの場合はnullを返す
        if (response.status === 404) {
          console.log(`実行が見つかりません: ${runId}`);
          return null;
        }
        
        const errorText = await response.text();
        throw new Error(`実行状態の取得に失敗: ${response.status} ${response.statusText}\nレスポンス: ${errorText}`);
      }
      
      const data = await response.json();
      console.log(`実行状態取得成功:`, data);
      return data;
    } catch (error) {
      console.error(`実行状態取得エラー: ${error.message}`);
      throw error;
    }
  }

  /**
   * ワークフロー実行状態を監視する
   * @param {Object} params パラメータ
   * @param {string} params.runId 実行ID
   * @param {Function} callback コールバック関数
   */
  watch({ runId }, callback) {
    console.log(`ワークフロー実行を監視中: ${runId}`);
    
    // 開始直後は少し遅延を入れる
    let isFirstCall = true;
    
    // ポーリングで監視
    const intervalId = setInterval(async () => {
      if (isFirstCall) {
        isFirstCall = false;
        // 最初の呼び出しでは2秒待つ
        await new Promise(resolve => setTimeout(resolve, 2000));
      }
      try {
        const runData = await this.getRunStatus(runId);
        
        if (!runData) {
          console.log(`警告: ワークフロー実行が見つかりません: ${runId}`);
          return;
        }
        
        // コールバック関数を呼び出す
        callback({
          activePaths: runData.snapshot?.value || {},
          results: runData.snapshot?.context?.steps || {},
          timestamp: Date.now(),
          runId: runId,
          status: runData.snapshot?.status || 'unknown',
        });
        
        // ワークフローが完了した場合、監視を停止
        if (runData.snapshot?.status === 'completed' || runData.snapshot?.status === 'failed') {
          console.log(`ワークフロー実行が${runData.snapshot.status === 'completed' ? '完了' : '失敗'}しました`);
          this.stopWatch(runId);
        }
      } catch (error) {
        console.error(`ワークフロー監視エラー: ${error.message}`);
      }
    }, 1000); // 1秒ごとに確認
    
    // 監視情報を保持
    this.activeWatchers.set(runId, {
      intervalId,
      callback,
    });
  }

  /**
   * 特定のワークフロー監視を停止
   * @param {string} runId 実行ID
   */
  stopWatch(runId) {
    const watcher = this.activeWatchers.get(runId);
    if (watcher) {
      clearInterval(watcher.intervalId);
      this.activeWatchers.delete(runId);
      console.log(`ワークフロー実行の監視を停止: ${runId}`);
    }
  }

  /**
   * すべての監視を停止
   */
  stopAllWatchers() {
    for (const [runId, watcher] of this.activeWatchers.entries()) {
      clearInterval(watcher.intervalId);
      console.log(`ワークフロー実行の監視を停止: ${runId}`);
    }
    this.activeWatchers.clear();
  }

  /**
   * 単一実行のための簡易メソッド(非同期を扱いやすくするためのラッパー)
   * @param {Object} triggerData ワークフロートリガーデータ
   * @param {number} timeoutMs タイムアウト時間(ミリ秒)
   * @param {Object} options オプション設定
   * @param {Object} options.stepHandlers ステップごとのハンドラー関数
   * @param {string} options.resultStepId 結果を取得するステップID
   * @returns {Promise<Object>} 処理結果
   */
  async executeSingleRun(triggerData, timeoutMs = 60000, options = {}) {
    const { stepHandlers = {}, resultStepId = null } = options;
    
    return new Promise(async (resolve, reject) => {
      try {
        // Mastraのstart-asyncは同期的に結果を返すため、直接結果を取得
        const response = await fetch(`${this.apiBaseUrl}/start-async`, {
          method: 'POST',
          headers: {
            'Content-Type': 'application/json',
          },
          body: JSON.stringify(triggerData),
          timeout: timeoutMs
        });

        if (!response.ok) {
          const errorText = await response.text();
          throw new Error(`ワークフロー開始失敗: ${response.status} ${response.statusText}\nレスポンス: ${errorText}`);
        }

        const startResult = await response.json();
        
        console.log(`ワークフロー開始結果:`, startResult);
        
        // start-asyncがすべての結果を返している場合
        if (startResult.results) {
          // ステップハンドラーを実行
          Object.entries(startResult.results).forEach(([stepId, step]) => {
            if (stepHandlers[stepId] && step.status === 'success') {
              stepHandlers[stepId](step.output);
            }
          });
          
          // 結果を返す
          if (resultStepId && startResult.results[resultStepId]) {
            resolve(startResult.results[resultStepId].output);
          } else {
            resolve(startResult.results);
          }
        } else {
          // 結果が含まれていない場合は、runIdを使って監視する
          const runId = startResult.runId;
          console.log(`実行ID: ${runId} で監視開始`);
          
          // タイムアウト設定
          const timeoutId = setTimeout(() => {
            this.stopWatch(runId);
            reject(new Error(`実行がタイムアウトしました: ${timeoutMs}ms経過`));
          }, timeoutMs);
          
          // 結果を待機
          let workflowResult = null;
          
          // 監視開始
          this.watch({ runId }, (record) => {
            const steps = record.results;
            
            // 各ステップの処理
            Object.entries(steps).forEach(([stepId, step]) => {
              // カスタムハンドラーがある場合は実行
              if (stepHandlers[stepId] && step.status === 'success' && !step._handled) {
                stepHandlers[stepId](step.output);
                step._handled = true;
              }
              
              if (step.status === 'success') {
                if (!step._logged) {
                  console.log(`✅ ステップ成功: ${stepId}`);
                  console.log(`${stepId}の出力:`, JSON.stringify(step.output).substring(0, 200) + '...');
                  step._logged = true;
                }
                
                // 結果ステップが指定されている場合は結果を保存
                if (resultStepId && stepId === resultStepId) {
                  workflowResult = step.output;
                }
              } else if (step.status === 'failed' && !step._logged) {
                console.error(`❌ ステップ失敗: ${stepId}`, step.error || '不明なエラー');
                step._logged = true;
              }
            });
            
            // エラー確認
            Object.entries(steps).forEach(([stepId, step]) => {
              if (step.status === 'failed') {
                clearTimeout(timeoutId);
                this.stopWatch(runId);
                reject(new Error(`ステップ ${stepId} が失敗: ${step.error || 'エラー詳細なし'}`));
              }
            });
            
            // ワークフローが完了したら結果を返す
            if (record.status === 'completed') {
              clearTimeout(timeoutId);
              this.stopWatch(runId);
              resolve(workflowResult || steps);
            }
          });
        }
        
      } catch (error) {
        reject(error);
      }
    });
  }
}

// API_HOST定数をエクスポート
MastraWorkflowClient.API_HOST = API_HOST;

module.exports = MastraWorkflowClient;

これはワークフローは発行して、その状態監視をして、ステップごとの出力をもらうという単純なものですが、それでも作ってしまうことでなんとなくワークフローの動作をどう使えばいいかはわかりました。
実は今回はじめてClaude codeでAIにコーディングを頼んで開発したのですが、なかなか大変でした。人間側で誤りを発見したり指示を的確に出せないとだめなんですね。全然読めないと作れなくない?
まあ、それでもなんとか形になりました。

で、やったー!と思ってたら、
こんなクライアント1から作らなくても、SDKありました。
なんかしかも今日見たらさらにいろいろ更新されてたし…熱いフレームワークはこれがあるんですわ。
https://mastra.ai/ja/reference/client-js/workflows

これを使えば、特にたぶん行数は半分くらいで済みそうです。
しかも、もうちょっと探したらMastraのGithubもめっちゃ更新されてて、なんとこっちにはMastraのクライアントそのものがありました。これでいいやん!

https://github.com/mastra-ai/mastra/tree/main/client-sdks/client-js

というわけで、使える最新情報を得るにはgithubを監視するのが一番よさそうです。
ってか、これがあるならSwaggerAPIいる?😂

まとめ

とはいえ、1から作ることには結構意味がありました。
思ってた「AIを扱う」はもっとAIの賢さを活かすイメージだったんですが、というよりは単にシステムのワークフローの1部にMastraなんかを使って、ほしい結果を最短で得るためのAPIを作る使い方なんだなと。
ワークフローというのはシステムにも運用(人間)にもあると思いますが、
AIを使うことでバックエンド処理のワークフローを自動化したり、n8nなんかでやっているような人間側のワークフローを自動化してあげたり、システムと人間の中間を埋めるのが得意そうだなと思いました。
特に最近ちまたではやっているポスグレなどDBをMastraのtoolにすることでAIが行えるワークフローはもっと多様化していきそうだなという感想です。
昭和のおじさんエンジニアがよくやってた大体のワークフローをサーバーのBATファイル作ってCronでなんとかしてた人はそろそろやることなくなりそうですね。

では皆様も、世界中のワークフローをAIに置き換えてハッピーAIワールドを目指しましょう。

TryAngle@大阪公立大学

Discussion