Google text to speech ストリーミングチュートリアル
音声をストリーミング録音する
ストリーミング録音にはsounddeviceを使います。
※pythonでオーディオを扱うライブラリとしてはpyaudioも有名ですが、pyaudioは古いライブラリであり最新のpythonバージョンでは不具合も多いです。
ストリーミング録音にはsounddeviceのInputStreamを使います。
これはストリーミング録音したデータをnumpyのnp.ndarray形式で返してくれるものです。
google sttに投げるには音声はバイナリデータにする必要ががあるので、bytes形式を返すRawInputStreamを使ったほうがいいんじゃないかとも思われますが、音量解析にはnp.ndarrayで計算する必要があるのと、bytes→np.ndarrayを書くのは手間ですがnp.ndarray→bytes変換はtobytes()で一撃で書けるので、np形式の方が扱いやすいです
InputStreamの設定について
google speech to textに使うためには、dtypeを'int16'に指定する必要があります。
float32など指定すると録音はできますが、google sttが動作しません(エラーは出ず、結果が返ってこないだけなので厄介)
samplerateは世間一般で用いられている16000を指定しておけば問題ないです。
blocksizeは録音したデータを何秒ごとの塊にするかを指定する数値です。
google sttでは0.1秒が推奨されていますので、16000のサンプルレートの0.1秒分ということで1600を指定しています
Queueについて
Queueとはpythonでマルチスレッドを扱うときに、データを入れたり抜いたりする時の順番や唯一性を保証してくれるものです。
(最初のうちはリストで考えててもOK。でもリストはマルチスレッドで先入先出のような順番保証ができません)
サンプルコード①
ストリーミングで音声データを片っ端から_audio_buffに詰め込んでいく一番シンプルなコードです
from queue import Queue
import numpy as np
import sounddevice as sd
_audio_buff:Queue[np.ndarray] = Queue()
def input_stream_callback(indata, frames, time, status):
"""音声データを片っ端から_audio_buffに詰め込んでいく"""
_audio_buff.put(indata)
############## ここから実行部分 ##############
with sd.InputStream(channels=1, dtype='int16', callback=input_stream_callback, samplerate=16000, blocksize=int(16000 * 0.1)):
print('start')
sd.sleep(4*1000) # 4秒間録音
サンプルコード②
サンプルコード①に、片っ端からデータを回収するget_buffを追加しました。
この関数はメインスレッドと並行する別スレッドで、_audio_buffに詰め込まれたデータを取り出し、reproduced_audioに蓄積するものです。
これだけだとストリーミング録音が終了したタイミングが掴めないので、ストリーミング録音が終了したタイミングで_audio_buffにはNoneを入れ、Noneを検知したら回収ループを抜け出すようにしています。
最後にreproduced_audioに蓄積したデータから音声ファイルを復元します。
np配列から音声ファイルを作成するにはsoundfileを使うと便利です。
# ライブラリimport部分は省略
import soundfile as sf
_audio_buff:Queue[np.ndarray|None] = Queue()
def get_buff():
"""
_audio_buffに詰め込まれたデータを取り出し、reproduced_audioに蓄積
最後にreproduced_audioに詰め込んだデータから音声ファイルを作成
"""
reproduced_audio:List[np.ndarray]=[]
# スレッド進行中は_audio_buffに詰め込まれたデータを片っ端から回収する。
while True:
chunk = audio_buff.get()
if chunk is None: # Noneが来たら終了
break
reproduced_audio.append(chunk.flatten())
# データがある場合のみ保存
if reproduced_audio:
sf.write("./myrecording.mp3", np.concatenate(reproduced_audio, axis=0), 16000)
print("Audio file saved!")
else:
print("No audio data to save.")
def input_stream_callback(indata, frames, time, status):
"""音声データを片っ端から_audio_buffに詰め込んでいく"""
_audio_buff.put(indata)
############## ここから実行部分 ##############
t1 = Thread(target=get_buff)
t1.start()
with sd.InputStream(channels=1, dtype='int16', callback=input_stream_callback, samplerate=16000, blocksize=int(16000 * 0.1)):
print('start')
sd.sleep(4*1000) # 4秒間録音
audio_buff.put(None) # 録音終了後、get_buffスレッドが終了するようフラグを変更
t1.join()
print('fin!')
サンプルコード③
サンプルコード②で作成した音声ファイル(myrecording.mp3)を再生すると、音声にプツプツ音が感じられるかとかと思います。
この原因は、コールバック関数内において、indataのようなストリームデータを受け取ってから_audio_buffに格納されるまでわずかな間に、次のストリームデータが来た場合、前のindataが書き換えられてしまうために発生します。
解決法として、.copy()でindataのコピーをとって_audio_buffに格納することで、次のindataが来ても上書きされることなく_audio_buffに詰め込んでいくことができます。
以下のように書き換えることで、音声のプツプツ音を解消することができます
def input_stream_callback(indata, frames, time, status):
"""音声データを片っ端から_audio_buffに詰め込んでいく"""
_audio_buff.put(indata.copy())
google sttに投げる
次はgoogle sttに投げるようにしましょう。
google sttは最終的に以下のように、クライアントライブラリにジェネレーターを投げる形になります。
responses_iterator = speech_client.streaming_recognize(requests=request_generator(config_request,generate_audio()))
このジェネレーターを作成するのがこの節の趣旨になります。
1. コンフィグ
公式チュートリアルに倣って、以下のようにコンフィグを設定します。
project_idはGCPで設定してください。
from google.cloud.speech_v2.types import cloud_speech as cloud_speech_types ,RecognitionConfig, StreamingRecognizeResponse, StreamingRecognizeRequest, StreamingRecognitionResult
recognition_config = cloud_speech_types.RecognitionConfig(
explicit_decoding_config=cloud_speech_types.ExplicitDecodingConfig(
sample_rate_hertz = 16000,
encoding=cloud_speech_types.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
audio_channel_count=1
),
language_codes=["ja-JP"],
model="long",
)
streaming_config = cloud_speech_types.StreamingRecognitionConfig(
config=recognition_config,
streaming_features=cloud_speech_types.StreamingRecognitionFeatures(
interim_results=True, # 日本語ではTrue限定?
enable_voice_activity_events=True,
)
)
project_id = os.environ.get('GOOGLE_PROJECT_ID')
config_request = cloud_speech_types.StreamingRecognizeRequest(
recognizer=f"projects/{project_id}/locations/global/recognizers/_",
streaming_config=streaming_config,
)
2. ヘルパーメソッドの作成
generate_audioは_audio_buffに蓄積された音声データの細切れを回収し、バイナリデータに変換してジェネレーターとして返す関数です。
request_generatorは、1で作成したコンフィグとgenerate_audioをセットにして、speech_client.streaming_recognizeに投げられる形にします。
def generate_audio():
while True:
data:List[bytes] = []
while True:
try:
chunk = _audio_buff.get(block=False)
if chunk is None:continue
data.append(chunk.tobytes())
except Empty:
break
if not len(data):continue
yield b"".join(data)
def request_generator(config: cloud_speech_types.RecognitionConfig, audio: Generator[bytes,None,None]
) -> Generator[Union[RecognitionConfig, StreamingRecognizeRequest],None,None]:
yield config
for chunk in audio:
yield cloud_speech_types.StreamingRecognizeRequest(audio=chunk)
3. google sttに投げて結果を得る
クライアントライブラリを使用して、google sttに投げます。
結果はイテレーターで返されますので、順次解読していく必要があります。
speech_client = SpeechClient()
def stt_with_google():
responses_iterator = speech_client.streaming_recognize(requests=request_generator(config_request,generate_audio()))
for response in responses_iterator:
if not response.results:continue
result:StreamingRecognitionResult = response.results[0] # response.resultsは複数存在する。これはうまく聞き取れなかったときに、いくつかの候補を挙げるもので、[0]が一番有力
if not result.alternatives:continue # alternativesはMutableSequence。よく聞き取れなかったときに、他に何と喋っていたのかの候補をを返す
if result.is_final:
print('final!')
transcript = result.alternatives[0].transcript
print(transcript)
まとめ
""" google stt """
import os
from typing import Iterable, List, Dict, Any, Generator, Union
from queue import Queue,Empty, Full
from threading import Thread, Event
import soundfile as sf
import sounddevice as sd
import numpy as np
import soundfile as sf
from google.cloud.speech_v2 import SpeechClient
from google.cloud.speech_v2.types import cloud_speech as cloud_speech_types ,RecognitionConfig, StreamingRecognizeResponse, StreamingRecognizeRequest, StreamingRecognitionResult
_audio_buff:Queue[np.ndarray|None] = Queue()
def input_stream_callback(indata, frames, time, status):
"""音声データを片っ端から_audio_buffに詰め込んでいく"""
_audio_buff.put(indata.copy())
recognition_config = cloud_speech_types.RecognitionConfig(
explicit_decoding_config=cloud_speech_types.ExplicitDecodingConfig(
sample_rate_hertz = 16000,
encoding=cloud_speech_types.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
audio_channel_count=1
),
language_codes=["ja-JP"],
model="long",
)
streaming_config = cloud_speech_types.StreamingRecognitionConfig(
config=recognition_config,
streaming_features=cloud_speech_types.StreamingRecognitionFeatures(
interim_results=True, # 日本語ではTrue限定?
enable_voice_activity_events=True,
)
)
project_id = os.environ.get('GOOGLE_PROJECT_ID')
config_request = cloud_speech_types.StreamingRecognizeRequest(
recognizer=f"projects/{project_id}/locations/global/recognizers/_",
streaming_config=streaming_config,
)
def generate_audio():
while True:
data:List[bytes] = []
while True:
try:
chunk = _audio_buff.get(block=False)
if chunk is None:break
data.append(chunk.tobytes())
except Empty:
break
if not len(data):continue
yield b"".join(data)
def request_generator(config: cloud_speech_types.RecognitionConfig, audio: Generator[bytes,None,None]):
yield config
for chunk in audio:
yield cloud_speech_types.StreamingRecognizeRequest(audio=chunk)
speech_client = SpeechClient()
def stt_with_google():
responses_iterator = speech_client.streaming_recognize(requests=request_generator(config_request,generate_audio()))
for response in responses_iterator:
if not response.results:continue
result:StreamingRecognitionResult = response.results[0] # response.resultsは複数存在する。これはうまく聞き取れなかったときに、いくつかの候補を挙げるもので、[0]が一番有力
if not result.alternatives:continue # alternativesはMutableSequence。よく聞き取れなかったときに、他に何と喋っていたのかの候補をを返す
if result.is_final:
print('final!')
transcript = result.alternatives[0].transcript
print(transcript)
############## ここから実行部分 ##############
t1 = Thread(target=stt_with_google)
t1.start()
with sd.InputStream(channels=1, dtype='int16', callback=input_stream_callback, samplerate=16000, blocksize=int(16000 * 0.1)):
print('start')
sd.sleep(4*1000) # 4秒間録音
_audio_buff.put(None) # 録音終了後、get_buffスレッドが終了するようフラグを変更
t1.join()
print('fin!')
Discussion