🤖

OpenAI Realtime API における文字起こし(Transcribe)処理についてまとめてみた

に公開

OpenAI Realtime API における文字起こし(Transcribe)処理についてまとめてみた

— Push-to-Talk 音声対話における入力音声の逐次文字起こし —

概要

本記事では、:contentReference[oaicite:0]{index=0} の Realtime API を用いて、
マイク入力音声をリアルタイムに文字起こし(transcribe)する処理を技術的に整理する。

対象とするシステムは、以下の特徴を持つ。

  • WebSocket による Realtime API 接続
  • 入力音声の 逐次文字起こし(streaming transcription)
  • Push-to-Talk(スペースキー押下中のみ録音)
  • 音声応答とテキスト応答の同時ストリーミング

本稿では特に、input_audio_transcription に関するイベント設計と処理フローに焦点を当てる。


全体構成

文字起こし処理は、次の要素で構成される。

要素 役割
WebSocket Realtime API との双方向通信
session.update セッション作成時に文字起こし設定を有効化
input_audio_buffer.append PCM 音声フレームの逐次送信
input_audio_buffer.commit 発話区間の確定
transcription events サーバー側文字起こし結果の受信
main.py
"""
main-transcribe.py

処理の概要:
1. OpenAI Realtime APIにWebSocketで接続。
2. セッション作成時(session.created)に `session.update` を送り、入力音声の文字起こし(input_audio_transcription)設定を有効化。
3. マイク入力を制御する `mic_loop` と、サーバーイベントを処理する `on_message` を並行動作。
4. スペースキー押下で録音開始(input_audio_buffer.append)、離すと最低100ms以上の音声がある場合のみコミット(input_audio_buffer.commit)して回答生成(response.create)を要求。
5. サーバーからは音声(delta)と同時に、ユーザー/エージェントのテキスト(conversation.item.input_audio_transcription.delta / response.output_audio_transcript.delta)が届く。
6. それらをストリーミング再生・表示する。
"""

import os
import json
import base64
import threading
import queue
import time

import numpy as np
import sounddevice as sd
import websocket
from pynput import keyboard
from dotenv import load_dotenv

load_dotenv()

# =========================
# 初期設定
# =========================
URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime"
HEADERS = ["Authorization: Bearer " + os.environ["OPENAI_API_KEY"]]

INPUT_SR = 16000
OUTPUT_SR = 24000
FRAME_MS = 20
FRAME_SAMPLES = int(INPUT_SR * FRAME_MS / 1000)

# commit 最低条件(100ms)
MIN_COMMIT_MS = 100
MIN_FRAMES = int(MIN_COMMIT_MS / FRAME_MS)  # 20msなら5
RELEASE_GRACE_SEC = 0.12  # release直後の取りこぼし防止

play_q: "queue.Queue[np.ndarray]" = queue.Queue()
recording_flag = threading.Event()
ws_ready = threading.Event()

# websocket-client はスレッド跨ぎsendが不安定になりやすいのでロック
ws_send_lock = threading.Lock()

# =========================
# commit 二重実行防止
# =========================

commit_inflight_lock = threading.Lock()
commit_inflight = False

def set_commit_inflight(v: bool) -> bool:
    global commit_inflight
    with commit_inflight_lock:
        if v and commit_inflight:
            return False
        commit_inflight = v
        return True

# =========================
# ストリーミング表示用の状態
# =========================
user_printing = False
agent_printing = False
user_transcript_buf = {}   # key: item_id -> text
agent_audio_tr_buf = ""    # assistantの音声トランスクリプト(output_audio_transcript.delta)

print_lock = threading.Lock()

# commit ガード用:今回発話で append できたフレーム数
utter_frames_lock = threading.Lock()
utter_frames = 0

def utter_frames_reset():
    global utter_frames
    with utter_frames_lock:
        utter_frames = 0

def utter_frames_inc():
    global utter_frames
    with utter_frames_lock:
        utter_frames += 1

def utter_frames_get() -> int:
    with utter_frames_lock:
        return int(utter_frames)

def send(ws, obj: dict):
    payload = json.dumps(obj)
    with ws_send_lock:
        ws.send(payload)

def safe_print(s: str, end: str = "\n"):
    with print_lock:
        print(s, end=end, flush=True)

def audio_player():
    with sd.OutputStream(samplerate=OUTPUT_SR, channels=1, dtype="int16") as out:
        while True:
            chunk = play_q.get()
            if chunk is None:
                break
            out.write(chunk)

def on_open(ws):
    ws_ready.set()

def on_error(ws, err):
    safe_print(f"\n[ws error] {err}")

def on_close(ws, status_code, msg):
    safe_print(f"\n[ws closed] code={status_code} msg={msg}")

def on_message(ws, message: str):
    global user_printing, agent_printing, agent_audio_tr_buf

    evt = json.loads(message)
    t = evt.get("type")

    # -------------------------
    # セッション初期化
    # -------------------------
    if t == "session.created":
        send(ws, {
            "type": "session.update",
            "session": {
                "type": "realtime",
                "model": "gpt-realtime",
                "audio": {"output": {"voice": "marin"}},
                "input_audio_transcription": {
                    "model": "gpt-4o-mini-transcribe",
                    # 必要なら: "language": "ja"
                },
            }
        })
        return

    # -------------------------
    # ユーザー音声:逐次文字起こし
    # -------------------------
    if t == "conversation.item.input_audio_transcription.delta":
        item_id = evt.get("item_id")
        delta = evt.get("delta", "")
        if not item_id:
            return

        prev = user_transcript_buf.get(item_id, "")
        now = prev + delta
        user_transcript_buf[item_id] = now

        if delta:
            if not user_printing:
                user_printing = True
                safe_print("\n[you] ", end="")
            safe_print(delta, end="")
        return

    if t == "conversation.item.input_audio_transcription.completed":
        transcript = evt.get("transcript", "")
        if user_printing:
            safe_print("")
        else:
            safe_print(f"\n[you] {transcript}")
        user_printing = False
        return

    # -------------------------
    # assistant:応答開始
    # -------------------------
    if t == "response.created":
        agent_audio_tr_buf = ""
        agent_printing = False
        return

    # -------------------------
    # assistant:音声トランスクリプト
    # -------------------------
    if t == "response.output_audio_transcript.delta":
        delta = evt.get("delta", "")
        if delta:
            if not agent_printing:
                agent_printing = True
                safe_print("\n[agent] ", end="")
            agent_audio_tr_buf += delta
            safe_print(delta, end="")
        return

    if t == "response.output_audio_transcript.done":
        if agent_printing:
            safe_print("")
        agent_printing = False
        return

    # -------------------------
    # assistant:音声デルタ
    # -------------------------
    if t == "response.output_audio.delta":
        b = base64.b64decode(evt["delta"])
        pcm = np.frombuffer(b, dtype=np.int16)
        play_q.put(pcm)
        return

    # -------------------------
    # 互換:assistant 音声トランスクリプト(来る環境では来る)
    # -------------------------
    if t == "response.audio_transcript.delta":
        delta = evt.get("delta", "")
        if delta:
            if not agent_printing:
                agent_printing = True
                safe_print("\n[agent] ", end="")
            agent_audio_tr_buf += delta
            safe_print(delta, end="")
        return

    if t == "response.audio_transcript.done":
        if agent_printing:
            safe_print("")
        agent_printing = False
        return

    # -------------------------
    # エラー
    # -------------------------
    if t == "error":
        safe_print("\n[server error] " + json.dumps(evt, ensure_ascii=False))
        return

def mic_loop(ws):
    send(ws, {"type": "input_audio_buffer.clear"})

    def callback(indata, frames, time_info, status):
        if status:
            # sounddevice の警告は最低限表示
            safe_print(f"\n[audio status] {status}")

        if not recording_flag.is_set():
            return

        utter_frames_inc()

        pcm16 = indata[:, 0].copy()
        b64 = base64.b64encode(pcm16.tobytes()).decode("ascii")
        send(ws, {"type": "input_audio_buffer.append", "audio": b64})

    with sd.InputStream(
        samplerate=INPUT_SR,
        channels=1,
        dtype="int16",
        blocksize=FRAME_SAMPLES,
        callback=callback,
    ):
        safe_print("Hold SPACE to talk. Release SPACE to get reply. (Ctrl+C to exit)")
        while True:
            time.sleep(0.05)

def main():
    threading.Thread(target=audio_player, daemon=True).start()

    ws = websocket.WebSocketApp(
        URL,
        header=HEADERS,
        on_open=on_open,
        on_message=on_message,
        on_error=on_error,
        on_close=on_close,
    )
    threading.Thread(target=lambda: ws.run_forever(), daemon=True).start()
    ws_ready.wait()

    def on_press(key):
        if key == keyboard.Key.space and not recording_flag.is_set():
            recording_flag.set()
            utter_frames_reset()
            send(ws, {"type": "input_audio_buffer.clear"})

    def on_release(key):
        if key == keyboard.Key.space and recording_flag.is_set():
            # ✅ ここでは recording_flag を落とさない(grace中にappendさせる)
            if not set_commit_inflight(True):
                return

            def delayed_commit():
                try:
                    # ✅ release直後のappend取りこぼし防止(この間 recording_flag は True)
                    time.sleep(RELEASE_GRACE_SEC)

                    # ✅ grace が終わったら append を止める
                    recording_flag.clear()

                    frames = utter_frames_get()
                    if frames < MIN_FRAMES:
                        safe_print(f"\n[skip] audio too short: frames={frames} (<{MIN_FRAMES})")
                        send(ws, {"type": "input_audio_buffer.clear"})
                        return

                    send(ws, {"type": "input_audio_buffer.commit"})
                    send(ws, {
                        "type": "response.create",
                        "response": {"output_modalities": ["audio"]},
                    })
                finally:
                    set_commit_inflight(False)

            threading.Thread(target=delayed_commit, daemon=True).start()

    listener = keyboard.Listener(on_press=on_press, on_release=on_release)
    listener.start()

    mic_loop(ws)

if __name__ == "__main__":
    main()


セッション初期化と文字起こし設定

WebSocket 接続後、サーバーから session.created イベントが届く。
このタイミングで 文字起こし機能を有効化する。

{
  "type": "session.update",
  "session": {
    "model": "gpt-realtime",
    "audio": {
      "output": { "voice": "marin" }
    },
    "input_audio_transcription": {
      "model": "gpt-4o-mini-transcribe"
    }
  }
}

ポイント

  • input_audio_transcription を指定しない場合、文字起こしイベントは送信されない
  • 音声認識モデルは 応答生成モデルとは独立して指定できる
  • 言語指定(language: "ja")も必要に応じて追加可能

マイク入力と音声バッファ送信

マイク入力は sounddevice.InputStream により取得する。
取得した PCM16 音声は 20ms 単位のフレームとして送信する。

send(ws, {
  "type": "input_audio_buffer.append",
  "audio": base64_pcm
})

設計意図

  • Realtime API は 生 PCM 音声のストリーミング入力を前提としている
  • クライアント側で音声認識は行わない
  • 音声はそのまま API に送信し、transcribe はサーバー側で実行する

Push-to-Talk と発話区間の確定

本実装では、スペースキー操作により発話区間を制御する。

録音開始

  • スペースキー押下
  • input_audio_buffer.clear を送信
  • 録音フラグを有効化

録音終了と commit

  • スペースキー解放
  • 一定時間(grace)待機後に録音停止
  • フレーム数が閾値以上の場合のみ commit
{ "type": "input_audio_buffer.commit" }

最低発話長のガード

  • 20ms × 5 フレーム(100ms)未満の音声は破棄
  • 無音・誤操作による不要な推論を防止

逐次文字起こしイベントの処理

サーバーは、入力音声に対して 逐次的に文字起こし結果を返す。

delta イベント

{
  "type": "conversation.item.input_audio_transcription.delta",
  "item_id": "...",
  "delta": "こんに"
}
  • delta は部分文字列
  • 同一 item_id に対して累積する
  • クライアント側で連結して表示する

completed イベント

{
  "type": "conversation.item.input_audio_transcription.completed",
  "transcript": "こんにちは"
}
  • 発話区間全体の確定テキスト
  • UI 上では改行・確定表示に使用する

応答生成との関係

文字起こし処理と応答生成は、明確に分離されている。

  • 文字起こし

    • input_audio_buffer.append / commit
    • conversation.item.input_audio_transcription.*
  • 応答生成

    • response.create
    • response.output_audio.*
    • response.output_audio_transcript.*

この分離により、

  • 文字起こしのみを利用する
  • 応答生成を行わない音声入力処理

といった構成も可能になる。


実装上の注意点

WebSocket の send 排他制御

  • websocket-client はスレッド跨ぎ送信が不安定になりやすい
  • Lock による send の直列化が必須

commit の二重実行防止

  • キーイベントは短時間に複数回発火する可能性がある
  • commit_inflight フラグで多重 commit を防止する

release grace の重要性

  • キー解放と音声 callback は非同期
  • 解放直後に数フレーム欠落する可能性がある
  • 短時間の grace を設けることで安定性が向上する

まとめ

本記事では、Realtime API における 文字起こし(transcribe)処理を以下の観点で整理した。

  • セッション初期化時の transcription 有効化
  • 生音声ストリーミング設計
  • Push-to-Talk による発話区間制御
  • delta / completed イベントの扱い
  • 応答生成との責務分離

この構成により、

  • 低遅延な逐次文字起こし
  • 音声 UI とテキスト UI の同時表示
  • 柔軟な対話制御

を実現できる。

Discussion