💰

Open AI Realtime APIで、会話履歴を削除して高額請求を回避したい!【Pythonサンプルコード】

2024/10/07に公開

はじめに

最近は、Realtime APIの実験が楽しくて止まらないasapです。

これまでも、pythonサンプルコードの作成と、ユーザ割り込み機能の実装に関しての記事を書かせていただきました。

https://zenn.dev/asap/articles/4368fd306b592a
https://zenn.dev/asap/articles/563500af4649da

Microsoftなどが、質の高いRealtime APIのフロント実装を公開しており、API KEYを入れるだけで、簡単に試すことができるものが、たくさん公開されています。
しかし、個人的には、やはり中身の実装を理解してこそ、だと思っています。
そのためには、一番簡単なサンプルコードの状態で、何が起きているのかを理解するのが手っ取り早いです。

そういう意味で、私の記事が皆様の参考になっているようで、すごく嬉しいです。(たくさんの「いいね」ありがとうございます)

本記事を書いた背景

さて、今回の記事を書いたきっかけは、下記のポストです。

https://x.com/ai_shirohana/status/1842176628228780151

こちら、拝見されている方多いと思いますが、Realtime APIをAITuberの方の配信のゲストとして、1時間利用したら、利用料が5000円もかかってしまったという話です。

APIの利用料はこちらの公式ページで公開されていますが、補足部分には、

*Audio input costs approximately 6¢ per minute; Audio output costs approximately 24¢ per minute

と記載されているので、1時間に1000円前後だと理解していたのですが、実際の金額はかなり乖離しており、多額の請求がなされてしまったようです。

ちなみに、話は変わりますが、「シロハナ / AIヒロイン研究所」さんの配信はすごく質が高かったので、ぜひ見てみてほしいです。Realtime APIの力がよくわかると思います。
https://www.youtube.com/watch?v=qy5MZK7Y6RU

(ちなみに、恐ろしいのは「シロハナ」さんは、入力には音声ではなくテキストを利用しているようなので、普段使いで音声入力音声出力で実施していたら、さらに料金が膨れ上がっていたことになります・・・)

高コストの原因考察

上記のポストのリプライでも議論がなされていますが、私の想定も大体同じです。
1時間の配信の中で、会話履歴が膨れ上がった結果、入力token数が増え、請求金額も増えているようです。
(おそらくですが)

Realtime APIはAssistants APIと同じで、「ステートフル」なAPIになります。つまり、過去の会話履歴などがセッションを切断するまで、明示的に与えなくても、モデルが反映してくれます。
この機能は非常に便利なのですが、意識しないと、会話履歴が膨れ上がることになります。
会話履歴が膨れ上がると、モデルのコンテキストウインドウのtoken数を超えるまで、全ての会話履歴(ユーザだけでなく、AI側の発話(音声)も含む)がモデルに毎回入力されることになり、コストが高くなってしまいます。

もしかしたら、今後OpenAIがプロンプトキャッシュを適用するなどして、改善される可能性はありますが、Assistants APIでも似たような話題が上がった時に、改善されなかったような気がするので、Realtime APIでもこのままの可能性はあります。

したがって、今回は、会話履歴を膨れ上がらせる前に、直近「xx」件より前の会話履歴は、自動的に消去するような機能を、過去の記事のサンプルコードに実装しようと思います。

必ず読んでください 注意喚起

サンプルコード

サンプルコードでは直近5件の会話履歴だけを保持して、それより過去の会話履歴をサーバから削除するようにしています。(ユーザ割り込みも込みです)

実行方法は前回の記事と同じです。

import asyncio
import websockets
import pyaudio
import numpy as np
import base64
import json
import queue
import threading
import os
import time

API_KEY = os.environ.get('OPENAI_API_KEY')

# WebSocket URLとヘッダー情報
# OpenAI
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
HEADERS = {
    "Authorization": "Bearer " + API_KEY,
    "OpenAI-Beta": "realtime=v1"
}

# キューを初期化
audio_send_queue = queue.Queue()
audio_receive_queue = queue.Queue()

#会話履歴IDリストの初期化
conversation_history_id_Queue = queue.Queue()

# PCM16形式に変換する関数
def base64_to_pcm16(base64_audio):
    audio_data = base64.b64decode(base64_audio)
    return audio_data

# 音声を送信する非同期関数
async def send_audio_from_queue(websocket):
    while True:
        audio_data = await asyncio.get_event_loop().run_in_executor(None, audio_send_queue.get)
        if audio_data is None:
            continue
        
        # PCM16データをBase64にエンコード
        base64_audio = base64.b64encode(audio_data).decode("utf-8")

        audio_event = {
            "type": "input_audio_buffer.append",
            "audio": base64_audio
        }

        # WebSocketで音声データを送信
        await websocket.send(json.dumps(audio_event))

        # キューの処理間隔を少し空ける
        await asyncio.sleep(0)

# マイクからの音声を取得しキューに入れる関数
def read_audio_to_queue(stream, CHUNK):
    while True:
        try:
            audio_data = stream.read(CHUNK, exception_on_overflow=False)
            audio_send_queue.put(audio_data)
        except Exception as e:
            print(f"音声読み取りエラー: {e}")
            break

# サーバーから音声を受信してキューに格納する非同期関数
async def receive_audio_to_queue(websocket):
    print("assistant: ", end = "", flush = True)
    while True:
        response = await websocket.recv()
        if response:
            response_data = json.loads(response)

            # 会話履歴IDをキューに格納
            if "type" in response_data and response_data["type"] == "conversation.item.created":
                conversation_history_id_Queue.put(response_data['item']['id'])
            
            # 会話履歴IDが5以上ある場合、最も古い会話履歴IDを取得し、削除する
            if conversation_history_id_Queue.qsize() >= 5:
                item_id = conversation_history_id_Queue.get()

                delite_event = {
                    "type": "conversation.item.delete",
                    "item_id": item_id
                }

                # WebSocketで音声データを送信
                await websocket.send(json.dumps(delite_event))
                print(f"conversation_history_id: {item_id}を削除しました。")

            # サーバーからの応答をリアルタイムに表示
            if "type" in response_data and response_data["type"] == "response.audio_transcript.delta":
                print(response_data["delta"], end = "", flush = True)
            # サーバからの応答が完了したことを取得
            elif "type" in response_data and response_data["type"] == "response.audio_transcript.done":
                print("\nassistant: ", end = "", flush = True)

            #こちらの発話がスタートしたことをサーバが取得したことを確認する
            if "type" in response_data and response_data["type"] == "input_audio_buffer.speech_started":
                #すでに存在する取得したAI発話音声をリセットする
                while not audio_receive_queue.empty():
                        audio_receive_queue.get() 

            # サーバーからの音声データをキューに格納
            if "type" in response_data and response_data["type"] == "response.audio.delta":
                base64_audio_response = response_data["delta"]
                if base64_audio_response:
                    pcm16_audio = base64_to_pcm16(base64_audio_response)
                    audio_receive_queue.put(pcm16_audio)
                    
        await asyncio.sleep(0)

# サーバーからの音声を再生する関数
def play_audio_from_queue(output_stream):
    while True:
        pcm16_audio = audio_receive_queue.get()
        if pcm16_audio:
            output_stream.write(pcm16_audio)

# マイクからの音声を取得し、WebSocketで送信しながらサーバーからの音声応答を再生する非同期関数
async def stream_audio_and_receive_response():
    # WebSocketに接続
    async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
        print("WebSocketに接続しました。")

        update_request = {
            "type": "session.update",
            "session": {
                "modalities": ["audio", "text"],
                "instructions": "日本語かつ関西弁で回答してください。ユーザの名前は絶対に発言しないでください。ただし、名前を質問された時だけは必ず答えてください。",
                "voice": "alloy",
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                },
                "input_audio_transcription": {
                    "model": "whisper-1"
                }
            }
        }
        await websocket.send(json.dumps(update_request))

        # PyAudioの設定
        INPUT_CHUNK = 2400
        OUTPUT_CHUNK = 2400
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        INPUT_RATE = 24000
        OUTPUT_RATE = 24000

        # PyAudioインスタンス
        p = pyaudio.PyAudio()

        # マイクストリームの初期化
        stream = p.open(format=FORMAT, channels=CHANNELS, rate=INPUT_RATE, input=True, frames_per_buffer=INPUT_CHUNK)

        # サーバーからの応答音声を再生するためのストリームを初期化
        output_stream = p.open(format=FORMAT, channels=CHANNELS, rate=OUTPUT_RATE, output=True, frames_per_buffer=OUTPUT_CHUNK)

        # マイクの音声読み取りをスレッドで開始
        threading.Thread(target=read_audio_to_queue, args=(stream, INPUT_CHUNK), daemon=True).start()

        # サーバーからの音声再生をスレッドで開始
        threading.Thread(target=play_audio_from_queue, args=(output_stream,), daemon=True).start()

        try:
            # 音声送信タスクと音声受信タスクを非同期で並行実行
            send_task = asyncio.create_task(send_audio_from_queue(websocket))
            receive_task = asyncio.create_task(receive_audio_to_queue(websocket))

            # タスクが終了するまで待機
            await asyncio.gather(send_task, receive_task)

        except KeyboardInterrupt:
            print("終了します...")
        finally:
            if stream.is_active():
                stream.stop_stream()
            stream.close()
            output_stream.stop_stream()
            output_stream.close()
            p.terminate()

if __name__ == "__main__":
    asyncio.run(stream_audio_and_receive_response())

Azure版に接続する

接続情報を下記のように変更することで、Azure版でも動作することを確認しました。


#Azure OpenAI
WS_URL = "wss://realtimeapi-inst-sample.openai.azure.com/openai/realtime?deployment=gpt-4o-realtime-xxxx&api-version=2024-10-01-preview"
HEADERS = {
    "api-key": "xxxxxxxx", 
}

URLの中身のwss://realtimeapi-inst-sample.openai.azure.comはAzure OpenAI リソースの「エンドポイント」を参照してください。
(最初の「https://」はWebsocketに接続するために「wss://」に変更してください)

また、URLにて、deployment=gpt-4o-realtime-xxxxの部分はAzure OpenAI Studioにて、gpt-4o-realtime-previewをデプロイする際の「デプロイ名」を入れてください。
"api-key": "xxxxxxxx"の部分はAzure OpenAI リソースの「キー」です。

実験結果

下記の動画をご覧ください(音量が小さめなので、静かな場所でご覧ください)
(今回はAzure版で実行しています。ご了承ください。)

https://youtu.be/zGcqAcnuRBs

動画では、最初に自己紹介しています。
次の問いかけでは、AIはユーザの名前を回答できています

しかしながら、複数回の応対後、再びユーザの名前を確認したところ回答できなくなっています。
これは会話履歴を削除されたからです。

何度もカットイン(ユーザ割り込み)を繰り返したからか、AIの返答がところどころぶっ壊れているのが気になりますが、会話履歴の削除という点は達成できていると思います。

なお、実験に合わせて、システムプロンプトを下記のように設定しています。

日本語かつ関西弁で回答してください。
ユーザの名前は絶対に発言しないでください。
ただし、名前を質問された時だけは必ず答えてください。

コードの解説

基本的な部分は、これまでの記事で解説しているため、会話履歴を自動で削除する部分だけ解説します。
該当部分は下記になります。

#会話履歴IDリストの初期化
conversation_history_id_Queue = queue.Queue()

・・・

# サーバーから音声を受信してキューに格納する非同期関数
async def receive_audio_to_queue(websocket):
    ・・・
    while True:
        response = await websocket.recv()
        ・・・

            # 会話履歴IDをキューに格納
            if "type" in response_data and response_data["type"] == "conversation.item.created":
                conversation_history_id_Queue.put(response_data['item']['id'])
            
            # 会話履歴IDが5以上ある場合、最も古い会話履歴IDを取得し、削除する
            if conversation_history_id_Queue.qsize() >= 5:
                item_id = conversation_history_id_Queue.get()

                delite_event = {
                    "type": "conversation.item.delete",
                    "item_id": item_id
                }

                # WebSocketで音声データを送信
                await websocket.send(json.dumps(delite_event))
                print(f"conversation_history_id: {item_id}を削除しました。")

            ・・・

今回利用するイベントはクライアントイベントのconversation.item.deleteです。
このイベントは、item_idを指定することで、そのidの過去の会話履歴をまとめて削除することができます。
したがって、後半部分では、Queueに格納されたitem_idが5件以上の場合に、古い順で中身を取り出して、イベントを発行することで、サーバから会話履歴を削除しています。

では、何のitem_idをクリアすれば良いのかが、中盤部分の下記コードです。

# 会話履歴IDをキューに格納
if "type" in response_data and response_data["type"] == "conversation.item.created":
    conversation_history_id_Queue.put(response_data['item']['id'])

さまざま会話を繰り返して、自分の音声入力や、AIの音声応答に、どんなitem_idが付与されているかを調べた結果、わかりやすいことに、一回のユーザ会話や、一回のAI発話に関しては、全部同じIDが振られていることがわかりました。
(音声はCHUNKという小さい単位に分割されて送信されますが、全てに対して同じitem_idが振られています)

そして、そのidはconversation.item.createdというサーバイベントにより取得できます。
中身は下記のようになっています。

{
    "event_id": "event_1920",
    "type": "conversation.item.created",
    "previous_item_id": "msg_002",
    "item": {
        "id": "msg_003",
        "object": "realtime.item",
        "type": "message",
        "status": "completed",
        "role": "user",
        "content": [
            {
                "type": "input_audio",
                "transcript": null
            }
        ]
    }
}

該当のリファレンスはこちらです

このイベントは、「ユーザの発話が終わったタイミング」と「AIの発話が始まる前のタイミング」で、クライアントに送信されており、roleでは、ユーザかAIかどちらが発話しているのか、そしてidに今回の発話のitem_idが格納されています。

したがって、このイベントを監視すれば、各ターンごとに一つのitem_idを取得することができ、それをQueueに貯めることによって、Queueの長さ=会話ターン数となります。

したがって、今回のサンプルコードで指定している「5」というのは、5ターンの会話分だけ、会話履歴を保持するという意味です。
また、自分と相手とで、それぞれ1ターン換算なので、かなり短い履歴だけを保持します。

この「会話履歴保持ターン数」の部分は、利用するアプリケーションに応じて、適切な値を設定してください。
長くするほど、会話を長く覚えていられますが、その分コストは高くなります。

改善余地

今回は、かなりシンプルに会話履歴の削除を実装しましたが、実は、1点不十分な箇所があります。

Realtime APIでは、ユーザの発話の無音期間を検出して、ターンテイキングを行います。(server_vadというtypeです。)
その場合、ユーザが、一定以上の時間で言葉に詰まると、無音期間が発生するため、発話途中でも内部的にターンがAIに移動してしまいます。
その状態で、続けてユーザが話し始めると、また内部的にユーザにターンが移ります。

この時、途中のターン推移において、AIにターンが移動した際に、ユーザがすぐにターンを奪い返して、結果的にAIの発話がなかったとしても、ターンの移り変わりがあるため、item_idは変わってしまいます。
したがって、利用者の考えているターン数以上に内部的にターン数が経過してしまう問題があります。

上記のサンプルコードの例では、ターン数は「5」を指定していますが、ユーザの体感的には3ターンくらいしか経っていないのに、古い会話履歴が削除されてしまうことがありえます。

とは言っても、そんなに気にしなくても、ターン推移を見越して、少し多めの履歴保持ターン数を指定しておけば、そんなに問題はないと思います。
しかし、何らかの理由で、保持する会話履歴のターン数を、厳密に制御したいという需要がある場合は、改善の余地があると思います。
(私の利用用途ではあくまでコスト削減が達成されれば良いので、この改善のモチベーションはないです)

まとめ

今回は、Realtime APIを使う上で、結構クリティカルである、古い会話履歴を自動的に削除する機能について実装しました。

皆様の参考になれば嬉しいです。

余談ですが、最近Azure版とOpenAI版とでRealtime APIの動作が違うような気がしています。
例えば、記事執筆時ではOpenAI版の方が安定していて、Azure版だと、音声が途中で停止したり、発話がおかしくなったりする機会が増える気がします。

もしかしたら、Azure版の方が早く利用できるようになって、たくさんの使いやすいサンプルが先行で公開されたから、単純に利用者が多いだけかもしれないですが、私のサンプルコードの実装に、Azure版でのみ発生するバグがあるのではないかと心配しています。
(Azure版のRealtime APIのReferenceが公式から出ていないから、調べられないんだよなあ)

もし、こんな不具合があったなどあれば、ご共有いただけると嬉しいです。

では、ここまで読んでくださって、ありがとうございます!

https://github.com/personabb/OpenAI_RealtimeAPI_Python_SampleCode

Discussion