🕌

Edge Runtime上でChat Completion API経由のストリーム形式のレスポンスを実装する

2023/05/07に公開

OpenAIのChat Completion APIstream: trueに指定した時にSSEのレスポンスからコンテンツを逐次読み込みしてUIに反映させようとすると、意外に途中のパース処理に手間取ることが知られています。

JavaScript版openai-nodeモジュールのリポジトリにもissueがあるのですが、ユーザー間で試行錯誤しておりまだインターフェイスとして こなれていない印象です。

https://github.com/openai/openai-node/issues/18

Cloudflare WorkersやVercel Edge FunctionのEdge Runtimeだとさらにややこしくて、openai-nodeはaxiosで作られたクライアントなのでそのまま動きません。

いろいろな回避策はあるのですが、今回は任意のモジュールを利用せずにAPIに直接リクエストを送信しつつストリーム形式のレスポンスに対応します。

Cloudflare Workers版

Cloudflare Workers環境ではブラウザのStreams APIを使うことができます。

https://developers.cloudflare.com/workers/learning/using-streams/

fetch()レスポンスがReadableStreamなのでTransformStreamでストリーム内のデータのJSONをパースして1行づつ処理していきます。

TransformStreamでレスポンスを変換する過程でコンテンツを抜き出し、enqueue()に詰め込むことでストリーム形式のままUIに応答します。

内部データがどういう形式になっているかは「ChatGPTをぬるぬるにする🐌Server-Sent Eventsの基礎知識」が参考になります。

index.ts
export interface Env {
    OPENAI_API_KEY: string;
}

export default {
    async fetch(
        request: Request,
        env: Env,
        ctx: ExecutionContext
    ): Promise<Response> {
        const params = new URL(request.url).searchParams; // GET
        // const params = await request.formData(); // POST
        const query = params.get('query');

        if (!query) {
            return new Response('No query provided', {status: 400});
        }

        const apiEndpoint = 'https://api.openai.com/v1/chat/completions';
        const headers = new Headers({
            'accept': 'text/event-stream',
            'authorization': `Bearer ${env.OPENAI_API_KEY}`,
            'content-type': 'application/json'
        });

        const body = JSON.stringify({
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": query}],
            "stream": true
        });

        const response = await fetch(apiEndpoint, {
            method: 'POST',
            headers,
            body
        });

        if (response.body === null) {
            throw new Error('No response body');
        }

        let buffer = '';
        const stream = response.body.pipeThrough(new TransformStream({
            transform(chunk, controller) {
                const encoder = new TextEncoder();
                const lines = new TextDecoder().decode(chunk).split('\n\n');
                for (const line of lines) {
                    if (line === 'data: [DONE]') {
                        controller.enqueue(encoder.encode(`data: [DONE]\n\n`));
                        return;
                    }

                    const substr = line.replace(/^data: /, '');
                    if (substr.startsWith('{')) {
                        buffer = substr;
                    } else {
                        buffer += substr;
                    }

                    if (substr.length === 0 || !buffer.startsWith('{') || !buffer.endsWith('}')) {
                        continue;
                    }

                    try {
                        const parsedLine = JSON.parse(buffer);
                        const input = parsedLine.choices[0].delta.content;
                        if (input) {
                            controller.enqueue(encoder.encode(`data: ${input}\n\n`));
                        }
                    } catch (e) {
                        console.error('Parse Error: ' + e);
                    } finally {
                        buffer = '';
                    }
                }
            }
        }));
        return new Response(stream, {
            headers: {
                "Access-Control-Allow-Origin": "*",
                "Content-Type": "text/event-stream;charset=utf-8"
            },
        });
    },
};

npx wrangler secret put OPENAI_API_KEY

Google ChromeはSSEなレスポンスを返すURLを直接開いてストリームをダンプ表示してくれるのでnpx wrangler devしてhttp://0.0.0.0:8787/?query=入力テキストすると以下のようにレスポンスを確認できます。

Vercel Edge Functions版

Cloudflare Workers版とOPENAI_API_KEYのenvの渡し方が違う程度で動きます。

api/chat.ts
export const config = {
    runtime: 'edge',
};

export default async (request: Request) => {
    const params = new URL(request.url).searchParams; // GET
    // const params = await request.formData(); // POST
    const query = params.get('query');

    if (!query) {
        return new Response('No query provided', { status: 400 });
    }

    const apiEndpoint = 'https://api.openai.com/v1/chat/completions';
    const headers = new Headers({
        'accept': 'text/event-stream',
        'authorization': `Bearer ${process.env.OPENAI_API_KEY}`,
        'content-type': 'application/json'
    });

    const body = JSON.stringify({
        "model": "gpt-3.5-turbo",
        "messages": [{ "role": "user", "content": query }],
        "stream": true
    });

    const response = await fetch(apiEndpoint, {
        method: 'POST',
        headers,
        body
    });

    if (response.body === null) {
        throw new Error('No response body');
    }

    let buffer = '';
    const stream = response.body.pipeThrough(new TransformStream({
        transform(chunk, controller) {
            const encoder = new TextEncoder();
                const lines = new TextDecoder().decode(chunk).split('\n\n');
                for (const line of lines) {
                    if (line === 'data: [DONE]') {
                        controller.enqueue(encoder.encode(`data: [DONE]\n\n`));
                        return;
                    }

                    const substr = line.replace(/^data: /, '');
                    if (substr.startsWith('{')) {
                        buffer = substr;
                    } else {
                        buffer += substr;
                    }

                    if (substr.length === 0 || !buffer.startsWith('{') || !buffer.endsWith('}')) {
                        continue;
                    }

                    try {
                        const parsedLine = JSON.parse(buffer);
                        const input = parsedLine.choices[0].delta.content;
                        if (input) {
                            controller.enqueue(encoder.encode(`data: ${input}\n\n`));
                        }
                    } catch (e) {
                        console.error('Parse Error: ' + e);
                    } finally {
                        buffer = '';
                    }
                }
        }
    }));
    return new Response(stream, {
        headers: {
            "Access-Control-Allow-Origin": "*",
            "Content-Type": "text/event-stream;charset=utf-8"
        },
    });
};
npx vercel env add OPENAI_API_KEY

UI側の実装(1): fetchしてwhileループ

Vercelのexample内に参考になるコードがあります。

以下ではレスポンスからReadableStreamを取得し、whileループでstateを上書きし続けます。

https://github.com/vercel/examples/blob/de90db0fb90e873dc5b94c3747b04ce1d934cbab/solutions/ai-chatgpt/components/Chat.tsx#L71-L113

このコードはtwitterbio.comの元になっているようで、以下から動作を試せます。

https://www.twitterbio.com/

UI側の実装(2): EventSource API

EventSourceが使えることに気付いたので試しました。

ただ「streamオプションを使う時はEventSourceではなくfetchを使うべき」にあるようにおすすめできません。

Azureチームが公開している@microsoft/fetch-event-sourceは上記問題に対する解決をしているようですが私は未検証です。

https://github.com/Azure/fetch-event-source

index.html
<html>
<script>
  function send() {
    const query = document.querySelector("input").value;
    const withCredentials = new URL(document.URL).protocol === "https";
    const eventSource = new EventSource(`/api/chat?query=` + encodeURIComponent(query), {withCredentials});
    const eventList = document.querySelector("ul");

    eventSource.onmessage = (ev) => {
      const element = document.querySelector("#result");
      element.textContent += `${ev.data}`;
    };

    eventSource.onopen = console.log
    eventSource.onerror = (ev, a, b) => {
      console.error(ev)
      eventSource.close()
    }
  }
</script>
<body>
<input type="text" value="ポエム書いて"/>
<button onclick="send()">Send</button>
<p id="result"></p>
</body>
</html>

Chrome DevToolsはEventStreamレスポンスを表示するビューがあり以下のように確認できます。

注意点としてはEvent stream formatにある仕様どうりdata: ...とフィールドを入れて返す必要があります。

トラブルシューテング: パース処理がうまくいかない

vercel/examplesのリポジトリではeventsource-parserというモジュールを使ってこの部分を実装しています。

https://github.com/vercel/examples/blob/962b82fa611c54366469d673184efd4caa807b5d/solutions/ai-chatgpt/utils/OpenAIStream.ts#L52-L76

このモジュールの評価ができてないので今回は採用しませんでしたが、うまくいかない場合は試してみてください。

Discussion