🌠

stream-json で ChatGPT の JSON を徐々に読む!

2024/05/10に公開

はじめに

ChatGPT の stream モードは試したことがありますか? AsyncIterable や ReadableStream として出力が得られるので、出力がテキストの場合は ChatGPT の UI のような連続でテキストを表示させるという体験をすぐに作れます。

ですが、出力を JSON で受け取りたい場合もあります。ChatGPT API は response_format: { type: "json_object" } にすることで JSON で出力を受け取ることができますが、stream にした際に json が一文字ずつやってきて厳しい状態になります。

ChatGPT による streaming json response をしている動画
厳しい JSON が帰ってきている様子

そこで今回は stream-json というライブラリを使用して、stream で出力される JSON をパースして扱う方法を紹介します。

要素ごとにテキストが取り出せている様子
stream-json で嬉しい JSON になった

stream-json とは

https://www.npmjs.com/package/stream-json

stream-json は、stream で JSON をパースするためのライブラリです。サイズの大きい json を streaming で parse するために開発されたようです。node:stream に対応しているので、Node.js での使用が前提ですが、Web の ReadableStream も node:stream に簡単に変換することができます。

Cloudflare Worker でも nodejs_compat: true すると node:stream が使えるそうなので、Worker でも使えるかもしれません。

https://developers.cloudflare.com/workers/runtime-apis/nodejs/streams/

試しに tsconfig.json を stream で parse してみます。json を split で一文字ずつ stream に流していきます。tsconfig の include を array として取り出してみます。

import { parser } from "stream-json";
import { readFile } from "node:fs/promises";
import { ReadableStream } from "stream/web";
import { Readable } from "node:stream";
import { pick } from "stream-json/filters/Pick";
import { streamArray } from "stream-json/streamers/StreamArray";

const main = async () => {
  const file = await readFile("./tsconfig.json", "utf-8");
  // ReadableStream に変換
  const stream = new ReadableStream({
    start(controller) {
      for (const str of file.split("")) {
        controller.enqueue(str);
      }

      controller.close();
    },
  });
  // ReadableStream を node:stream に変換
  const readable = Readable.fromWeb(stream);
  readable
    .pipe(parser())
    .pipe(pick({ filter: "include" }))
    .pipe(streamArray())
    .addListener("data", (data) => {
      console.log(data);
    });
};

void main();

このような形で配列の要素を組み立てられた順に取り出せます。

> { key: 0, value: 'src' }
> { key: 1, value: 'types' }
> { key: 2, value: 'scripts' }
> { key: 3, value: 'drizzle' }

では次は Object を取り出してみます。

import { parser } from "stream-json";
import { readFile } from "node:fs/promises";
import { ReadableStream } from "stream/web";
import { Readable } from "node:stream";
import { pick } from "stream-json/filters/Pick";
import { streamObject } from "stream-json/streamers/StreamObject";

const main = async () => {
  const file = await readFile("./tsconfig.json", "utf-8");
  const stream = new ReadableStream({
    start(controller) {
      for (const str of file.split("")) {
        controller.enqueue(str);
      }

      controller.close();
    },
  });
  const readable = Readable.fromWeb(stream);
  readable
    .pipe(parser())
    .pipe(pick({ filter: "compilerOptions.paths" }))
    .pipe(streamObject())
    .addListener("data", (data) => {
      console.log(data);
    });
};

void main();

compilerOptions.paths の内容が順番に取り出されます。

> { key: '@adapters/*', value: [ './src/adapters/*' ] }
> { key: '@web/*', value: [ './src/web/*' ] }
> { key: '@discord/*', value: [ './src/discord/*' ] }

streamArray の注意点

streamObject は key が json の key ですが、 streamArray は key は index です。そのため pick で複数の array field に対してパースを行うとどこの要素が取り出されたのかわからなくなります。

readable
  .pipe(parser())
  .pipe(pick({ filter: /include|compilerOptions.types/ }))
  .pipe(streamArray())
  .addListener("data", (data) => {
    console.log(data);
  });
> { key: 0, value: 'vitest/globals' }
> { key: 1, value: 'src' }
> { key: 2, value: 'types' }
> { key: 3, value: 'scripts' }
> { key: 4, value: 'drizzle' }

こんなときは pick する key 毎に parse するのが良いでしょう。僕は rxjs の Observable にして key をつけたうえで merge して扱っています。
こういう streaming の処理をするときは rxjs のようなオペレーターが多数あるものを使うと操作が楽になるように思えます。

const readable = Readable.fromWeb(stream);

const readableWithParser = readable.pipe(parser());

const include$ = new Observable<{ key: number; value: string }>((subscriber) => {
  const pipeline = readableWithParser
    .pipe(pick({ filter: "include" }))
    .pipe(streamArray())
    .addListener("data", (data) => {
      subscriber.next(data);
    })
    .addListener("error", (error) => {
      subscriber.error(error);
    })
    .addListener("end", () => {
      subscriber.complete();
    });

  return () => {
    pipeline.removeAllListeners();
  };
});

// 中略

merge(
  include$.pipe(map((x) => ({ key: "include", value: x.value }))),
  types$.pipe(map((x) => ({ key: "types", value: x.value }))),
).subscribe({
  next(data) {
    console.log(data);
  },
  error(error) {
    console.error(error);
  },
  complete() {
    console.log("done");
  },
});
> { key: 'types', value: 'vitest/globals' }
> { key: 'include', value: 'src' }
> { key: 'include', value: 'types' }
> { key: 'include', value: 'scripts' }
> { key: 'include', value: 'drizzle' }

ChatGPT の stream を stream-json でパースする

ある程度 stream-json のできることがわかったところで、ChatGPT の stream をパースしてみます。stream: true にしたときは chunk.choices[0]?.delta.content にコンテンツがあり、これをつなげていくことで JSON になります。これを stream-json でパースして message を順番に取り出してみます。

ChatGPT による streaming json response をしている動画

import OpenAI from "openai";

const openai = new OpenAI({
  apiKey: process.env.OPENAI_KEY,
});

const prompt = `
  ## 返答ルール

  - 返答は JSON で "message" を配列で返す
  \`\`\`json
  {
    "message": [
      "おはよう",
      " 今日も一日がんばろうね。",
      "何か手伝えることあるかな?"
    ]
  }
  \`\`\`
`;

const main = async () => {
  const stream = await openai.chat.completions.create({
    model: "gpt-3.5-turbo",
    messages: [
      {
        role: "system",
        content: prompt,
      },
      {
        role: "user",
        content: "json を stream でパースするよ!どうしたらいい?",
      },
    ],
    response_format: { type: "json_object" },
    stream: true,
  });

  for await (const chunk of stream) {
    process.stdout.write(chunk.choices[0]?.delta.content ?? "");
  }
};

void main();

GPT が返す stream のままでは stream-json がパースできないので加工します。ReadableStream に変換して一文字ずつ流すようにします。

const webStream = new ReadableStream({
  async start(controller) {
    for await (const chunk of stream) {
      controller.enqueue(chunk.choices[0]?.delta.content);
    }
    controller.close();
  },
});

const readable = Readable.fromWeb(webStream);

あとはこの readablestream-json でパースしていきます。

readable
  .pipe(parser())
  .pipe(pick({ filter: "message" }))
  .pipe(streamArray())
  .addListener("data", (data) => {
    console.log(data);
  });

要素ごとにテキストが取り出せている様子

コード全文

import OpenAI from "openai";
import { Readable } from "stream";
import { parser } from "stream-json";
import { pick } from "stream-json/filters/Pick";
import { streamArray } from "stream-json/streamers/StreamArray";
import { ReadableStream } from "stream/web";

const openai = new OpenAI({
  apiKey: process.env.OPENAI_KEY,
});

const prompt = `
  ## 返答ルール

  - 返答は JSON で "message" を配列で返す
  \`\`\`json
  {
    "message": [
      "おはよう",
      " 今日も一日がんばろうね。",
      "何か手伝えることあるかな?"
    ]
  }
  \`\`\`
`;

const main = async () => {
  const stream = await openai.chat.completions.create({
    model: "gpt-3.5-turbo",
    messages: [
      {
        role: "system",
        content: prompt,
      },
      {
        role: "user",
        content: "json を stream でパースするよ!どうしたらいい?",
      },
    ],
    response_format: { type: "json_object" },
    stream: true,
  });

  const webStream = new ReadableStream({
    async start(controller) {
      for await (const chunk of stream) {
        controller.enqueue(chunk.choices[0]?.delta.content);
      }
      controller.close();
    },
  });

  const readable = Readable.fromWeb(webStream);
  readable
    .pipe(parser())
    .pipe(pick({ filter: "message" }))
    .pipe(streamArray())
    .addListener("data", (data) => {
      console.log(data);
    });
};

void main();

まとめ

stream-json を使うことで stream で出力される JSON をパースすることができました。
Cloudflare Worker でも node:stream が使えるので、Worker でも使えるかもしれません。これは試したいですね。

discord bot を作るうえで最初のレスポンスが速くしたかったので Stream な JSON と戦ってみました。

次は function_calling でも stream を試してみたいですね。

おわり!

Discussion