🙌

【streamlit】音声会話アプリ開発手順③

2024/08/31に公開

この章ではユーザーとAIが音声でやり取りする方法を解説します。

3-1 音声で入力する

今回の肝は、単発の録音ではなくユーザーが話している時にリアルタイムに録音していく(これをストリーミングと言います)方法です。

音声を扱うライブラリはいくつかあるのですが(いくつもあるがゆえに、結局どれ使ったら良いのかわからんのが一番困る)、いろいろ試してpyaudioとwaveが一番シンプルかつ今回のアプリケーションを作成するための要件を満たしているので、これらを選びました。

音声で入力するには、いったん録音したファイルを書き出してから、そのファイルをwhisperにぶち込んでテキスト化します。
(わざわざファイルに書き出す必要あるの?バイナリ直接渡されへんの?、と感じましたが、コンピューター的には変数にぶち込んだバイナリであろうが、ディレクトリに存在するファイルであろうが、メモリに何か書き込んで、それを参照するという動作に変わりはありません。ファイル書き出しをするから時間がかかるということもないです。)

音声入力を初めて扱うような人だと音声関係のライブラリに馴染みがないと思いますので、ここでは録音したデータをファイルに書き出すだけにとどめておきます。

サンプルコード
# 音声で入力する

######## Streamlitの設定 ########
import streamlit as st

st.title("音声で入力する")
st.write("マイクに話しかけてください。")

######## 録音関係の設定 ########
import pyaudio
import wave

p = pyaudio.PyAudio()  # PyAudioのインスタンス化

######## 録音のフラグ ########
if 'recording' not in st.session_state:
    st.session_state.recording = False

######## 録音停止ボタン ########
if st.button("停止", key="stop_button"):
    st.session_state.recording = False

######## 録音開始ボタン ########
if st.button("おしゃべり", key="start_button"):
    st.session_state.recording = True
    st.write("何か話しかけてください")

    chunk = 1024  # フレーム単位での音声の読み込みサイズ
    sample_format = pyaudio.paInt16  # 16ビットの音声
    channels = 1  # モノラル音声
    rate = 16000  # サンプリングレート
    record_seconds = 2

    stream = p.open(format=sample_format,
                    channels=channels,
                    rate=rate,
                    input=True,
                    frames_per_buffer=chunk)
    
    for i in range(5):

        data = stream.read(chunk, exception_on_overflow=False)

        frames = []
        for _ in range(0, int(rate / chunk * record_seconds)):
            data = stream.read(chunk)
            frames.append(data)

        # waveを使って、framesをwavファイルとして書き出す
        with wave.open(f'output{i}.wav', 'wb') as wf:
            wf.setnchannels(channels)
            wf.setsampwidth(p.get_sample_size(sample_format))
            wf.setframerate(rate)
            wf.writeframes(b''.join(frames))


    # トークが終わったら録音ストリームを閉じる
    stream.stop_stream()
    stream.close()
    p.terminate()

ストリーム録音の方法は、p = pyaudio.PyAudio()でインスタンス化した後、open()でストリームをオープンすることです。
(カセットテープを知っているなら、カセットに録音している状態をイメージしてもらうとわかりやすいかな)

そして、stream.readで細切れにした音声を読み取った後、それらを変数framesに格納します。

最後にflamesに貯められたデータ(byte配列)をwaveライブラリを使って、wav形式で書き出すというのが一連の流れです。

ここで、チャンクやサンプリングレートの意味がよくわからない人のために、これらの持つ意味をざっくりイメージしてもらいます。

音声というのは流しそうめんのように、次々と音声情報(周波数の塊)が流れてきます。

これを掬い上げるお箸をチャンク、お椀をフレームとイメージしてください。

より抜け漏れなくそうめんを掬い上げる(stream.readの部分)には、小さいお箸を高速に掬い上げる(チャンクサイズを小さくする)のが良いのですが、あんまり小さくするとコンピューターの処理が追いつかなくなってしまいます。

サンプリングレートやレコーディング時間は、お椀に溜めたそうめんをどれだけの頻度で食べる(waveで書き出し)かをイメージしてもらうとわかりやすいかと思います。

ひとつ注意すべきは、これらのパラメーターは音質に影響するものであり、音声認識がうまくいかない(違う言葉に翻訳されてしまう)時にこれらの数値をいじっても認識にはほとんど効果はないことです。

例えばサンプリングレートの16000はほとんど決まりみたいなものなので、音質にこだわらない限り調整することはありません。(ちなみにハイレゾ音源だとサンプリングレートは9万もあるらしいです)

知り合いのエンジニアに聞いた話では、音声認識はモデルの選び方が肝心で、音質や周波数のフィルタリングはあんまり音声認識に影響はないとのことです。

3-2 音声をテキストに変換

次に音声をテキスト変換する方法を学びます。

これもいくつかライブラリがあるのですが、今回はopenAI API Keyだけで使えるopenAIのaudio.transcriptionsライブラリを使います。
(ただ、性能的にはgoogle cloudのspeech to textのほうが良い気がしました)

サンプルコード
# 音声を日本語に翻訳

######## Streamlitの設定 ########
import streamlit as st

st.title("音声で入力する")
st.write("マイクに話しかけてください。")

######## 録音関係の設定 ########
import pyaudio
import wave

p = pyaudio.PyAudio()  # PyAudioのインスタンス化

######## 音声翻訳関係の設定 ########
import openai

# ハルっているか判定する関数
def hallcinated_transcription(t:str):
    hallcination_texts = ['ご視聴ありがとう', '最後まで視聴','最後までご視聴','視聴してくださって', '本日はご覧いただき', 'おやすみなさい']
    return any(phrase in t for phrase in hallcination_texts)

######## 録音のフラグ ########
if 'recording' not in st.session_state:
    st.session_state.recording = False

######## 録音停止ボタン ########
if st.button("停止", key="stop_button"):
    st.session_state.recording = False

######## 録音開始ボタン ########
if st.button("おしゃべり", key="start_button"):
    st.session_state.recording = True
    st.write("何か話しかけてください")

    chunk = 1024  # フレーム単位での音声の読み込みサイズ
    sample_format = pyaudio.paInt16  # 16ビットの音声
    channels = 1  # モノラル音声
    rate = 16000  # サンプリングレート
    record_seconds = 2

    stream = p.open(format=sample_format,
                    channels=channels,
                    rate=rate,
                    input=True,
                    frames_per_buffer=chunk)
    
    for i in range(5):

        data = stream.read(chunk, exception_on_overflow=False)

        frames = []
        for _ in range(0, int(rate / chunk * record_seconds)):
            data = stream.read(chunk)
            frames.append(data)

        # waveを使って、framesをwavファイルとして書き出す
        with wave.open(f'output{i}.wav', 'wb') as wf:
            wf.setnchannels(channels)
            wf.setsampwidth(p.get_sample_size(sample_format))
            wf.setframerate(rate)
            wf.writeframes(b''.join(frames))

        # openaiのtranscriptionsを使って音声をテキスト変換
        transcription = openai.audio.transcriptions.create(
            file=open(f'output{i}.wav', "rb"),
            model="whisper-1",
            language="ja"
        )
        
        print(f'---------{i}---------')
        print(transcription.text,f'。ハルシネーション:{hallcinated_transcription(transcription.text)}')

        if hallcinated_transcription(transcription.text) is False:
            st.write(transcription.text)


    # トークが終わったら録音ストリームを閉じる
    stream.stop_stream()
    stream.close()
    p.terminate()

openai.audio.transcriptionsを使って音声をテキスト翻訳する部分は以下の箇所です。
このライブラリはバイナリやnumpy配列、AudioSegment等では読み込めないので、書き出したファイルを読み込むことになります。

        transcription = openai.audio.transcriptions.create(
            file=open(f'output{i}.wav', "rb"),
            model="whisper-1",
            language="ja"
        )

このopenai.audio.transcriptions、何も喋っていない状態だとご視聴ありがとうございましたなどを返してきます。
いわゆるハルシネーションなのですが、無音の時に返してくる翻訳が全てYouTubeの終わりの言葉なので、YouTubeから学習したんだなということがよくわかります。

今回のコードではハルシネーションを起こしたときに返されるワードをリスト化し、これらが返ってきたらユーザーが話を終えたと判断する基準にしています。

def hallcinated_transcription(t:str):
    hallcination_texts = ['ご視聴ありがとう', '最後まで視聴','最後までご視聴','視聴してくださって', '本日はご覧いただき', 'おやすみなさい']
    return any(phrase in t for phrase in hallcination_texts)

3-3 音声をユーザー入力にする

3-2で音声を細切れにテキストとする作業をしました。
次にこの細切れのテキストをまとめて、ユーザー入力にしましょう。

また3-2では、録音したデータをファイルに書き出す時にoutput○.wavという名前で書き出していましたが、このファイルは音声をテキスト化したら不要なので、片っ端から消していっても問題ありません。(というか消さないとメモリが不足してしまいます)

これにはpython組み込みライブラリのtemporaryfile.NamedTemporaryFileを使います。

サンプルコード
# user_inputとして蓄積

######## Streamlitの設定 ########
import streamlit as st

st.title("音声で入力する")
st.write("マイクに話しかけてください。")

######## 録音関係の設定 ########
import pyaudio
import wave

p = pyaudio.PyAudio()  # PyAudioのインスタンス化

######## 音声翻訳関係の設定 ########
import openai
from tempfile import NamedTemporaryFile

# ハルっているか判定する関数
def hallcinated_transcription(t:str):
    hallcination_texts = ['ご視聴ありがとう', '最後まで視聴','最後までご視聴','視聴してくださって', '本日はご覧いただき', 'おやすみなさい']
    return any(phrase in t for phrase in hallcination_texts)

######## 録音のフラグ ########
if 'recording' not in st.session_state:
    st.session_state.recording = False

######## 録音停止ボタン ########
if st.button("停止", key="stop_button"):
    st.session_state.recording = False

######## 録音開始ボタン ########
if st.button("おしゃべり", key="start_button"):
    st.session_state.recording = True
    st.write("何か話しかけてください")

    chunk = 1024  # フレーム単位での音声の読み込みサイズ
    sample_format = pyaudio.paInt16  # 16ビットの音声
    channels = 1  # モノラル音声
    rate = 16000  # サンプリングレート
    record_seconds = 2

    stream = p.open(format=sample_format,
                    channels=channels,
                    rate=rate,
                    input=True,
                    frames_per_buffer=chunk)
    
    while st.session_state.recording:

        user_input:str = ""
        
        while True:

            data = stream.read(chunk, exception_on_overflow=False)

            frames = []
            for _ in range(0, int(rate / chunk * record_seconds)):
                data = stream.read(chunk)
                frames.append(data)

            with NamedTemporaryFile(suffix='.wav') as temp_wav_file:
                # waveを使って、framesをwavファイルとして書き出す
                with wave.open(temp_wav_file, 'wb') as wf:
                    wf.setnchannels(channels)
                    wf.setsampwidth(p.get_sample_size(sample_format))
                    wf.setframerate(rate)
                    wf.writeframes(b''.join(frames))

                # openaiのtranscriptionsを使って音声をテキスト変換
                transcription = openai.audio.transcriptions.create(
                    file= open(temp_wav_file.name, "rb"),
                    model="whisper-1",
                    language="ja"
                )
            
            print(transcription.text,f'。ハルシネーション:{hallcinated_transcription(transcription.text)}')

            # 何も喋らないとハルシネーションを起こすので、ハルシネーションを起こしている=話し終えたと判断
            if hallcinated_transcription(transcription.text):
                break

            user_input += transcription.text
        
        st.write(user_input)


    # トークが終わったら録音ストリームを閉じる
    stream.stop_stream()
    stream.close()
    p.terminate()

user_inputという変数を用意して、ここに音声から翻訳された細切れのテキストを継ぎ足していきます(whileループの部分)。

openai.audio.transcriptionsは無音だとハルシネーションを返すので、それをユーザーが話し終えた手がかりとしてwhileループをブレイクしています。

また、wavファイル書き出す部分はtemporaryFile.NamedTemporaryFileを使って書き直しています。

with NamedTemporaryFile(suffix='.wav') as temp_wav_file:
    # waveを使って、framesをwavファイルとして書き出す
    with wave.open(temp_wav_file, 'wb') as wf:
        wf.setnchannels(channels)
        wf.setsampwidth(p.get_sample_size(sample_format))
        wf.setframerate(rate)
        wf.writeframes(b''.join(frames))

    # openaiのtranscriptionsを使って音声をテキスト変換
    transcription = openai.audio.transcriptions.create(
        file= open(temp_wav_file.name, "rb"),
        model="whisper-1",
        language="ja"
    )

temporaryfileにはTemporaryFileというのもありますが、openai.audio.transcriptions.createでfileを指定する時はpython組み込みのopenメソッドで開いて渡すのですが、この時ファイル名を指定する必要があるのでTemporaryFileは使えません。
※ちなみにファイル名はランダムな英数です。開発者が知る必要はありません。

3-4 ユーザーとAIがチャットする

3-3でユーザーの音声をテキストにして投げる準備ができたので、次はAIが回答する機能を加えましょう。

サンプルコード
# ユーザーとチャットする(返答はテキスト)

######## Streamlitの設定 ########
import streamlit as st

st.title("音声で入力する")
st.write("マイクに話しかけてください。")

######## StreamlitChat関係の設定 ########
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langchain_community.chat_message_histories import StreamlitChatMessageHistory

# StreamlitChatMessageHistoryの初期化
chat_history = StreamlitChatMessageHistory(key="chat_messages")

chat_history.add_messages([
    SystemMessage("ユーザーの入力に対して、日本語で返答してください")
])

######## AI返答関係の設定 ########
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI

import os
from dotenv import load_dotenv
load_dotenv()
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')

chat_model = ChatOpenAI(api_key=OPENAI_API_KEY, model="gpt-4o-mini") # type: ignore

######## 録音関係の設定 ########
import pyaudio
import wave

p = pyaudio.PyAudio()  # PyAudioのインスタンス化

######## 音声翻訳関係の設定 ########
import openai
from tempfile import NamedTemporaryFile

# ハルっているか判定する関数
def hallcinated_transcription(t:str):
    hallcination_texts = ['ご視聴ありがとう', '最後まで視聴','最後までご視聴','視聴してくださって','見てくれてありがとう', '本日はご覧いただき', 'おやすみなさい']
    return any(phrase in t for phrase in hallcination_texts)

######## 録音のフラグ ########
if 'recording' not in st.session_state:
    st.session_state.recording = False

######## 録音停止ボタン ########
if st.button("停止", key="stop_button"):
    st.session_state.recording = False

######## 録音開始ボタン ########
if st.button("おしゃべり", key="start_button"):
    st.session_state.recording = True
    st.write("何か話しかけてください")

    chunk = 1024  # フレーム単位での音声の読み込みサイズ
    sample_format = pyaudio.paInt16  # 16ビットの音声
    channels = 1  # モノラル音声
    rate = 16000  # サンプリングレート
    record_seconds = 2

    stream = p.open(format=sample_format,
                    channels=channels,
                    rate=rate,
                    input=True,
                    frames_per_buffer=chunk)
    
    while st.session_state.recording:

        user_input:str = ""
        
        while True:

            data = stream.read(chunk, exception_on_overflow=False)

            frames = []
            for _ in range(0, int(rate / chunk * record_seconds)):
                data = stream.read(chunk)
                frames.append(data)

            with NamedTemporaryFile(suffix='.wav') as temp_wav_file:
                # waveを使って、framesをwavファイルとして書き出す
                with wave.open(temp_wav_file, 'wb') as wf:
                    wf.setnchannels(channels)
                    wf.setsampwidth(p.get_sample_size(sample_format))
                    wf.setframerate(rate)
                    wf.writeframes(b''.join(frames))

                # openaiのtranscriptionsを使って音声をテキスト変換
                transcription = openai.audio.transcriptions.create(
                    file= open(temp_wav_file.name, "rb"),
                    model="whisper-1",
                    language="ja"
                )
            
            print(transcription.text,f'。ハルシネーション:{hallcinated_transcription(transcription.text)}')

            # 何も喋らないとハルシネーションを起こすので、ハルシネーションを起こしている=話し終えたと判断
            if hallcinated_transcription(transcription.text):
                break

            user_input += transcription.text
        
        # user_inputをchat_historyに追加
        chat_history.add_user_message(HumanMessage(user_input))

        # user_inputを表示する
        with st.chat_message("human"):
            st.markdown(user_input)

        # AIの返答
        ai_response = chat_model.invoke(chat_history.messages)

        # user_inputをchat_historyに追加
        chat_history.add_ai_message(AIMessage(ai_response.content))

        # ChatBotの返答を表示する
        with st.chat_message("ai"):
            st.markdown(ai_response.content)


    # トークが終わったら録音ストリームを閉じる
    stream.stop_stream()
    stream.close()
    p.terminate()

チャットをするためのモデルをChatOpenAIで作成し(この時変数名がmodelだけだと音声認識モデルとややこしくなるので、変数名はchat_modelとしています)、あとはchat_model.invoke()でAPIに投げています。

返答後の処理をchat_historyに追加したりst.chat_messageに表示させる部分は2章で説明しましたので割愛します。

3-5 ユーザーとAIがチャットする(返答は音声)

この章の終わりとして、テキストで返されたAIの返答を音声に変換してやりましょう。

テキストを音声に変換するライブラリもいくつかあるのですが、ここでもopenAI API Keyがあれば使えるopenai.audio.speechライブラリを使います

サンプルコード
# ユーザーとチャットする(返答は音声)

import time

######## Streamlitの設定 ########
import streamlit as st

st.title("音声で入力する")
st.write("マイクに話しかけてください。")

######## StreamlitChat関係の設定 ########
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langchain_community.chat_message_histories import StreamlitChatMessageHistory

# StreamlitChatMessageHistoryの初期化
chat_history = StreamlitChatMessageHistory(key="chat_messages")

chat_history.add_messages([
    SystemMessage("ユーザーの入力に対して、日本語で返答してください")
])

######## AI返答関係の設定 ########
from langchain.schema import HumanMessage, AIMessage, SystemMessage
from langchain_openai import ChatOpenAI

import os
from dotenv import load_dotenv
load_dotenv()
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')

chat_model = ChatOpenAI(api_key=OPENAI_API_KEY, model="gpt-4o-mini") # type: ignore

######## 録音関係の設定 ########
import pyaudio
import wave
from pydub import AudioSegment

p = pyaudio.PyAudio()  # PyAudioのインスタンス化

######## 音声翻訳関係の設定 ########
import openai
from tempfile import NamedTemporaryFile

client = openai.OpenAI()

# ハルっているか判定する関数
def hallcinated_transcription(t:str):
    hallcination_texts = ['ご視聴ありがとう', '最後まで視聴','最後までご視聴','視聴してくださって','見てくれてありがとう', '本日はご覧いただき', 'おやすみなさい']
    return any(phrase in t for phrase in hallcination_texts)

######## 録音のフラグ ########
if 'recording' not in st.session_state:
    st.session_state.recording = False

######## 録音停止ボタン ########
if st.button("停止", key="stop_button"):
    st.session_state.recording = False

######## 録音開始ボタン ########
if st.button("おしゃべり", key="start_button"):
    st.session_state.recording = True
    st.write("何か話しかけてください")

    chunk = 1024  # フレーム単位での音声の読み込みサイズ
    sample_format = pyaudio.paInt16  # 16ビットの音声
    channels = 1  # モノラル音声
    rate = 16000  # サンプリングレート
    record_seconds = 2

    stream = p.open(format=sample_format,
                    channels=channels,
                    rate=rate,
                    input=True,
                    frames_per_buffer=chunk)
    
    while st.session_state.recording:

        user_input:str = ""
        
        while True:

            data = stream.read(chunk, exception_on_overflow=False)

            frames = []
            for _ in range(0, int(rate / chunk * record_seconds)):
                data = stream.read(chunk)
                frames.append(data)

            with NamedTemporaryFile(suffix='.wav') as temp_wav_file:
                # waveを使って、framesをwavファイルとして書き出す
                with wave.open(temp_wav_file, 'wb') as wf:
                    wf.setnchannels(channels)
                    wf.setsampwidth(p.get_sample_size(sample_format))
                    wf.setframerate(rate)
                    wf.writeframes(b''.join(frames))

                # openaiのtranscriptionsを使って音声をテキスト変換
                transcription = openai.audio.transcriptions.create(
                    file= open(temp_wav_file.name, "rb"),
                    model="whisper-1",
                    language="ja"
                )
            
            print(transcription.text,f'。ハルシネーション:{hallcinated_transcription(transcription.text)}')

            # 何も喋らないとハルシネーションを起こすので、ハルシネーションを起こしている=話し終えたと判断
            if hallcinated_transcription(transcription.text):
                break

            user_input += transcription.text
        
        # user_inputをchat_historyに追加
        chat_history.add_user_message(HumanMessage(user_input))

        # user_inputを表示する
        with st.chat_message("human"):
            st.markdown(user_input)

        # AIの返答
        ai_response = chat_model.invoke(chat_history.messages)

        # テキストを音声に変換して再生
        with client.audio.speech.with_streaming_response.create(
            model = "tts-1",
            voice = "alloy",# 話者を入力、2024年2月現在の話者は「alloy", "echo", "fable", "onyx", "nova", "shimmer」
            input = str(ai_response.content), 
            response_format='mp3'
        ) as r:
                with NamedTemporaryFile(suffix=".mp3") as temp_audio_file:
                    r.stream_to_file(temp_audio_file.name)

                    st.audio(temp_audio_file.name,autoplay=True)
                    # 注意!AudioSegmentやsimpleaudio等で音声再生すると「zsh: segmentation fault」エラーとなる

                    # 再生中は録音しないようにstreamを停止する
                    stream.stop_stream()
                    audio = AudioSegment.from_mp3(temp_audio_file.name)
                    duration = len(audio) / 1000.0  # オーディオファイルの長さ
                    time.sleep(duration)

                    # 待機後、stream録音を再開
                    stream.start_stream()

        # user_inputをchat_historyに追加
        chat_history.add_ai_message(AIMessage(ai_response.content))

        # ChatBotの返答を表示する
        with st.chat_message("ai"):
            st.markdown(ai_response.content)


    # トークが終わったら録音ストリームを閉じる
    stream.stop_stream()
    stream.close()
    p.terminate()

openai.audio.speechclient = openai.OpenAI()の部分でOpenAIライブラリをインスタンス化して使います。

※ OpenAIクラスは、openaiライブラリとlangchain_openaiライブラリの2通りあります。langchainを使うならlangchain_openaiのほうが便利なのですが、openai.audio.speechを使うにはopenaiライブラリのOpenAIクラスを使う必要があります

以下の部分がテキストを音声に変換して再生する部分です。

# テキストを音声に変換して再生
with client.audio.speech.with_streaming_response.create(
    model = "tts-1",
    voice = "alloy",# 話者を入力、2024年2月現在の話者は「alloy", "echo", "fable", "onyx", "nova", "shimmer」
    input = str(ai_response.content), 
    response_format='mp3'
) as r:
        with NamedTemporaryFile(suffix=".mp3") as temp_audio_file:
            r.stream_to_file(temp_audio_file.name)

            st.audio(temp_audio_file.name,autoplay=True)
            # 注意!AudioSegmentやsimpleaudio等で音声再生すると「zsh: segmentation fault」エラーとなる

            # 再生中は録音しないようにstreamを停止する
            stream.stop_stream()
            audio = AudioSegment.from_mp3(temp_audio_file.name)
            duration = len(audio) / 1000.0  # オーディオファイルの長さ
            time.sleep(duration)

            # 待機後、stream録音を再開
            stream.start_stream()

注意書きにも書いてありますが、音声を再生する方法はpydub.AudioSegmentやsimpleaudioなどいくつか方法があるのですが、streamlitでこれらを使おうとすると「zsh: segmentation fault」エラーとなります。
streamlitのaudioは、デフォルトではユーザーが再生ボタンを押したら再生できるのですが、autoplay=Trueを指定することで自動的に音声が流れるようにできます。

またstreamlit.audioは、音声アイコンが出現したら次の処理に進んでしまいますので、自動再生で再生が終わるまで待ってくれません。
そこで再生が終わるまでtime.sleepで次の処理を待たせます。再生時間はAudioSegmentを使って取得するのが一番シンプルです。

Discussion