🐈

VPSで「認証つきDify環境」を作る⑥:「Custom送信ルート」とSSEでDifyと会話する

に公開

💡 この記事はシリーズ⑥です。
前章では、Difyのフローを“モデル”として選べるようになりました。
ここでは、選んだ時の“送信先”を付け替えて、Difyにメッセージを送り、SSE(ストリーミング)で返事をリアルタイム表示できるようにします。

1. 送信先を「Custom」に切り替える(画面側のハンドラ)

「このモデルの送り先はCustomだ」と分かったら、標準の送信ではなく、自前の送信API(後述 /api/chat/custom)に投げるようにします。

編集するファイル
components/chat/chat-hooks/use-chat-handler.tsx

  • ポイント
    送る前に選んだモデルの provider を見て、custom なら

    • 1.チャットが無ければ handleCreateChat() で作る
    • 2./api/chat/custom に chatId、message、model(=Difyのslug) をPOST

    返ってきたデータは processResponse() でSSEとして読んで、画面に少しずつ表示します。また、添付ファイルを使うなら、アップロード後のIDを最小限にくっつけます。

●編集

cd /opt/chatbot-ui
sudo nano components/chat/chat-hooks/use-chat-handler.tsx

「-」部分は削除し、「+」部分を追記します。

import { ChatbotUIContext } from "@/context/context"
import { getAssistantCollectionsByAssistantId } from "@/db/assistant-collections"

// --- 中略 ---

export const useChatHandler = () => {
  const router = useRouter()

  const {
+     availableHostedModels,
    userInput,
    chatFiles,
    setUserInput,

// --- 中略 ---

      const modelData = [
-         ...models.map(model => ({
-           modelId: model.model_id as LLMID,
-           modelName: model.name,
-           provider: "custom" as ModelProvider,
-           hostedId: model.id,
-           platformLink: "",
-           imageInput: false
-         })),
+         ...availableHostedModels,
        ...LLM_LIST,
        ...availableLocalModels,
        ...availableOpenRouterModels
      ].find(llm => llm.modelId === chatSettings?.model)


// --- 中略 ---

      } else {
        if (modelData!.provider === "ollama") {
          generatedText = await handleLocalChat(
            payload,
            profile!,
            chatSettings!,
            tempAssistantChatMessage,
            isRegeneration,
            newAbortController,
            setIsGenerating,
            setFirstTokenReceived,
            setChatMessages,
            setToolInUse
          )

+         // ★追加: custom(Dify) はここで直接 /api/chat/custom に送る
+         } else if (modelData!.provider === "custom") {
+           // まだチャットが無ければ先に作る(/api/chat/custom は chatId 必須)
+           if (!currentChat) {
+             currentChat = await handleCreateChat(
+               chatSettings!,
+              profile!,
+               selectedWorkspace!,
+               messageContent,
+               selectedAssistant!,
+               newMessageFiles,
+               setSelectedChat,
+               setChats,
+               setChatFiles
+             )
+           }

+           // ★追加: 直前にアップロード成功したファイルIDを添付(最小実装)
+           const pending: Array<{ id: string; type: "image" | "document" }> =
+             ((globalThis as any).__difyPendingFiles || []).slice()
+           const filesForThisMessage =
+             pending.map(f => ({
+               type: f.type,
+               transfer_method: "local_file" as const,
+               upload_file_id: f.id
+             }))
+           if (pending.length) {
+             ;(globalThis as any).__difyPendingFiles = [] // 使い終わったらクリア
+             console.log("[SEND files]", { count: filesForThisMessage.length })
+           }

+           const response = await fetch("/api/chat/custom", {
+             method: "POST",
+             headers: { "Content-Type": "application/json" },
+             body: JSON.stringify({
+               chatId: currentChat.id,
+               message: messageContent,
+               model: chatSettings!.model, // ← Dify の slug
+               // inputs を使うならここで { inputs: {...} } を追加
+               // ★追加: 添付ファイル
+               ...(filesForThisMessage.length
+                 ? { files: filesForThisMessage }
+                 : {})
+             })
+           })
+           // ★追記: Dify からの SSE を UI にストリーム表示
+           generatedText = await processResponse(
+             response,
+             isRegeneration
+             ? payload.chatMessages[payload.chatMessages.length - 1]
+             : tempAssistantChatMessage,
+           true,
+           newAbortController,
+           setFirstTokenReceived,
+           setChatMessages,
+           setToolInUse
+           )

        } else {
          generatedText = await handleHostedChat(
            payload,
            profile!,
            modelData!,
            tempAssistantChatMessage,
            isRegeneration,
            newAbortController,
            newMessageImages,
            chatImages,
            setIsGenerating,
            setFirstTokenReceived,
            setChatMessages,
            setToolInUse
          )
        }
      }

// --- 中略 ---

  return {
    chatInputRef,
-     prompt,
    handleNewChat,
    handleSendMessage,
    handleFocusChatInput,
    handleStopMessage,
    handleSendEdit
  }
}

2. Custom専用の送信APIを作る(サーバ側)

鍵を外に出さずにサーバ側で代理送信します。そのための手順は、①ログイン確認 → ②チャットの持ち主か確認 → ③dify_apps から鍵など取得 → ④Difyの /v1/chat-messages にSSEで中継 → ⑤初回だけ会話IDを保存、です。

編集するファイル
/opt/chatbot-ui/app/api/chat/custom/route.ts

  • ポイント
    • 受け取る内容:chatId、message(または messages)、model(= Difyのslug)、inputs(任意)
    • ログイン確認:@supabase/auth-helpers-nextjs を使い、Cookieからユーザー取得
    • DBで確認:chats に存在するか、持ち主が本人かをチェック(supabaseAdmin=サーバ用キー)
    • Dify設定の取得:dify_apps から api_key / base_url / default_inputs を取得
    • 送信:fetch(baseUrl + /v1/chat-messages, { Accept: text/event-stream })
    • 会話IDの保存:ReadableStream.tee() を使い、一方をそのままブラウザへ、もう一方で最初に届くJSONから conversation_id を抜き出し chats.provider_thread_id に保存(初回だけでOK)
    • ヘッダー:Content-Type: text/event-stream、X-Accel-Buffering: no(Nginx側は前章で設定済み)

●編集

cd /opt/chatbot-ui
sudo nano /opt/chatbot-ui/app/api/chat/custom/route.ts

内容を丸ごと以下にコピペで置き換えてください。

// /app/api/chat/custom/route.ts
import { NextResponse } from 'next/server';
import { cookies } from 'next/headers';
import { createRouteHandlerClient } from '@supabase/auth-helpers-nextjs';
import { supabaseAdmin } from '@/lib/supabaseAdmin';

type ChatRequestBody = {
  chatId: string;
  message?: string;
  messages?: Array<{ role: 'user' | 'assistant' | 'system'; content: string }>;
  model?: string;       // ← Dify の slug をここに入れる運用(UIの「モデル」に相当)
  inputs?: Record<string, any>; // Dify Flow の inputs(任意)
};

export const dynamic = 'force-dynamic';   // キャッシュ抑止
export const runtime = 'nodejs'; // Edge だと Service Role が使いづらいので nodejs 実行

function pickLastUserText(body: ChatRequestBody): string | null {
  if (body.message && body.message.trim()) return body.message.trim();
  const arr = body.messages ?? [];
  for (let i = arr.length - 1; i >= 0; i--) {
    if (arr[i].role === 'user' && arr[i].content?.trim()) return arr[i].content.trim();
  }
  return null;
}

export async function POST(req: Request) {
  try {
    const body = (await req.json()) as ChatRequestBody;

    // 0) 入力バリデーション(最小限)
    if (!body.chatId) return NextResponse.json({ error: 'chatId is required' }, { status: 400 });

    const userText = pickLastUserText(body);
    if (!userText) return NextResponse.json({ error: 'message is empty' }, { status: 400 });

    // 1) 認証ユーザー取得(Cookie ベース)
    const supabaseServer = createRouteHandlerClient({ cookies });
    const { data: { user } } = await supabaseServer.auth.getUser();
    if (!user) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });

    // 2) チャット行の取得&所有者チェック(Service Role)
    const { data: chat, error: chatErr } = await supabaseAdmin
      .from('chats')
      .select('id, user_id, model, provider_thread_id, provider')
      .eq('id', body.chatId)
      .single();

    if (chatErr || !chat) return NextResponse.json({ error: 'Chat not found' }, { status: 404 });
    if (chat.user_id !== user.id) return NextResponse.json({ error: 'Forbidden' }, { status: 403 });

    const slug = body.model?.trim() || chat.model?.trim();
    if (!slug) return NextResponse.json({ error: 'model (Dify slug) is required' }, { status: 400 });

    // 3) Dify アプリ設定の取得(鍵は DB から)
    const { data: app, error: appErr } = await supabaseAdmin
      .from('dify_apps')
      .select('slug, display_name, api_key, base_url, default_inputs, is_active')
      .eq('slug', slug)
      .eq('is_active', true)
      .single();

    if (appErr || !app) return NextResponse.json({ error: 'Dify app not found or inactive' }, { status: 404 });

    // 初回送信など provider が custom でなければ custom に自動更新
    if (chat.provider !== 'custom' || chat.model !== slug) {
      await supabaseAdmin
      .from('chats')
      .update({ provider: 'custom', model: slug })
      .eq('id', chat.id);
    }

    const baseUrl = app.base_url.replace(/\/+$/, '');
    const apiKey = app.api_key;
    const inputs = { ...(app.default_inputs ?? {}), ...(body.inputs ?? {}) };

    // 4) Dify へストリーミング中継(SSE)
    const payload: Record<string, any> = {
      inputs,
      query: userText,
      response_mode: 'streaming',
      user: user.id,
    };
    if (chat.provider_thread_id) payload.conversation_id = chat.provider_thread_id;

    const upstream = await fetch(`${baseUrl}/v1/chat-messages`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${apiKey}`,
        'Content-Type': 'application/json',
        'Accept': 'text/event-stream',
      },
      body: JSON.stringify(payload),
    });

    if (!upstream.ok || !upstream.body) {
      const errText = await upstream.text().catch(() => '');
      return NextResponse.json({ error: 'Upstream error', detail: errText || upstream.statusText }, { status: 502 });
    }

    // 5) conversation_id を初回のみ保存するための tee(片方でパース)
    const [streamForClient, streamForParse] = (upstream.body as ReadableStream).tee();

    (async () => {
      // パーサ側:最初に届く JSON の中から conversation_id を拾う
      try {
        const reader = streamForParse.getReader();
        let buf = '';
        let done = false;
        let saved = false;

        while (!done) {
          const { value, done: d } = await reader.read();
          done = d;
          if (value) buf += new TextDecoder().decode(value);

          // SSE は「data: {...}\n\n」形式。最初の data 行から抽出できれば十分
          const match = buf.match(/"conversation_id"\s*:\s*"([0-9a-f-]{36})"/i);
          if (match && !saved && !chat.provider_thread_id) {
            const convId = match[1];
            await supabaseAdmin.from('chats')
              .update({ provider_thread_id: convId })
              .eq('id', chat.id);
            saved = true;
            // 以降のパースは不要なので抜ける
            try { reader.cancel(); } catch {}
            break;
          }
          // バッファが大きくなりすぎないよう適宜切り捨て
          if (buf.length > 64 * 1024) buf = buf.slice(-32 * 1024);
        }
      } catch {
        // 失敗してもストリーミング自体は継続するので握りつぶし
      }
    })();

    // 6) クライアントへそのまま中継(SSEヘッダ)
    const headers = new Headers();
    headers.set('Content-Type', 'text/event-stream; charset=utf-8');
    headers.set('Cache-Control', 'no-cache, no-transform');
    headers.set('Connection', 'keep-alive');
    headers.set('X-Accel-Buffering', 'no');

    return new Response(streamForClient, { headers, status: 200 });
  } catch (e: any) {
    return NextResponse.json({ error: 'Internal error', detail: e?.message ?? String(e) }, { status: 500 });
  }
}

3.SSE(ストリーミング)の読み込み(画面側)

Difyの返事は SSE(text/event-stream)という方式で届きます。これをイベントごとに読んで、「ユーザーの目の前で文字が流れていく」ように表示します。

編集するファイル
components/chat/chat-helpers/index.ts

  • ポイント
    レスポンスの Content-Type に text/event-stream が含まれていたらSSEモードにします。
    また、data: の行だけを集めて、イベント単位で処理します。
     - event: ping は無視
     - event: message の answer だけ画面に追加
     - event: message_end や usage のような大きなメタ情報は表示しない

●編集

cd /opt/chatbot-ui
sudo nano components/chat/chat-helpers/index.ts

「-」部分は削除し、「+」部分を追記します。

// Only used in use-chat-handler.tsx to keep it clean

import { createChatFiles } from "@/db/chat-files"
import { createChat } from "@/db/chats"

// --- 中略 ---


export const processResponse = async (
  response: Response,
  lastChatMessage: ChatMessage,
  isHosted: boolean,
  controller: AbortController,
  setFirstTokenReceived: React.Dispatch<React.SetStateAction<boolean>>,
  setChatMessages: React.Dispatch<React.SetStateAction<ChatMessage[]>>,
  setToolInUse: React.Dispatch<React.SetStateAction<string>>
) => {
-   let fullText = ""
-   let contentToAdd = ""

-   if (response.body) {
-     await consumeReadableStream(
-       response.body,
-       chunk => {
-         setFirstTokenReceived(true)
-         setToolInUse("none")

-         try {
-           contentToAdd = isHosted
-             ? chunk
-             : // Ollama's streaming endpoint returns new-line separated JSON
-               // objects. A chunk may have more than one of these objects, so we
-               // need to split the chunk by new-lines and handle each one
-               // separately.
-               chunk
-                 .trimEnd()
-                 .split("\n")
-                 .reduce(
-                   (acc, line) => acc + JSON.parse(line).message.content,
-                   ""
-                 )
-           fullText += contentToAdd
-         } catch (error) {
-           console.error("Error parsing JSON:", error)
-         }

+   let fullText = ""
+   let contentToAdd = ""
+   // ★追加: SSE 断片をまたぐ場合に備えたバッファ(Difyなどの text/event-stream 用)
+   let sseBuffer = ""

+   // Content-Type でSSEかどうかを判定
+   const contentType = (response.headers.get("content-type") || "").toLowerCase()
+   const isSSE = contentType.includes("text/event-stream")

+   if (response.body) {
+     await consumeReadableStream(
+       response.body,
+      chunk => {
+         try {
+           if (isHosted) {
+             // ★変更(最小): Hosted かつ SSE の場合のみ、SSEイベントを厳密パース
+             if (isSSE) {
+               sseBuffer += chunk
+               const events = sseBuffer.split("\n\n")
+               sseBuffer = events.pop() || ""

+               let added = ""
+               for (const ev of events) {
+                 const lines = ev.split("\n")
+                 // event名を拾う(無い場合もある)
+                 const eventLine = lines.find(l => l.startsWith("event:"))
+                 const eventName = eventLine ? eventLine.slice(6).trim() : null

+                 // data行をまとめる
+                 const dataStr = lines
+                   .filter(l => l.startsWith("data:"))
+                   .map(l => l.slice(5).trim())
+                   .join("\n")

+                 if (!dataStr) continue

+                 // ping などは描画しない
+                 if (eventName === "ping") continue

+                 // JSONとして解釈を試み、本文のみ抽出
+                 let obj: any = null
+                 try {
+                   obj = JSON.parse(dataStr)
+                 } catch {
+                   // JSONでない data は描画しない
+                   continue
+                 }

+                 // usage / model_provider などの巨大JSONは描画しない
+                 if (obj?.usage || obj?.model_provider || obj?.execution_metadata) {
+                   continue
+                 }

+                 // 本文: event: message の answer だけを描画
+                 if ((eventName === "message" || eventName === null) && typeof obj?.answer === "string") {
+                   added += obj.answer
+                   continue
+                 }

+                 // message_end は会話IDなどのメタのみ(描画しない)
+                 if (eventName === "message_end") {
+                   continue
+                 }

+                 // その他イベントは描画しない
+               }

+               contentToAdd = added
+             } else {
+               // 既存ホステッド(OpenAI 等)はプレーンテキスト想定
+               contentToAdd = chunk
+             }
+           } else {
+             // Ollama: 1行1 JSON なので行ごとに content を抽出
+             contentToAdd = chunk
+               .trimEnd()
+               .split("\n")
+               .reduce((acc, line) => acc + JSON.parse(line).message.content, "")
+           }
+         } catch (error) {
+           console.error("Error parsing streamed chunk:", error)
+           contentToAdd = ""
+         }

+         // 文字が増えたときだけ UI を進める
+         if (contentToAdd) {
+           setFirstTokenReceived(true)
+           setToolInUse("none")
+           fullText += contentToAdd
+         }

        setChatMessages(prev =>
          prev.map(chatMessage => {

// --- 以下略 ---

4. ビルド&テスト&常駐化

ここまででいったん、ビルド&テストしてみましょう。

cd /opt/chatbot-ui
rm -rf .next
npm run build
npm run start -- -H 0.0.0.0 -p 3000

ここまでの設定で会話ができるようになるはず。

chat.example.com にブラウザでアクセスしてChatbotUIにログインします。
右上のモデル選択のところをクリックして、Difyで作ったカスタムフローを選択して「こんにちは」と送ってみましょう。無事に返事が返ってくれば完了です。

お疲れ様でした。
あとは、Difyのチャットフローを作りこめば、どんな設定であれば返事を返してくれます。

テストも終わりなので、ここからはPM2 を使って、常駐化させましょう。
PM2 を使って「サーバ再起動後も自動で立ち上がる」ようにします。
落ちたときの自動再起動やログ確認もワンコマンドでOKになります。
※前提:ChatbotUIのコードは /opt/chatbot-ui、Nginxは 3000番へプロキシ済み(第3章)。

●PM2をインストール

npm install -g pm2
pm2 -v   # バージョンが出ればOK

●アプリを PM2 で起動(ビルド済み前提)

cd /opt/chatbot-ui
pm2 start "npm run start -- -H 0.0.0.0 -p 3000" --name chatbot-ui --cwd /opt/chatbot-ui --time

● サーバ再起動時に自動起動させる

# ① systemd にPM2の自動起動を登録(出力される“最後の1行”をコピペ実行)
pm2 startup

# ② 現在のPM2プロセス一覧を保存(「自動起動する対象」を記録)
pm2 save

これで、サーバ再起動後も 自動でChatbotUIが立ち上がるようになりました。
以下はご参考まで。

●参考①:よく使うPM2コマンド(控え)

pm2 ls                    # 一覧
pm2 logs chatbot-ui       # リアルタイムログ
pm2 restart chatbot-ui    # 再起動
pm2 stop chatbot-ui       # 停止
pm2 delete chatbot-ui     # PM2管理から削除
pm2 save                  # 自動起動に登録される“現在の一覧”を保存

●参考②:デプロイ更新の流れ(控え)

cd /opt/chatbot-ui
git pull
sudo npm i
npm run build
pm2 restart chatbot-ui

以下は付録です。必要に応じて設定してください。
6章では、ChatbotUIからDifyに添付ファイルを送れるようにします。ここでは、画像とテキストのみを扱います。
7章では、Difyで設定した初期入力フィールドをChatbotUIにも表示するように設定変更します。初期入力フィールドは短文、段落、選択、数値の4つにだけ対応し、単一ファイル、複数ファイルには対応しませんのでご了承ください。


🚀 まとめ

これで、Difyをバックエンドにした「認証つき・ストリーミング対応の安全なチャット環境」が完成です。
次章では、添付ファイルの送信や初期入力フィールドの設定など、より実践的な連携機能を追加していきます。


🪄 次の記事:
VPSで「認証つきDify環境」を作る⑦:添付ファイルを使えるようにする

Discussion