OpenAI Libraryと読み解くServer-Sent Events (SSE) の仕様
こんにちは、株式会社AI Shiftの@KEY60228です。
この記事はAI Shift Advent Calendar 2024の8日目の記事です。
Server-Sent Eventsのクライアントを自前で実装しようとして詰まったので、その時に調べた仕様をOpenAIのLibrary (openai/openai-node)とともにまとめてみます。
Server-Sent Events (SSE) とは
Server-Sent Events (SSE) はサーバーからクライアントへデータをプッシュ配信するための技術です。
2009年にはW3Cがドラフトを公開しており、現在ではWHATWGのHTML Living Standardに統合されています。
上記の通り、SSE自体は新しい技術ではありませんが、ChatGPTを始めとしたここ数年のLLMの普及によって再度注目を集めている印象を受けます。
AI ShiftでもこのSSEを活用し、AI Worker[1]を開発しています。
詰まったポイント
AI Workerでは、バックエンドで各LLMへの問い合わせ、レスポンスのノーマライズ、結果のDB格納やログ出力を行っています。
バックエンド-フロントエンド間ではSSEを用いてユーザー体験の向上を図っていますが、その際に詰まったポイントを擬似的なコードで紹介します。
// LLMへの問合せ
const response = await fetch(url)
const handleStream = (stream: SSEStreamApi) => {
let sequence = 0
while (true) {
// ...
// データをSSEでpush
stream.write(JSON.stringify({
data: {
rawContent: text,
sequenceNumber: sequence++,
generatedAt: new Date().toISOString(),
},
}))
// ...
}
}
return streamSSE(context, handleStream)
// サーバーへの問合せ
const response = await fetch(url)
// デコード、チャンク分割
const decodedStream = response.body.pipeThrough(
new TextDecoderStream()
).pipeThrough(
new TransformStream({
transform(chunk, controller) {
const chunks = chunk.split('data: ').filter((str) => str.trim())
for (const chunk of chunks) {
controller.enqueue(chunk)
}
},
})
)
const reader = decodedStream.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
// JSONパース
const res = JSON.parse(value)
// 画面表示
onMessage(res)
}
起きたこと
まれにフロントエンドでJSON.parseに失敗してしまうケースが発生しました。
適切な単位でwriteしていて、ちゃんとdecodeしているはずなのに何故…と思いログデバッグをしてみたところ、1つのデータが複数のchunkに分割されて配信されているケースがあることが分かりました。
数字は気にしないでください (chunkのbyte数です)
そこで正しい実装はどうなっているべきなのか、HTML Living StandardをOpenAIのLibraryとともに読み解いてみました。
Server-Sent Eventsの仕様
イベントストリームの解釈
まずイベントストリームの形式は9.2.5 Parsing an event streamにて以下のABNFとして定義されています。
すなわち、ストリームは複数のイベントから構成されており、1つのイベントは複数のコメント (コロンから始まる1行) あるいはフィールド (コロンとスペースで区切られたキーと値のペア) から構成され、各イベントは改行 (CRLF/CR/LF) で区切られています。
また、これらのストリームをどう解釈すべきかについては9.2.6 Event stream interpretationにて定義されています。
The stream must then be parsed by reading everything line by line, with a U+000D CARRIAGE RETURN U+000A LINE FEED (CRLF) character pair, a single U+000A LINE FEED (LF) character not preceded by a U+000D CARRIAGE RETURN (CR) character, and a single U+000D CARRIAGE RETURN (CR) character not followed by a U+000A LINE FEED (LF) character being the ways in which a line can end.
まず大前提として、ストリームは行ごとに解釈されるべきです。
その点、処理するチャンクがSSEにおける"行"になることが担保されていない以上、上で記述したような擬似コードは正しい実装ではありませんでした。
一方、openai/openai-node では以下の実装でストリームのハンドリングを行っています。
iterSSEChunks
ジェネレータ関数の中で、上流で受け取ったチャンクを2つの連続する改行で分割しながらイテレータリザルトとして返却しているのが分かります。
また、分割して中途半端に残ってしまったチャンクは data
変数に格納され、次のイテレーションで次のチャンクと連結された上で処理されています。
これらの処理によって、後続の処理では常に複数の完結した"行"を1つのイベントとして扱えることが担保されています。
イベントの解釈
上述した擬似コードでは、1つのイベントは data
フィールドのみを持つ1行のデータとして扱っていましたが、SSEの仕様では data
フィールド以外にも id
, event
, retry
フィールドやコメントを含めることが可能です。
イベントの解釈については以下のように定義されており、
フィールドの解釈については以下のように定義されています。
これらの実装を openai/openai-node では SSEDecoder
が担っています。
When a stream is parsed, a data buffer, an event type buffer, and a last event ID buffer must be associated with it. They must be initialized to the empty string.
この data buffer
event type buffer
の実装がそれぞれ data
フィールドと event
フィールドにあたります。 (Last-Event-ID
[2] に関しては openai/openai-node ではハンドリングしていないようでした)
If the line is empty (a blank line)
└ Dispatch the event, as defined below.
When the user agent is required to dispatch the event, the user agent must process the data buffer, the event type buffer, and the last event ID buffer using steps appropriate for the user agent.
行が空の場合は data buffer
event type buffer
(last event ID buffer
) の内容を配信、decode
関数でいうところの return
を行います。
If the line starts with a U+003A COLON character (:)
└ Ignore the line.
行がコロンから始まる場合はコメント行のため、無視します。
If the line contains a U+003A COLON character (:)
└ Collect the characters on the line before the first U+003A COLON character (:), and let field be that string.
└ Collect the characters on the line after the first U+003A COLON character (:), and let value be that string. If value starts with a U+0020 SPACE character, remove it from value.
└ Process the field using the steps described below, using field as the field name and value as the field value.
Otherwise, the string is not empty but does not contain a U+003A COLON character (:)
└ Process the field using the steps described below, using the whole line as the field name, and the empty string as the field value.
それ以外の場合はフィールド行として解釈します。
コロンが含まれる場合はコロンで分割してフィールド名と値をそれぞれ取得し、コロンが含まれない場合は行全体をフィールド名として扱います。
この処理は partition
関数で行っています。
If the field name is "event"
└ Set the event type buffer to field value.
If the field name is "data"
└ Append the field value to the data buffer, then append a single U+000A LINE FEED (LF) character to the data buffer.
If the field name is "id"
└ If the field value does not contain U+0000 NULL, then set the last event ID buffer to the field value. Otherwise, ignore the field.
If the field name is "retry"
└ If the field value consists of only ASCII digits, then interpret the field value as an integer in base ten, and set the event stream's reconnection time to that integer. Otherwise, ignore the field.
Otherwise
└ The field is ignored.
その後、取得したフィールド名に応じて処理を行います。
SSEの仕様上は id
や retry
フィールドを受け付けていますが、 openai/openai-node では data
と event
フィールドのハンドリングのみ実装しています。
1つのイベントの中で event
フィールドは1つのみ、 data
フィールドは複数存在することが可能なことも実装から確認できます。
以上がSSEにおけるイベントの配信と解釈の仕様です。
所感
SSEに触れたのが今回が初めてで、Honoを疑ってみたりNode.jsを疑ってみたりReadableStreamを疑ってみたりとかなり遠回りしてしまいましたが、ちゃんと仕様を読めば腑に落ちる結果になりました。
また、openai/openai-node が独自仕様ではなくSSEの仕様に (ほぼ) 忠実に従っていたことは少し意外でしたが、良い具体例となり理解が捗りました。
OpenAIのコミュニティでも同様の疑問が多く挙がっていた[3][4][5]ので、この記事が少しでも誰かの役に立てば幸いです。
最後に
AI Shiftではエンジニアの採用に力を入れています!
少しでも興味を持っていただけましたら、カジュアル面談でお話しませんか?
(オンライン・19時以降の面談も可能です!)
【面談フォームはこちら】
-
https://html.spec.whatwg.org/multipage/server-sent-events.html#the-last-event-id-header ↩︎
-
https://community.openai.com/t/high-rate-of-invalid-json-response-when-streaming-response/279977 ↩︎
-
https://community.openai.com/t/parsing-json-stream-response-in-nodejs/325366 ↩︎
-
https://community.openai.com/t/malformed-streaming-answers-from-gpt-4-completions-api-lately/481686 ↩︎
Discussion