🤖
OpenAI Realtime API における文字起こし(Transcribe)処理についてまとめてみた
OpenAI Realtime API における文字起こし(Transcribe)処理についてまとめてみた
— Push-to-Talk 音声対話における入力音声の逐次文字起こし —
概要
本記事では、:contentReference[oaicite:0]{index=0} の Realtime API を用いて、
マイク入力音声をリアルタイムに文字起こし(transcribe)する処理を技術的に整理する。
対象とするシステムは、以下の特徴を持つ。
- WebSocket による Realtime API 接続
- 入力音声の 逐次文字起こし(streaming transcription)
- Push-to-Talk(スペースキー押下中のみ録音)
- 音声応答とテキスト応答の同時ストリーミング
本稿では特に、input_audio_transcription に関するイベント設計と処理フローに焦点を当てる。
全体構成
文字起こし処理は、次の要素で構成される。
| 要素 | 役割 |
|---|---|
| WebSocket | Realtime API との双方向通信 |
session.update |
セッション作成時に文字起こし設定を有効化 |
input_audio_buffer.append |
PCM 音声フレームの逐次送信 |
input_audio_buffer.commit |
発話区間の確定 |
| transcription events | サーバー側文字起こし結果の受信 |
main.py
"""
main-transcribe.py
処理の概要:
1. OpenAI Realtime APIにWebSocketで接続。
2. セッション作成時(session.created)に `session.update` を送り、入力音声の文字起こし(input_audio_transcription)設定を有効化。
3. マイク入力を制御する `mic_loop` と、サーバーイベントを処理する `on_message` を並行動作。
4. スペースキー押下で録音開始(input_audio_buffer.append)、離すと最低100ms以上の音声がある場合のみコミット(input_audio_buffer.commit)して回答生成(response.create)を要求。
5. サーバーからは音声(delta)と同時に、ユーザー/エージェントのテキスト(conversation.item.input_audio_transcription.delta / response.output_audio_transcript.delta)が届く。
6. それらをストリーミング再生・表示する。
"""
import os
import json
import base64
import threading
import queue
import time
import numpy as np
import sounddevice as sd
import websocket
from pynput import keyboard
from dotenv import load_dotenv
load_dotenv()
# =========================
# 初期設定
# =========================
URL = "wss://api.openai.com/v1/realtime?model=gpt-realtime"
HEADERS = ["Authorization: Bearer " + os.environ["OPENAI_API_KEY"]]
INPUT_SR = 16000
OUTPUT_SR = 24000
FRAME_MS = 20
FRAME_SAMPLES = int(INPUT_SR * FRAME_MS / 1000)
# commit 最低条件(100ms)
MIN_COMMIT_MS = 100
MIN_FRAMES = int(MIN_COMMIT_MS / FRAME_MS) # 20msなら5
RELEASE_GRACE_SEC = 0.12 # release直後の取りこぼし防止
play_q: "queue.Queue[np.ndarray]" = queue.Queue()
recording_flag = threading.Event()
ws_ready = threading.Event()
# websocket-client はスレッド跨ぎsendが不安定になりやすいのでロック
ws_send_lock = threading.Lock()
# =========================
# commit 二重実行防止
# =========================
commit_inflight_lock = threading.Lock()
commit_inflight = False
def set_commit_inflight(v: bool) -> bool:
global commit_inflight
with commit_inflight_lock:
if v and commit_inflight:
return False
commit_inflight = v
return True
# =========================
# ストリーミング表示用の状態
# =========================
user_printing = False
agent_printing = False
user_transcript_buf = {} # key: item_id -> text
agent_audio_tr_buf = "" # assistantの音声トランスクリプト(output_audio_transcript.delta)
print_lock = threading.Lock()
# commit ガード用:今回発話で append できたフレーム数
utter_frames_lock = threading.Lock()
utter_frames = 0
def utter_frames_reset():
global utter_frames
with utter_frames_lock:
utter_frames = 0
def utter_frames_inc():
global utter_frames
with utter_frames_lock:
utter_frames += 1
def utter_frames_get() -> int:
with utter_frames_lock:
return int(utter_frames)
def send(ws, obj: dict):
payload = json.dumps(obj)
with ws_send_lock:
ws.send(payload)
def safe_print(s: str, end: str = "\n"):
with print_lock:
print(s, end=end, flush=True)
def audio_player():
with sd.OutputStream(samplerate=OUTPUT_SR, channels=1, dtype="int16") as out:
while True:
chunk = play_q.get()
if chunk is None:
break
out.write(chunk)
def on_open(ws):
ws_ready.set()
def on_error(ws, err):
safe_print(f"\n[ws error] {err}")
def on_close(ws, status_code, msg):
safe_print(f"\n[ws closed] code={status_code} msg={msg}")
def on_message(ws, message: str):
global user_printing, agent_printing, agent_audio_tr_buf
evt = json.loads(message)
t = evt.get("type")
# -------------------------
# セッション初期化
# -------------------------
if t == "session.created":
send(ws, {
"type": "session.update",
"session": {
"type": "realtime",
"model": "gpt-realtime",
"audio": {"output": {"voice": "marin"}},
"input_audio_transcription": {
"model": "gpt-4o-mini-transcribe",
# 必要なら: "language": "ja"
},
}
})
return
# -------------------------
# ユーザー音声:逐次文字起こし
# -------------------------
if t == "conversation.item.input_audio_transcription.delta":
item_id = evt.get("item_id")
delta = evt.get("delta", "")
if not item_id:
return
prev = user_transcript_buf.get(item_id, "")
now = prev + delta
user_transcript_buf[item_id] = now
if delta:
if not user_printing:
user_printing = True
safe_print("\n[you] ", end="")
safe_print(delta, end="")
return
if t == "conversation.item.input_audio_transcription.completed":
transcript = evt.get("transcript", "")
if user_printing:
safe_print("")
else:
safe_print(f"\n[you] {transcript}")
user_printing = False
return
# -------------------------
# assistant:応答開始
# -------------------------
if t == "response.created":
agent_audio_tr_buf = ""
agent_printing = False
return
# -------------------------
# assistant:音声トランスクリプト
# -------------------------
if t == "response.output_audio_transcript.delta":
delta = evt.get("delta", "")
if delta:
if not agent_printing:
agent_printing = True
safe_print("\n[agent] ", end="")
agent_audio_tr_buf += delta
safe_print(delta, end="")
return
if t == "response.output_audio_transcript.done":
if agent_printing:
safe_print("")
agent_printing = False
return
# -------------------------
# assistant:音声デルタ
# -------------------------
if t == "response.output_audio.delta":
b = base64.b64decode(evt["delta"])
pcm = np.frombuffer(b, dtype=np.int16)
play_q.put(pcm)
return
# -------------------------
# 互換:assistant 音声トランスクリプト(来る環境では来る)
# -------------------------
if t == "response.audio_transcript.delta":
delta = evt.get("delta", "")
if delta:
if not agent_printing:
agent_printing = True
safe_print("\n[agent] ", end="")
agent_audio_tr_buf += delta
safe_print(delta, end="")
return
if t == "response.audio_transcript.done":
if agent_printing:
safe_print("")
agent_printing = False
return
# -------------------------
# エラー
# -------------------------
if t == "error":
safe_print("\n[server error] " + json.dumps(evt, ensure_ascii=False))
return
def mic_loop(ws):
send(ws, {"type": "input_audio_buffer.clear"})
def callback(indata, frames, time_info, status):
if status:
# sounddevice の警告は最低限表示
safe_print(f"\n[audio status] {status}")
if not recording_flag.is_set():
return
utter_frames_inc()
pcm16 = indata[:, 0].copy()
b64 = base64.b64encode(pcm16.tobytes()).decode("ascii")
send(ws, {"type": "input_audio_buffer.append", "audio": b64})
with sd.InputStream(
samplerate=INPUT_SR,
channels=1,
dtype="int16",
blocksize=FRAME_SAMPLES,
callback=callback,
):
safe_print("Hold SPACE to talk. Release SPACE to get reply. (Ctrl+C to exit)")
while True:
time.sleep(0.05)
def main():
threading.Thread(target=audio_player, daemon=True).start()
ws = websocket.WebSocketApp(
URL,
header=HEADERS,
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close,
)
threading.Thread(target=lambda: ws.run_forever(), daemon=True).start()
ws_ready.wait()
def on_press(key):
if key == keyboard.Key.space and not recording_flag.is_set():
recording_flag.set()
utter_frames_reset()
send(ws, {"type": "input_audio_buffer.clear"})
def on_release(key):
if key == keyboard.Key.space and recording_flag.is_set():
# ✅ ここでは recording_flag を落とさない(grace中にappendさせる)
if not set_commit_inflight(True):
return
def delayed_commit():
try:
# ✅ release直後のappend取りこぼし防止(この間 recording_flag は True)
time.sleep(RELEASE_GRACE_SEC)
# ✅ grace が終わったら append を止める
recording_flag.clear()
frames = utter_frames_get()
if frames < MIN_FRAMES:
safe_print(f"\n[skip] audio too short: frames={frames} (<{MIN_FRAMES})")
send(ws, {"type": "input_audio_buffer.clear"})
return
send(ws, {"type": "input_audio_buffer.commit"})
send(ws, {
"type": "response.create",
"response": {"output_modalities": ["audio"]},
})
finally:
set_commit_inflight(False)
threading.Thread(target=delayed_commit, daemon=True).start()
listener = keyboard.Listener(on_press=on_press, on_release=on_release)
listener.start()
mic_loop(ws)
if __name__ == "__main__":
main()
セッション初期化と文字起こし設定
WebSocket 接続後、サーバーから session.created イベントが届く。
このタイミングで 文字起こし機能を有効化する。
{
"type": "session.update",
"session": {
"model": "gpt-realtime",
"audio": {
"output": { "voice": "marin" }
},
"input_audio_transcription": {
"model": "gpt-4o-mini-transcribe"
}
}
}
ポイント
-
input_audio_transcriptionを指定しない場合、文字起こしイベントは送信されない - 音声認識モデルは 応答生成モデルとは独立して指定できる
- 言語指定(
language: "ja")も必要に応じて追加可能
マイク入力と音声バッファ送信
マイク入力は sounddevice.InputStream により取得する。
取得した PCM16 音声は 20ms 単位のフレームとして送信する。
send(ws, {
"type": "input_audio_buffer.append",
"audio": base64_pcm
})
設計意図
- Realtime API は 生 PCM 音声のストリーミング入力を前提としている
- クライアント側で音声認識は行わない
- 音声はそのまま API に送信し、transcribe はサーバー側で実行する
Push-to-Talk と発話区間の確定
本実装では、スペースキー操作により発話区間を制御する。
録音開始
- スペースキー押下
-
input_audio_buffer.clearを送信 - 録音フラグを有効化
録音終了と commit
- スペースキー解放
- 一定時間(grace)待機後に録音停止
- フレーム数が閾値以上の場合のみ commit
{ "type": "input_audio_buffer.commit" }
最低発話長のガード
- 20ms × 5 フレーム(100ms)未満の音声は破棄
- 無音・誤操作による不要な推論を防止
逐次文字起こしイベントの処理
サーバーは、入力音声に対して 逐次的に文字起こし結果を返す。
delta イベント
{
"type": "conversation.item.input_audio_transcription.delta",
"item_id": "...",
"delta": "こんに"
}
-
deltaは部分文字列 - 同一
item_idに対して累積する - クライアント側で連結して表示する
completed イベント
{
"type": "conversation.item.input_audio_transcription.completed",
"transcript": "こんにちは"
}
- 発話区間全体の確定テキスト
- UI 上では改行・確定表示に使用する
応答生成との関係
文字起こし処理と応答生成は、明確に分離されている。
-
文字起こし
input_audio_buffer.append / commitconversation.item.input_audio_transcription.*
-
応答生成
response.createresponse.output_audio.*response.output_audio_transcript.*
この分離により、
- 文字起こしのみを利用する
- 応答生成を行わない音声入力処理
といった構成も可能になる。
実装上の注意点
WebSocket の send 排他制御
-
websocket-clientはスレッド跨ぎ送信が不安定になりやすい -
Lockによる send の直列化が必須
commit の二重実行防止
- キーイベントは短時間に複数回発火する可能性がある
-
commit_inflightフラグで多重 commit を防止する
release grace の重要性
- キー解放と音声 callback は非同期
- 解放直後に数フレーム欠落する可能性がある
- 短時間の grace を設けることで安定性が向上する
まとめ
本記事では、Realtime API における 文字起こし(transcribe)処理を以下の観点で整理した。
- セッション初期化時の transcription 有効化
- 生音声ストリーミング設計
- Push-to-Talk による発話区間制御
- delta / completed イベントの扱い
- 応答生成との責務分離
この構成により、
- 低遅延な逐次文字起こし
- 音声 UI とテキスト UI の同時表示
- 柔軟な対話制御
を実現できる。
Discussion