stream-json で ChatGPT の JSON を徐々に読む!
はじめに
ChatGPT の stream モードは試したことがありますか? AsyncIterable や ReadableStream として出力が得られるので、出力がテキストの場合は ChatGPT の UI のような連続でテキストを表示させるという体験をすぐに作れます。
ですが、出力を JSON で受け取りたい場合もあります。ChatGPT API は response_format: { type: "json_object" }
にすることで JSON で出力を受け取ることができますが、stream にした際に json が一文字ずつやってきて厳しい状態になります。
厳しい JSON が帰ってきている様子
そこで今回は stream-json
というライブラリを使用して、stream で出力される JSON をパースして扱う方法を紹介します。
stream-json
で嬉しい JSON になった
stream-json とは
stream-json は、stream で JSON をパースするためのライブラリです。サイズの大きい json を streaming で parse するために開発されたようです。node:stream
に対応しているので、Node.js での使用が前提ですが、Web の ReadableStream も node:stream
に簡単に変換することができます。
Cloudflare Worker でも nodejs_compat: true
すると node:stream
が使えるそうなので、Worker でも使えるかもしれません。
試しに tsconfig.json
を stream で parse してみます。json を split で一文字ずつ stream に流していきます。tsconfig の include を array として取り出してみます。
import { parser } from "stream-json";
import { readFile } from "node:fs/promises";
import { ReadableStream } from "stream/web";
import { Readable } from "node:stream";
import { pick } from "stream-json/filters/Pick";
import { streamArray } from "stream-json/streamers/StreamArray";
const main = async () => {
const file = await readFile("./tsconfig.json", "utf-8");
// ReadableStream に変換
const stream = new ReadableStream({
start(controller) {
for (const str of file.split("")) {
controller.enqueue(str);
}
controller.close();
},
});
// ReadableStream を node:stream に変換
const readable = Readable.fromWeb(stream);
readable
.pipe(parser())
.pipe(pick({ filter: "include" }))
.pipe(streamArray())
.addListener("data", (data) => {
console.log(data);
});
};
void main();
このような形で配列の要素を組み立てられた順に取り出せます。
> { key: 0, value: 'src' }
> { key: 1, value: 'types' }
> { key: 2, value: 'scripts' }
> { key: 3, value: 'drizzle' }
では次は Object を取り出してみます。
import { parser } from "stream-json";
import { readFile } from "node:fs/promises";
import { ReadableStream } from "stream/web";
import { Readable } from "node:stream";
import { pick } from "stream-json/filters/Pick";
import { streamObject } from "stream-json/streamers/StreamObject";
const main = async () => {
const file = await readFile("./tsconfig.json", "utf-8");
const stream = new ReadableStream({
start(controller) {
for (const str of file.split("")) {
controller.enqueue(str);
}
controller.close();
},
});
const readable = Readable.fromWeb(stream);
readable
.pipe(parser())
.pipe(pick({ filter: "compilerOptions.paths" }))
.pipe(streamObject())
.addListener("data", (data) => {
console.log(data);
});
};
void main();
compilerOptions.paths
の内容が順番に取り出されます。
> { key: '@adapters/*', value: [ './src/adapters/*' ] }
> { key: '@web/*', value: [ './src/web/*' ] }
> { key: '@discord/*', value: [ './src/discord/*' ] }
streamArray
の注意点
streamObject
は key が json の key ですが、 streamArray
は key は index です。そのため pick で複数の array field に対してパースを行うとどこの要素が取り出されたのかわからなくなります。
readable
.pipe(parser())
.pipe(pick({ filter: /include|compilerOptions.types/ }))
.pipe(streamArray())
.addListener("data", (data) => {
console.log(data);
});
> { key: 0, value: 'vitest/globals' }
> { key: 1, value: 'src' }
> { key: 2, value: 'types' }
> { key: 3, value: 'scripts' }
> { key: 4, value: 'drizzle' }
こんなときは pick する key 毎に parse するのが良いでしょう。僕は rxjs
の Observable にして key をつけたうえで merge して扱っています。
こういう streaming の処理をするときは rxjs
のようなオペレーターが多数あるものを使うと操作が楽になるように思えます。
const readable = Readable.fromWeb(stream);
const readableWithParser = readable.pipe(parser());
const include$ = new Observable<{ key: number; value: string }>((subscriber) => {
const pipeline = readableWithParser
.pipe(pick({ filter: "include" }))
.pipe(streamArray())
.addListener("data", (data) => {
subscriber.next(data);
})
.addListener("error", (error) => {
subscriber.error(error);
})
.addListener("end", () => {
subscriber.complete();
});
return () => {
pipeline.removeAllListeners();
};
});
// 中略
merge(
include$.pipe(map((x) => ({ key: "include", value: x.value }))),
types$.pipe(map((x) => ({ key: "types", value: x.value }))),
).subscribe({
next(data) {
console.log(data);
},
error(error) {
console.error(error);
},
complete() {
console.log("done");
},
});
> { key: 'types', value: 'vitest/globals' }
> { key: 'include', value: 'src' }
> { key: 'include', value: 'types' }
> { key: 'include', value: 'scripts' }
> { key: 'include', value: 'drizzle' }
ChatGPT の stream を stream-json でパースする
ある程度 stream-json
のできることがわかったところで、ChatGPT の stream をパースしてみます。stream: true
にしたときは chunk.choices[0]?.delta.content
にコンテンツがあり、これをつなげていくことで JSON になります。これを stream-json
でパースして message を順番に取り出してみます。
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_KEY,
});
const prompt = `
## 返答ルール
- 返答は JSON で "message" を配列で返す
\`\`\`json
{
"message": [
"おはよう",
" 今日も一日がんばろうね。",
"何か手伝えることあるかな?"
]
}
\`\`\`
`;
const main = async () => {
const stream = await openai.chat.completions.create({
model: "gpt-3.5-turbo",
messages: [
{
role: "system",
content: prompt,
},
{
role: "user",
content: "json を stream でパースするよ!どうしたらいい?",
},
],
response_format: { type: "json_object" },
stream: true,
});
for await (const chunk of stream) {
process.stdout.write(chunk.choices[0]?.delta.content ?? "");
}
};
void main();
GPT が返す stream のままでは stream-json
がパースできないので加工します。ReadableStream
に変換して一文字ずつ流すようにします。
const webStream = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
controller.enqueue(chunk.choices[0]?.delta.content);
}
controller.close();
},
});
const readable = Readable.fromWeb(webStream);
あとはこの readable
を stream-json
でパースしていきます。
readable
.pipe(parser())
.pipe(pick({ filter: "message" }))
.pipe(streamArray())
.addListener("data", (data) => {
console.log(data);
});
コード全文
import OpenAI from "openai";
import { Readable } from "stream";
import { parser } from "stream-json";
import { pick } from "stream-json/filters/Pick";
import { streamArray } from "stream-json/streamers/StreamArray";
import { ReadableStream } from "stream/web";
const openai = new OpenAI({
apiKey: process.env.OPENAI_KEY,
});
const prompt = `
## 返答ルール
- 返答は JSON で "message" を配列で返す
\`\`\`json
{
"message": [
"おはよう",
" 今日も一日がんばろうね。",
"何か手伝えることあるかな?"
]
}
\`\`\`
`;
const main = async () => {
const stream = await openai.chat.completions.create({
model: "gpt-3.5-turbo",
messages: [
{
role: "system",
content: prompt,
},
{
role: "user",
content: "json を stream でパースするよ!どうしたらいい?",
},
],
response_format: { type: "json_object" },
stream: true,
});
const webStream = new ReadableStream({
async start(controller) {
for await (const chunk of stream) {
controller.enqueue(chunk.choices[0]?.delta.content);
}
controller.close();
},
});
const readable = Readable.fromWeb(webStream);
readable
.pipe(parser())
.pipe(pick({ filter: "message" }))
.pipe(streamArray())
.addListener("data", (data) => {
console.log(data);
});
};
void main();
まとめ
stream-json
を使うことで stream で出力される JSON をパースすることができました。
Cloudflare Worker でも node:stream
が使えるので、Worker でも使えるかもしれません。これは試したいですね。
discord bot を作るうえで最初のレスポンスが速くしたかったので Stream な JSON と戦ってみました。
次は function_calling でも stream を試してみたいですね。
おわり!
Discussion