読み上げbotにOpenJTalkを組み込んで、Discord読み上げをリアルタイム化した話
はじめに
Discord向け読み上げBot「Swiftly」では、もともとVOICEVOXを中心に音声合成をしていました。
VOICEVOXは音質が良く、キャラクター性も強く、読み上げBotとしてはかなり魅力的です。
ただ、実際にDiscordのVCで使われるサービスとして運用していると、だんだん別の問題が見えてきます。
それは 「音声合成が終わるまで、読み上げが始まらない」 という問題です。
短いメッセージならまだいいのですが、長文や混雑時になると、
- キューに入る
- 音声合成が走る
- WAVファイルができる
- Discordで再生される
という流れになり、最初の音が出るまでの待ち時間が気になってきます。
そこでSwiftlyでは、VOICEVOXとは別に OpenJTalkエンジン を組み込みました。
ただし、単純に「OpenJTalkをHTTPで叩いてWAVを返す」だけでは面白くありません。
今回の実装では、OpenJTalkを独立したFastAPIコンポーネントとして切り出し、Bot側にはTTSエンジンの抽象化レイヤーを追加し、さらにOpenJTalkだけは PCMを直接DiscordのAudioSourceへ流す低遅延ルート を作りました。
今回は、OpenJTalkを単なる追加エンジンではなく、低遅延・高可用性のための別経路として組み込みました。
特に、WAVファイルを作ってから再生するのではなく、PCMを直接Discord AudioSourceへ流すことで、最初の音が出るまでの待ち時間を減らしています。
この記事では、SwiftlyにOpenJTalkをどう組み込んだのか、そしてリアルタイム読み上げをどう実装したのかを、実際のコードを見ながら解説します。
全体構成
Swiftlyでは、TTSエンジンを voicevox と openjtalk に分離しています。
ただし、読み上げキュー、辞書適用、WAV生成後の FFmpegPCMAudio 再生経路は共通化されています。
ざっくり構成はこうです。
ポイントは、OpenJTalkをBot本体にベタ書きしていないことです。
Bot側から見ると、VOICEVOXもOpenJTalkも synthesize() を持つTTSエンジンとして扱われます。
このおかげで、既存の読み上げキューや辞書処理を壊さずに、TTSエンジンだけ差し替えられるようになっています。
speaker_idでエンジンを切り替える
Swiftlyでは、OpenJTalk用の話者IDを 100000 番台に割り当てています。
OPENJTALK_SPEAKER_ID = 100000
OPENJTALK_SPEAKER_NAMES = {
100000: "mei_normal",
100001: "mei_happy",
100002: "mei_bashful",
100003: "mei_angry",
100004: "mei_sad",
100005: "nitech_male",
}
OPENJTALK_SPEAKER_IDS = frozenset(OPENJTALK_SPEAKER_NAMES)
この設計が地味に効いています。
VOICEVOX側のspeaker_idとOpenJTalk側のspeaker_idを同じDBカラムで扱いつつ、IDの範囲でエンジンを判定できます。
判定部分はシンプルです。
def engine_for_speaker_id(speaker_id: int | str) -> str:
normalized_id = int(speaker_id)
if normalized_id in OPENJTALK_SPEAKER_IDS:
return OPENJTALK_ENGINE_NAME
return VOICEVOX_ENGINE_NAME
つまり、Bot側は「このユーザーがOpenJTalkを選んでいるか?」を深く考えなくてよくなります。
speaker_idを見れば、どのエンジンに投げればいいか分かる。
こういう分岐は、最初は単純すぎるように見えるかもしれません。
でも運用しているBotでは、こういう「見れば分かるID設計」がかなり強いです。
DB構造を大きく変えなくていいし、既存ユーザーのVOICEVOX設定も壊しません。
TTSGatewayで既存コードを壊さない
OpenJTalk追加の中心になっているのが TTSGateway です。
TTSGateway は既存の音声キュー・再生処理から見た互換レイヤーで、内部でVOICEVOXかOpenJTalkへ振り分けます。
class TTSGateway:
"""Compatibility facade used by the existing voice queue and playback code."""
def __init__(self, voicevox=None, openjtalk=None) -> None:
self.voicevox = voicevox or VOICEVOXLib()
self.openjtalk = openjtalk or OpenJTalkSynthesizer()
self.default_engine = normalize_tts_engine(
os.getenv("TTS_DEFAULT_ENGINE"),
default=VOICEVOX_ENGINE_NAME,
)
self.fallback_to_openjtalk = _parse_bool_env(
"TTS_VOICEVOX_FALLBACK_TO_OPENJTALK",
True,
)
通常の合成では、まずspeaker_idからエンジンを解決します。
async def synthesize(self, text, speaker_id, output_path, speed=1.0, dictionary_signature="default"):
engine = engine_for_speaker_id(speaker_id)
if engine == OPENJTALK_ENGINE_NAME:
return await self.openjtalk.synthesize(
text,
speaker_id,
output_path,
speed,
dictionary_signature,
)
return await self.voicevox.synthesize(
text,
speaker_id,
output_path,
speed,
dictionary_signature,
)
ここで大事なのは、既存の再生処理からは self.voicelib.synthesize(...) のままでいいことです。
つまり、読み上げ側の巨大な処理にOpenJTalk固有の分岐を大量に混ぜ込まずに済みます。
VOICEVOX障害時のフォールバック
さらに、VOICEVOXが使えないときはOpenJTalkへフォールバックできるようになっています。
except Exception as exc:
if not self.fallback_to_openjtalk or self._voicevox_is_health_check_available():
raise
return await self.openjtalk.synthesize(
text,
OPENJTALK_SPEAKER_ID,
output_path,
speed,
dictionary_signature,
)
ポイントは、VOICEVOXの合成が失敗したら常にOpenJTalkへ逃がすわけではない ところです。
has_available_voicevox_server() を見て、VOICEVOX側のヘルスチェックが落ちている場合にOpenJTalkへフォールバックします。
つまり、
- VOICEVOX自体が死んでいるならOpenJTalkへ逃がす
- 単発のリクエストエラーなら無理に握りつぶさない
という考え方です。
読み上げBotの運用では、これはかなり重要です。
全部フォールバックすると障害に気づきにくくなるし、逆に一切フォールバックしないとユーザー体験が悪くなる。
この中間を取っているのが良いです。
OpenJTalkを独立FastAPIサービスにする
OpenJTalk本体はSwiftlyとは独立して実装されています。
FastAPIアプリとして独立していて、Bot側からはHTTPで叩きます。
app = FastAPI(title="Swiftly OpenJTalk Engine")
リクエストモデルはかなり素直です。
class SynthesisRequest(BaseModel):
text: str = Field(min_length=1, max_length=500)
speaker: str = Field(default="mei_normal", pattern=r"^[a-zA-Z0-9_-]{1,64}$")
speed: float = Field(default=1.0, ge=0.5, le=2.0)
テキスト長、話者ID、速度の範囲をFastAPI/Pydantic側で制限しています。
Discordの読み上げBotは、ユーザー入力がそのまま来るので、こういう入口の制限はかなり大事です。
OpenJTalk側には、通常のWAV合成エンドポイントがあります。
@app.post("/synthesis")
async def synthesis(request: SynthesisRequest) -> Response:
text = _normalize_synthesis_text(request.text)
if not text:
raise HTTPException(status_code=422, detail="text must not be empty")
cache_key = (text, request.speaker, round(float(request.speed), 3))
cached_wav = await _get_cached_wav(cache_key)
if cached_wav is not None:
return Response(
content=cached_wav,
media_type="audio/wav",
headers={"X-TTS-Cache": "hit"},
)
async with _get_synthesis_semaphore():
wav_bytes = await asyncio.to_thread(
_run_openjtalk_synthesis,
text,
request.speaker,
request.speed,
)
await _put_cached_wav(cache_key, wav_bytes)
return Response(
content=wav_bytes,
media_type="audio/wav",
headers={"X-TTS-Cache": "miss"},
)
この時点で、
- テキスト正規化
- LRU風キャッシュ
- 同時実行数制御
- ブロッキング処理の
asyncio.to_thread()化
が入っています。
OpenJTalkのCLI実行はブロッキングなので、イベントループ上で直接叩くとBot全体に悪影響が出ます。
そのため、asyncio.to_thread() で別スレッドへ逃がしています。
コマンド実行パス
CLI経由のOpenJTalk合成では、一時ディレクトリを作り、入力テキストと出力WAVを扱います。
def _run_openjtalk_synthesis(text: str, speaker: str, speed: float) -> bytes:
with tempfile.TemporaryDirectory(prefix="swiftly-openjtalk-") as tmp_dir:
tmp_path = Path(tmp_dir)
text_path = tmp_path / "input.txt"
output_path = tmp_path / "output.wav"
text_path.write_text(text, encoding="utf-8")
command = _build_command(text_path, output_path, speed, speaker)
completed = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=_timeout_seconds(),
check=False,
)
open_jtalk のコマンドには辞書、音声ファイル、速度、ピッチ、音量などが渡されます。
合成結果のWAVは読み込まれ、必要なら無音トリムされます。
OpenJTalkは短い読み上げでもプロセス起動やファイルI/Oが入るので、ここが遅延の原因になりやすいです。
なのでSwiftlyでは、次の一手として PCM直通のネイティブルート を用意しています。
低遅延のための /synthesis/pcm
OpenJTalkエンジンには、WAVではなくPCMを直接返す /synthesis/pcm エンドポイントがあります。Swiftlyでは、基本的に/synthesisではなく/synthesis/pcmを利用して音声合成を行っています。
@app.post("/synthesis/pcm")
async def synthesis_pcm(request: SynthesisRequest) -> Response:
text = _normalize_synthesis_text(request.text)
if not text:
raise HTTPException(status_code=422, detail="text must not be empty")
_target_sample_rate()
cache_key = (
text,
request.speaker,
round(float(request.speed), 3),
PCM_FORMAT_VERSION,
)
返す形式は固定です。
PCM_MEDIA_TYPE = "audio/L16; rate=48000; channels=1"
PCM_FORMAT_VERSION = "pcm_s16le_48000_mono_v1"
Discordの音声は基本的に48kHzが前提なので、ここで 48kHz / mono / signed 16-bit little endian にそろえています。
OPENJTALK_SAMPLE_RATE が48000以外ならエラーにしているのも、低遅延PCMルートの前提を崩さないためです。
このエンドポイントでは、まずネイティブワーカーを使おうとします。
worker = await _get_native_worker()
if worker is not None:
try:
pcm_bytes = await asyncio.to_thread(
worker.synthesize_pcm,
text,
request.speaker,
request.speed,
)
source = "native"
except Exception:
if _native_required():
raise
pcm_bytes = await asyncio.to_thread(
_run_openjtalk_pcm_synthesis,
text,
request.speaker,
request.speed,
)
source = "cli-fallback"
else:
pcm_bytes = await asyncio.to_thread(
_run_openjtalk_pcm_synthesis,
text,
request.speaker,
request.speed,
)
source = "cli"
ネイティブが使えるならネイティブで、ダメならCLIへフォールバック。
しかもレスポンスヘッダーに X-TTS-Backend として native / cli-fallback / cli が入ります。
これは運用でかなり嬉しいです。
ログやメトリクスを見たときに「今どの経路で音声が作られているのか」が分かります。
ネイティブOpenJTalk Worker
低遅延ルートで一番面白いのが NativeOpenJTalkWorker です。
class NativeOpenJTalkWorker:
"""In-process OpenJTalk frontend + HTSEngine backend.
The existing CLI path starts `open_jtalk` and writes temporary files for
every request. This worker keeps the dictionary frontend and HTS voice
engines loaded in the process, which removes the dominant cold path for
short Discord messages.
"""
CLI版では、毎回 open_jtalk プロセスを起動して、一時ファイルを書いて、WAVを読む必要があります。
でもネイティブワーカーでは、OpenJTalkのフロントエンドとHTSエンジンをプロセス内に保持します。
self._frontend = OpenJTalk(
dn_mecab=str(_env_path("OPENJTALK_DIC", required=True)).encode("utf-8")
)
self._engines: dict[str, Any] = {}
for preset in _load_voice_presets().values():
self._engines[preset.id] = self._load_engine(preset.voice_path)
話者ごとのHTSエンジンもキャッシュされます。
合成処理はこうです。
labels = self._frontend.run_frontend(text)
engine.set_speed(effective_speed)
waveform = engine.synthesize(labels)
audio = np.asarray(waveform, dtype=np.float64)
audio = np.clip(audio, -1.0, 1.0)
pcm = (audio * 32767.0).astype("<i2").tobytes()
OpenJTalkの出力をnumpy配列として扱い、16-bit PCMに変換しています。
サンプルレートが48000でなければ audioop.ratecv() で変換します。
ここでWAVファイルを作らず、PCM bytesとして返せるのが強いです。
Bot側のPCMクライアント
Bot側の OpenJTalkSynthesizer には synthesize_pcm() が実装されています。
async def synthesize_pcm(
self,
text: str,
speaker_id: int,
speed: float = 1.0,
dictionary_signature: str = "default",
) -> PCM16MonoAudio:
このメソッドは /synthesis/pcm を叩き、PCM bytesを受け取ります。
レスポンスヘッダーからキャッシュ状態、サンプルレート、チャンネル数も読みます。
async with session.post(f"{self.config.base_url}/synthesis/pcm", json=payload) as response:
response.raise_for_status()
pcm_bytes = await response.read()
cache_status = response.headers.get("X-TTS-Cache", "miss")
sample_rate = int(response.headers.get("X-TTS-Sample-Rate", "48000"))
channels = int(response.headers.get("X-TTS-Channels", "1"))
そして、もし /synthesis/pcm が404なら、古いOpenJTalkエンジン互換として /synthesis のWAVを取ってPCMへ変換するフォールバックもあります。
この互換性の持たせ方は地味だけど大事です。
Bot側だけ先に更新されたり、OpenJTalkエンジン側だけ古かったりしても、いきなり壊れにくくなります。
キャッシュとinflight制御
OpenJTalkSynthesizerでは、単純なキャッシュだけでなく inflight制御 が入っています。
self._cache: OrderedDict[tuple[str, int, float, str], bytes] = OrderedDict()
self._pcm_cache: OrderedDict[tuple[str, int, float, str, str], PCM16MonoAudio] = OrderedDict()
self._inflight: dict[tuple[str, int, float, str], asyncio.Future[tuple[str, bytes]]] = {}
self._pcm_inflight: dict[tuple[str, int, float, str, str], asyncio.Future[PCM16MonoAudio]] = {}
キャッシュキーには、
- text
- speaker_id
- speed
- dictionary_signature
- PCM format version
が入ります。
dictionary_signature が入っているのが良いところです。
同じ本文でも、サーバー辞書の内容が違えば読みが変わる可能性があります。
なので、辞書適用後の状態をキャッシュキーへ含めています。
inflight制御は、「同じ音声が同時にリクエストされたとき、合成を1回だけにする」ためのものです。
Discordの読み上げBotでは、同じような短文が連続することがあります。
例えば「w」「草」「了解」みたいな短文です。
ここで毎回合成すると無駄ですが、同時に来た場合は通常のキャッシュだけでは間に合いません。
まだ1本目の合成が終わっていないからです。
そこでinflightを見て、同じキーの合成が進行中なら、そのFutureをawaitします。
これは高負荷時にかなり効くタイプの最適化です。
リアルタイム再生ルート
通常のTTSでは、合成が終わってからWAVファイルを discord.FFmpegPCMAudio で再生します。
しかしOpenJTalkでは、専用のリアルタイム経路があります。
def _should_use_openjtalk_realtime(speaker_id: int) -> bool:
return _openjtalk_realtime_enabled() and _is_openjtalk_speaker(speaker_id)
環境変数 TTS_OPENJTALK_REALTIME_ENABLED が有効で、speaker_idがOpenJTalkならリアルタイム経路に入ります。
キュー処理側では、OpenJTalkリアルタイムが使える場合、通常のWAV合成ルートへ行かずに _play_openjtalk_realtime_text() を呼びます。
if _should_use_openjtalk_realtime(speaker_id):
ok = await _play_openjtalk_realtime_text(
self,
guild,
guild_id,
text,
speaker_id,
speed,
dictionary_signature,
user_id,
enqueue_ts,
enqueue_ts,
)
if not ok:
...
continue
オーバーラップ再生が有効な場合も同じです。
メッセージを受け取った直後にOpenJTalkリアルタイム経路へ入れます。
テキストを先頭チャンクに分ける
リアルタイム再生では、まずテキストをチャンクに分けます。
def _split_first_audio_chunks(text: str) -> list[str]:
chunks = split_tts_text(
text,
min_chars=int(os.getenv("TTS_OPENJTALK_FIRST_CHUNK_MIN_CHARS", "24")),
target_chars=int(os.getenv("TTS_OPENJTALK_FIRST_CHUNK_TARGET_CHARS", "56")),
max_chars=int(os.getenv("TTS_OPENJTALK_FIRST_CHUNK_MAX_CHARS", "70")),
split_every_punctuation=False,
)
return chunks or [text]
ここでは、最初の音を早く出すために、かなり小さめのチャンクサイズを使っています。
デフォルトでは最小24文字、目標56文字、最大70文字です。
長文を全部合成してから流すのではなく、まず先頭チャンクだけ合成して流す。
その間に次のチャンクを裏で合成する。
これで、ユーザー体験上は「全文の合成完了」ではなく「最初のチャンクの合成完了」で音が出始めます。
RealtimeMixerAudioSource
リアルタイム再生の心臓部が RealtimeMixerAudioSource です。
class RealtimeMixerAudioSource(discord.AudioSource):
"""Discord PCM source that can accept new mono PCM streams while playing."""
これはDiscord.pyの AudioSource として動作しながら、再生中に新しいPCMを追加できます。
def add_pcm_mono(
self,
pcm: bytes,
*,
user_id: int | None = None,
first_frame_callback: object | None = None,
) -> bool:
if not pcm:
return False
with self._lock:
if self._closed:
return False
if user_id is not None and self._has_user_locked(user_id):
self._queued_by_user.setdefault(user_id, deque()).append(pcm)
else:
self._active.append(_PCMStream(memoryview(pcm), user_id=user_id))
同じユーザーの音声は同時に重ねず、ユーザーごとにキューへ積みます。
一方で、別ユーザーの音声は同時に混ぜることができます。
read() では20msごとのPCMフレームを返します。
FRAME_MS = 20
MONO_SAMPLES_PER_FRAME = SAMPLE_RATE * FRAME_MS // 1000
Discordへ渡すため、mono PCMをstereoへ複製します。
stereo = array.array("h")
for value in mixed:
clipped = max(-32768, min(32767, value))
stereo.append(clipped)
stereo.append(clipped)
return stereo.tobytes()
複数ストリームを足し合わせ、クリッピングして、stereo PCMとして返す。
ここがかなり面白いです。
通常の FFmpegPCMAudio は「完成済みファイルを再生する」発想です。
でもこの実装は「再生しながら音声片を足していく」発想です。
Discord読み上げBotでこれをやると、かなり体感が変わります。
リアルタイム合成の実装
実際のリアルタイム再生処理は _play_openjtalk_realtime_text() にあります。
async def _play_openjtalk_realtime_text(...):
chunks = _split_first_audio_chunks(text)
mixer = await _ensure_realtime_mixer(self, guild, guild_id)
next_task = asyncio.create_task(synthesize_chunk(chunks[0]))
try:
for index, chunk_text in enumerate(chunks):
audio = await next_task
next_task = None
if index + 1 < len(chunks):
next_task = asyncio.create_task(
synthesize_chunk(chunks[index + 1])
)
if not mixer.add_pcm_mono(
audio.pcm,
user_id=user_id,
first_frame_callback=callback,
):
return False
return True
ここでやっていることはシンプルです。
- 最初のチャンク合成を開始
- 合成完了したらMixerへ追加
- 同時に次のチャンク合成を開始
- 次のチャンクができたらまたMixerへ追加
つまり 合成と再生をパイプライン化 しています。
この形にすると、長文でも「全部合成し終わるまで待つ」必要がありません。
もちろん、チャンク2の合成がチャンク1の再生終了に間に合わないと途切れます。
ただ、OpenJTalkの短いチャンクならかなり間に合いやすいです。
メトリクスも取る
Swiftlyでは、リアルタイム読み上げの遅延もPrometheusメトリクスとして取っています。
TTS_MESSAGE_TO_FIRST_AUDIO = Histogram(
"tts_message_to_first_audio_seconds",
"Seconds from Discord message handling to first realtime audio frame",
buckets=(0.025, 0.05, 0.075, 0.1, 0.15, 0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0),
)
OpenJTalkの最初のチャンクが準備できるまでの時間も別で取っています。
TTS_OPENJTALK_FIRST_CHUNK_READY = Histogram(
"tts_openjtalk_first_chunk_ready_seconds",
"Seconds until the first OpenJTalk realtime chunk is ready",
buckets=(0.025, 0.05, 0.075, 0.1, 0.15, 0.25, 0.5, 0.75, 1.0, 1.5, 2.0, 3.0),
)
メトリクスとしては、
- キュー投入から再生開始まで
- Discordメッセージ処理から最初の音声フレームまで
- OpenJTalkの最初のチャンク準備まで
- ストリーミングチャンク数
- 合成エラー数
などを取っています。
これはかなり大事です。
「速くなった気がする」ではなく、
どこが遅いのかを分解して見られる ようになります。
リアルタイム音声合成は、体感だけで評価すると沼ります。
ネットワークなのか、キュー待ちなのか、合成なのか、Discord再生なのか分からなくなるからです。
なので、最初からメトリクスを埋め込んでおくのは正解です。
テキスト分割も雑にやらない
長文を分割する処理は tts_segmenter.py にあります。
ここでは、URLやDiscordメンションを壊さないように保護範囲を作り、句読点や括弧の深さを見ながら分割しています。
_URL_RE = re.compile(r"https?://[^\s<>()]+|www\.[^\s<>()]+", re.IGNORECASE)
_DISCORD_TOKEN_RE = re.compile(r"<(?:@[!&]?|#|a?:[A-Za-z0-9_~]+:)[0-9]+>")
_STRONG_BOUNDARY_CHARS = frozenset("。!?!?;;\n")
_WEAK_BOUNDARY_CHARS = frozenset("、, ")
分割処理では、
- URLの途中で切らない
- Discordメンションの途中で切らない
- 括弧の中で切りにくくする
- 強い区切りを優先する
- 弱い区切りも使う
- それでも長すぎる場合はハード分割する
という処理になっています。
これは読み上げBotだとかなり重要です。
雑に文字数で切ると、URLの途中で分割されたり、メンションの途中で壊れたりします。
リアルタイム化のために分割するのは大事ですが、分割したせいで読み上げ品質が落ちたら意味がありません。
通常のストリーミングチャンク再生
OpenJTalk専用のPCMリアルタイム経路とは別に、通常のTTSでもチャンク分割再生があります。
def _should_stream_chunks(text: str, chunks: list[str]) -> bool:
if not is_streaming_chunking_enabled() or len(chunks) <= 1:
return False
if is_streaming_split_every_punctuation_enabled():
return True
return len(text) >= streaming_min_chars()
一定以上の長文ならチャンク分割し、チャンクごとに合成・再生します。
ただしOpenJTalkのリアルタイムPCM経路とは違い、こちらは基本的にWAVファイルを合成してから再生するルートです。
OpenJTalkだけ特別扱いしているのは、PCMを直接返せるようにしたことで、ファイルを経由せずMixerへ流せるからです。
なぜOpenJTalkなのか
OpenJTalkはVOICEVOXほどキャラクター性のある音声ではありません。
でも、読み上げBotの運用では「音質」だけが価値ではありません。
特にDiscord VCでは、
- 短文が多い
- レスポンス速度が体感に直結する
- 多少機械っぽくても、すぐ読まれる方が嬉しい場面がある
- 障害時のフォールバック先が欲しい
- GPUに依存しないTTSエンジンが欲しい
という事情があります。
VOICEVOXは魅力的ですが、GPUやモデルロード、サーバー負荷の影響を受けやすいです。
一方でOpenJTalkは軽量で、CPUだけでも動き、短文読み上げの安定枠として使いやすい。
Swiftlyでは、OpenJTalkを「VOICEVOXの代替」ではなく、
低遅延・高可用性のための別ルート として組み込んでいます。
実装して学んだこと
1. TTSエンジンは最初から抽象化した方がいい
読み上げBotを作ると、最初は1つのTTSエンジンだけで十分に見えます。
でも運用が続くと、必ずこういう要望が出ます。
- 別エンジンを追加したい
- 障害時に逃がしたい
- 話者ごとにルーティングしたい
- 軽量エンジンと高品質エンジンを使い分けたい
- 将来AquesTalkなどを追加したい
Swiftlyのドキュメントにも、将来AquesTalkを追加する場合は lib/tts_engines.py に互換クラスを追加し、speaker IDからengineを解決する分岐を増やす設計が書かれています。
この形にしておくと、TTS基盤が育てやすくなります。
2. リアルタイム化は「速い合成」だけでは足りない
最初の音を早く出すには、単に合成を高速化するだけでは足りません。
必要なのは、
- テキストを小さく分割する
- 最初のチャンクを先に合成する
- 再生しながら次のチャンクを合成する
- 完成済みファイルではなくPCMを流せるようにする
- DiscordのAudioSourceとして継続的にフレームを返す
という一連のパイプラインです。
つまり、リアルタイム音声合成はTTSエンジン単体の話ではなく、
入力処理、合成、キャッシュ、再生、メトリクスまで含めたシステム設計 です。
3. キャッシュには辞書状態も含める
読み上げBotでは、サーバーごとの辞書が音声に影響します。
同じ「鯖」という文字でも、辞書によって「さば」と読むか「サーバー」と読むか変わるかもしれません。
なので、キャッシュキーに dictionary_signature を入れるのはかなり重要です。
cache_key = (
text,
normalized_speaker_id,
round(speed, 3),
dictionary_signature or "default",
format_version,
)
これを入れないと、辞書変更後も古い音声がキャッシュから返る可能性があります。
4. ファイルベース再生とPCM再生は別物
FFmpegPCMAudio にWAVを渡す方式は簡単です。
しかし、リアルタイムに音声を足していくには向いていません。
一方で、discord.AudioSource を自前実装して read() で20msごとのPCMを返せば、再生中に音声を追加できます。
これは少し面倒ですが、できることが一気に増えます。
- 合成済みチャンクを順次追加
- 複数ユーザーの音声をミックス
- 同じユーザーの音声は重ねずキュー化
- 最初のフレームが出た瞬間にメトリクス記録
このあたりはファイル再生だけでは難しいです。
まとめ
SwiftlyのOpenJTalk対応は、単に「OpenJTalkを追加しました」というより、TTS基盤を一段階抽象化して、低遅延な読み上げ経路を作る実装でした。
今回のポイントは次の通りです。
-
speaker_idの範囲でVOICEVOX/OpenJTalkをルーティング -
TTSGatewayで既存の読み上げ処理を壊さずTTSエンジンを抽象化 - OpenJTalkをFastAPIサービスとして独立
-
/synthesisでWAV、/synthesis/pcmで低遅延PCMを返す - ネイティブOpenJTalk WorkerでCLI起動・一時ファイルのコストを削減
- Bot側でキャッシュとinflight制御
- テキストをチャンク分割して、最初の音を早く出す
-
RealtimeMixerAudioSourceでPCMを再生中に追加 - Prometheusメトリクスで体感速度を観測可能にする
読み上げBotは、一見すると「メッセージを音声にするだけ」のサービスに見えます。
でも実際に運用すると、遅延、負荷、フォールバック、辞書、再生キュー、VC接続、同時再生、メトリクスなど、かなり分散システムっぽい顔をしてきます。
OpenJTalkを入れたことで、Swiftlyは「高品質なVOICEVOX読み上げ」と「軽量で低遅延なOpenJTalk読み上げ」を使い分けられるようになりました。
そして何より、Discordで誰かがメッセージを送った瞬間に、できるだけ早く声が返ってくる。
読み上げBotにとって、この体感はかなり大きいです。
音声合成は、ただWAVを作るだけじゃない。
ユーザーの会話にどれだけ自然に割り込まず、遅れず、馴染めるか。
そこまで考え始めると、TTS基盤は一気に面白くなります。
Discussion