Azure OpenAIの返答をAzure Functionを中継してStream(SSE)したい
クライアントとOpenAIのエンドポイントの間にAzure Functionsを配置するケースが時折存在します。
- AOAIのキーを隠蔽したい
- ユーザー認証などの処理をしたい
- クライアントに余計なプロパティを設定させたくない
- 複数のAOAIを切り替えて負荷分散をしたい
などなど、理由は様々です。
場合によってはAzure FunctionsだけではなくAPI ManamgenetやApp Serviceを使った方がよい場合もありますが、FaaSのお手軽さを享受するためにもAzure Functionsを使ってみます。通常のChatCompletionAPIへのリクエストであればリクエストとレスポンスの単純な通信で済むのですが、多くの場合Streamを使って生成結果をヌルヌル表示したいことが多いと思います。今回はクライアントとAzure OpenAIの間にAzure Functionsを配置して、Stream(SSE)を有効にした状態でChatCompletionを行う方法を紹介します。
実装
ポイントのみ一部抜粋で紹介します。すべてのコードはこちらのリポジトリにあります。
Azure Functions
APIの仲介をするのでHTTPトリガーの関数を作成します。
ポイントとしてはつぎの通りです。
- リクエストのHttpContextからレスポンスを取得。
- GetChatCompletionsStreamingAsyncを呼び出すと、OpenAIのAPIからのレスポンスがストリーミングされる。
- そのデータをWriteAsyncで適宜データを書き込み、FlushAsyncでバッファされている出力をクライアントに送信する。
isolatedモードの場合は送信の順番が前後する現象が発生したため暫定処理で遅延を追加しています。(In-procモードの場合は現象が発生しなかったため調査中です。)
var response = req.HttpContext.Response;
response.Headers.Add(HeaderNames.ContentType, "text/event-stream");
response.Headers.Add(HeaderNames.CacheControl, CacheControlHeaderValue.NoCacheString);
string dataformat = "data: {0}\r\n\r\n";
await foreach (StreamingChatCompletionsUpdate chatUpdate in await client.GetChatCompletionsStreamingAsync(chatOptions))
{
log.LogInformation(chatUpdate.ContentUpdate);
await response.WriteAsync(string.Format(dataformat, chatUpdate.ContentUpdate));
await response.Body.FlushAsync();
await Task.Delay(10);
}
await response.WriteAsync(string.Format(dataformat, "[DONE]"));
await response.Body.FlushAsync();
return new EmptyResult();
クライアント
SSEを実装したサーバーはMIMEタイプtext/event-streamで応答が返されます。
この応答はReadableStreamにラップされているため、res.body.getReader()でデータを読み取るためのReadableStreamDefaultReaderが取得できます。
このReadableStreamDefaultReaderのread()メソッドを使用することでストリームのチャンクへのアクセスを提供するプロミスを取得できます。
この時に返されるdoneがfalse以外のときにはエラーまたはストリームがすでに閉じられていることを示しています。doneがfalseの場合は、valueにはチャンクのデータが含まれているため、それをデコードして表示します。デコードを行うと通常のテキストデータとして操作できるようになるため、不要な文字などを整形して表示します。
const res = await fetch("http://localhost:7221/api/chat-stream", {
headers: {
"content-type": "application/json",
"accept":"text/event-stream"
},
method: "POST",
body: JSON.stringify({
messages:messages
}),
});
const reader = res.body.getReader();
const decoder = new TextDecoder();
var assistant_message = "";
const read = async () => {
const { done, value } = await reader.read();
if (done) {
return;
}
assistant_message += decoder.decode(value).split('data:')[1].trim();
document.getElementById("assistant-temp-message").innerHTML = assistant_message;
return read();
};
await read();
reader.releaseLock();
左側の黒地の画面がFcuntionのデバッグログ、右側が実際に実装をしたクライアントの画面です。
サンプルコード
実際に動作するサンプルコードをGitHubにアップしています。今回のメインテーマに関係ない以下のような処理は省略しています。実際に使用する際はご注意ください。
- エラー・例外処理
- finish_reasonなどの判定処理
参考
Discussion