RailsからOpenAI APIなどを呼んでストリーミングレスポンスを少しずつ処理する方法(HTTParty)
近頃寒くなってきており、非常に過ごし易い季節になりましたね。
さて今回はHTTPartyというgemを使った際にstreamingで受け取ったレスポンスを少しずつ処理する方法についてシェアしたいと思います!
やりたいこと
やりたいことは以下です。
- Railsで動いているサーバーからOpenAIなどの生成AIを呼びたい
- レスポンスに時間がかかることが考えられるため、少しずつRailsサーバーからレスポンスを返したい
- リクエストはHTTPartyで送りたい
OpenAIなどの生成AIはstreamingでレスポンスを返す仕組みがあります。
これらのAPIからのレスポンスを少しずつ処理したいときってありますよね。
方法
以下みたいな形で stream_body というオプションを渡しつつ、streamingで受け取ったデータをblockで処理します。
chunked_body = ''
HTTParty.post(
'叩きたいAPIのエンドポイント',
{
headers: headers,
body: body.to_json,
stream_body: true,
timeout: 30,
read_timeout: 60, # ストリーミングデータ読み取り中の無応答タイムアウト(秒)
},
) do |response_fragment|
chunked_body += response_fragment
while (index = chunked_body.index("\n"))
line = chunked_body[0..index]
chunked_body = chunked_body[(index + 1)..] || ''
# 何らかの処理
end
end
OpenAIのStreaming APIは、Server-Sent Events (SSE) 形式でレスポンスを返します。
各イベントは data: {json}\n という形式で、改行で区切られています。
ここでの何らかの処理とはDBへの書き込みやRedisのPub/Subを使っている際のpublishなどが挙げられます。
ここで気になるのはblockで渡されるオブジェクトがどんなオブジェクトか、ですよね。
ではライブラリでも見ましょう。
コードリーディング
HTTParty.post を呼ぶと最終的に以下の関数が実行されます。
blockが渡されている場合はL164で ResponseFragment というクラスのインスタンスが渡されていますね。
なので、先ほどの response_fragment は HTTParty::ResponseFragment というクラスのオブジェクトだということが分かりました。
ResponseFragment は SimpleDelegator を継承しているらしいですね。
L18で super fragment をしているため、perform関数で渡していた encoded_fragment がsuperされていそうです。
encoded_fragment はStringだということが分かりました。
なので例のコードの response_fragment はStringに以下の関数(attributes)が生えているオブジェクトっぽいですね。
- code
- http_response
- connection
まとめ
以下のような処理になっています!
HTTPartyではなく、FaradayやHTTP.rbで少しずつレスポンスを処理する記事は他にもあったので、今回はHTTPartyについて書いてみました。
blockで渡されるオブジェクトがStringを拡張したもので、なかなか厄介でしたが他のライブラリも同じような実装なのか気になって、夜はよく眠れます。
それでは!
Discussion