Edge Runtime上でChat Completion API経由のストリーム形式のレスポンスを実装する
OpenAIのChat Completion APIでstream: true
に指定した時にSSEのレスポンスからコンテンツを逐次読み込みしてUIに反映させようとすると、意外に途中のパース処理に手間取ることが知られています。
JavaScript版openai-nodeモジュールのリポジトリにもissueがあるのですが、ユーザー間で試行錯誤しておりまだインターフェイスとして こなれていない印象です。
Cloudflare WorkersやVercel Edge FunctionのEdge Runtimeだとさらにややこしくて、openai-nodeはaxiosで作られたクライアントなのでそのまま動きません。
いろいろな回避策はあるのですが、今回は任意のモジュールを利用せずにAPIに直接リクエストを送信しつつストリーム形式のレスポンスに対応します。
Cloudflare Workers版
Cloudflare Workers環境ではブラウザのStreams APIを使うことができます。
fetch()レスポンスがReadableStreamなのでTransformStreamでストリーム内のデータのJSONをパースして1行づつ処理していきます。
TransformStreamでレスポンスを変換する過程でコンテンツを抜き出し、enqueue()
に詰め込むことでストリーム形式のままUIに応答します。
内部データがどういう形式になっているかは「ChatGPTをぬるぬるにする🐌Server-Sent Eventsの基礎知識」が参考になります。
export interface Env {
OPENAI_API_KEY: string;
}
export default {
async fetch(
request: Request,
env: Env,
ctx: ExecutionContext
): Promise<Response> {
const params = new URL(request.url).searchParams; // GET
// const params = await request.formData(); // POST
const query = params.get('query');
if (!query) {
return new Response('No query provided', {status: 400});
}
const apiEndpoint = 'https://api.openai.com/v1/chat/completions';
const headers = new Headers({
'accept': 'text/event-stream',
'authorization': `Bearer ${env.OPENAI_API_KEY}`,
'content-type': 'application/json'
});
const body = JSON.stringify({
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": query}],
"stream": true
});
const response = await fetch(apiEndpoint, {
method: 'POST',
headers,
body
});
if (response.body === null) {
throw new Error('No response body');
}
let buffer = '';
const stream = response.body.pipeThrough(new TransformStream({
transform(chunk, controller) {
const encoder = new TextEncoder();
const lines = new TextDecoder().decode(chunk).split('\n\n');
for (const line of lines) {
if (line === 'data: [DONE]') {
controller.enqueue(encoder.encode(`data: [DONE]\n\n`));
return;
}
const substr = line.replace(/^data: /, '');
if (substr.startsWith('{')) {
buffer = substr;
} else {
buffer += substr;
}
if (substr.length === 0 || !buffer.startsWith('{') || !buffer.endsWith('}')) {
continue;
}
try {
const parsedLine = JSON.parse(buffer);
const input = parsedLine.choices[0].delta.content;
if (input) {
controller.enqueue(encoder.encode(`data: ${input}\n\n`));
}
} catch (e) {
console.error('Parse Error: ' + e);
} finally {
buffer = '';
}
}
}
}));
return new Response(stream, {
headers: {
"Access-Control-Allow-Origin": "*",
"Content-Type": "text/event-stream;charset=utf-8"
},
});
},
};
npx wrangler secret put OPENAI_API_KEY
Google ChromeはSSEなレスポンスを返すURLを直接開いてストリームをダンプ表示してくれるのでnpx wrangler dev
してhttp://0.0.0.0:8787/?query=入力テキスト
すると以下のようにレスポンスを確認できます。
Vercel Edge Functions版
Cloudflare Workers版とOPENAI_API_KEYのenvの渡し方が違う程度で動きます。
export const config = {
runtime: 'edge',
};
export default async (request: Request) => {
const params = new URL(request.url).searchParams; // GET
// const params = await request.formData(); // POST
const query = params.get('query');
if (!query) {
return new Response('No query provided', { status: 400 });
}
const apiEndpoint = 'https://api.openai.com/v1/chat/completions';
const headers = new Headers({
'accept': 'text/event-stream',
'authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
'content-type': 'application/json'
});
const body = JSON.stringify({
"model": "gpt-3.5-turbo",
"messages": [{ "role": "user", "content": query }],
"stream": true
});
const response = await fetch(apiEndpoint, {
method: 'POST',
headers,
body
});
if (response.body === null) {
throw new Error('No response body');
}
let buffer = '';
const stream = response.body.pipeThrough(new TransformStream({
transform(chunk, controller) {
const encoder = new TextEncoder();
const lines = new TextDecoder().decode(chunk).split('\n\n');
for (const line of lines) {
if (line === 'data: [DONE]') {
controller.enqueue(encoder.encode(`data: [DONE]\n\n`));
return;
}
const substr = line.replace(/^data: /, '');
if (substr.startsWith('{')) {
buffer = substr;
} else {
buffer += substr;
}
if (substr.length === 0 || !buffer.startsWith('{') || !buffer.endsWith('}')) {
continue;
}
try {
const parsedLine = JSON.parse(buffer);
const input = parsedLine.choices[0].delta.content;
if (input) {
controller.enqueue(encoder.encode(`data: ${input}\n\n`));
}
} catch (e) {
console.error('Parse Error: ' + e);
} finally {
buffer = '';
}
}
}
}));
return new Response(stream, {
headers: {
"Access-Control-Allow-Origin": "*",
"Content-Type": "text/event-stream;charset=utf-8"
},
});
};
npx vercel env add OPENAI_API_KEY
UI側の実装(1): fetchしてwhileループ
Vercelのexample内に参考になるコードがあります。
以下ではレスポンスからReadableStreamを取得し、whileループでstateを上書きし続けます。
このコードはtwitterbio.comの元になっているようで、以下から動作を試せます。
UI側の実装(2): EventSource API
EventSourceが使えることに気付いたので試しました。
ただ「streamオプションを使う時はEventSourceではなくfetchを使うべき」にあるようにおすすめできません。
Azureチームが公開している@microsoft/fetch-event-sourceは上記問題に対する解決をしているようですが私は未検証です。
<html>
<script>
function send() {
const query = document.querySelector("input").value;
const withCredentials = new URL(document.URL).protocol === "https";
const eventSource = new EventSource(`/api/chat?query=` + encodeURIComponent(query), {withCredentials});
const eventList = document.querySelector("ul");
eventSource.onmessage = (ev) => {
const element = document.querySelector("#result");
element.textContent += `${ev.data}`;
};
eventSource.onopen = console.log
eventSource.onerror = (ev, a, b) => {
console.error(ev)
eventSource.close()
}
}
</script>
<body>
<input type="text" value="ポエム書いて"/>
<button onclick="send()">Send</button>
<p id="result"></p>
</body>
</html>
Chrome DevToolsはEventStreamレスポンスを表示するビューがあり以下のように確認できます。
注意点としてはEvent stream formatにある仕様どうりdata: ...
とフィールドを入れて返す必要があります。
トラブルシューテング: パース処理がうまくいかない
vercel/examplesのリポジトリではeventsource-parserというモジュールを使ってこの部分を実装しています。
このモジュールの評価ができてないので今回は採用しませんでしたが、うまくいかない場合は試してみてください。
Discussion