🥰

Open AI Realtime APIのPythonサンプルコードを作成

2024/10/04に公開

はじめに

タイトルの通りです。
AzureのAPIは先行で公開しておりましたが、ついにOpenAIからも、APIが解放されたので、取り急ぎpythonでサンプルコードを作成してみました。
(公式からはPythonのサンプルコードが公開されていないので、java scriptのコードを参考にしてpythonに書き換える形で作成しました)

サンプルコードなので、できるだけ1ファイルかつ、最小限の機能で実装しました。
なのでわかりやすいとは思います。
(その代わり、システムプロンプトなどはベタ書きしてます)

リアルタイム音声対話ができるRealtime APIをpythonで動かしたいけど、pythonのサンプルコードがなくて困っているという人向けに参考になれば幸いです。

(追記)
サンプルコードを少し変更することで、Azure版にも接続できることを確認しました。
サンプルコードの章にて変更箇所を紹介します。

(2024年10月6日 AM11:52 追記)
AI発話中にこちらが発話しても、発話を止めてくれなかった問題ですが、解決方法を見つけましたので、改めて記事にする予定です。(今日は時間がなく・・・申し訳ございません)
(ちょっとサンプルコードからは離れると思ったので、別記事の予定です)
おそらく10月7日になると思います。
記事書きましたら、こちらにもリンクを貼ります。

下記にて取り急ぎコードだけ共有します。コードの解説は後日更新します。
https://zenn.dev/asap/articles/563500af4649da

なお、実践上は上記で紹介するコードを利用するのをお勧めしますが、Realtime APIをpythonで利用するにあたっての仕様などの理解は、本記事で紹介するサンプルコードを利用する方が理解しやすいと思います。

参考

下記の公式ドキュメントを参考にしながら組みました。(というかそれ以外に情報がない・・・)

https://platform.openai.com/docs/guides/realtime?text-generation-quickstart-example=stream
https://platform.openai.com/docs/api-reference/realtime-client-events

サンプルコード

取り急ぎ下記がサンプルコードです。
下記コマンドで実行できるとおもいます。
~/.zshrc~/.bashrcにOPENAI_API_KEYが記録されていることを前提にしていますが、わからなければ直接コード内にAPIKEYを書き下してもよいです。推奨はしません。)

pip install websockets soundfile numpy pyaudio
python main.py

実行したら、AIが発話するので、その後こちらはマイクに向かって発話すれば、会話を始めることができます。

main.py
import asyncio
import websockets
import pyaudio
import numpy as np
import base64
import json
import wave
import io
import os

API_KEY = os.environ.get('OPENAI_API_KEY')
#わからない人は、上の行をコメントアウトして、下記のように直接API KEYを書き下してもよい
#API_KEY = "sk-xxxxx"

# WebSocket URLとヘッダー情報
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
HEADERS = {
    "Authorization": "Bearer "+ API_KEY, 
    "OpenAI-Beta": "realtime=v1"
}

# PCM16形式に変換する関数
def base64_to_pcm16(base64_audio):
    audio_data = base64.b64decode(base64_audio)
    return audio_data

# 音声を送信する非同期関数
async def send_audio(websocket, stream, CHUNK):
    def read_audio_block():
        """同期的に音声データを読み取る関数"""
        try:
            return stream.read(CHUNK, exception_on_overflow=False)
        except Exception as e:
            print(f"音声読み取りエラー: {e}")
            return None

    print("マイクから音声を取得して送信中...")
    while True:
        # マイクから音声を取得
        audio_data = await asyncio.get_event_loop().run_in_executor(None, read_audio_block)
        if audio_data is None:
                continue  # 読み取りに失敗した場合はスキップ
        
        # PCM16データをBase64にエンコード
        base64_audio = base64.b64encode(audio_data).decode("utf-8")

        audio_event = {
            "type": "input_audio_buffer.append",
            "audio": base64_audio
        }

        # WebSocketで音声データを送信
        await websocket.send(json.dumps(audio_event))

        await asyncio.sleep(0)

# サーバーから音声を受信して再生する非同期関数
async def receive_audio(websocket, output_stream):
    print("assistant: ", end = "", flush = True)
    loop = asyncio.get_event_loop()
    while True:
        # サーバーからの応答を受信
        response = await websocket.recv()
        response_data = json.loads(response)

        # サーバーからの応答をリアルタイム(ストリーム)で表示
        if "type" in response_data and response_data["type"] == "response.audio_transcript.delta":
            print(response_data["delta"], end = "", flush = True)
        # サーバからの応答が完了したことを取得
        elif "type" in response_data and response_data["type"] == "response.audio_transcript.done":
            print("\nassistant: ", end = "", flush = True)

        # サーバーからの応答に音声データが含まれているか確認
        if "delta" in response_data:
            if response_data["type"] == "response.audio.delta":
                base64_audio_response = response_data["delta"]
                if base64_audio_response:
                    pcm16_audio = base64_to_pcm16(base64_audio_response)
                    #音声データがある場合は、出力ストリームから再生
                    await loop.run_in_executor(None, output_stream.write, pcm16_audio)

# マイクからの音声を取得し、WebSocketで送信しながらサーバーからの音声応答を再生する非同期関数
async def stream_audio_and_receive_response():
    # WebSocketに接続
    async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
        print("WebSocketに接続しました。")

        # 初期リクエスト (モダリティ設定)
        init_request = {
            "type": "response.create",
            "response": {
                "modalities": ["audio", "text"],
                "instructions": "ユーザーをサポートしてください。",
                "voice": "echo" #"alloy", "echo", "shimmer"
            }
        }
        await websocket.send(json.dumps(init_request))
        print("初期リクエストを送信しました。")
        
        # PyAudioの設定
        CHUNK = 2048          # マイクからの入力データのチャンクサイズ
        FORMAT = pyaudio.paInt16  # PCM16形式
        CHANNELS = 1          # モノラル
        RATE = 24000          # サンプリングレート(24kHz)

        # PyAudioインスタンス
        p = pyaudio.PyAudio()

        # マイクストリームの初期化
        stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)

        # サーバーからの応答音声を再生するためのストリームを初期化
        output_stream = p.open(format=FORMAT, channels=CHANNELS, rate=RATE, output=True, frames_per_buffer=CHUNK)

        print("マイク入力およびサーバーからの音声再生を開始...")

        try:
            # 音声送信タスクと音声受信タスクを非同期で並行実行
            send_task = asyncio.create_task(send_audio(websocket, stream, CHUNK))
            receive_task = asyncio.create_task(receive_audio(websocket, output_stream))

            # タスクが終了するまで待機
            await asyncio.gather(send_task, receive_task)

        except KeyboardInterrupt:
            # キーボードの割り込みで終了
            print("終了します...")
        finally:
            # ストリームを閉じる
            if stream.is_active():
                stream.stop_stream()
            stream.close()
            output_stream.stop_stream()
            output_stream.close()
            p.terminate()


if __name__ == "__main__":
    asyncio.get_event_loop().run_until_complete(stream_audio_and_receive_response())

Azure版に接続する

接続情報を下記のように変更し、"type": "response.create",を使わない形に変更することで、Azure版でも動作することを確認しました


#Azure OpenAI
WS_URL = "wss://realtimeapi-inst-sample.openai.azure.com/openai/realtime?deployment=gpt-4o-realtime-xxxx&api-version=2024-10-01-preview"
HEADERS = {
    "api-key": "xxxxxxxx", 
}

URLの中身のwss://realtimeapi-inst-sample.openai.azure.comはAzure OpenAI リソースの「エンドポイント」を参照してください。
(最初の「https://」はWebsocketに接続するために「wss://」に変更してください)

また、URLにて、deployment=gpt-4o-realtime-xxxxの部分はAzure OpenAI Studioにて、gpt-4o-realtime-previewをデプロイする際の「デプロイ名」を入れてください。
"api-key": "xxxxxxxx"の部分はAzure OpenAI リソースの「キー」です。

async def stream_audio_and_receive_response():
    # WebSocketに接続
    async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
        print("WebSocketに接続しました。")

        update_request = {
            "type": "session.update",
            "session": {
                "modalities": ["audio", "text"],
                "instructions": "日本語かつ関西弁で回答してください。",
                "voice": "alloy", #"alloy", "echo", "shimmer"
                "turn_detection": {
                    "type": "server_vad",
                    "threshold": 0.5,
                },
                "input_audio_transcription":{
                    "model": "whisper-1"
                }
            }
        }
        await websocket.send(json.dumps(update_request))
        
        
        print("初期リクエストを送信しました。")

実行結果

実行結果を表示します。(音量小さめなので、静かな場所でのご視聴をお勧めします)

https://youtu.be/JjcBQxrtU3o

相手の発話開始までの応答時間はめちゃくちゃ早かったです。1.5秒かかってないんじゃないかな
こんなすごいのが出てくると、今まで、音声認識やらLLMやら合成音声を繋げて頑張ってた自分の自信が完全になくなりますね。

ちなみに1ドルちょっと使ったら、すぐにRate Limitになりました。
tier1だからというのもあると思いますが・・・
→一瞬Rate Limitになりましたが、記事書き終わった頃には解消されていました。結構早かったです。

詰まった点

基本的には下記の記事をよく読めばわかる内容ですが、私はつまづいたので忘備録も兼ねて
https://platform.openai.com/docs/api-reference/realtime-client-events

とにかく重要なのは下記の部分

audio_event = {
    "type": "input_audio_buffer.append",
    "audio": base64_audio
}

今回のように、入出力をストリーミングで処理する場合は、"type": "input_audio_buffer.appendを指定する必要があります。

この部分を下記のように書いていると一生うまくいきません(1敗)

event = {
  type: 'conversation.item.create',
  item: {
    type: 'message',
    role: 'user',
    content: [
      {
        type: 'input_audio',
        audio: base64AudioData
      }
    ]
  }

あとは単純にpythonの実装の話ですが、pyaudioを利用する場合は、入出力で同じスレッドを利用するとぶつぶつした音声になってしまうので、ちゃんとスレッドを分けつつ、時間がかかる処理は非同期で実施することをお勧めします(1敗)

コードの解説

簡単にコードの解説をしていきます。
上記のコードは、OpenAIのモデルにWebsocketで接続する部分と、PyAudioによる入出力音声の制御の主に二つの処理によって構成されていますが、今回は、WebsocketでOpenAIのサーバと通信する部分を主に説明します。

(解説のために順番などはバラバラになっています。実際の処理は実際のコードをご覧ください)

Websocketへの接続

最初の接続は下記のようなコードで達成されます。

API_KEY = os.environ.get('OPENAI_API_KEY')

# WebSocket URLとヘッダー情報
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
HEADERS = {
    "Authorization": "Bearer " + API_KEY, 
    "OpenAI-Beta": "realtime=v1"
}

# WebSocketに接続
async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
    print("WebSocketに接続しました。")

API_KEYを環境変数から取得し、 OpenAIが指定するサーバURLと認証情報を付与したHEADERSを利用して、Websocket通信で接続しています。

最初のリクエスト

接続後、最初のリクエストは下記のように実施します。

async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
    print("WebSocketに接続しました。")

    # 初期リクエスト (モダリティ設定)
    init_request = {
        "type": "response.create",
        "response": {
            "modalities": ["audio", "text"],
            "instructions": "関西弁で回答してください。",
            "voice": "echo" #"alloy", "echo", "shimmer"
        }
    }

    await websocket.send(json.dumps(init_request))

最初の2行は前節と共通で、Websocket通信が接続している範囲内で処理をしてくださいという意味です。

その上で、最初は"type": "response.create"のtypeを持つJSONを作成します。
詳細か下記をご覧ください。
https://platform.openai.com/docs/api-reference/realtime-client-events/response-create

公式のドキュメントにも記載されていますが、下記の要素を指定できます・

{
    "event_id": "event_234",
    "type": "response.create",
    "response": {
        "modalities": ["text", "audio"],
        "instructions": "Please assist the user.",
        "voice": "alloy",
        "output_audio_format": "pcm16",
        "tools": [
            {
                "type": "function",
                "name": "calculate_sum",
                "description": "Calculates the sum of two numbers.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "a": { "type": "number" },
                        "b": { "type": "number" }
                    },
                    "required": ["a", "b"]
                }
            }
        ],
        "tool_choice": "auto",
        "temperature": 0.7,
        "max_output_tokens": 150
    }
}

今回はFunction Callingなどは利用しないため、最低限の指定に収めていますが、temperaturemax_output_tokensなどのよく見る要素も、最初に指定できます。

本記事ではmodalitiesinstructionsvoiceを主に設定しており、そのほかはDefaultの値(つまり上に記載した全文の情報)を利用しています。

modalitiesは、何の種類のモダリティを利用するかを指定しており、今回はSystem PromptにはTextデータを、会話にはAudioデータを利用するため、両方指定しています。

instructionsは俗にいうSystem Promptです。ここでAIにどのように振る舞ってほしいかを指定することができます。
今回はサンプルなので「関西弁で回答してください。」の一行だけですが、普通に長いPromptを投入することも可能です。

voiceは対話でモデルが出す声のキャラクターです。具体的に現在どのモデルが利用できるのかが調べてもわからなかったため、PlayGroundで選択できるキャラクター(3名)のうち一人を選んでいます。
PlayGroundでは名前の先頭が大文字で書かれていますが、JSONでは小文字で書く必要があることに注意してください。

最初に発話させたくなかったら

ちなみに、このコードでは、相手からの発話を受けてこちらが話し始めるように設計していますが、こちらから話しかけたい場合は、代わりに下記のように設定すると良いです。

async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
    print("WebSocketに接続しました。")

    update_request = {
        "type": "session.update",
        "session": {
            "modalities": ["audio", "text"],
            "instructions": "日本語かつ関西弁で回答してください。",
            "voice": "alloy", #"alloy", "echo", "shimmer"
        }
    }
    await websocket.send(json.dumps(update_request))

typeとして、session.updateを利用することで、AIからの返答をさせずに、System Promptなどの設定が可能です。(若干response.createと書き方が変わるので、ドキュメントを参考にしてください)

自分が発話した音声を送信する

発話した音声を送信する方法はいくつかありますが、音声対話を行う場合は、ストリーミング(音声を細かいCHUNK単位で区切って、リアルタイムに送信する)で音声を送信する必要があります。
公式のドキュメントではあまり触れられていないので、私も沼りましたが、結論、下記のように送信すれば良いです。

# PCM16データをBase64にエンコード
base64_audio = base64.b64encode(audio_data).decode("utf-8")

audio_event = {
    "type": "input_audio_buffer.append",
    "audio": base64_audio
}

# WebSocketで音声データを送信
await websocket.send(json.dumps(audio_event))

当然、上記もWebsocketが接続している中で実行してください。

まず、前提としてaudio_dataはPCM16というフォーマットの音声データのチャンクで、サンプリングレートは24,000Hzであることとします。
(そのほかの設定でも上手くいくのかもしれないですが、私はよく知らないです)
今回、私は音声のチャンクサイズを2048としています。おおよそ0.085秒程度の音声データです。

今回は音声データをそのままWebsocketで送信できるわけではなく、一度、Base64形式にエンコードしたのちにJSONの要素として送る必要があるため、上記のような実装コードになっています。

また、Websocketで送るJSONのtype"type": "input_audio_buffer.append"を利用する必要があります。
こちらは下記の公式ドキュメントをご覧ください。
https://platform.openai.com/docs/api-reference/realtime-client-events/input-audio-buffer-append

ここで、audioには、エンコードされた音声のチャンクが入ります。
これをループして、マイク入力音声を少しずつ、全てWebsocketで送信していることになります。

AIが発話した音声を受信する

受信に関しては下記のように取得できます。

response = await websocket.recv()
response_data = json.loads(response)

もちろん、Websocketが接続されている状態で実行してください。
OpenAIからはJSON形式でデータが返ってくるので、それを辞書型としてPythonで処理できるようにロードしています。

取得後のデータの中身の処理は、実装するアプリケーションに依存しますが、音声対話では、最低限取得した音声を再生できることが求められます。

それは、下記で実行されます。

if "delta" in response_data:
    if response_data["type"] == "response.audio.delta":
        base64_audio_response = response_data["delta"]
        if base64_audio_response:
            pcm16_audio = base64.b64decode(base64_audio_response)
            #音声データがある場合は、出力ストリームから再生
            await loop.run_in_executor(None, output_stream.write, pcm16_audio)

音声自体も、一度に全ての発話音声データが送られてくるわけではなく、一定のCHUNKに分割して返却されます。
返却されるJSONのtype"response.audio.delta"となります。
公式のドキュメントは下記になります。
https://platform.openai.com/docs/api-reference/realtime-server-events/response-audio-delta

このJSONのdeltaにBase64でEncodeされた音声データが格納されています。
したがって、デコードすれば音声データになるため、デコード後、PyAudioで非同期に再生しています。

AIが発話した音声の文字起こしをストリーミングで取得する

音声対話システムを実装するなら、AIが発話した内容の音声だけではなく、テキストでも表示がしたいです。
テキストを取得するには下記を実行します。


print("assistant: ", end = "", flush = True)
・・・
while True:
    ・・・
    # サーバーからの応答をリアルタイムに表示
    if "type" in response_data and response_data["type"] == "response.audio_transcript.delta":
        print(response_data["delta"], end = "", flush = True)
    # サーバからの応答が完了したことを取得
    elif "type" in response_data and response_data["type"] == "response.audio_transcript.done":
        print("\nassistant: ", end = "", flush = True)

ストリーミングでテキストを取得するには、response.audio_transcript.deltatypeのJSONの中身を取得して、それをコンソールに表示します。
また、ストリーミングで取得するテキストが一段落ついたらresponse.audio_transcript.donetypeのJSONが発行されるため、それを取得したら、次のテキストを表示する準備をしています。

実装できなかった部分

AIが話しているときに、こちらが話し始めても、うまく音声を止めてくれませんでした。
(止めてくれることもあれば、そのまま話し続けることもある)
上記のサンプルコードだけでは、その部分が弱いみたいです。
上手な実装を知っている方がいたら教えてください。

上記に関しても実装できました!
下記にて取り急ぎコードだけ共有します。コードの解説は後日更新します。
https://zenn.dev/asap/articles/563500af4649da

なお、実践上は上記で紹介するコードを利用するのをお勧めしますが、Realtime APIをpythonで利用するにあたっての仕様などの理解は、本記事で紹介するサンプルコードを利用する方が理解しやすいと思います。

あと、キャラクターの音声を変える部分は記載してないですので、これもRate Limitが解除されたら記載します。
こちらは実装して、更新しました!

init_request = {
    "type": "response.create",
    "response": {
        "modalities": ["audio", "text"],
        "instructions": "ユーザーをサポートしてください。",
        "voice": "echo" #"alloy", "echo", "shimmer"
    }
}

上記のinstructionsの部分が、最初にAIに与えるプロンプトになります。system promptのようなものです。
またvoiceのところを変更すると、声を変えることができます。
誰の声が使えるのかよくわからなかったですが、PlayGroundにて利用できる名前は全て動作確認できました。
もっと増えるといいですね。

まとめ

取り急ぎ、pythonで動かしたいけどサンプルコードがなくて困っているという方向けに、実装したものを公開します。
Rate Limitが解除されたら、もっといろいろ試して機能追加できたらいいなと思っています。

今回の記事の本題とは異なるため紹介しませんでしたが、実はユーザが発話した音声の文字起こしを取得する機能もあります。したがってRAGのQueryに利用すれば、Realtime APIとRAGを組み合わせることもできそうです。(試していない+文字起こしの処理速度は遅いので、応答時間が長くなると思うが)

下記に置いているコードは、ユーザが発話した音声の文字起こしも取得しています。(取得しただけ)
https://github.com/personabb/OpenAI_RealtimeAPI_Python_SampleCode

ありがとうございました!

Discussion