🤖

Vercel AI SDK の Data Parts をカスタムして検索進捗を可視化する

に公開

はじめに

AIを使った検索システムを構築する際、システムは Knowledge Base 検索や文書の精査、LLM の生成といった複数のプロセスを実行しますが、ユーザーに見えるのは「回答生成中...」というローディング表示のみであると、プロセスの状況がわからずに待たされ UX が悪いです。

Vercel AI SDK の Data Parts[1] を使うと、検索処理の各ステージをリアルタイムで可視化できます。検索開始、文書精査、回答生成といった進捗をユーザーに伝えることで、体感的な待ち時間を改善できます。
本記事では、Data Parts を使った検索進捗の可視化について実装例を交えて解説します。

また筆者は過去に Vercel AI SDK に関する記事を書いています。そちらもあわせてご覧ください。

https://zenn.dev/chot/articles/997bb34bab0062

検索進捗処理の可視化 🔄

実装例に入る前に、まず Vercel AI SDK v5 の UIMessage システムについて触れておきます。これは検索処理の進捗をリアルタイムで可視化するための基盤となる機能です。

前提① UIMessage の構造と型定義

UIMessage の parts 配列

Vercel AI SDK v5 の UIMessage は、parts 配列を使用してメッセージを表現します[2]。従来のような単一のテキストメッセージではなく、テキスト・ツール呼び出し・カスタムデータなど、複数の異なる型のパーツを1つのメッセージに含められます。

interface UIMessage<
  METADATA = unknown,
  DATA_PARTS extends UIDataTypes = UIDataTypes,
  TOOLS extends UITools = UITools,
> {
  /**
   * メッセージの一意な識別子
   */
  id: string;
  /**
   * メッセージのロール
   */
  role: 'system' | 'user' | 'assistant';
  /**
   * メッセージのメタデータ
   */
  metadata?: METADATA;
  /**
   * メッセージのパーツ。UI でのレンダリングに使用
   */
  parts: Array<UIMessagePart<DATA_PARTS, TOOLS>>;
}

この構造では、メッセージに複数の異なる型のパーツを含められます。テキスト・ツール呼び出し・カスタムデータを統一的に扱えます。各パーツには以下のような種類があります。

  • TextPart - LLM からのテキスト応答
  • DataPart - カスタムデータ(検索ステージ、参照資料など)
  • ToolCallPart - AI によるツール呼び出し
  • ToolResultPart - ツール実行結果

これらのパーツは parts 配列に格納され、インターリーブ(混在して配置)できます。これにより、検索ステージ → 参照資料 → LLM 応答というように、異なる種類のパーツを自由な順序で配置できます。

UIMessage の3つの型引数

UIMessage は、3つの型引数を受け取ることができます。

type UIMessage<
  METADATA = unknown,                            // 1. メッセージメタデータ
  DATA_PARTS extends UIDataTypes = UIDataTypes,  // 2. Data Parts
  TOOLS extends UITools = UITools                // 3. ツール定義
>

第1引数の METADATA は、メッセージ全体のメタデータ(ユーザー情報、タイムスタンプ、評価スコアなど)を定義します。第2引数の DATA_PARTS は、Data Parts のマップで、検索ステージや参照資料、進捗情報などを型安全に送信できます。第3引数の TOOLS は、AIが使用できるツール(関数呼び出し、API統合など)を定義します。

検索システムでは、主に DATA_PARTS を使用してカスタム情報をストリーミングします。

型安全性の実例

以下は、型安全性がどのように機能するかを示す例です。

// カスタムメッセージ型の定義
export type MyUIMessage = UIMessage<
  unknown,  // 第1引数: Metadata(使用しない場合は unknown)
  {
    hello: string    // 第2引数: Data Parts
    goodbye: string
  }
>

// ✅ OK: hello は string 型
writer.write({
  type: 'data-hello',
  data: 'Bonjour!',
})

// ❌ コンパイルエラー: hello は number 型ではない
writer.write({
  type: 'data-hello',
  data: 123,  // Type 'number' is not assignable to type 'string'
})

// ❌ コンパイルエラー: 存在しないキー
writer.write({
  type: 'data-unknown',  // Type '"data-unknown"' is not assignable to ...
  data: 'test',
})

このように Data Parts は、streamText からの LLM 応答と混在して配置できます。

前提② createUIMessageStream の仕組み

Vercel AI SDK v5 では、createUIMessageStream[3] が導入され、LLM の応答テキストだけでなく、任意の型安全なデータをストリーミングできるようになりました。検索ステージや参照資料など、アプリケーション固有のメタデータをリアルタイムでクライアントに送信できます。

サーバー側とクライアント側の流れ

Data Parts をストリーミングする際の基本的な流れは以下の通りです。

1. サーバー側:ストリームの作成

API ルートでカスタムメッセージ型を使用してストリームを作成します。

import { createUIMessageStream } from "ai"

const stream = createUIMessageStream<MyUIMessage>({
  execute: async ({ writer }) => {
    writer.write({
      type: 'data-hello',
      data: 'Hello!',
    })
    // さらに writer.write で追加のデータを送信...
  },
})

2. クライアント側:useChat での受信

クライアントコンポーネントで、同じ型を useChat に渡します。

const { messages, sendMessage } = useChat<MyUIMessage>({})

messagesMyUIMessage[] 型となり、型安全にアクセスできます。

3. メッセージのレンダリング

messages 配列をループして各メッセージをレンダリングします。各メッセージの parts 配列には、テキストや Data Parts が含まれています。

// Chat コンポーネント内
{messages.map((message) => (
  <Message key={message.id} parts={message.parts} />
))}

Message コンポーネントでは、parts 配列から Data Parts をフィルタリングして表示します。

export const Message = ({ parts }: { parts: MyUIMessage['parts'] }) => {
  return (
    <>
      {parts.map((part) => {
        if (part.type === 'data-hello') {
          return <div key={part.id}>{part.data}</div>
        }
        return null
      })}
    </>
  )
}

フルスタック型安全性

この仕組みの最大の利点は、エンドツーエンドの型安全性です。カスタムメッセージ型を createUIMessageStreamuseChat の両方にジェネリクスとして渡すことで、サーバーからクライアントまで一貫した型チェックが保証されます。

  • サーバー側で誤った型のデータを送信するとコンパイルエラー
  • クライアント側で存在しないプロパティにアクセスするとコンパイルエラー
  • リファクタリング時も型エラーで問題を即座に検出

Data Parts の種類

AI SDK v5 では、3種類の Data Parts があります[1:1][4]

種類 用途 メッセージ履歴への保存 識別方法
Regular data parts 一般的なカスタムデータ ✅ 保存される(message.parts に追加) type: 'data-*' で識別(例:'data-search-stage'
Sources RAG での参照資料・URL ✅ 保存される type: 'source-url' または type: 'source-document'
Transient parts 一時的な進捗情報 ❌ 保存されない(onData でのみアクセス可能) transient: true フラグ付き

例えば RAG アプリケーションでは、検索ステージは Transient parts として、参照資料は Sources(基本情報)と Regular data parts(関連度スコアなどの追加情報)を組み合わせて実装します。

Transient parts で体感速度を改善

Transient parts を使うと、体感的な待ち時間を改善できます。処理開始と同時に transient: true フラグ付きの進捗データを送信することで、ユーザーは即座にフィードバックを得られます。

// 処理開始直後に即座に進捗を送信
writer.write({
  type: "data-search-stage",
  data: { stage: "searching" },
  transient: true,  // 履歴には保存せず、即時表示のみ
})

この方式により、実際の処理時間が変わらなくても体感的な待ち時間が改善されます。ストリーミングにより、進捗が即座にクライアントに送信されるためです。

機密情報の取り扱い

Transient parts のもう一つの重要な用途は、署名付き URL や機密情報を履歴に残さないことです。参照資料のプレビューに一時的にアクセスURLを表示する場合、transient: true を使用すると安全です。

// ✅ 一時的なプレビュー表示には transient を使用(履歴に残らない)
writer.write({
  type: "data-file-preview",
  data: {
    signedUrl: "https://s3.amazonaws.com/...",
    // 注: 署名付きURLには有効期限があるため、
    // transientフラグで履歴に残さないことが重要
  },
  transient: true,
})

メモリ効率とパフォーマンス

Transient parts はメモリ効率の向上にも貢献します。高頻度で更新される進捗情報(ファイルアップロードの進捗など)をデフォルト設定(transient: false)で送信すると、すべての Data Parts がメッセージ履歴に蓄積され、履歴が肥大化します。一方、transient: true を使用すると onData でのみ処理され履歴には残らないため、最終結果のみを保存することで履歴をコンパクトに保てます。

Data Parts と Metadata のユースケース

Data Parts と Metadata は、どちらもメッセージに追加情報を付与する仕組みですが、用途と特性が異なります。適切に使い分けることで、型安全で保守性の高い実装が可能になります。

Metadata とは

Metadata[5] は、メッセージ全体に付与するメタ情報です。UIMessage の第1型引数で定義し、message.metadata オブジェクトとしてアクセスします。タイムスタンプ、使用トークン数、モデル名など、メッセージの属性情報を格納するのに適しています。

Data Parts と Metadata の違い

両者の違いを理解するために、以下の比較表を確認しましょう。

特性 Data Parts Metadata
格納場所 message.parts 配列 message.metadata オブジェクト
配置可能な数 メッセージごとに複数存在可能 メッセージごとに1つのみ
主な用途 メッセージの内容の一部 メッセージ全体のメタ情報
ストリーミング 複数回送信可能 最終的に1回設定
型定義の位置 UIMessage の第2引数 UIMessage の第1引数

使い分けのガイドライン

両者の使い分けは、メッセージの内容または属性で判断できます。

Data Parts の使いどころ

Data Parts は、ユーザーがメッセージの内容として見る必要がある情報です。検索システムを例に考えると、進捗状態の変化や参照資料のリストは、ユーザーが直接確認すべき情報です。これらはメッセージの本体として表示され、履歴にも残ります。

使用例

  • 検索処理の進捗状態:searching → reranking → generating
  • 参照資料のリスト:複数のソースURLや文書の引用情報
  • 一時的な進捗情報:ファイルアップロード進捗(transient フラグ付き)
  • Follow-up suggestions:LLMの応答後に「次に何を聞くべきか」の提案
  • インタラクティブな要素:ボタン、フォーム、アクション可能な UI コンポーネント

Metadata の使いどころ

一方、Metadata は、メッセージそのものの属性を記録します。メッセージ生成に何秒かかったのか、何トークン使用したのか、どのモデルで生成されたのかといった情報は、メッセージの内容ではなく、メッセージについての情報です。これらは、UIでは小さなバッジやツールチップで補助的に表示されることが多く、メッセージごとに1つの値として確定します。

使用例

  • パフォーマンス指標:メッセージ生成時間、初回バイトまでの時間(TTFB)
  • コスト情報:使用トークン数、API コスト、使用モデル
  • 品質指標:ユーザー評価スコア、信頼度スコア
  • コンテキスト情報:タイムスタンプ、ユーザーID、セッション ID

型定義の例

// Data Parts と Metadata を組み合わせた型定義
export type SearchStage = "searching" | "reranking" | "generating"

export type MyUIMessage = UIMessage<
  // 1. Metadata(第1引数): メッセージ全体の属性
  {
    duration: number;      // 生成時間(ミリ秒)
    tokenCount: number;    // 使用トークン数
    model: string;         // 使用モデル
  },
  // 2. Data Parts(第2引数): メッセージの内容
  {
    'search-stage': {
      stage: SearchStage
    };
    // Note: 'source-url' は組み込み型のため定義不要
  }
>

この使い分けにより、型安全で保守性の高い実装が可能になります。

実装例:段階的な検索ステージの表示

従来の実装では、回答生成のみが可視化されていました。

// ❌ 「回答生成中...」表示 → 裏で検索・ReRank・生成が実行される
const docs = await retrieveDocuments(...)        // 例: 3秒(ユーザーには見えない)
const rerankResult = await rerankDocuments(...)  // 例: 2秒(ユーザーには見えない)
// ↑ この5秒間、ユーザーは「回答生成中...」しか見えない
const textStream = streamText(...)                // 例: 5秒(ストリーミング表示)

この方法では、例えば Knowledge Base 検索に3秒、ReRankに2秒かかっていても、ユーザーには「回答生成中...」という単一のローディング状態しか表示されません。実際には検索や精査を実行しているのに、何が起きているか分からない状態で待たされます。

Data Parts による可視化

Data Parts を使うと、各ステージの進捗をリアルタイムで通知でき、段階的な進捗表示で UX が向上します。

Vercel AI SDK はストリーミングプロトコル[6]を使用しているため、HTTP 接続を保持したまま複数のイベントを順次送信でき、処理の進行に合わせて検索ステージや参照資料などのカスタムデータをリアルタイムで送信できます。

次のように段階的に表示されます。

段階的な進捗表示の例
[0秒] 🔍 検索中...
  ↓
[3秒] 📊 関連文書を精査中...
  ↓
[5秒] 📄 参照資料のプレビュー
  ↓
[5秒〜] ✨ 回答ストリーミング

実装のポイント

ステージ通知のタイミングが重要です。各非同期処理の前後でステージを更新することで、ユーザーは何が起きているかを理解できます。

const stream = createUIMessageStream<MyUIMessage>({
  execute: async ({ writer }) => {
    // 1️⃣ 検索開始
    writer.write({
      type: "data-search-stage",
      data: { stage: "searching" },
      transient: true,
    })
    const docs = await retrieveDocuments({ query, topK, knowledgeBaseId })

    if (docs && docs.length > 0) {
      // 2️⃣ ReRank 実行
      writer.write({
        type: "data-search-stage",
        data: { stage: "reranking" },
        transient: true,
      })
      const rerankResult = await rerankDocuments({ query, documents: docs, topN: 5 })

      // 3️⃣ 参照資料を Source パーツとして送信(履歴に保存される)
      for (const doc of rerankResult.documents) {
        writer.write({
          type: "source-url",
          sourceId: doc.id,
          url: doc.fileUrl,
          title: doc.fileName,
        })
      }
    }

    // 4️⃣ 生成開始 + LLM ストリームをマージ
    writer.write({
      type: "data-search-stage",
      data: { stage: "generating" },
      transient: true,
    })
    const textStream = streamText({ model: bedrockModel, messages, system: enhancedSystemPrompt })
    writer.merge(textStream.toUIMessageStream())
  },
})

writer.merge() の役割: LLM のテキストストリームと Data Parts を単一のストリームに統合し、クライアントで一貫して処理できるようにします。

順序保証: writer.write() で送信した Data Parts は、merge() との位置関係で順序が決まります。

// 順序の例
writer.write({
  type: "data-search-stage",
  data: { stage: "searching" },
  transient: true,
})  // ① 先に届く
const textStream = streamText({ model, messages })
writer.merge(textStream.toUIMessageStream())  // ② LLMテキストがストリーミング
writer.write({
  type: "data-complete",
  data: { done: true },
})  // ③ LLM完了後に届く

これにより、検索ステージ → 参照資料 → LLM応答 → 完了通知という自然な順序が保証されます。

エラーハンドリング: merge() されたストリーム内でエラーが発生した場合、ストリーム全体が中断されます。本番環境では、LLM 呼び出しを try-catch で囲み、エラー時には Data Parts でエラー状態を通知することを推奨します。

エラーハンドリングの実装例
const stream = createUIMessageStream<MyUIMessage>({
  execute: async ({ writer }) => {
    try {
      // 検索処理
      writer.write({ type: "data-search-stage", data: { stage: "searching" }, transient: true })
      const docs = await retrieveDocuments({ query, topK, knowledgeBaseId })

      // LLM ストリーミング
      writer.write({ type: "data-search-stage", data: { stage: "generating" }, transient: true })
      const textStream = streamText({ model: bedrockModel, messages, system })
      writer.merge(textStream.toUIMessageStream())
    } catch (error) {
      // エラー発生時はエラー状態を通知
      writer.write({
        type: "data-search-stage",
        data: { stage: "error", message: error instanceof Error ? error.message : "Unknown error" },
        transient: true,
      })
      throw error  // エラーを再スローしてストリームを終了
    }
  },
})

クライアント側では、onData でエラー状態を受信します。

const { messages, status } = useChat<MyUIMessage>({
  api: "/api/chat",
  onData(part) {
    if (part.type === "data-search-stage") {
      if (part.data.stage === "error") {
        // エラー状態を UI に反映
        setStage("error")
      } else {
        setStage(part.data.stage)
      }
    }
  },
})

const MessageGenerating = ({ stage }: { stage: SearchStage }) => {
  if (stage === "error") {
    return <div className="text-red-500">エラーが発生しました</div>
  }

  // 通常の進捗表示
  return <div>処理中... ({stage})</div>
}
より詳細なコード実装
import {
  createUIMessageStream,
  createUIMessageStreamResponse,
  streamText,
  convertToModelMessages,
  generateId
} from "ai"
import type { MyUIMessage } from "@/types/chat"

export const POST = async (request: Request) => {
  const { messages, query } = await request.json()

  const stream = createUIMessageStream<MyUIMessage>({
    originalMessages: messages,  // 元のメッセージを渡す(オプション)
    generateId: generateId,      // メッセージIDの生成関数(オプション)
    execute: async ({ writer }) => {
      // 1️⃣ 検索ステージを送信(transient で即時表示、履歴には残さない)
      writer.write({
        type: "data-search-stage",
        data: { stage: "searching" },
        transient: true,  // 体感レイテンシ短縮:即座に進捗を表示
      })

      // Knowledge Base から検索
      const docs = await retrieveDocuments({ query, topK, knowledgeBaseId })

      if (docs && docs.length > 0) {
        // 2️⃣ ReRank 実行中ステージを送信
        writer.write({
          type: "data-search-stage",
          data: { stage: "reranking" },
          transient: true,
        })

        // ReRank を実行
        const rerankResult = await rerankDocuments({
          query,
          documents: docs,
          topN: 5,
        })

        // 3️⃣ 参照資料を Source パーツとして送信(履歴に保存される)
        for (const ref of rerankResult.documents) {
          writer.write({
            type: "source-url",
            sourceId: ref.id,
            url: ref.fileUrl,
            title: ref.fileName,
          })
        }
      }

      // 4️⃣ 生成ステージを送信
      writer.write({
        type: "data-search-stage",
        data: { stage: "generating" },
        transient: true,
      })

      // 5️⃣ LLM ストリーミングをマージ
      const textStream = streamText({
        model: bedrockModel,
        // UIMessage には 'data-search-stage' のような Data Parts が含まれるが、
        // LLM はこれらを理解できない(LLM が理解できるのは text/tool-call/tool-result のみ)
        // そのため、convertToModelMessages で ModelMessage(標準形式)に変換する
        messages: convertToModelMessages(messages),
        system: enhancedSystemPrompt,
      })

      writer.merge(textStream.toUIMessageStream())
    },
  })

  return createUIMessageStreamResponse({ stream })
}

originalMessagesgenerateId は任意のパラメータです。これらは createUIMessageStreamResponse でストリームレスポンスを作成する際、メッセージを適切に永続化し履歴管理を行うために使用されます。originalMessages を渡すことで既存のメッセージとの重複を防ぎ、generateId により一貫性のある ID 生成が保証されます。省略した場合はデフォルトの動作が適用されます。

クライアント側での受信

transient: true を付けたパーツは message.parts に追加されず履歴に残らないため、useChatonData コールバックで受信します。

"use client"

import { useState } from "react"
import { useChat } from "@ai-sdk/react"
import type { MyUIMessage, SearchStage } from "@/types/chat"

export function Chat() {
  const [stage, setStage] = useState<SearchStage>("searching")

  const { messages, status, input, handleSubmit } = useChat<MyUIMessage>({
    api: "/api/chat",
    onData(part) {
      // Transient な Data Parts を onData で受信
      if (part.type === "data-search-stage") {
        setStage(part.data.stage)
      }
    },
  })

  return (
    <>
      {/* 生成中の表示 */}
      {status === "streaming" && (
        <MessageGenerating stage={stage} />
      )}

      {/* チャットメッセージ */}
      {messages.map((message) => (
        <Message key={message.id} message={message} />
      ))}
    </>
  )
}

const MessageGenerating = ({ stage }: { stage: SearchStage }) => {
  const stageText: Record<SearchStage, string> = {
    searching: "検索中...",
    reranking: "関連文書を精査中...",
    generating: "回答生成中...",
  }

  return <span>{stageText[stage]}</span>
}

クライアント側のポイントは以下の3点です。

  1. onData コールバックの使用:Transient なパーツは onData で受信します
  2. ローカル state での管理useState でステージ情報を保持し、UI に反映します
  3. 即時フィードバック:処理開始直後からステージ情報が表示されます

この実装により、プログレス効果(処理段階のリアルタイム表示により待ち時間の体験を改善する心理的効果)を活用し、長時間の処理でもユーザーの不安を軽減できます。

動的更新:同じ ID による部分更新

ここまでは検索ステージ(searching → reranking → generating)の切り替えを説明してきましたが、より詳細な進捗情報を表示したい場合もあります。たとえば、「検索中... 10/100」のように具体的な進捗パーセンテージを表示する場合、サーバー側で writer.write()同じパーツID を指定することで、クライアント側で同じパーツを上書き更新できます[1:2]。これにより、進捗バーのリアルタイム更新が可能になります。

注:ここでの id は、メッセージ全体を識別する message.id ではなく、パーツ固有の識別子です。同じ id を持つパーツが送信されると、クライアント側で既存のパーツが上書きされます。

// 検索進捗の動的更新例
writer.write({
  id: "search-progress",  // 同じ ID
  type: "data-progress",  // 同じ type
  data: { current: 10, total: 100 },
})

// ... 処理中 ...

writer.write({
  id: "search-progress",  // 同じ ID で更新
  type: "data-progress",  // 同じ type
  data: { current: 50, total: 100 },
})

まとめ 🖌️

Vercel AI SDK v5 の Data Parts を活用することで、検索処理の進捗をリアルタイムで可視化し、ユーザー体験を改善できます。

  • フルスタック型安全性 - createUIMessageStreamuseChat にカスタムメッセージ型を渡すことで、サーバーからクライアントまで一貫した型チェックが保証されます。
  • 段階的な進捗表示 - 「検索中 → 文書精査中 → 回答生成中」という段階的な進捗により、ユーザーは処理状況を理解でき、体感的な待ち時間が改善されます。
  • Transient parts による即時フィードバック - transient: true フラグにより、処理開始直後に進捗を表示でき、機密情報を履歴に残さずに済みます。
  • onData による即時受信 - Transient なパーツは onData コールバックで受信し、UI に即座に反映できます。

以上です!

脚注
  1. AI SDK UI: Streaming Custom Data ↩︎ ↩︎ ↩︎

  2. AI SDK UI: UIMessage Interface ↩︎

  3. AI SDK UI: createUIMessageStream Reference ↩︎

  4. AI SDK UI: Types of Streamable Data ↩︎

  5. AI SDK UI: Message Metadata ↩︎

  6. AI SDK UI: Stream Protocol ↩︎

Discussion