Zenn
Closed9

あらためて「Google Cloud Speech-to-Text」を試す

kun432kun432

短い音声ファイルのSTT

https://cloud.google.com/speech-to-text/docs/sync-recognize?hl=ja

60 秒未満の短い音声を同期で音声認識する。60秒を超える場合は非同期で行う必要があるらさいい。

パッケージインストール。google-cloud-speechをアップデートするとランタイム再起動が必要になる。

!pip install --upgrade google-cloud-speech datasets

認証。画面に従って行う。

!gcloud auth application-default login

事前にGoogle Cloudプロジェクトを作成し、Cloud Speech-to-Text APIを有効にしておいて、そのプロジェクトIDを指定。

!gcloud auth application-default set-quota-project <プロジェクトID>

データセットは以下を使う

https://huggingface.co/datasets/reazon-research/reazonspeech

from datasets import load_dataset

ds = load_dataset("reazon-research/reazonspeech", "tiny", split="train[:100]", trust_remote_code=True)

適当にピックアップ

ds[6]
出力
{'name': '000/0048b7384d276.flac',
 'audio': {'path': '/root/.cache/huggingface/datasets/downloads/extracted/c50543a6ab5c6805e43f77f8957ae81e61f5b17cecb476837fd0dca8789b0484/000/0048b7384d276.flac',
  'array': array([-0.05941772, -0.05899048, -0.05633545, ...,  0.00518799,
          0.00595093,  0.00311279]),
  'sampling_rate': 16000},
 'transcription': '積極的にお金を使うべきだと主張する政治家や省庁と支出を抑えたい財務省との間でせめぎ合いが続きます。'}
ds[18]
出力
{'name': '000/00b835a47edf9.flac',
 'audio': {'path': '/root/.cache/huggingface/datasets/downloads/extracted/c50543a6ab5c6805e43f77f8957ae81e61f5b17cecb476837fd0dca8789b0484/000/00b835a47edf9.flac',
  'array': array([4.16870117e-02, 3.16162109e-02, 1.47705078e-02, ...,
         1.46484375e-03, 6.10351562e-04, 6.10351562e-05]),
  'sampling_rate': 16000},
 'transcription': 'さてここまで直線を表す方程式がどのような直線になるかを見てきましたがここからは逆にある1点を通って傾きmの直線がどのような方程式で表されるのかを考えてみましょう。'}

Colab上で再生確認する場合は以下のような感じで。

from IPython.display import Audio, display

display(Audio(ds[6]["audio"]["path"], autoplay=True))

ではSTTしてみる。encodingsample_rate_hertzは音声ファイルに合わせて変更。

from google.cloud import speech

client = speech.SpeechClient()

with open(ds[6]["audio"]["path"], "rb") as f:
    audio_content = f.read()

audio = speech.RecognitionAudio(content=audio_content)
config = speech.RecognitionConfig(
    encoding=speech.RecognitionConfig.AudioEncoding.FLAC,
    sample_rate_hertz=16000,
    language_code="ja-JP"
)

response = client.recognize(config=config, audio=audio)
print(response)
出力
results {
  alternatives {
    transcript: "積極的にお金を使うべきだと主張する政治家や省庁と失踪を抑えたい 財務省との間で せめぎ合いが続きます"
    confidence: 0.984510839
  }
  result_end_time {
    seconds: 8
    nanos: 110000000
  }
  language_code: "ja-jp"
}
total_billed_time {
  seconds: 9
}
request_id: 3057035447765523242

実際にテキストを取得する場合、response.resultsに分割された形で含まれるらしいので、ループで取得する。あと、実際に確認したわけではないけども、書き起こしは複数の文章が返される場合があり、その場合には可能性(confidence)が高いものから順番に並ぶ様子。

# 各結果は、オーディオの連続した部分に対応しているため、
# 繰り返し実行してオーディオファイル全体の書き起こしを取得する。
for result in response.results:
    # 最初の選択肢が最も可能性が高い
    print(f"書き起こし: {result.alternatives[0].transcript}")

別のデータでも。

with open(ds[18]["audio"]["path"], "rb") as f:
    audio_content = f.read()

audio = speech.RecognitionAudio(content=audio_content)
config = speech.RecognitionConfig(
    encoding=speech.RecognitionConfig.AudioEncoding.FLAC,
    sample_rate_hertz=16000,
    language_code="ja-JP"
)

response = client.recognize(config=config, audio=audio)
print(response)

こちらは複数に分割されているのがわかる

出力
results {
  alternatives {
    transcript: "直線の方程式"
    confidence: 0.787059665
  }
  result_end_time {
    seconds: 1
    nanos: 930000000
  }
  language_code: "ja-jp"
}
results {
  alternatives {
    transcript: " さてここまで直線を表す方程式がどのような直線になるかを見てきましたが ここからは 逆にある一点を通って 傾き M の直線がどのような方程式で表されるのかを考えてみましょう"
    confidence: 0.987571955
  }
  result_end_time {
    seconds: 19
    nanos: 830000000
  }
  language_code: "ja-jp"
}
total_billed_time {
  seconds: 20
}
request_id: 5461322970594425852
for result in response.results:
    print(f"書き起こし: {result.alternatives[0].transcript}")
出力
書き起こし: 直線の方程式
書き起こし:  さてここまで直線を表す方程式がどのような直線になるかを見てきましたが ここからは 逆にある一点を通って 傾き M の直線がどのような方程式で表されるのかを考えてみましょう

なお、GCS上のファイルならURIで直接指定ができる。

kun432kun432

長い音声ファイルのSTT

https://cloud.google.com/speech-to-text/docs/async-recognize?hl=ja

60秒を超える音声データの場合は以下が前提となる。

  • 非同期で行う
  • GCSに音声データを保存しておく

上記を除けばコードの記述はそれほど大きく変わるわけではなく、

  • 音声データはspeech.RecognitionAudio(uri=gcs_uri)を使う
  • speech.SpeechClient().recognize()の代わりに、speech.SpeechClient().long_running_recognize()を使う
  • long_running_recognize()が完了するのを待って結果を取得する

という感じ。

以下で公開されている会議の議事録練習用の音声データを使用させていただく

https://note.com/note_0530/n/neb6a205744bd

GCSにバケットを作成してファイルをアップロードしておくこと。

from google.cloud import speech

client = speech.SpeechClient()

audio = speech.RecognitionAudio(uri="gs://<GCSバケット名>/sample.wav")
config = speech.RecognitionConfig(
    encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
    sample_rate_hertz=44100,
    language_code="ja-JP"
)

operation = client.long_running_recognize(config=config, audio=audio)
response = operation.result(timeout=90)
print(response)
出力
results {
  alternatives {
    transcript: "今日は鈴木さんは研修のため不在ですそれ以外の皆さんは(snip)"
    confidence: 0.920531213
  }
  result_end_time {
    seconds: 59
    nanos: 970000000
  }
  language_code: "ja-jp"
}
results {
  alternatives {
    transcript: "はいおかげさまで良くなりましたするとのことですが(snip)"
    confidence: 0.948999882
  }
  result_end_time {
    seconds: 112
    nanos: 200000000
  }
  language_code: "ja-jp"
}
(snip)

結合すればすべての音声の文字起こしを取得できる。

transcription = "".join([result.alternatives[0].transcript for result in response.results])

結果をGCSに直接出力することもできるみたい。

kun432kun432

ストリーミング入力の音声を文字に変換する

https://cloud.google.com/speech-to-text/docs/transcribe-streaming-audio?hl=ja

本題はこれ。

ストリーミング音声認識を使用すると、音声を Speech-to-Text にストリーミングし、音声を処理しながらリアルタイムでストリーム音声認識の結果を受信できます。

サンプルとして、自分が開催した勉強会のYouTube動画から冒頭5分程度の音声(WAV)を抜き出して、疑似ストリーミング的に処理させてみた。

https://www.youtube.com/watch?v=Yl2kR6zLRY8

from google.cloud import speech
import wave

# ストリーミングの代替としてWAVファイルをチャンク分割して返すジェネレータ
def wav_chunk_generator(file_path: str, chunk_size: int = 1024):
    with wave.open(file_path, 'rb') as wf:
        while True:
            chunk = wf.readframes(chunk_size)
            if not chunk:
                break
            yield chunk

with open("sample.wav", "rb") as f:
    audio_content = f.read()

stream = wav_chunk_generator("sample.wav", chunk_size=1024)

client = speech.SpeechClient()

requests = (
    speech.StreamingRecognizeRequest(audio_content=chunk) for chunk in stream
)

config = speech.RecognitionConfig(
    encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
    sample_rate_hertz=16000,
    language_code="ja-JP",
)

streaming_config = speech.StreamingRecognitionConfig(config=config)

# streaming_recognizeはジェネレータを返す
responses = client.streaming_recognize(
    config=streaming_config,
    requests=requests,
)

for response in responses:
    # 音声認識が確定した場合、最初の結果にis_finalが含まれる。
    # その他の結果は、音声データの後続部分に対応している。
    for result in response.results:
        print(f"Finished: {result.is_final}")
        print(f"Stability: {result.stability}")
        alternatives = result.alternatives
        # alternativesは、最も確からしいものから順に並んでいる。
        for alternative in alternatives:
            print(f"Confidence: {alternative.confidence}")
            print(f"Transcript: {alternative.transcript}")
    print("--------")
出力
Finished: True
Stability: 0.0
Confidence: 0.7634750604629517
Transcript: はいじゃあ始めます ちょっと まだ来られてない方もいらっしゃるんですけど ボイス ランチ jp 始めます 皆さん見てる
--------
Finished: True
Stability: 0.0
Confidence: 0.6886197328567505
Transcript:  日曜日にお集まりいただきましてありがとうございます 今日久しぶりにですね オフラインということで今日は ですね スペシャルなゲストをお二人 来ていただいておりますということではいえーと 今日ちょっと トピックに参りますけれども VOICE 4 の仕様であるプレゼンリングさんとあと セールスフォースのかぼちゃと デザインのディレクターである グレッグベネットさんに来ていただいてます
--------
Finished: True
Stability: 0.0
Confidence: 0.8904028534889221
Transcript: ということで 日本に来ていただいてありがとうございます
--------
(snip)
--------
Finished: True
Stability: 0.0
Confidence: 0.5103464722633362
Transcript:  分けていただきますというところでボイスをアップデート 2022 というところで今年の新京成について少しお話をします 自己紹介です 清水 と申します 神戸でインフラのエンジニアをやってましたので 普段はくわね ですとか 選別とかテラフォンとかをいじってまして最近ちょっとフリーランスになります ちょっと調べてみたら ボイス フロー 一番最初に始めたのは 2019年の 頭ぐらいなんでだいたい4年弱 ぐらいですね 色々と触ってましてあと 音声 関連のコミュニティのとこでは VOICE lunch JP 今回のやつですね 以外に A ジャグ Amazon Alexa ジャパン ユーザーグループとかあと VOICE Pro の日本語ユーザーグループということで vfj LG というのをやっています
--------
Finished: True
Stability: 0.0
Confidence: 0.7589678168296814
Transcript:  日本ファミリーの方は Facebook の方でやってますので もしよろしければ見ていただければなと思います
--------
Finished: True
Stability: 0.0
Confidence: 0.8742498755455017
Transcript: あと2年ぐらい前にですね 技術書店の方でここに今日 スタッフで来ていただいてる皆さんとですね 一緒に あの 同時に作ろうぜ ということで作ったんですけれども もうこれちょっと2年ぐらい経って中身がだいぶ 古くなってしまっているのですでにちょっと販売は終了しております 今日ちょっと持ってきたかったんですけど すいません 忘れてしまいましたはいなのでこういうこともやっています
--------

実行してみるとストリーミングで処理されているのがわかる。

kun432kun432

上の続き。

マイクからの入力をストリーミングで受けてみる。こちらはColaboratoryだと難しいので、ローカルのMac上でやる。

Python仮想環境を作成。自分はmiseを使うけど、適宜。

mkdir google-cloud-stt-work && cd google-cloud-stt-work
mise use python@3.12
cat << 'EOS' >> .mise.toml

[env]
_.python.venv = { path = ".venv", create = true }
EOS
mise trust

パッケージインストール

pip install google-cloud-speech pyaudio

Google Cloudの認証は実施済みとする。

ドキュメントには2つの例があるが、まず1つ目のサンプル。

import queue
import re
import sys

from google.cloud import speech

import pyaudio

# オーディオ録音パラメータ
RATE = 16000
CHUNK = int(RATE / 10)  # 100ms


class MicrophoneStream:
    """オーディオチャンクを生成するレコーディングストリームを開く"""

    def __init__(self: object, rate: int = RATE, chunk: int = CHUNK) -> None:
        """オーディオとジェネレーターはメインスレッド上で実行されることが保証される"""
        self._rate = rate
        self._chunk = chunk

        # オーディオデータのスレッドセーフなバッファを作成する
        self._buff = queue.Queue()
        self.closed = True

    def __enter__(self: object) -> object:
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            # 現在、APIは1チャンネル(モノラル)オーディオのみをサポートしてい
            # https://goo.gl/z757pE
            channels=1,
            rate=self._rate,
            input=True,
            frames_per_buffer=self._chunk,
            # オーディオストリームを非同期で実行し、バッファオブジェクトを満たす。
            # これは、呼び出しスレッドがネットワークリクエストを行っている間など、
            # 入力デバイスのバッファがオーバーフローしないようにするために必要。
            stream_callback=self._fill_buffer,
        )

        self.closed = False

        return self

    def __exit__(
        self: object,
        type: object,
        value: object,
        traceback: object,
    ) -> None:
        """接続が失われたかどうかに関係なく、ストリームを閉じる"""
        self._audio_stream.stop_stream()
        self._audio_stream.close()
        self.closed = True
        # ジェネレーターを終了させて、クライアントのstreaming_recognizeメソッドが
        # プロセス終了をブロックしないようにする
        self._buff.put(None)
        self._audio_interface.terminate()

    def _fill_buffer(
        self: object,
        in_data: object,
        frame_count: int,
        time_info: object,
        status_flags: object,
    ) -> object:
        """オーディオストリームからデータを継続的に収集し、バッファに入れる

        引数:
            in_data: バイトオブジェクトとしてのオーディオデータ
            frame_count: キャプチャされたフレームの数
            time_info: 時間情報
            status_flags: ステータスフラグ

        戻り値:
            バイトオブジェクトとしてのオーディオデータ。
        """
        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self: object) -> object:
        """オーディオデータのストリームからオーディオチャンクを生成する。

        引数:
            self: MicrophoneStreamオブジェクト

        戻り値:
            オーディオチャンクを出力するジェネレーター。
        """
        while not self.closed:
            # ブロッキングget()を使用して、少なくとも1つのデータチャンクがあることを確認し、
            # チャンクがNoneの場合はイテレーションを停止する。これはオーディオストリームの
            # 終了を示す。
            chunk = self._buff.get()
            if chunk is None:
                return
            data = [chunk]

            # ここで、まだバッファに残っている他のデータを消費する。
            while True:
                try:
                    chunk = self._buff.get(block=False)
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b"".join(data)


def listen_print_loop(responses: object) -> str:
    """サーバーのレスポンスを反復処理して表示する。

    渡されるresponsesは、サーバーからレスポンスが提供されるまでブロックするジェネレーター。

    各レスポンスには複数の結果が含まれる場合があり、各結果には複数の代替案が含まれる場合がある。
    詳細はhttps://goo.gl/tjCPAUを参照。ここでは、最上位の結果の最上位の代替案の
    トランスクリプションのみを表示する。

    この場合、レスポンスは中間結果も提供する。レスポンスが中間のものである場合、
    次の結果がそれを上書きできるように、行末に改行を追加する。レスポンスが最終的なものである場合、
    確定したトランスクリプションを保持するために改行を追加する。

    引数:
        responses: サーバーのレスポンスのリスト

    戻り値:
        トランスクリプションされたテキスト。
    """
    num_chars_printed = 0
    for response in responses:
        if not response.results:
            continue

        # `results`リストは連続している。ストリーミングの場合、最初の結果のみを考慮する。
        # 一度`is_final`になると、次の発話を考慮する。
        result = response.results[0]
        if not result.alternatives:
            continue

        # 最上位の代替案のトランスクリプションを表示する。
        transcript = result.alternatives[0].transcript

        # 中間結果を表示するが、行末にキャリッジリターンを追加して、
        # 後続の行がそれを上書きするようにする。
        #
        # 前の結果がこの結果よりも長い場合、前の結果を上書きするために
        # いくつかの余分なスペースを印刷する必要がある。
        overwrite_chars = " " * (num_chars_printed - len(transcript))

        if not result.is_final:
            sys.stdout.write(transcript + overwrite_chars + "\r")
            sys.stdout.flush()

            num_chars_printed = len(transcript)

        else:
            print(transcript + overwrite_chars)

            # トランスクリプションされたフレーズのいずれかがキーワードの1つである場合、
            # 認識を終了する。
            if re.search(r"\b(終了|ストップ)\b", transcript, re.I):
                print("\n### 終了ワードが検出されました。音声認識を終了します。 ###\n")
                break

            num_chars_printed = 0

    return transcript


def main() -> None:
    """オーディオファイルから音声を文字起こしする。"""
    print("\n### 音声認識を開始します。 終了する場合は「終了」または「ストップ」と言ってください。 ###\n")

    # サポートされている言語のリストについては、
    # http://g.co/cloud/speech/docs/languages を参照。
    language_code = "ja-JP"  # BCP-47言語タグ

    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=RATE,
        language_code=language_code,
    )

    streaming_config = speech.StreamingRecognitionConfig(
        config=config, interim_results=True
    )

    with MicrophoneStream(RATE, CHUNK) as stream:
        audio_generator = stream.generator()
        requests = (
            speech.StreamingRecognizeRequest(audio_content=content)
            for content in audio_generator
        )

        responses = client.streaming_recognize(streaming_config, requests)

        # ここで、文字起こしのレスポンスを利用する。
        listen_print_loop(responses)


if __name__ == "__main__":
    main()

こんな感じで動作する。

https://www.youtube.com/watch?v=qJQvNlzy4zw

ほぼリアルタイムで処理されているのがわかる。

もう1つの例。

import queue
import re
import sys
import time

from google.cloud import speech
import pyaudio

# オーディオ録音パラメータ
STREAMING_LIMIT = 240000  # 4分
SAMPLE_RATE = 16000
CHUNK_SIZE = int(SAMPLE_RATE / 10)  # 100ms

RED = "\033[0;31m"
GREEN = "\033[0;32m"
YELLOW = "\033[0;33m"


def get_current_time() -> int:
    """現在の時間をミリ秒で返す。

    Returns:
        int: 現在の時間(ミリ秒)。
    """

    return int(round(time.time() * 1000))


class ResumableMicrophoneStream:
    """オーディオチャンクを生成するジェネレータとして録音ストリームを開く。"""

    def __init__(
        self: object,
        rate: int,
        chunk_size: int,
    ) -> None:
        """再開可能なマイクストリームを作成する。

        Args:
        self: クラスのインスタンス。
        rate: オーディオファイルのサンプリングレート。
        chunk_size: オーディオファイルのチャンクサイズ。

        returns: None
        """
        self._rate = rate
        self.chunk_size = chunk_size
        self._num_channels = 1
        self._buff = queue.Queue()
        self.closed = True
        self.start_time = get_current_time()
        self.restart_counter = 0
        self.audio_input = []
        self.last_audio_input = []
        self.result_end_time = 0
        self.is_final_end_time = 0
        self.final_request_end_time = 0
        self.bridging_offset = 0
        self.last_transcript_was_final = False
        self.new_stream = True
        self._audio_interface = pyaudio.PyAudio()
        self._audio_stream = self._audio_interface.open(
            format=pyaudio.paInt16,
            channels=self._num_channels,
            rate=self._rate,
            input=True,
            frames_per_buffer=self.chunk_size,
            # バッファオブジェクトを埋めるためにオーディオストリームを非同期で実行する。
            # これは、呼び出しスレッドがネットワークリクエストなどを行っている間に
            # 入力デバイスのバッファがオーバーフローしないようにするために必要。
            stream_callback=self._fill_buffer,
        )

    def __enter__(self: object) -> object:
        """ストリームを開く。

        Args:
        self: クラスのインスタンス。

        returns: None
        """
        self.closed = False
        return self

    def __exit__(
        self: object,
        type: object,
        value: object,
        traceback: object,
    ) -> object:
        """ストリームを閉じてリソースを解放する。

        Args:
        self: クラスのインスタンス。
        type: 例外のタイプ。
        value: 例外の値。
        traceback: 例外のトレースバック。

        returns: None
        """
        self._audio_stream.stop_stream()
        self._audio_stream.close()
        self.closed = True
        # ジェネレータに終了を通知して、クライアントの
        # streaming_recognize メソッドがプロセスの終了をブロックしないようにする。
        self._buff.put(None)
        self._audio_interface.terminate()

    def _fill_buffer(
        self: object,
        in_data: object,
        *args: object,
        **kwargs: object,
    ) -> object:
        """オーディオストリームからデータを継続的に収集して、バッファに入れる。

        Args:
        self: クラスのインスタンス。
        in_data: バイトオブジェクトとしてのオーディオデータ。
        args: 追加の引数。
        kwargs: 追加の引数。

        returns: None
        """
        self._buff.put(in_data)
        return None, pyaudio.paContinue

    def generator(self: object) -> object:
        """マイクからAPIおよびローカルバッファへのオーディオストリーム

        Args:
            self: クラスのインスタンス。

        returns:
            オーディオストリームからのデータ。
        """
        while not self.closed:
            data = []

            if self.new_stream and self.last_audio_input:
                chunk_time = STREAMING_LIMIT / len(self.last_audio_input)

                if chunk_time != 0:
                    if self.bridging_offset < 0:
                        self.bridging_offset = 0

                    if self.bridging_offset > self.final_request_end_time:
                        self.bridging_offset = self.final_request_end_time

                    chunks_from_ms = round(
                        (self.final_request_end_time - self.bridging_offset)
                        / chunk_time
                    )

                    self.bridging_offset = round(
                        (len(self.last_audio_input) - chunks_from_ms) * chunk_time
                    )

                    for i in range(chunks_from_ms, len(self.last_audio_input)):
                        data.append(self.last_audio_input[i])

                self.new_stream = False

            # 少なくとも1つのデータチャンクがあることを保証するためにブロッキングget()を使用し、
            # チャンクがNoneの場合はオーディオストリームの終了を示すために反復を停止する。
            chunk = self._buff.get()
            self.audio_input.append(chunk)

            if chunk is None:
                return
            data.append(chunk)
            # まだバッファリングされている他のデータを消費する。
            while True:
                try:
                    chunk = self._buff.get(block=False)

                    if chunk is None:
                        return
                    data.append(chunk)
                    self.audio_input.append(chunk)

                except queue.Empty:
                    break

            yield b"".join(data)


def listen_print_loop(responses: object, stream: object) -> None:
    """サーバーのレスポンスを反復処理して表示する。

    渡されたレスポンスは、サーバーからレスポンスが提供されるまでブロックするジェネレータ。

    各レスポンスには複数の結果が含まれる場合があり、各結果には複数の代替案が含まれる場合がある。
    詳細は https://goo.gl/tjCPAU を参照。ここでは、トップ結果のトップ代替案の文字起こしのみを表示する。

    この場合、レスポンスは中間結果も提供する。レスポンスが中間の場合、次の結果がそれを上書きできるように
    行末に改行を追加する。最終的なレスポンスの場合、確定した文字起こしを保持するために改行を追加する。

    引数:
        responses: APIから返されたレスポンス。
        stream: 処理するオーディオストリーム。
    """
    for response in responses:
        if get_current_time() - stream.start_time > STREAMING_LIMIT:
            stream.start_time = get_current_time()
            break

        if not response.results:
            continue

        result = response.results[0]

        if not result.alternatives:
            continue

        transcript = result.alternatives[0].transcript

        result_seconds = 0
        result_micros = 0

        if result.result_end_time.seconds:
            result_seconds = result.result_end_time.seconds

        if result.result_end_time.microseconds:
            result_micros = result.result_end_time.microseconds

        stream.result_end_time = int((result_seconds * 1000) + (result_micros / 1000))

        corrected_time = (
            stream.result_end_time
            - stream.bridging_offset
            + (STREAMING_LIMIT * stream.restart_counter)
        )
        # 中間結果を表示するが、行末にキャリッジリターンを追加して、
        # 後続の行がそれを上書きするようにする。

        if result.is_final:
            sys.stdout.write(GREEN)
            sys.stdout.write("\033[K")
            sys.stdout.write(str(corrected_time) + ": " + transcript + "\n")

            stream.is_final_end_time = stream.result_end_time
            stream.last_transcript_was_final = True

            # 文字起こしされたフレーズがキーワードのいずれかである場合、認識を終了する。
            if re.search(r"\b(終了|ストップ)\b", transcript, re.I):
                sys.stdout.write(YELLOW)
                sys.stdout.write("終了します...\n")
                stream.closed = True
                break
        else:
            sys.stdout.write(RED)
            sys.stdout.write("\033[K")
            sys.stdout.write(str(corrected_time) + ": " + transcript + "\r")

            stream.last_transcript_was_final = False


def main() -> None:
    """マイク入力から音声APIへの双方向ストリーミングを開始する"""
    client = speech.SpeechClient()
    config = speech.RecognitionConfig(
        encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
        sample_rate_hertz=SAMPLE_RATE,
        language_code="ja-JP",
        max_alternatives=1,
    )

    streaming_config = speech.StreamingRecognitionConfig(
        config=config, interim_results=True
    )

    mic_manager = ResumableMicrophoneStream(SAMPLE_RATE, CHUNK_SIZE)
    print(mic_manager.chunk_size)
    sys.stdout.write(YELLOW)
    sys.stdout.write('\n「終了」または「ストップ」と言って停止します。\n\n')
    sys.stdout.write("終了 (ms)       文字起こし結果/ステータス\n")
    sys.stdout.write("=====================================================\n")

    with mic_manager as stream:
        while not stream.closed:
            sys.stdout.write(YELLOW)
            sys.stdout.write(
                "\n" + str(STREAMING_LIMIT * stream.restart_counter) + ": 新しいリクエスト\n"
            )

            stream.audio_input = []
            audio_generator = stream.generator()

            requests = (
                speech.StreamingRecognizeRequest(audio_content=content)
                for content in audio_generator
            )

            responses = client.streaming_recognize(streaming_config, requests)

            # 文字起こしのレスポンスを利用する。
            listen_print_loop(responses, stream)

            if stream.result_end_time > 0:
                stream.final_request_end_time = stream.is_final_end_time
            stream.result_end_time = 0
            stream.last_audio_input = []
            stream.last_audio_input = stream.audio_input
            stream.audio_input = []
            stream.restart_counter = stream.restart_counter + 1

            if not stream.last_transcript_was_final:
                sys.stdout.write("\n")
            stream.new_stream = True


if __name__ == "__main__":
    main()

こちらはこんな感じ。

https://youtu.be/NEtnVVTj_Dc

音声を認識すると最初は赤文字で表示されるが、これがInterimという中間状態の音声認識。最終的に確定した音声認識は緑文字で出力されている。

なるほど、中間状態はかなり認識が速いが精度が低め、最終確定状態だと精度は上がるが、中間〜最終までには多少時間がかかる、という感じなのね。

kun432kun432

一応当初確認したかったことはできた。この中間状態の精度でどこまで使えるかはやってみないとわからない。ただLLMなら、ある程度間違っててもよしなに答えてくれそうな気はする。ChatGPTなんかでtypoしててもそこまでおかしな回答になることはない気がするし。

kun432kun432

その他

他にも色々機能があるので、サラッと。

単語レベルの信頼度

https://cloud.google.com/speech-to-text/docs/word-confidence?hl=ja

レスポンスに含まれるconfidenceで認識信頼度が返されていたが、上の方で試した限り、ある程度の文章単位になっていたものを単語単位の細かいところまで返すことができるみたい。ただ、日本語の場合どうなるのかはわからない。

話者ダイアライぜーション

https://cloud.google.com/speech-to-text/docs/multiple-voices?hl=ja

話者ダイアライゼーションも一応できるとあるので、ちょっと違うデータになるが、Kotoba-Whisperのサンプル音声(3人の話者が含まれる)で試してみる。

https://zenn.dev/kun432/scraps/c3111462f8b70f

#from google.cloud import speech_v1p1beta1 as speech  # これじゃなくてもいけた
from google.cloud import speech

client = speech.SpeechClient()

with open("sample_diarization_japanese.mp3", "rb") as audio_file:
    content = audio_file.read()

audio = speech.RecognitionAudio(content=content)

# 話者台荒いぜーションの設定はSpeakerDiarizationConfigで行う
diarization_config = speech.SpeakerDiarizationConfig(
    enable_speaker_diarization=True, # 話者第あらいぜーションを有効化
    min_speaker_count=3, # 最小話者数
    max_speaker_count=3, # 最大話者数
)

config = speech.RecognitionConfig(
    encoding=speech.RecognitionConfig.AudioEncoding.MP3,
    sample_rate_hertz=44100,
    language_code="ja-JP",
    diarization_config=diarization_config,  # RecognitionConfigにSpeakerDiarizationConfigを追加
)

response = client.recognize(config=config, audio=audio)

# 各結果内のトランスクリプトは、各結果ごとに分離され連続したものになる。
# 
# ただし、選択肢内の単語リストには、これまでのすべての結果のすべての単語が含まれる。
# したがって、話者タグを持つすべての単語を取得するには、最後の結果から単語リストを取得するだけです:
result = response.results[-1]

words_info = result.alternatives[0].words

for word_info in words_info:
    print(f"単語: '{word_info.word}', 話者タグ: {word_info.speaker_tag}")
出力
処理が完了するのを待っています...
単語: 'そう', 話者タグ: 1
単語: 'です', 話者タグ: 1
単語: 'ね', 話者タグ: 1
単語: 'これ', 話者タグ: 1
単語: 'も', 話者タグ: 1
単語: '先ほど', 話者タグ: 1
単語: 'から', 話者タグ: 1
単語: 'ずっと', 話者タグ: 1
単語: '言っ', 話者タグ: 1
単語: 'てる', 話者タグ: 1
単語: '自分', 話者タグ: 1
単語: 'の', 話者タグ: 1
単語: '感覚', 話者タグ: 1
単語: '的', 話者タグ: 1
単語: 'に', 話者タグ: 1
単語: 'は', 話者タグ: 1
単語: '大丈夫', 話者タグ: 1
単語: 'です', 話者タグ: 1
単語: 'けれど', 話者タグ: 1
単語: 'も', 話者タグ: 1
単語: 'もう', 話者タグ: 1
単語: '今', 話者タグ: 1
単語: 'は', 話者タグ: 1
単語: '屋外', 話者タグ: 1
単語: 'の', 話者タグ: 1
単語: '気温', 話者タグ: 1
単語: '昼', 話者タグ: 1
単語: 'も', 話者タグ: 1
単語: '夜', 話者タグ: 1
単語: 'も', 話者タグ: 1
単語: '上がっ', 話者タグ: 1
単語: 'て', 話者タグ: 1
単語: 'ます', 話者タグ: 1
単語: 'の', 話者タグ: 1
単語: 'で', 話者タグ: 1
単語: '空気', 話者タグ: 1
単語: 'の', 話者タグ: 1
単語: '入れ替え', 話者タグ: 1
単語: 'だけ', 話者タグ: 1
単語: 'で', 話者タグ: 1
単語: 'は', 話者タグ: 1
単語: 'かえって', 話者タグ: 1
単語: '子孫', 話者タグ: 1
単語: 'が', 話者タグ: 1
単語: '上がっ', 話者タグ: 1
単語: 'て', 話者タグ: 1
単語: 'き', 話者タグ: 1
単語: 'ます', 話者タグ: 1
単語: 'やっぱり', 話者タグ: 1
単語: '愚直', 話者タグ: 1
単語: 'に', 話者タグ: 1
単語: 'やっぱり', 話者タグ: 1
単語: 'その', 話者タグ: 1
単語: '町', 話者タグ: 1
単語: 'の', 話者タグ: 1
単語: '良', 話者タグ: 1
単語: 'さ', 話者タグ: 1
単語: 'を', 話者タグ: 1
単語: 'アピール', 話者タグ: 1
単語: 'し', 話者タグ: 1
単語: 'て', 話者タグ: 1
単語: 'くっ', 話者タグ: 1
単語: 'て', 話者タグ: 1
単語: 'いう', 話者タグ: 1
単語: 'そう', 話者タグ: 1
単語: 'いう', 話者タグ: 1
単語: '姿勢', 話者タグ: 1
単語: 'が', 話者タグ: 1
単語: '基本', 話者タグ: 1
単語: 'に', 話者タグ: 1
単語: 'あっ', 話者タグ: 1
単語: 'た', 話者タグ: 1
単語: '上', 話者タグ: 1
単語: 'で', 話者タグ: 1
単語: 'の', 話者タグ: 1
単語: 'こう', 話者タグ: 1
単語: 'いう', 話者タグ: 1
単語: 'pr', 話者タグ: 1
単語: '作戦', 話者タグ: 1
単語: 'だ', 話者タグ: 1
単語: 'と', 話者タグ: 1
単語: '思う', 話者タグ: 1
単語: 'ん', 話者タグ: 1
単語: 'です', 話者タグ: 1
単語: 'よ', 話者タグ: 1
単語: 'ね', 話者タグ: 1
単語: '水', 話者タグ: 1
単語: 'を', 話者タグ: 1
単語: 'マレーシア', 話者タグ: 1
単語: 'から', 話者タグ: 1
単語: '買わ', 話者タグ: 1
単語: 'なく', 話者タグ: 1
単語: 'て', 話者タグ: 1
単語: 'は', 話者タグ: 1
単語: 'なら', 話者タグ: 1
単語: 'ない', 話者タグ: 1
単語: 'の', 話者タグ: 1
単語: 'です', 話者タグ: 1

書いてあるコメント、意味がよくわからんのだけども、要は、認識内容は複数のresultで返ってくるのだけど、話者タグを含むのは最後のresultになっていて、かつ、単語単位で話者タグが付与される、ということらしい。

ただ上の通り話者タグがすべて同じになってしまった。以下を見るとどうやら日本語は対応していないらしい。

https://cloud.google.com/speech-to-text/docs/speech-to-text-supported-languages?hl=ja

言語の自動認識

https://cloud.google.com/speech-to-text/docs/enable-language-recognition-speech-to-text?hl=ja

ここまでの例では言語を固定で指定していたが、複数の言語候補を渡すこともできるみたい。

config = speech.RecognitionConfig(
    encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
    sample_rate_hertz=44100,
    audio_channel_count=2,
    language_code="ja-JP",
    alternative_language_codes=["en", "en-US", "es"],
)

合計4言語まで指定できるらしい。

複数のチャネル

https://cloud.google.com/speech-to-text/docs/multi-channel?hl=ja

複数のチャネル、例えばステレオの場合だと、それぞれのチャネルごとに音声認識することができるらしい。

特定のユースケースに合わせたモデルの変更

https://cloud.google.com/speech-to-text/docs/transcription-model?hl=ja

音声認識モデルは1つだけではなくて、特定のユースケース、例えば、発話の短いもの・長いもの、とか、あとは電話音声、医療向け会話、動画向け等がある模様。おー、これは知らなかった。

ちなみに言語によって提供されて言えるモデルは異なる模様。

https://cloud.google.com/speech-to-text/docs/speech-to-text-supported-languages?hl=ja

拡張モデル

https://cloud.google.com/speech-to-text/docs/enhanced-models?hl=ja

上記の特定ユースケース向けモデルの中には、通常のモデルよりも精度高くSTTができるように最適化された「拡張モデル」というものがあるらしい。拡張モデルが提供されているのは電話向け・動画向けの2つだけ。

句読点の自動挿入

https://cloud.google.com/speech-to-text/docs/automatic-punctuation?hl=ja

ここまで試してきた中で句読点が一切なかったのだけど、これは設定で有効化できるみたい。

from google.cloud import speech

client = speech.SpeechClient()

with open("sample_diarization_japanese.mp3", "rb") as audio_file:
    content = audio_file.read()

audio = speech.RecognitionAudio(content=content)

config = speech.RecognitionConfig(
    encoding=speech.RecognitionConfig.AudioEncoding.MP3,
    sample_rate_hertz=44100,
    language_code="ja-JP",
    enable_automatic_punctuation=True,
)

response = client.recognize(config=config, audio=audio)
for result in response.results:
    print(f"書き起こし: {result.alternatives[0].transcript}")
出力
書き起こし: そうですね。これも 先ほどからずっと言ってる自分の感覚的には大丈夫ですけれども、もう今は屋外の気温。昼も夜も 上がってますので、空気の入れ替え だけではかえって子孫が上がってきます。やっぱり 愚直にやっぱり その町の良さをアピールしてくっていう、そういう姿勢が基本にあった上でのこういう PR 作戦だと思うんですよね。水をマレーシアから買わなくてはならないのです。

きちんと句読点が入っている。

単語のタイムスタンプの自動挿入

https://cloud.google.com/speech-to-text/docs/async-time-offsets?hl=ja

文字起こしの発話の単語単位でタイムスタンプを挿入できるみたい。

kun432kun432

まとめ

自分が知らなかっただけで、全然機能豊富だった。

精度的にはどうなのかな?と思って、各STTの評価比較を探してみたら、以下があった。

https://picovoice.ai/docs/benchmark/stt/

これを見る限り、それほど制度が高い、というわけではなさそうなのだけど、リアルタイム性とか使い勝手とか諸々含めて取捨選択すればいいのかなと思う。

このスクラップは3ヶ月前にクローズされました
ログインするとコメントできます