🦊

ChatGPTのAPIを使ってパタパタ表現を実現する

2023/06/24に公開

はじめに

ChatGPTを使用していると以下のようなパタパタと順次テキストが出てくる表現があります。これは、Sever-Sent-Event(以下SSEと略称)というWeb技術が使用されています。

https://developer.mozilla.org/ja/docs/Web/API/Server-sent_events/Using_server-sent_events

ChatGPTのAPIを使う際、デフォルトでは、リクエストに対して、応答が全て取得されたタイミングでレスポンスが返ってくるようになっています。単純にAPIを叩くだけの実装の場合、以下のように全文が返ってくるまでに、待ち時間が発生します。

ローディングアニメーション等で対処しても良いですが、できれば本家ChatGPTのUIのように順次パタパタ表示されるstreamに対応したUIを実現したいですよね。

本記事では、パタパタUIを実現するためのSSEの概要とReactでの実装例を紹介したいと思います。

ServerSentEventとは?

サーバーとクライアント間のリアルタイム通信というとWebSocketが知られていると思いますが、SSEは双方向通信はせず、サーバーからクライアントにのみリアルタイムでイベントを送信することが可能になります。1回繋いでやり取りして、終わりというものでなく、コネクションは明示的に閉じない限り、張られ続け、サーバーから何度もレスポンスを返すことが可能です。

SSEは、HTTP上で動作し、Content-Typeはtext/event-streamで通信を行います。一方で、WebSocketは専用のプロトコルを使う必要があります。SSEの方が標準のHTTPプロトコルをそのまま使うことができるので、通信の互換性が高く、より簡単に扱える技術だと思います。

今回紹介するようなChatGPTが生成するトークン単位で順次表示するようなサーバーからのデータストリームのみであれば、SSEを選択する方がベターな印象です。(自動再接続もサポートされているのも嬉しい)

実装例

※ ChatGPT APIのkeyを取得する方法などは割愛します。

以下はReactのプロジェクトでの実装サンプルです。

注意点として、実際のプロダクトでChatGPT APIを使う際は、クライアントにAPI Keyを持たせず、ちゃんとサーバー側に持たせるようにしましょうね。(クライアントに埋め込むと、簡単にAPI Keyは知ることができます。)

UI層では、質問内容をローカルステートに持ち、submitMessageメソッドに渡します。

import React, { useState } from 'react'

import { useChat } from './useChat'

export const SampleChatGpt = () => {
  const [question, setQuestion] = useState('')
  const [messages, submitMessage] = useChat({
    model: 'gpt-3.5-turbo',
    apiKey: '発行したAPIKeyを入れる',
  })

  const onSend = () => {
    submitMessage([{ content: question, role: 'user' }])
    setQuestion('')
  }

  return (
    <div style={{ padding: '10px' }}>
      <div className="chat-wrapper">
        {messages.length < 1 ? (
          <div className="empty">No messages</div>
        ) : (
          messages.map((msg, i) => (
            <div className="message-wrapper" key={i}>
              <pre className="chat-message" style={{ whiteSpace: 'pre-wrap' }}>
                {msg.content}
              </pre>
            </div>
          ))
        )}
      </div>

      <textarea
        rows={5}
        cols={50}
        value={question}
        placeholder="Write a prompt"
        onChange={event => {
          setQuestion(event.target.value)
        }}
      />

      <div>
        <button type="button" onClick={onSend}>
          質問する
        </button>
      </div>
    </div>
  )
}

hooksのSampleコードは以下の通りです。

リクエストパラメータにstream: true の設定を追加した上で、 EventSourseオブジェクトを作成するとデータストリームに接続することができます。

EventSourceのコネクションを閉じるタイミングは、サーバーから返ってくるdataの[DONE] トークンを見て判定しています。

import { useState } from 'react'
import { SSE } from 'sse.js'

type ParseData = {
  id: string
  object: string
  created: number
  model: string
  choices: {
    delta: {
      content: string
    }
    index: number
    finish_reason: string
  }[]
}

type ChatMessageIncomingChunk = {
  content: string
  role?: string
}

type OpenAIChatMessage = {
  content: string
  role: string
}

type ChatMessageToken = OpenAIChatMessage

type ChatMessageParams = {
  meta?: {
    chunks?: ChatMessageToken[]
  }
} & OpenAIChatMessage

type ChatMessage = {
  meta: {
    chunks: ChatMessageToken[]
  }
} & ChatMessageParams

type OpenAIStreamingProps = {
  apiKey: string
  model: 'gpt-3.5-turbo' | 'gpt-4'
}
const CHAT_COMPLETIONS_URL = 'https://api.openai.com/v1/chat/completions'

// OpenAI APIが期待するデータ型
const officialOpenAIParams = ({
  content,
  role,
}: ChatMessage): OpenAIChatMessage => ({ content, role })

const createChatMessage = ({
  content,
  role,
  ...restOfParams
}: ChatMessageParams): ChatMessage => ({
  content,
  role,
  meta: {
    chunks: [],
    ...restOfParams.meta,
  },
})

export const useChat = ({ model, apiKey }: OpenAIStreamingProps) => {
  // [0]の配列には、Userが入力したcontentとroleが入る
  // [1]の配列には、OpenAIが返したcontentとrole、metaデータとして、chunkとloadingのstatusが入る
  const [messages, setMessages] = useState<ChatMessage[]>([])

  const submitMessage = (newMessages?: ChatMessageParams[]) => {
    // 送信されたメッセージがない場合は空の配列をセット
    if (!newMessages) {
      setMessages([])
      return
    }

    const updatedMessages: ChatMessage[] = [
      ...newMessages.map(createChatMessage), // Userが入力したテキストをセット
      createChatMessage({
        content: '',
        role: '',
      }), // OpenAIが返すテキストをセット
    ]

    // SSEで更新するリストをセット
    setMessages(updatedMessages)

    // SSEのpayload
    const payload = JSON.stringify({
      model,
      // ユーザーの入力したテキストをセット。
      messages: updatedMessages
        .filter((_, i) => updatedMessages.length - 1 !== i)
        .map(officialOpenAIParams),
      stream: true,
    })

    // OPENAIに送信するヘッダー
    const CHAT_HEADERS = {
      'Content-Type': 'application/json',
      Authorization: `Bearer ${apiKey}`,
    }

    // SSEリクエストを作成
    const source = new SSE(CHAT_COMPLETIONS_URL, {
      headers: CHAT_HEADERS,
      method: 'POST',
      payload,
    })

    // 受信したチャンクを保存
    source.addEventListener('message', e => {
      // [DONE]tokenが送信されたらストリーミングを終了
      if (e.data !== '[DONE]') {
        let parseData

        // APIから返ってくるデータ例
        // {
        // "id":"chatcmpl-73zuUbFIU3ru9TEclR3z7D6g6gpB4",
        // "object":"chat.completion.chunk",
        // "created":1681187322,
        // "model":"gpt-3.5-turbo-0301",
        // "choices":[{"delta":{"content":"き"},
        // "index":0,"finish_reason":null}]
        // }
        try {
          parseData = JSON.parse(e.data) as ParseData | undefined
        } catch {
          parseData = undefined
        }

        if (!parseData?.choices[0]) return

        const chunk: ChatMessageIncomingChunk = parseData.choices[0]?.delta

        setMessages(msgs =>
          msgs.map((message, i) => {
            // chatGPTからの返答を更新
            if (updatedMessages.length - 1 === i) {
              return {
                content: message.content + (chunk.content || ''),
                role: message.role + (chunk.role || ''),
                meta: {
                  ...message.meta,
                  id: i,
                  chunks: [
                    ...message.meta.chunks,
                    {
                      content: chunk.content || '',
                      role: chunk.role || '',
                    },
                  ],
                },
              }
            }
            return message
          })
        )
      } else {
        source.close() // DONEを受け取ったら、イベントストリームを閉じる
      }
    })

    source.stream()
  }

  return [messages, submitMessage] as [ChatMessage[], typeof submitMessage]
}

では次のセクションで、上記を読み解くためのSSEの要素を紹介します。

パタパタUIを実現するための主要なSSEの要素

SSEのイベント

デフォルトでは、以下の3つのイベントを生成します。

  • message: メッセージを受信時に発生。 event.data で利用できる
  • open: EventSourseに接続された時に発生。
  • error: EventSourseへの接続が失敗したときに発生

※イベント名を独自に指定することも可能です。(addEventListenerを使用)

SSEのプロパティ

  • readyState: 接続の状態を表します。接続の状態によって、UIを変化させるケースなどが出てきた場合にこのプロパティは役立つと思います。
    • 0 — 接続試行中(EventSource.CONNECTING)
    • 1 — 接続中(EventSource.OPEN)
    • 2 — 接続終了(EventSource.CLOSED)
  • withCredentials: EventSourseオブジェクトがCORS設定してインスタンス化されたのかを判定する値です。

SSEのメソッド

  • close(): EventSourseの接続を閉じます

サーバーレスポンス

  • event : イベント名。data: の前になければなりません。サーバーから指定したイベントを送信したい場合に利用します。その場合、addEventListener(イベント名)で指定したイベントリスナーにイベントが送られます。
  • data : メッセージ本文。複数の data は各パート間に \n を含む単一のメッセージとして解釈されます。
  • id : 一意なIDが付与されており、再接続時が必要な際に利用します。クライアントはメッセージのIDを記録し、最後に受信したメッセージのidを受け取るとブラウザ側は、リクエストヘッダーのLast-Event-ID にそのidをセットして再度リクエストをサーバーに投げます。サーバーはヘッダーを見つけると、クライアントがそこまでは受信成功したものと見なし、それ以降のメッセージを送信します。
  • retry : EventSourceの接続が切れた際に、再接続を試みるまでの時間をミリ秒で指定します。クライアントから設定することはできません。

実際にChatGPTのAPIを叩いた時に返ってくるレスポンスは以下の通りです。

注目してほしいのは、 content-type の部分です。イベントを送信する側であるサーバーの実装では、Content-Typetext/event-stream で応答する必要があります。

* We are completely uploaded and fine
* Connection state changed (MAX_CONCURRENT_STREAMS == 256)!
< HTTP/2 200
< date: Sat, 24 Jun 2023 12:09:02 GMT
< content-type: text/event-stream
< access-control-allow-origin: *
< cache-control: no-cache, must-revalidate
< openai-organization: user-ozq2sj2m3r06ncvqdxjrqvvh
< openai-processing-ms: 173
< openai-version: 2020-10-01
< strict-transport-security: max-age=15724800; includeSubDomains
< x-ratelimit-limit-requests: 3
< x-ratelimit-limit-tokens: 40000
< x-ratelimit-remaining-requests: 2
< x-ratelimit-remaining-tokens: 39979
< x-ratelimit-reset-requests: 20s
< x-ratelimit-reset-tokens: 31ms
< x-request-id: bcca4dc6586c02e665e86c6ffff575af
< cf-cache-status: DYNAMIC
< server: cloudflare
< cf-ray: 7dc4dc085d1680d7-NRT
< alt-svc: h3=":443"; ma=86400
<

ChatGPTからのレスポンスは以下のような形式で返ってきます。
最後に[DONE]が返ってくるので、このレスポンスでconnectionを閉じるかどうかの判定ができます。

data: {"id":"chatcmpl-7UwMYmwxbiWRcPke8tShRCtQINqOi","object":"chat.completion.chunk","created":1687608542,"model":"gpt-3.5-turbo-0301","choices":[{"index":0,"delta":{"content":"か"},"finish_reason":null}]}

data: {"id":"chatcmpl-7UwMYmwxbiWRcPke8tShRCtQINqOi","object":"chat.completion.chunk","created":1687608542,"model":"gpt-3.5-turbo-0301","choices":[{"index":0,"delta":{"content":"?"},"finish_reason":null}]}

data: {"id":"chatcmpl-7UwMYmwxbiWRcPke8tShRCtQINqOi","object":"chat.completion.chunk","created":1687608542,"model":"gpt-3.5-turbo-0301","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

まとめ

今回、ChatGPT APIを利用するにあたり、初めてSSEに触れたのですが、HTTPベースで作られており、Webエンジニアなら少しキャッチアップすれば、WebSocketより簡単に使える技術だと感じました。

実際のプロダクトに適応するには、サンプルコード以外にも考慮するポイントはありますが、この記事を入り口にstream設定をonにしたパタパタUIが実現できるようになると幸いです。

Discussion