VPSで「認証つきDify環境」を作る⑥:「Custom送信ルート」とSSEでDifyと会話する
💡 この記事はシリーズ⑥です。
前章では、Difyのフローを“モデル”として選べるようになりました。
ここでは、選んだ時の“送信先”を付け替えて、Difyにメッセージを送り、SSE(ストリーミング)で返事をリアルタイム表示できるようにします。
1. 送信先を「Custom」に切り替える(画面側のハンドラ)
「このモデルの送り先はCustomだ」と分かったら、標準の送信ではなく、自前の送信API(後述 /api/chat/custom)に投げるようにします。
編集するファイル
components/chat/chat-hooks/use-chat-handler.tsx
-
ポイント
送る前に選んだモデルの provider を見て、custom なら- 1.チャットが無ければ handleCreateChat() で作る
- 2./api/chat/custom に chatId、message、model(=Difyのslug) をPOST
返ってきたデータは processResponse() でSSEとして読んで、画面に少しずつ表示します。また、添付ファイルを使うなら、アップロード後のIDを最小限にくっつけます。
●編集
cd /opt/chatbot-ui
sudo nano components/chat/chat-hooks/use-chat-handler.tsx
「-」部分は削除し、「+」部分を追記します。
import { ChatbotUIContext } from "@/context/context"
import { getAssistantCollectionsByAssistantId } from "@/db/assistant-collections"
// --- 中略 ---
export const useChatHandler = () => {
const router = useRouter()
const {
+ availableHostedModels,
userInput,
chatFiles,
setUserInput,
// --- 中略 ---
const modelData = [
- ...models.map(model => ({
- modelId: model.model_id as LLMID,
- modelName: model.name,
- provider: "custom" as ModelProvider,
- hostedId: model.id,
- platformLink: "",
- imageInput: false
- })),
+ ...availableHostedModels,
...LLM_LIST,
...availableLocalModels,
...availableOpenRouterModels
].find(llm => llm.modelId === chatSettings?.model)
// --- 中略 ---
} else {
if (modelData!.provider === "ollama") {
generatedText = await handleLocalChat(
payload,
profile!,
chatSettings!,
tempAssistantChatMessage,
isRegeneration,
newAbortController,
setIsGenerating,
setFirstTokenReceived,
setChatMessages,
setToolInUse
)
+ // ★追加: custom(Dify) はここで直接 /api/chat/custom に送る
+ } else if (modelData!.provider === "custom") {
+ // まだチャットが無ければ先に作る(/api/chat/custom は chatId 必須)
+ if (!currentChat) {
+ currentChat = await handleCreateChat(
+ chatSettings!,
+ profile!,
+ selectedWorkspace!,
+ messageContent,
+ selectedAssistant!,
+ newMessageFiles,
+ setSelectedChat,
+ setChats,
+ setChatFiles
+ )
+ }
+ // ★追加: 直前にアップロード成功したファイルIDを添付(最小実装)
+ const pending: Array<{ id: string; type: "image" | "document" }> =
+ ((globalThis as any).__difyPendingFiles || []).slice()
+ const filesForThisMessage =
+ pending.map(f => ({
+ type: f.type,
+ transfer_method: "local_file" as const,
+ upload_file_id: f.id
+ }))
+ if (pending.length) {
+ ;(globalThis as any).__difyPendingFiles = [] // 使い終わったらクリア
+ console.log("[SEND files]", { count: filesForThisMessage.length })
+ }
+ const response = await fetch("/api/chat/custom", {
+ method: "POST",
+ headers: { "Content-Type": "application/json" },
+ body: JSON.stringify({
+ chatId: currentChat.id,
+ message: messageContent,
+ model: chatSettings!.model, // ← Dify の slug
+ // inputs を使うならここで { inputs: {...} } を追加
+ // ★追加: 添付ファイル
+ ...(filesForThisMessage.length
+ ? { files: filesForThisMessage }
+ : {})
+ })
+ })
+ // ★追記: Dify からの SSE を UI にストリーム表示
+ generatedText = await processResponse(
+ response,
+ isRegeneration
+ ? payload.chatMessages[payload.chatMessages.length - 1]
+ : tempAssistantChatMessage,
+ true,
+ newAbortController,
+ setFirstTokenReceived,
+ setChatMessages,
+ setToolInUse
+ )
} else {
generatedText = await handleHostedChat(
payload,
profile!,
modelData!,
tempAssistantChatMessage,
isRegeneration,
newAbortController,
newMessageImages,
chatImages,
setIsGenerating,
setFirstTokenReceived,
setChatMessages,
setToolInUse
)
}
}
// --- 中略 ---
return {
chatInputRef,
- prompt,
handleNewChat,
handleSendMessage,
handleFocusChatInput,
handleStopMessage,
handleSendEdit
}
}
2. Custom専用の送信APIを作る(サーバ側)
鍵を外に出さずにサーバ側で代理送信します。そのための手順は、①ログイン確認 → ②チャットの持ち主か確認 → ③dify_apps から鍵など取得 → ④Difyの /v1/chat-messages にSSEで中継 → ⑤初回だけ会話IDを保存、です。
編集するファイル
/opt/chatbot-ui/app/api/chat/custom/route.ts
-
ポイント
- 受け取る内容:chatId、message(または messages)、model(= Difyのslug)、inputs(任意)
- ログイン確認:@supabase/auth-helpers-nextjs を使い、Cookieからユーザー取得
- DBで確認:chats に存在するか、持ち主が本人かをチェック(supabaseAdmin=サーバ用キー)
- Dify設定の取得:dify_apps から api_key / base_url / default_inputs を取得
- 送信:fetch(baseUrl + /v1/chat-messages, { Accept: text/event-stream })
- 会話IDの保存:ReadableStream.tee() を使い、一方をそのままブラウザへ、もう一方で最初に届くJSONから conversation_id を抜き出し chats.provider_thread_id に保存(初回だけでOK)
- ヘッダー:Content-Type: text/event-stream、X-Accel-Buffering: no(Nginx側は前章で設定済み)
●編集
cd /opt/chatbot-ui
sudo nano /opt/chatbot-ui/app/api/chat/custom/route.ts
内容を丸ごと以下にコピペで置き換えてください。
// /app/api/chat/custom/route.ts
import { NextResponse } from 'next/server';
import { cookies } from 'next/headers';
import { createRouteHandlerClient } from '@supabase/auth-helpers-nextjs';
import { supabaseAdmin } from '@/lib/supabaseAdmin';
type ChatRequestBody = {
chatId: string;
message?: string;
messages?: Array<{ role: 'user' | 'assistant' | 'system'; content: string }>;
model?: string; // ← Dify の slug をここに入れる運用(UIの「モデル」に相当)
inputs?: Record<string, any>; // Dify Flow の inputs(任意)
};
export const dynamic = 'force-dynamic'; // キャッシュ抑止
export const runtime = 'nodejs'; // Edge だと Service Role が使いづらいので nodejs 実行
function pickLastUserText(body: ChatRequestBody): string | null {
if (body.message && body.message.trim()) return body.message.trim();
const arr = body.messages ?? [];
for (let i = arr.length - 1; i >= 0; i--) {
if (arr[i].role === 'user' && arr[i].content?.trim()) return arr[i].content.trim();
}
return null;
}
export async function POST(req: Request) {
try {
const body = (await req.json()) as ChatRequestBody;
// 0) 入力バリデーション(最小限)
if (!body.chatId) return NextResponse.json({ error: 'chatId is required' }, { status: 400 });
const userText = pickLastUserText(body);
if (!userText) return NextResponse.json({ error: 'message is empty' }, { status: 400 });
// 1) 認証ユーザー取得(Cookie ベース)
const supabaseServer = createRouteHandlerClient({ cookies });
const { data: { user } } = await supabaseServer.auth.getUser();
if (!user) return NextResponse.json({ error: 'Unauthorized' }, { status: 401 });
// 2) チャット行の取得&所有者チェック(Service Role)
const { data: chat, error: chatErr } = await supabaseAdmin
.from('chats')
.select('id, user_id, model, provider_thread_id, provider')
.eq('id', body.chatId)
.single();
if (chatErr || !chat) return NextResponse.json({ error: 'Chat not found' }, { status: 404 });
if (chat.user_id !== user.id) return NextResponse.json({ error: 'Forbidden' }, { status: 403 });
const slug = body.model?.trim() || chat.model?.trim();
if (!slug) return NextResponse.json({ error: 'model (Dify slug) is required' }, { status: 400 });
// 3) Dify アプリ設定の取得(鍵は DB から)
const { data: app, error: appErr } = await supabaseAdmin
.from('dify_apps')
.select('slug, display_name, api_key, base_url, default_inputs, is_active')
.eq('slug', slug)
.eq('is_active', true)
.single();
if (appErr || !app) return NextResponse.json({ error: 'Dify app not found or inactive' }, { status: 404 });
// 初回送信など provider が custom でなければ custom に自動更新
if (chat.provider !== 'custom' || chat.model !== slug) {
await supabaseAdmin
.from('chats')
.update({ provider: 'custom', model: slug })
.eq('id', chat.id);
}
const baseUrl = app.base_url.replace(/\/+$/, '');
const apiKey = app.api_key;
const inputs = { ...(app.default_inputs ?? {}), ...(body.inputs ?? {}) };
// 4) Dify へストリーミング中継(SSE)
const payload: Record<string, any> = {
inputs,
query: userText,
response_mode: 'streaming',
user: user.id,
};
if (chat.provider_thread_id) payload.conversation_id = chat.provider_thread_id;
const upstream = await fetch(`${baseUrl}/v1/chat-messages`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
'Accept': 'text/event-stream',
},
body: JSON.stringify(payload),
});
if (!upstream.ok || !upstream.body) {
const errText = await upstream.text().catch(() => '');
return NextResponse.json({ error: 'Upstream error', detail: errText || upstream.statusText }, { status: 502 });
}
// 5) conversation_id を初回のみ保存するための tee(片方でパース)
const [streamForClient, streamForParse] = (upstream.body as ReadableStream).tee();
(async () => {
// パーサ側:最初に届く JSON の中から conversation_id を拾う
try {
const reader = streamForParse.getReader();
let buf = '';
let done = false;
let saved = false;
while (!done) {
const { value, done: d } = await reader.read();
done = d;
if (value) buf += new TextDecoder().decode(value);
// SSE は「data: {...}\n\n」形式。最初の data 行から抽出できれば十分
const match = buf.match(/"conversation_id"\s*:\s*"([0-9a-f-]{36})"/i);
if (match && !saved && !chat.provider_thread_id) {
const convId = match[1];
await supabaseAdmin.from('chats')
.update({ provider_thread_id: convId })
.eq('id', chat.id);
saved = true;
// 以降のパースは不要なので抜ける
try { reader.cancel(); } catch {}
break;
}
// バッファが大きくなりすぎないよう適宜切り捨て
if (buf.length > 64 * 1024) buf = buf.slice(-32 * 1024);
}
} catch {
// 失敗してもストリーミング自体は継続するので握りつぶし
}
})();
// 6) クライアントへそのまま中継(SSEヘッダ)
const headers = new Headers();
headers.set('Content-Type', 'text/event-stream; charset=utf-8');
headers.set('Cache-Control', 'no-cache, no-transform');
headers.set('Connection', 'keep-alive');
headers.set('X-Accel-Buffering', 'no');
return new Response(streamForClient, { headers, status: 200 });
} catch (e: any) {
return NextResponse.json({ error: 'Internal error', detail: e?.message ?? String(e) }, { status: 500 });
}
}
3.SSE(ストリーミング)の読み込み(画面側)
Difyの返事は SSE(text/event-stream)という方式で届きます。これをイベントごとに読んで、「ユーザーの目の前で文字が流れていく」ように表示します。
編集するファイル
components/chat/chat-helpers/index.ts
-
ポイント
レスポンスの Content-Type に text/event-stream が含まれていたらSSEモードにします。
また、data: の行だけを集めて、イベント単位で処理します。
- event: ping は無視
- event: message の answer だけ画面に追加
- event: message_end や usage のような大きなメタ情報は表示しない
●編集
cd /opt/chatbot-ui
sudo nano components/chat/chat-helpers/index.ts
「-」部分は削除し、「+」部分を追記します。
// Only used in use-chat-handler.tsx to keep it clean
import { createChatFiles } from "@/db/chat-files"
import { createChat } from "@/db/chats"
// --- 中略 ---
export const processResponse = async (
response: Response,
lastChatMessage: ChatMessage,
isHosted: boolean,
controller: AbortController,
setFirstTokenReceived: React.Dispatch<React.SetStateAction<boolean>>,
setChatMessages: React.Dispatch<React.SetStateAction<ChatMessage[]>>,
setToolInUse: React.Dispatch<React.SetStateAction<string>>
) => {
- let fullText = ""
- let contentToAdd = ""
- if (response.body) {
- await consumeReadableStream(
- response.body,
- chunk => {
- setFirstTokenReceived(true)
- setToolInUse("none")
- try {
- contentToAdd = isHosted
- ? chunk
- : // Ollama's streaming endpoint returns new-line separated JSON
- // objects. A chunk may have more than one of these objects, so we
- // need to split the chunk by new-lines and handle each one
- // separately.
- chunk
- .trimEnd()
- .split("\n")
- .reduce(
- (acc, line) => acc + JSON.parse(line).message.content,
- ""
- )
- fullText += contentToAdd
- } catch (error) {
- console.error("Error parsing JSON:", error)
- }
+ let fullText = ""
+ let contentToAdd = ""
+ // ★追加: SSE 断片をまたぐ場合に備えたバッファ(Difyなどの text/event-stream 用)
+ let sseBuffer = ""
+ // Content-Type でSSEかどうかを判定
+ const contentType = (response.headers.get("content-type") || "").toLowerCase()
+ const isSSE = contentType.includes("text/event-stream")
+ if (response.body) {
+ await consumeReadableStream(
+ response.body,
+ chunk => {
+ try {
+ if (isHosted) {
+ // ★変更(最小): Hosted かつ SSE の場合のみ、SSEイベントを厳密パース
+ if (isSSE) {
+ sseBuffer += chunk
+ const events = sseBuffer.split("\n\n")
+ sseBuffer = events.pop() || ""
+ let added = ""
+ for (const ev of events) {
+ const lines = ev.split("\n")
+ // event名を拾う(無い場合もある)
+ const eventLine = lines.find(l => l.startsWith("event:"))
+ const eventName = eventLine ? eventLine.slice(6).trim() : null
+ // data行をまとめる
+ const dataStr = lines
+ .filter(l => l.startsWith("data:"))
+ .map(l => l.slice(5).trim())
+ .join("\n")
+ if (!dataStr) continue
+ // ping などは描画しない
+ if (eventName === "ping") continue
+ // JSONとして解釈を試み、本文のみ抽出
+ let obj: any = null
+ try {
+ obj = JSON.parse(dataStr)
+ } catch {
+ // JSONでない data は描画しない
+ continue
+ }
+ // usage / model_provider などの巨大JSONは描画しない
+ if (obj?.usage || obj?.model_provider || obj?.execution_metadata) {
+ continue
+ }
+ // 本文: event: message の answer だけを描画
+ if ((eventName === "message" || eventName === null) && typeof obj?.answer === "string") {
+ added += obj.answer
+ continue
+ }
+ // message_end は会話IDなどのメタのみ(描画しない)
+ if (eventName === "message_end") {
+ continue
+ }
+ // その他イベントは描画しない
+ }
+ contentToAdd = added
+ } else {
+ // 既存ホステッド(OpenAI 等)はプレーンテキスト想定
+ contentToAdd = chunk
+ }
+ } else {
+ // Ollama: 1行1 JSON なので行ごとに content を抽出
+ contentToAdd = chunk
+ .trimEnd()
+ .split("\n")
+ .reduce((acc, line) => acc + JSON.parse(line).message.content, "")
+ }
+ } catch (error) {
+ console.error("Error parsing streamed chunk:", error)
+ contentToAdd = ""
+ }
+ // 文字が増えたときだけ UI を進める
+ if (contentToAdd) {
+ setFirstTokenReceived(true)
+ setToolInUse("none")
+ fullText += contentToAdd
+ }
setChatMessages(prev =>
prev.map(chatMessage => {
// --- 以下略 ---
4. ビルド&テスト&常駐化
ここまででいったん、ビルド&テストしてみましょう。
cd /opt/chatbot-ui
rm -rf .next
npm run build
npm run start -- -H 0.0.0.0 -p 3000
ここまでの設定で会話ができるようになるはず。
chat.example.com にブラウザでアクセスしてChatbotUIにログインします。
右上のモデル選択のところをクリックして、Difyで作ったカスタムフローを選択して「こんにちは」と送ってみましょう。無事に返事が返ってくれば完了です。

お疲れ様でした。
あとは、Difyのチャットフローを作りこめば、どんな設定であれば返事を返してくれます。
テストも終わりなので、ここからはPM2 を使って、常駐化させましょう。
PM2 を使って「サーバ再起動後も自動で立ち上がる」ようにします。
落ちたときの自動再起動やログ確認もワンコマンドでOKになります。
※前提:ChatbotUIのコードは /opt/chatbot-ui、Nginxは 3000番へプロキシ済み(第3章)。
●PM2をインストール
npm install -g pm2
pm2 -v # バージョンが出ればOK
●アプリを PM2 で起動(ビルド済み前提)
cd /opt/chatbot-ui
pm2 start "npm run start -- -H 0.0.0.0 -p 3000" --name chatbot-ui --cwd /opt/chatbot-ui --time
● サーバ再起動時に自動起動させる
# ① systemd にPM2の自動起動を登録(出力される“最後の1行”をコピペ実行)
pm2 startup
# ② 現在のPM2プロセス一覧を保存(「自動起動する対象」を記録)
pm2 save
これで、サーバ再起動後も 自動でChatbotUIが立ち上がるようになりました。
以下はご参考まで。
●参考①:よく使うPM2コマンド(控え)
pm2 ls # 一覧
pm2 logs chatbot-ui # リアルタイムログ
pm2 restart chatbot-ui # 再起動
pm2 stop chatbot-ui # 停止
pm2 delete chatbot-ui # PM2管理から削除
pm2 save # 自動起動に登録される“現在の一覧”を保存
●参考②:デプロイ更新の流れ(控え)
cd /opt/chatbot-ui
git pull
sudo npm i
npm run build
pm2 restart chatbot-ui
以下は付録です。必要に応じて設定してください。
6章では、ChatbotUIからDifyに添付ファイルを送れるようにします。ここでは、画像とテキストのみを扱います。
7章では、Difyで設定した初期入力フィールドをChatbotUIにも表示するように設定変更します。初期入力フィールドは短文、段落、選択、数値の4つにだけ対応し、単一ファイル、複数ファイルには対応しませんのでご了承ください。
🚀 まとめ
これで、Difyをバックエンドにした「認証つき・ストリーミング対応の安全なチャット環境」が完成です。
次章では、添付ファイルの送信や初期入力フィールドの設定など、より実践的な連携機能を追加していきます。
Discussion