😀

Python × OpenAI Realtime API で作る「Push-to-Talk」な音声対話

に公開

Python × OpenAI Realtime API で作る「Push-to-Talk」な音声対話

— スペースキーを押している間だけ喋る!実装の勘所を解説 —

はじめに

OpenAI の Realtime API は「AI とリアルタイムに喋れる」点が魅力です。一方で、VAD(発話区間検出)の調整や割り込みが気になる場面もあります。

そこで本記事では、スペースキーを押している間だけマイク入力を送る Push-to-Talk 方式で、音声対話を実装します。

実装するフローは次のとおりです。

  1. SPACE 押下中:マイク音声をリアルタイム送信
  2. SPACE 解放:発話を確定(Commit)してレスポンスを要求
  3. 応答受信:返ってきた音声をストリーミング再生

全体構成と登場人物

今回のシステムを構成する主なライブラリと役割は次のとおりです。

要素 役割 メモ
sounddevice マイク入力 / スピーカー出力 Python で音を扱うならコレ
websocket-client API との通信 永続的な接続を維持する
threading.Event 状態管理 録音中などの状態をスレッド間で共有する
queue.Queue 再生バッファ 受信音声を溜める

まずは必要なライブラリをインストールします。

pip install websocket-client sounddevice numpy pynput

実装コード全文

環境変数 OPENAI_API_KEY をセットして実行します。

import os, json, base64, threading, queue, time
import numpy as np
import sounddevice as sd
import websocket
from pynput import keyboard

# APIへの接続先URL
URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime"
# 環境変数からAPIキーを取得してヘッダーを設定
HEADERS = ["Authorization: Bearer " + os.environ["OPENAI_API_KEY"]]

# 音声設定
INPUT_SR = 16000       # 入力サンプリングレート
OUTPUT_SR = 24000      # 出力サンプリングレート(モデル出力に合わせて調整)
FRAME_MS = 20          # 1フレームあたりのミリ秒数
FRAME_SAMPLES = int(INPUT_SR * FRAME_MS / 1000) # 1フレームあたりのサンプル数

# 音声再生用のキュー
play_q: "queue.Queue[np.ndarray]" = queue.Queue()
# 録音中かどうかのフラグ
recording_flag = threading.Event()
# WebSocketの準備完了フラグ
ws_ready = threading.Event()

def send(ws, obj: dict):
    """
    WebSocket経由でJSONデータを送信するヘルパー関数
    """
    ws.send(json.dumps(obj))

def audio_player():
    """
    音声再生専用のスレッド関数
    play_qから音声データを取得して再生する
    """
    with sd.OutputStream(samplerate=OUTPUT_SR, channels=1, dtype="int16") as out:
        while True:
            chunk = play_q.get()
            if chunk is None:
                break
            out.write(chunk)

def on_open(ws):
    """
    WebSocket接続確立時のコールバック
    """
    ws_ready.set()

def on_message(ws, message: str):
    """
    サーバーからのメッセージ受信時のコールバック
    """
    evt = json.loads(message)
    t = evt.get("type")

    # セッション作成完了時の処理
    if t == "session.created":
        # GAでは session.update に type を入れる必要があります
        # セッション設定を更新(音声出力の設定など)
        send(ws, {
            "type": "session.update",
            "session": {
                "type": "realtime",
                "model": "gpt-realtime",
                "turn_detection": None,  # Push-to-Talkの可否
                # システムプロンプトの設定
                #"instructions": "あなたは東京駅の案内アシスタントです。",
                # 音声出力(voice指定は任意)
                "audio": {"output": {"voice": "marin"}},
            }
        })
        return

    # サーバーからの音声データ(デルタ)受信時の処理
    # GAの音声デルタ名:response.output_audio.delta
    if t == "response.output_audio.delta":
        b = base64.b64decode(evt["delta"])
        pcm = np.frombuffer(b, dtype=np.int16)
        play_q.put(pcm) # 再生キューに追加
        return

    # (任意)入力音声の文字起こし完了イベント
    if t == "conversation.item.input_audio_transcription.completed":
        print("\n[you]", evt.get("transcript", ""))
        return

def mic_loop(ws):
    """
    マイク入力を処理するメインループ
    """
    # 録音開始前にバッファを空に
    send(ws, {"type": "input_audio_buffer.clear"})

    def callback(indata, frames, time_info, status):
        """
        sounddeviceのInputStreamからのコールバック
        録音フラグが立っている場合のみデータを送信
        """
        if not recording_flag.is_set():
            return
        pcm16 = indata[:, 0].copy()  # int16 mono
        b64 = base64.b64encode(pcm16.tobytes()).decode("ascii")
        # 音声データをバッファに追加
        # appendは input_audio_format に従う必要あり
        send(ws, {"type": "input_audio_buffer.append", "audio": b64})

    # マイク入力ストリームの開始
    with sd.InputStream(
        samplerate=INPUT_SR,
        channels=1,
        dtype="int16",
        blocksize=FRAME_SAMPLES,
        callback=callback,
    ):
        print("Hold SPACE to talk. Release SPACE to get reply. (Ctrl+C to exit)")
        while True:
            time.sleep(0.05)

def main():
    # 音声再生スレッドの開始
    threading.Thread(target=audio_player, daemon=True).start()

    # WebSocket接続の設定
    ws = websocket.WebSocketApp(
        URL,
        header=HEADERS,
        on_open=on_open,
        on_message=on_message,
    )
    # WebSocketスレッドの開始
    threading.Thread(target=lambda: ws.run_forever(), daemon=True).start()
    ws_ready.wait() # 接続完了待ち

    # Push-to-talk: SPACE押下で録音、離したら commit → response.create
    def on_press(key):
        if key == keyboard.Key.space and not recording_flag.is_set():
            recording_flag.set() # 録音開始
            send(ws, {"type": "input_audio_buffer.clear"})

    def on_release(key):
        if key == keyboard.Key.space and recording_flag.is_set():
            recording_flag.clear() # 録音停止

            # commit:空バッファだとエラー
            # バッファ内の音声をコミット(会話アイテムとして確定)
            send(ws, {"type": "input_audio_buffer.commit"})

            # commitだけでは返答は出ないので response.create が必要
            # レスポンス生成をリクエスト
            send(ws, {
                "type": "response.create",
                "response": {"output_modalities": ["audio"]},
            })

    # キーボードイベントのリスナー開始
    listener = keyboard.Listener(on_press=on_press, on_release=on_release)
    listener.start()

    # マイク入力ループへ(メインスレッドはここでブロッキングされる)
    mic_loop(ws)

if __name__ == "__main__":
    main()

コードのポイント解説

1. send() ヘルパー関数

Realtime API は JSON 送受信が基本です。そこで json.dumps() を隠して、送信処理を読みやすくします。

def send(ws, obj: dict):
    ws.send(json.dumps(obj))

2. threading.Event() で状態を共有する

このアプリは「キーボード監視」「マイク入力」「WebSocket通信」が並行動作します。threading.Event を使うと、スレッド間の状態共有が簡単になります。

  • recording_flag:録音の ON/OFF
  • ws_ready:WebSocket 接続完了の待機

3. 音声送信は 20ms ごとに append する

sounddevice.InputStream の callback は一定間隔で呼ばれます。録音中だけ、PCM を Base64 化して input_audio_buffer.append を送ります。

def callback(indata, frames, time_info, status):
    if not recording_flag.is_set():
        return
    pcm16 = indata[:, 0].copy()
    b64 = base64.b64encode(pcm16.tobytes()).decode("ascii")
    send(ws, {"type": "input_audio_buffer.append", "audio": b64})

処理フローの可視化

「SPACEキーを押してから、AIが喋り出すまで」の流れを図解します。


実装におけるハマりポイント

1. commit だけでは返事が来ない

input_audio_buffer.commit は「ユーザーの発話を確定する」合図です。返答生成は別リクエストなので、必ず response.create を送ります。

2. 応答は delta で届く

応答音声は response.output_audio.delta として分割配信されます。queue.Queue に積み、別スレッドで順次再生すると低遅延になります。


まとめ

この実装は次の基本要素を一通り含みます。

  • input_audio_buffer.append(音声送信)
  • input_audio_buffer.commit / response.create(ターン制御)
  • response.output_audio.delta(ストリーミング再生)

このコードを起点に、GUI 追加、プロンプト設計、ログ保存などを拡張すると、音声対話アプリの土台として使えます。

Discussion