ChatGPTのAPIを使ってパタパタ表現を実現する
はじめに
ChatGPTを使用していると以下のようなパタパタと順次テキストが出てくる表現があります。これは、Sever-Sent-Event(以下SSEと略称)というWeb技術が使用されています。
ChatGPTのAPIを使う際、デフォルトでは、リクエストに対して、応答が全て取得されたタイミングでレスポンスが返ってくるようになっています。単純にAPIを叩くだけの実装の場合、以下のように全文が返ってくるまでに、待ち時間が発生します。
ローディングアニメーション等で対処しても良いですが、できれば本家ChatGPTのUIのように順次パタパタ表示されるstreamに対応したUIを実現したいですよね。
本記事では、パタパタUIを実現するためのSSEの概要とReactでの実装例を紹介したいと思います。
ServerSentEventとは?
サーバーとクライアント間のリアルタイム通信というとWebSocketが知られていると思いますが、SSEは双方向通信はせず、サーバーからクライアントにのみリアルタイムでイベントを送信することが可能になります。1回繋いでやり取りして、終わりというものでなく、コネクションは明示的に閉じない限り、張られ続け、サーバーから何度もレスポンスを返すことが可能です。
SSEは、HTTP上で動作し、Content-Typeはtext/event-streamで通信を行います。一方で、WebSocketは専用のプロトコルを使う必要があります。SSEの方が標準のHTTPプロトコルをそのまま使うことができるので、通信の互換性が高く、より簡単に扱える技術だと思います。
今回紹介するようなChatGPTが生成するトークン単位で順次表示するようなサーバーからのデータストリームのみであれば、SSEを選択する方がベターな印象です。(自動再接続もサポートされているのも嬉しい)
実装例
※ ChatGPT APIのkeyを取得する方法などは割愛します。
以下はReactのプロジェクトでの実装サンプルです。
注意点として、実際のプロダクトでChatGPT APIを使う際は、クライアントにAPI Keyを持たせず、ちゃんとサーバー側に持たせるようにしましょうね。(クライアントに埋め込むと、簡単にAPI Keyは知ることができます。)
UI層では、質問内容をローカルステートに持ち、submitMessageメソッドに渡します。
import React, { useState } from 'react'
import { useChat } from './useChat'
export const SampleChatGpt = () => {
const [question, setQuestion] = useState('')
const [messages, submitMessage] = useChat({
model: 'gpt-3.5-turbo',
apiKey: '発行したAPIKeyを入れる',
})
const onSend = () => {
submitMessage([{ content: question, role: 'user' }])
setQuestion('')
}
return (
<div style={{ padding: '10px' }}>
<div className="chat-wrapper">
{messages.length < 1 ? (
<div className="empty">No messages</div>
) : (
messages.map((msg, i) => (
<div className="message-wrapper" key={i}>
<pre className="chat-message" style={{ whiteSpace: 'pre-wrap' }}>
{msg.content}
</pre>
</div>
))
)}
</div>
<textarea
rows={5}
cols={50}
value={question}
placeholder="Write a prompt"
onChange={event => {
setQuestion(event.target.value)
}}
/>
<div>
<button type="button" onClick={onSend}>
質問する
</button>
</div>
</div>
)
}
hooksのSampleコードは以下の通りです。
リクエストパラメータにstream: true
の設定を追加した上で、 EventSourseオブジェクトを作成するとデータストリームに接続することができます。
EventSourceのコネクションを閉じるタイミングは、サーバーから返ってくるdataの[DONE]
トークンを見て判定しています。
import { useState } from 'react'
import { SSE } from 'sse.js'
type ParseData = {
id: string
object: string
created: number
model: string
choices: {
delta: {
content: string
}
index: number
finish_reason: string
}[]
}
type ChatMessageIncomingChunk = {
content: string
role?: string
}
type OpenAIChatMessage = {
content: string
role: string
}
type ChatMessageToken = OpenAIChatMessage
type ChatMessageParams = {
meta?: {
chunks?: ChatMessageToken[]
}
} & OpenAIChatMessage
type ChatMessage = {
meta: {
chunks: ChatMessageToken[]
}
} & ChatMessageParams
type OpenAIStreamingProps = {
apiKey: string
model: 'gpt-3.5-turbo' | 'gpt-4'
}
const CHAT_COMPLETIONS_URL = 'https://api.openai.com/v1/chat/completions'
// OpenAI APIが期待するデータ型
const officialOpenAIParams = ({
content,
role,
}: ChatMessage): OpenAIChatMessage => ({ content, role })
const createChatMessage = ({
content,
role,
...restOfParams
}: ChatMessageParams): ChatMessage => ({
content,
role,
meta: {
chunks: [],
...restOfParams.meta,
},
})
export const useChat = ({ model, apiKey }: OpenAIStreamingProps) => {
// [0]の配列には、Userが入力したcontentとroleが入る
// [1]の配列には、OpenAIが返したcontentとrole、metaデータとして、chunkとloadingのstatusが入る
const [messages, setMessages] = useState<ChatMessage[]>([])
const submitMessage = (newMessages?: ChatMessageParams[]) => {
// 送信されたメッセージがない場合は空の配列をセット
if (!newMessages) {
setMessages([])
return
}
const updatedMessages: ChatMessage[] = [
...newMessages.map(createChatMessage), // Userが入力したテキストをセット
createChatMessage({
content: '',
role: '',
}), // OpenAIが返すテキストをセット
]
// SSEで更新するリストをセット
setMessages(updatedMessages)
// SSEのpayload
const payload = JSON.stringify({
model,
// ユーザーの入力したテキストをセット。
messages: updatedMessages
.filter((_, i) => updatedMessages.length - 1 !== i)
.map(officialOpenAIParams),
stream: true,
})
// OPENAIに送信するヘッダー
const CHAT_HEADERS = {
'Content-Type': 'application/json',
Authorization: `Bearer ${apiKey}`,
}
// SSEリクエストを作成
const source = new SSE(CHAT_COMPLETIONS_URL, {
headers: CHAT_HEADERS,
method: 'POST',
payload,
})
// 受信したチャンクを保存
source.addEventListener('message', e => {
// [DONE]tokenが送信されたらストリーミングを終了
if (e.data !== '[DONE]') {
let parseData
// APIから返ってくるデータ例
// {
// "id":"chatcmpl-73zuUbFIU3ru9TEclR3z7D6g6gpB4",
// "object":"chat.completion.chunk",
// "created":1681187322,
// "model":"gpt-3.5-turbo-0301",
// "choices":[{"delta":{"content":"き"},
// "index":0,"finish_reason":null}]
// }
try {
parseData = JSON.parse(e.data) as ParseData | undefined
} catch {
parseData = undefined
}
if (!parseData?.choices[0]) return
const chunk: ChatMessageIncomingChunk = parseData.choices[0]?.delta
setMessages(msgs =>
msgs.map((message, i) => {
// chatGPTからの返答を更新
if (updatedMessages.length - 1 === i) {
return {
content: message.content + (chunk.content || ''),
role: message.role + (chunk.role || ''),
meta: {
...message.meta,
id: i,
chunks: [
...message.meta.chunks,
{
content: chunk.content || '',
role: chunk.role || '',
},
],
},
}
}
return message
})
)
} else {
source.close() // DONEを受け取ったら、イベントストリームを閉じる
}
})
source.stream()
}
return [messages, submitMessage] as [ChatMessage[], typeof submitMessage]
}
では次のセクションで、上記を読み解くためのSSEの要素を紹介します。
パタパタUIを実現するための主要なSSEの要素
SSEのイベント
デフォルトでは、以下の3つのイベントを生成します。
-
message
: メッセージを受信時に発生。event.data
で利用できる -
open
: EventSourseに接続された時に発生。 -
error
: EventSourseへの接続が失敗したときに発生
※イベント名を独自に指定することも可能です。(addEventListenerを使用)
SSEのプロパティ
-
readyState
: 接続の状態を表します。接続の状態によって、UIを変化させるケースなどが出てきた場合にこのプロパティは役立つと思います。-
0
— 接続試行中(EventSource.CONNECTING) -
1
— 接続中(EventSource.OPEN) -
2
— 接続終了(EventSource.CLOSED)
-
-
withCredentials
: EventSourseオブジェクトがCORS設定してインスタンス化されたのかを判定する値です。
SSEのメソッド
-
close()
: EventSourseの接続を閉じます
サーバーレスポンス
-
event
: イベント名。data:
の前になければなりません。サーバーから指定したイベントを送信したい場合に利用します。その場合、addEventListener(イベント名)で指定したイベントリスナーにイベントが送られます。 -
data
: メッセージ本文。複数のdata
は各パート間に\n
を含む単一のメッセージとして解釈されます。 -
id
: 一意なIDが付与されており、再接続時が必要な際に利用します。クライアントはメッセージのIDを記録し、最後に受信したメッセージのidを受け取るとブラウザ側は、リクエストヘッダーのLast-Event-ID
にそのidをセットして再度リクエストをサーバーに投げます。サーバーはヘッダーを見つけると、クライアントがそこまでは受信成功したものと見なし、それ以降のメッセージを送信します。 -
retry
: EventSourceの接続が切れた際に、再接続を試みるまでの時間をミリ秒で指定します。クライアントから設定することはできません。
実際にChatGPTのAPIを叩いた時に返ってくるレスポンスは以下の通りです。
注目してほしいのは、 content-type
の部分です。イベントを送信する側であるサーバーの実装では、Content-Typetext/event-stream
で応答する必要があります。
* We are completely uploaded and fine
* Connection state changed (MAX_CONCURRENT_STREAMS == 256)!
< HTTP/2 200
< date: Sat, 24 Jun 2023 12:09:02 GMT
< content-type: text/event-stream
< access-control-allow-origin: *
< cache-control: no-cache, must-revalidate
< openai-organization: user-ozq2sj2m3r06ncvqdxjrqvvh
< openai-processing-ms: 173
< openai-version: 2020-10-01
< strict-transport-security: max-age=15724800; includeSubDomains
< x-ratelimit-limit-requests: 3
< x-ratelimit-limit-tokens: 40000
< x-ratelimit-remaining-requests: 2
< x-ratelimit-remaining-tokens: 39979
< x-ratelimit-reset-requests: 20s
< x-ratelimit-reset-tokens: 31ms
< x-request-id: bcca4dc6586c02e665e86c6ffff575af
< cf-cache-status: DYNAMIC
< server: cloudflare
< cf-ray: 7dc4dc085d1680d7-NRT
< alt-svc: h3=":443"; ma=86400
<
ChatGPTからのレスポンスは以下のような形式で返ってきます。
最後に[DONE]が返ってくるので、このレスポンスでconnectionを閉じるかどうかの判定ができます。
data: {"id":"chatcmpl-7UwMYmwxbiWRcPke8tShRCtQINqOi","object":"chat.completion.chunk","created":1687608542,"model":"gpt-3.5-turbo-0301","choices":[{"index":0,"delta":{"content":"か"},"finish_reason":null}]}
data: {"id":"chatcmpl-7UwMYmwxbiWRcPke8tShRCtQINqOi","object":"chat.completion.chunk","created":1687608542,"model":"gpt-3.5-turbo-0301","choices":[{"index":0,"delta":{"content":"?"},"finish_reason":null}]}
data: {"id":"chatcmpl-7UwMYmwxbiWRcPke8tShRCtQINqOi","object":"chat.completion.chunk","created":1687608542,"model":"gpt-3.5-turbo-0301","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
まとめ
今回、ChatGPT APIを利用するにあたり、初めてSSEに触れたのですが、HTTPベースで作られており、Webエンジニアなら少しキャッチアップすれば、WebSocketより簡単に使える技術だと感じました。
実際のプロダクトに適応するには、サンプルコード以外にも考慮するポイントはありますが、この記事を入り口にstream設定をonにしたパタパタUIが実現できるようになると幸いです。
Discussion