Open AI Realtime APIで、会話履歴を削除して高額請求を回避したい!【Pythonサンプルコード】
はじめに
最近は、Realtime APIの実験が楽しくて止まらないasapです。
これまでも、pythonサンプルコードの作成と、ユーザ割り込み機能の実装に関しての記事を書かせていただきました。
Microsoftなどが、質の高いRealtime APIのフロント実装を公開しており、API KEYを入れるだけで、簡単に試すことができるものが、たくさん公開されています。
しかし、個人的には、やはり中身の実装を理解してこそ、だと思っています。
そのためには、一番簡単なサンプルコードの状態で、何が起きているのかを理解するのが手っ取り早いです。
そういう意味で、私の記事が皆様の参考になっているようで、すごく嬉しいです。(たくさんの「いいね」ありがとうございます)
本記事を書いた背景
さて、今回の記事を書いたきっかけは、下記のポストです。
こちら、拝見されている方多いと思いますが、Realtime APIをAITuberの方の配信のゲストとして、1時間利用したら、利用料が5000円もかかってしまったという話です。
APIの利用料はこちらの公式ページで公開されていますが、補足部分には、
*Audio input costs approximately 6¢ per minute; Audio output costs approximately 24¢ per minute
と記載されているので、1時間に1000円前後だと理解していたのですが、実際の金額はかなり乖離しており、多額の請求がなされてしまったようです。
ちなみに、話は変わりますが、「シロハナ / AIヒロイン研究所」さんの配信はすごく質が高かったので、ぜひ見てみてほしいです。Realtime APIの力がよくわかると思います。
(ちなみに、恐ろしいのは「シロハナ」さんは、入力には音声ではなくテキストを利用しているようなので、普段使いで音声入力音声出力で実施していたら、さらに料金が膨れ上がっていたことになります・・・)
高コストの原因考察
上記のポストのリプライでも議論がなされていますが、私の想定も大体同じです。
1時間の配信の中で、会話履歴が膨れ上がった結果、入力token数が増え、請求金額も増えているようです。
(おそらくですが)
Realtime APIはAssistants APIと同じで、「ステートフル」なAPIになります。つまり、過去の会話履歴などがセッションを切断するまで、明示的に与えなくても、モデルが反映してくれます。
この機能は非常に便利なのですが、意識しないと、会話履歴が膨れ上がることになります。
会話履歴が膨れ上がると、モデルのコンテキストウインドウのtoken数を超えるまで、全ての会話履歴(ユーザだけでなく、AI側の発話(音声)も含む)がモデルに毎回入力されることになり、コストが高くなってしまいます。
もしかしたら、今後OpenAIがプロンプトキャッシュを適用するなどして、改善される可能性はありますが、Assistants APIでも似たような話題が上がった時に、改善されなかったような気がするので、Realtime APIでもこのままの可能性はあります。
したがって、今回は、会話履歴を膨れ上がらせる前に、直近「xx」件より前の会話履歴は、自動的に消去するような機能を、過去の記事のサンプルコードに実装しようと思います。
必ず読んでください 注意喚起
サンプルコード
サンプルコードでは直近5件の会話履歴だけを保持して、それより過去の会話履歴をサーバから削除するようにしています。(ユーザ割り込みも込みです)
実行方法は前回の記事と同じです。
import asyncio
import websockets
import pyaudio
import numpy as np
import base64
import json
import queue
import threading
import os
import time
API_KEY = os.environ.get('OPENAI_API_KEY')
# WebSocket URLとヘッダー情報
# OpenAI
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
HEADERS = {
"Authorization": "Bearer " + API_KEY,
"OpenAI-Beta": "realtime=v1"
}
# キューを初期化
audio_send_queue = queue.Queue()
audio_receive_queue = queue.Queue()
#会話履歴IDリストの初期化
conversation_history_id_Queue = queue.Queue()
# PCM16形式に変換する関数
def base64_to_pcm16(base64_audio):
audio_data = base64.b64decode(base64_audio)
return audio_data
# 音声を送信する非同期関数
async def send_audio_from_queue(websocket):
while True:
audio_data = await asyncio.get_event_loop().run_in_executor(None, audio_send_queue.get)
if audio_data is None:
continue
# PCM16データをBase64にエンコード
base64_audio = base64.b64encode(audio_data).decode("utf-8")
audio_event = {
"type": "input_audio_buffer.append",
"audio": base64_audio
}
# WebSocketで音声データを送信
await websocket.send(json.dumps(audio_event))
# キューの処理間隔を少し空ける
await asyncio.sleep(0)
# マイクからの音声を取得しキューに入れる関数
def read_audio_to_queue(stream, CHUNK):
while True:
try:
audio_data = stream.read(CHUNK, exception_on_overflow=False)
audio_send_queue.put(audio_data)
except Exception as e:
print(f"音声読み取りエラー: {e}")
break
# サーバーから音声を受信してキューに格納する非同期関数
async def receive_audio_to_queue(websocket):
print("assistant: ", end = "", flush = True)
while True:
response = await websocket.recv()
if response:
response_data = json.loads(response)
# 会話履歴IDをキューに格納
if "type" in response_data and response_data["type"] == "conversation.item.created":
conversation_history_id_Queue.put(response_data['item']['id'])
# 会話履歴IDが5以上ある場合、最も古い会話履歴IDを取得し、削除する
if conversation_history_id_Queue.qsize() >= 5:
item_id = conversation_history_id_Queue.get()
delite_event = {
"type": "conversation.item.delete",
"item_id": item_id
}
# WebSocketで音声データを送信
await websocket.send(json.dumps(delite_event))
print(f"conversation_history_id: {item_id}を削除しました。")
# サーバーからの応答をリアルタイムに表示
if "type" in response_data and response_data["type"] == "response.audio_transcript.delta":
print(response_data["delta"], end = "", flush = True)
# サーバからの応答が完了したことを取得
elif "type" in response_data and response_data["type"] == "response.audio_transcript.done":
print("\nassistant: ", end = "", flush = True)
#こちらの発話がスタートしたことをサーバが取得したことを確認する
if "type" in response_data and response_data["type"] == "input_audio_buffer.speech_started":
#すでに存在する取得したAI発話音声をリセットする
while not audio_receive_queue.empty():
audio_receive_queue.get()
# サーバーからの音声データをキューに格納
if "type" in response_data and response_data["type"] == "response.audio.delta":
base64_audio_response = response_data["delta"]
if base64_audio_response:
pcm16_audio = base64_to_pcm16(base64_audio_response)
audio_receive_queue.put(pcm16_audio)
await asyncio.sleep(0)
# サーバーからの音声を再生する関数
def play_audio_from_queue(output_stream):
while True:
pcm16_audio = audio_receive_queue.get()
if pcm16_audio:
output_stream.write(pcm16_audio)
# マイクからの音声を取得し、WebSocketで送信しながらサーバーからの音声応答を再生する非同期関数
async def stream_audio_and_receive_response():
# WebSocketに接続
async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
print("WebSocketに接続しました。")
update_request = {
"type": "session.update",
"session": {
"modalities": ["audio", "text"],
"instructions": "日本語かつ関西弁で回答してください。ユーザの名前は絶対に発言しないでください。ただし、名前を質問された時だけは必ず答えてください。",
"voice": "alloy",
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
},
"input_audio_transcription": {
"model": "whisper-1"
}
}
}
await websocket.send(json.dumps(update_request))
# PyAudioの設定
INPUT_CHUNK = 2400
OUTPUT_CHUNK = 2400
FORMAT = pyaudio.paInt16
CHANNELS = 1
INPUT_RATE = 24000
OUTPUT_RATE = 24000
# PyAudioインスタンス
p = pyaudio.PyAudio()
# マイクストリームの初期化
stream = p.open(format=FORMAT, channels=CHANNELS, rate=INPUT_RATE, input=True, frames_per_buffer=INPUT_CHUNK)
# サーバーからの応答音声を再生するためのストリームを初期化
output_stream = p.open(format=FORMAT, channels=CHANNELS, rate=OUTPUT_RATE, output=True, frames_per_buffer=OUTPUT_CHUNK)
# マイクの音声読み取りをスレッドで開始
threading.Thread(target=read_audio_to_queue, args=(stream, INPUT_CHUNK), daemon=True).start()
# サーバーからの音声再生をスレッドで開始
threading.Thread(target=play_audio_from_queue, args=(output_stream,), daemon=True).start()
try:
# 音声送信タスクと音声受信タスクを非同期で並行実行
send_task = asyncio.create_task(send_audio_from_queue(websocket))
receive_task = asyncio.create_task(receive_audio_to_queue(websocket))
# タスクが終了するまで待機
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("終了します...")
finally:
if stream.is_active():
stream.stop_stream()
stream.close()
output_stream.stop_stream()
output_stream.close()
p.terminate()
if __name__ == "__main__":
asyncio.run(stream_audio_and_receive_response())
Azure版に接続する
接続情報を下記のように変更することで、Azure版でも動作することを確認しました。
#Azure OpenAI
WS_URL = "wss://realtimeapi-inst-sample.openai.azure.com/openai/realtime?deployment=gpt-4o-realtime-xxxx&api-version=2024-10-01-preview"
HEADERS = {
"api-key": "xxxxxxxx",
}
URLの中身のwss://realtimeapi-inst-sample.openai.azure.com
はAzure OpenAI リソースの「エンドポイント」を参照してください。
(最初の「https://」はWebsocketに接続するために「wss://」に変更してください)
また、URLにて、deployment=gpt-4o-realtime-xxxx
の部分はAzure OpenAI Studioにて、gpt-4o-realtime-preview
をデプロイする際の「デプロイ名」を入れてください。
"api-key": "xxxxxxxx"
の部分はAzure OpenAI リソースの「キー」です。
実験結果
下記の動画をご覧ください(音量が小さめなので、静かな場所でご覧ください)
(今回はAzure版で実行しています。ご了承ください。)
動画では、最初に自己紹介しています。
次の問いかけでは、AIはユーザの名前を回答できています
しかしながら、複数回の応対後、再びユーザの名前を確認したところ回答できなくなっています。
これは会話履歴を削除されたからです。
何度もカットイン(ユーザ割り込み)を繰り返したからか、AIの返答がところどころぶっ壊れているのが気になりますが、会話履歴の削除という点は達成できていると思います。
なお、実験に合わせて、システムプロンプトを下記のように設定しています。
日本語かつ関西弁で回答してください。
ユーザの名前は絶対に発言しないでください。
ただし、名前を質問された時だけは必ず答えてください。
コードの解説
基本的な部分は、これまでの記事で解説しているため、会話履歴を自動で削除する部分だけ解説します。
該当部分は下記になります。
#会話履歴IDリストの初期化
conversation_history_id_Queue = queue.Queue()
・・・
# サーバーから音声を受信してキューに格納する非同期関数
async def receive_audio_to_queue(websocket):
・・・
while True:
response = await websocket.recv()
・・・
# 会話履歴IDをキューに格納
if "type" in response_data and response_data["type"] == "conversation.item.created":
conversation_history_id_Queue.put(response_data['item']['id'])
# 会話履歴IDが5以上ある場合、最も古い会話履歴IDを取得し、削除する
if conversation_history_id_Queue.qsize() >= 5:
item_id = conversation_history_id_Queue.get()
delite_event = {
"type": "conversation.item.delete",
"item_id": item_id
}
# WebSocketで音声データを送信
await websocket.send(json.dumps(delite_event))
print(f"conversation_history_id: {item_id}を削除しました。")
・・・
今回利用するイベントはクライアントイベントのconversation.item.delete
です。
このイベントは、item_id
を指定することで、そのidの過去の会話履歴をまとめて削除することができます。
したがって、後半部分では、Queueに格納されたitem_id
が5件以上の場合に、古い順で中身を取り出して、イベントを発行することで、サーバから会話履歴を削除しています。
では、何のitem_id
をクリアすれば良いのかが、中盤部分の下記コードです。
# 会話履歴IDをキューに格納
if "type" in response_data and response_data["type"] == "conversation.item.created":
conversation_history_id_Queue.put(response_data['item']['id'])
さまざま会話を繰り返して、自分の音声入力や、AIの音声応答に、どんなitem_id
が付与されているかを調べた結果、わかりやすいことに、一回のユーザ会話や、一回のAI発話に関しては、全部同じIDが振られていることがわかりました。
(音声はCHUNKという小さい単位に分割されて送信されますが、全てに対して同じitem_id
が振られています)
そして、そのidはconversation.item.created
というサーバイベントにより取得できます。
中身は下記のようになっています。
{
"event_id": "event_1920",
"type": "conversation.item.created",
"previous_item_id": "msg_002",
"item": {
"id": "msg_003",
"object": "realtime.item",
"type": "message",
"status": "completed",
"role": "user",
"content": [
{
"type": "input_audio",
"transcript": null
}
]
}
}
該当のリファレンスはこちらです
このイベントは、「ユーザの発話が終わったタイミング」と「AIの発話が始まる前のタイミング」で、クライアントに送信されており、role
では、ユーザかAIかどちらが発話しているのか、そしてid
に今回の発話のitem_id
が格納されています。
したがって、このイベントを監視すれば、各ターンごとに一つのitem_id
を取得することができ、それをQueueに貯めることによって、Queueの長さ=会話ターン数となります。
したがって、今回のサンプルコードで指定している「5」というのは、5ターンの会話分だけ、会話履歴を保持するという意味です。
また、自分と相手とで、それぞれ1ターン換算なので、かなり短い履歴だけを保持します。
この「会話履歴保持ターン数」の部分は、利用するアプリケーションに応じて、適切な値を設定してください。
長くするほど、会話を長く覚えていられますが、その分コストは高くなります。
改善余地
今回は、かなりシンプルに会話履歴の削除を実装しましたが、実は、1点不十分な箇所があります。
Realtime APIでは、ユーザの発話の無音期間を検出して、ターンテイキングを行います。(server_vad
というtype
です。)
その場合、ユーザが、一定以上の時間で言葉に詰まると、無音期間が発生するため、発話途中でも内部的にターンがAIに移動してしまいます。
その状態で、続けてユーザが話し始めると、また内部的にユーザにターンが移ります。
この時、途中のターン推移において、AIにターンが移動した際に、ユーザがすぐにターンを奪い返して、結果的にAIの発話がなかったとしても、ターンの移り変わりがあるため、item_id
は変わってしまいます。
したがって、利用者の考えているターン数以上に内部的にターン数が経過してしまう問題があります。
上記のサンプルコードの例では、ターン数は「5」を指定していますが、ユーザの体感的には3ターンくらいしか経っていないのに、古い会話履歴が削除されてしまうことがありえます。
とは言っても、そんなに気にしなくても、ターン推移を見越して、少し多めの履歴保持ターン数を指定しておけば、そんなに問題はないと思います。
しかし、何らかの理由で、保持する会話履歴のターン数を、厳密に制御したいという需要がある場合は、改善の余地があると思います。
(私の利用用途ではあくまでコスト削減が達成されれば良いので、この改善のモチベーションはないです)
まとめ
今回は、Realtime APIを使う上で、結構クリティカルである、古い会話履歴を自動的に削除する機能について実装しました。
皆様の参考になれば嬉しいです。
余談ですが、最近Azure版とOpenAI版とでRealtime APIの動作が違うような気がしています。
例えば、記事執筆時ではOpenAI版の方が安定していて、Azure版だと、音声が途中で停止したり、発話がおかしくなったりする機会が増える気がします。
もしかしたら、Azure版の方が早く利用できるようになって、たくさんの使いやすいサンプルが先行で公開されたから、単純に利用者が多いだけかもしれないですが、私のサンプルコードの実装に、Azure版でのみ発生するバグがあるのではないかと心配しています。
(Azure版のRealtime APIのReferenceが公式から出ていないから、調べられないんだよなあ)
もし、こんな不具合があったなどあれば、ご共有いただけると嬉しいです。
では、ここまで読んでくださって、ありがとうございます!
Discussion