🗂
asyncioでなんちゃってリアルタイム音声対話システム
概要
この記事ではasyncio
の基本的な機能を使った処理例を紹介したのちに、なんちゃってリアルタイム音声対話システムの実装例を載せます。
OpenAIのwhisper, TTS, および gpt-4oのREST APIのみを使用してどこまでいけるか挑戦しました(Real-Time APIは使ってません)。
とても楽しかったです。
実装方法も今回のものがベストではなくもっといい実装方法があるはず・・・!
asyncio
は同じものを色んな方法で実装できるから面白いヨネ。
以下デモになります〜(音声収録が適当で大きかったり小さかったりしてます。):
デモ1
デモ2
処理例
以下の設定が必要です。
python 3.12 or higher
pip install openai loguru
- 環境変数に
OPENAI_API_KEY=...
をセットする
await
:即時実行
-
await
で呼び出された処理は完了するまで待機する -
await
をつけなかった非同期関数はコルーチンオブジェクトのまま実行されない
import asyncio
from loguru import logger
from openai import AsyncOpenAI
async def main() -> None:
client = AsyncOpenAI()
messages = [{"role": "user", "content": "こんにちは, asyncio!"}]
logger.info(messages[-1])
# async関数の実行はawaitをつけて行う
# APIのリクエストが返ってくるまでここで待機が起きる
resp1 = await client.chat.completions.create(messages=messages, model="gpt-4o-mini")
messages.append({"role": "assistant", "content": resp1.choices[0].message.content})
logger.info(messages[-1])
messages.append(
{"role": "user", "content": "asyncioについて3行で教えてください!"}
)
logger.info(messages[-1])
resp2 = await client.chat.completions.create(messages=messages, model="gpt-4o-mini")
messages.append({"role": "assistant", "content": resp2.choices[0].message.content})
logger.info(messages[-1])
messages.append({"role": "user", "content": "asyncioの特徴は何ですか?"})
resp3 = client.chat.completions.create(messages=messages, model="gpt-4o-mini")
try:
# エラー!awaitをつけていないため↑の処理は未実行でresp3はコルーチンオブジェクトのまま。
messages.append(
{"role": "assistant", "content": resp3.choices[0].message.content}
)
except AttributeError as e:
logger.error(e)
# awaitをつけて実行
resp3 = await resp3
messages.append({"role": "assistant", "content": resp3.choices[0].message.content})
logger.info(messages[-1])
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 19:55:27.901 | INFO | __main__:main:10 - {'role': 'user', 'content': 'こんにちは, asyncio!'}
2024-12-11 19:55:28.989 | INFO | __main__:main:14 - {'role': 'assistant', 'content': 'こんにちは!どんなことをお手伝いできますか? asyncioやPythonについて知りたいことがあれば教えてください!'}
2024-12-11 19:55:28.989 | INFO | __main__:main:16 - {'role': 'user', 'content': 'asyncioについて3行で教えてください!'}
2024-12-11 19:55:30.577 | INFO | __main__:main:19 - {'role': 'assistant', 'content': '`asyncio`は、Pythonの標準ライブラリであり、非同期プログラミングをサポートするためのツールです。イベントループを使用して、コルーチンとタスクを管理し、同時に複数のI/O操作を効率的に処理できます。これにより、スレッドやプロセスを使用せずに、スケーラブルでパフォーマンスの高いアプリケーションを構築することができます。'}
2024-12-11 19:55:30.578 | ERROR | __main__:main:27 - 'coroutine' object has no attribute 'choices'
2024-12-11 19:55:37.078 | INFO | __main__:main:32 - {'role': 'assistant', 'content': '`asyncio`の特徴には以下のような点があります:\n\n1. **非同期I/Oサポート**: `asyncio`は、非同期的にI/O操作を実行するためのAPIを提供しており、ブロッキングなしにリソースの効率的な利用を可能にします。\n\n2. **コルーチン**: Pythonの`async`および`await`キーワードを使用して、コルーチン(軽量なスレッドのようなもの)を定義し、簡潔に非同期プログラミングが行えます。\n\n3. **イベントループ**: `asyncio`は、すべての非同期操作を管理するイベントループを持っており、これにより複数のタスクを同時に実行し、処理を待ちながら他のタスクを進めることができます。\n\n4. **タスクとスケジューリング**: `asyncio`は、タスクを簡単に作成およびスケジュールする機能を提供しており、非同期処理の制御が容易になります。\n\n5. **豊富なライブラリ**: `asyncio`は、HTTPクライアントやWebSocket、サーバー、キューなど、多くの非同期ライブラリやフレームワークと統合されています。\n\nこれらの特徴により、`asyncio`はウェブサーバーやデータベースアクセス、APIとの通信などの非同期タスクを効率的に扱うことができます。'}
create_task
:遅延実行
-
asyncio.create_task
は実行をスケジューリングする - スケジューリングされた処理はI/O-boundの待機時間に随時実行されていく
import asyncio
from loguru import logger
from openai import AsyncOpenAI
async def main() -> None:
client = AsyncOpenAI()
messages = [{"role": "user", "content": "こんにちはasyncio!"}]
logger.info(messages[-1])
# タスクをスケジューリング。各タスクはI/O待ちが発生した時に順次実行される。
# ここでは実行されない!!
task1 = asyncio.create_task(
client.chat.completions.create(messages=messages, model="gpt-4o-mini")
)
# エラー。実行前に結果を取得しようとするとエラーになる。
# task1.result()
# I/O待ちを発生させる。
while not task1.done():
# PENDINGのまま
logger.debug(f"task1: {task1._state}")
# ここの待機時間で他の処理に移行し、task1のAPIリクエストがここで実行される。
# APIリクエストもI/O待ちが発生するため、さらに処理が移行しここのループにも戻ってくる。
# task1のステータスログが複数回出力されるのはその移行のため。
# このようにして他の処理と並行して進めることができる。
await asyncio.sleep(0.5)
# FINISHEDになっている
logger.debug(f"task1: {task1._state}")
# 結果(スケジューリングした関数の返り値)を取り出す
resp1 = task1.result()
messages.append({"role": "assistant", "content": resp1.choices[0].message.content})
logger.info(messages[-1])
messages.append(
{"role": "user", "content": "asyncioについて3行で教えてください!"}
)
logger.info(messages[-1])
task2 = asyncio.create_task(
client.chat.completions.create(messages=messages, model="gpt-4o-mini")
)
# タスク完了時のコールバックを差し込むこともできる。
task2.add_done_callback(
lambda res: messages.append(
{"role": "assistant", "content": res.result().choices[0].message.content}
)
)
while not task2.done():
logger.debug(f"task2: {task2._state}")
await asyncio.sleep(0.5)
logger.debug(f"task2: {task2._state}")
logger.info(messages[-1])
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 19:59:55.833 | INFO | __main__:main:10 - {'role': 'user', 'content': 'こんにちはasyncio!'}
2024-12-11 19:59:55.833 | DEBUG | __main__:main:22 - task1: PENDING
2024-12-11 19:59:56.334 | DEBUG | __main__:main:22 - task1: PENDING
2024-12-11 19:59:56.836 | DEBUG | __main__:main:22 - task1: PENDING
2024-12-11 19:59:57.337 | DEBUG | __main__:main:25 - task1: FINISHED
2024-12-11 19:59:57.337 | INFO | __main__:main:29 - {'role': 'assistant', 'content': 'こんにちは!何かお手伝いできることがありますか? asyncioに関する質問や他のトピックについて何でもお答えしますよ。'}
2024-12-11 19:59:57.337 | INFO | __main__:main:32 - {'role': 'user', 'content': 'asyncioについて3行で教えてください!'}
2024-12-11 19:59:57.338 | DEBUG | __main__:main:44 - task2: PENDING
2024-12-11 19:59:57.839 | DEBUG | __main__:main:44 - task2: PENDING
2024-12-11 19:59:58.341 | DEBUG | __main__:main:44 - task2: PENDING
2024-12-11 19:59:58.845 | DEBUG | __main__:main:44 - task2: PENDING
2024-12-11 19:59:59.347 | DEBUG | __main__:main:47 - task2: FINISHED
2024-12-11 19:59:59.348 | INFO | __main__:main:49 - {'role': 'assistant', 'content': '`asyncio`は、Pythonの標準ライブラリの一部で、非同期プログラミングをサポートするために設計されています。コルーチンやイベントループを使用して、効率的にI/Oバウンドなタスクを管理し、同時に実行できるようにします。これにより、ネットワーク通信やファイル操作などの待機時間を活用して、全体のパフォーマンスを向上させることができます。'}
gather
:同時実行
- 複数の非同期処理を**”同時”に実行**する
- 厳密には同時に実行ではなく、同時にスケジューリングをしている(GILがあるので同時実行はできない)
- 実行する非同期処理がI/O-boundである場合はほぼほぼ同時に実行される
- 実行する非同期処理がCPU-boundな処理である場合は順に処理される
import asyncio
from loguru import logger
from openai import AsyncOpenAI
async def main() -> None:
client = AsyncOpenAI()
styles = ["シンプル", "ユーモラス", "スタイリッシュ"]
logger.debug(f"asyncioについて{styles}に3行で説明して。")
messages_list = [
[{"role": "user", "content": f"asyncioについて{s}に3行で説明して。"}]
for s in styles
]
# "同時"実行(複数の非同期処理を一度にスケジューリングする)。全ての処理が完了するまで待機する。
# ここでは3つのAPIリクエストが同時に発行され、全てのリクエストが完了するまで待機する。
# APIリクエストはほとんどI/O待ちなのでほぼほぼ同時実行になっている
resps = await asyncio.gather(
*[
client.chat.completions.create(messages=messages, model="gpt-4o-mini")
for messages in messages_list
]
)
for style, resp in zip(styles, resps):
logger.info(f"[{style}] {resp.choices[0].message.content}")
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:01:13.606 | DEBUG | __main__:main:10 - asyncioについて['シンプル', 'ユーモラス', 'スタイリッシュ']に3行で説明して。
2024-12-11 20:01:17.193 | INFO | __main__:main:25 - [シンプル] `asyncio`はPythonのライブラリで、非同期プログラミングを支援するためのものです。コルーチン(`async def`で定義された関数)を用いて、I/O操作などの待機中に他のタスクを実行することができます。イベントループを使用して、非同期タスクのスケジューリングと実行を管理します。
2024-12-11 20:01:17.194 | INFO | __main__:main:25 - [ユーモラス] asyncioは、Pythonの世界に潜む忍者たちが、待機することなく同時に任務をこなすための秘密道具です。彼らは「待たせてごめん、でも次のターンでちゃんとやります!」と笑顔で言いながら、非同期に動き回ります。結局、全員が任務を終えたとき、忍者たちが集まってお茶を飲むのが恒例行事です!
2024-12-11 20:01:17.194 | INFO | __main__:main:25 - [スタイリッシュ] `asyncio`は、Pythonにおける非同期プログラミングのためのライブラリで、連続するI/O操作を効率良く処理します。イベントループを利用して、コルーチンを管理し、タスクを協調的に実行できます。これにより、ブロッキングを避けつつ高いパフォーマンスを実現することが可能です。
wait
:実行待機
- 複数タスクの完了を待つ
-
return_when
パラメーターによって待機終了条件を調整できる-
gather
をもう少し細かく制御して実行できる感じ
-
import asyncio
from loguru import logger
from openai import AsyncOpenAI
async def main() -> None:
client = AsyncOpenAI()
styles = ["シンプル", "ユーモラス", "スタイリッシュ"]
logger.info(f"asyncioについて{styles}のいずれかのスタイルで説明がなされます。どのスタイルか当ててください。")
messages_list = [
[{"role": "user", "content": f"asyncioについて{s}に3行で説明して。"}]
for s in styles
]
# 複数のタスクの完了を明示的に待つ
# return_whenパラメーターで指定した条件を満たすと待機を終了する
# gatherをもう少し細かく制御して実行できる感じ
done, pending = await asyncio.wait(
[
asyncio.create_task(
client.chat.completions.create(messages=messages, model="gpt-4o-mini")
)
for messages in messages_list
],
return_when=asyncio.FIRST_COMPLETED,
# return_when=asyncio.FIRST_EXCEPTION, # 最初の例外発生時に返ってくる
# return_when=asyncio.ALL_COMPLETED # similar to asyncio.gather
)
logger.info(done.pop().result().choices[0].message.content)
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:04:21.541 | INFO | __main__:main:10 - asyncioについて['シンプル', 'ユーモラス', 'スタイリッシュ']のいずれかのスタイルで説明がなされます。どのスタイルか当ててください。
2024-12-11 20:04:23.563 | INFO | __main__:main:28 - `asyncio`は、Pythonの標準ライブラリで、非同期プログラミングをサポートします。コルーチンを使って入出力操作を効率的に行い、同時に複数のタスクを実行できます。イベントループを介して、タスクのスケジューリングと管理を行います。
as_completed
:順次処理
- 非同期処理をスケジューリングし完了した順に次の処理に回す
- 元のリストの順とは一致しない点に注意
import asyncio
from loguru import logger
from openai import AsyncOpenAI
async def main() -> None:
client = AsyncOpenAI()
styles = ["シンプル", "ユーモラス", "スタイリッシュ"]
logger.debug(f"asyncioについて{styles}に3行で説明して。")
messages_list = [
[
{
"role": "user",
"content": f"asyncioについて{s}に3行で説明して。スタイル名{s}を文頭の[]内に明記してください。",
}
]
for s in styles
]
# コルーチンを作成
# タスクにしてないのでこのままではいつまでも実行されない点に注意
coros = [
client.chat.completions.create(messages=messages, model="gpt-4o-mini")
for messages in messages_list
]
# 各コルーチンをスケジューリングし、完了した順に結果を処理する
# 元のリストの順とは一致しない
for coro in asyncio.as_completed(coros):
# coroには完了したコルーチンが入ってくる
# awaitで結果(元関数の返り値)取得できる
resp = await coro
logger.info(f"{resp.choices[0].message.content}")
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:04:53.356 | DEBUG | __main__:main:10 - asyncioについて['シンプル', 'ユーモラス', 'スタイリッシュ']に3行で説明して。
2024-12-11 20:04:54.995 | INFO | __main__:main:28 - [ユーモラス]
asyncioは、Pythonの「待つのが得意なバイキング」。
同時に何人も食べ放題に行かせることで、時間を無駄にしない魔法の杖。
でも、彼らが戻ってくるまで少しだけお茶を飲むのを忘れずに! 🍵
2024-12-11 20:04:55.185 | INFO | __main__:main:28 - [シンプル] asyncioはPythonの非同期プログラミングをサポートするライブラリで、複数のタスクを同時に実行できます。イベントループを使用してI/O操作を効果的に管理し、ブロッキングを回避します。これにより、高いパフォーマンスと効率的なリソース利用が実現できます。
2024-12-11 20:04:56.717 | INFO | __main__:main:28 - [スタイリッシュ]
`asyncio`はPythonの非同期プログラミングライブラリで、効率的にI/O操作を処理します。
コルーチンを使用して、同時に複数のタスクを実行することで、スケーラブルなアプリケーション開発をサポートします。
イベントループによってタスクの管理を行い、エレガントに待機操作を行うことができます。
wait_for
:タイムアウト付き実行待機
- 非同期処理をタイムアウト付きでスケジューリングする
- 指定時間以内に処理が終わらなければ
TimeoutError
が発生する
import asyncio
from loguru import logger
from openai import AsyncOpenAI
async def main() -> None:
client = AsyncOpenAI()
times = [0.1, 0.2, 0.5, 0.7, 1.0]
logger.info("各待ち時間に対するリクエストの結果を表示します。")
for t in times:
# コルーチンを作成
# この時点では実行もされずスケジューリングもされない
coro = client.chat.completions.create(
messages=[{"role": "user", "content": f"{t}秒待ってやる。"}],
model="gpt-4o-mini",
)
try:
# コルーチンをタイムアウト付きでスケジューリングする
resp = await asyncio.wait_for(coro, timeout=t)
logger.success(f"{t} sec: {resp.choices[0].message.content}")
except asyncio.TimeoutError:
# 指定秒数以内に処理が終わらなければTimeoutErrorが発生する
logger.error(f"{t} sec: タイムアウト")
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:05:41.244 | INFO | __main__:main:10 - 各待ち時間に対するリクエストの結果を表示します。
2024-12-11 20:05:41.345 | ERROR | __main__:main:20 - 0.1 sec: タイムアウト
2024-12-11 20:05:41.548 | ERROR | __main__:main:20 - 0.2 sec: タイムアウト
2024-12-11 20:05:42.053 | ERROR | __main__:main:20 - 0.5 sec: タイムアウト
2024-12-11 20:05:42.760 | ERROR | __main__:main:20 - 0.7 sec: タイムアウト
2024-12-11 20:05:43.530 | SUCCESS | __main__:main:18 - 1.0 sec: もちろん、1.0秒待ちますね。何かお手伝いできることがあれば教えてください!
timeout
:タイムアウト付き実行
-
wait_for
と機能的には一緒 - 対象の非同期関数に加えてちょっとした処理を含めてタイムアウトを設定したい場合に便利
import asyncio
import time
from loguru import logger
from openai import AsyncOpenAI
async def main() -> None:
client = AsyncOpenAI()
# コンテキストマネージャを用いたタイムアウト設定
start, duration = time.time(), 2
logger.info(f"Timeout: {duration}秒")
try:
async with asyncio.timeout(duration):
# 対象の非同期関数
resp = await client.chat.completions.create(
messages=[
{
"role": "user",
"content": "asyncioについて3行で説明してください。",
}
],
model="gpt-4o-mini",
)
# ちょっとした処理
elapsed = time.time() - start
logger.success(f"完了:{elapsed:.2f}秒")
logger.info(resp.choices[0].message.content)
except asyncio.TimeoutError:
logger.error("タイムアウトしました。")
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:06:29.902 | INFO | __main__:main:13 - Timeout: 2秒
2024-12-11 20:06:31.903 | ERROR | __main__:main:29 - タイムアウトしました。
Lock
:排他制御
-
Lock
の中に入れる処理を1つに限定する -
async with lock
のスコープ内を実行中の処理が存在する場合、他の処理はasync with lock
のところで待機する- 実行中の処理が当該のロックスコープを抜けたときに、待機中の処理がロックスコープ内に入って処理を始める
import asyncio
from loguru import logger
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletion
async def chat(msg: str, lock: asyncio.Lock) -> ChatCompletion:
logger.info(f"{msg=}")
# ロックの中(async with lockのスコープ内)には一つの処理しか入れない。
# すでに実行中の処理(APIリクエストを待っている処理)がある場合、当該処理がwithのscopeを抜けるまで待機する。
async with lock:
logger.debug(f"Send request for '{msg}'")
resp = await AsyncOpenAI().chat.completions.create(
messages=[{"role": "user", "content": msg}], model="gpt-4o-mini"
)
logger.success(f"{msg=}, resp={resp.choices[0].message.content}")
async def main() -> None:
messages = ["こんにちは", "hello", "你好", "안녕하세요", "bonjour"]
lock = asyncio.Lock()
# 同時にスケジューリングするが、ロックの中には一つの処理しか入らないので、一つの処理が終わるまで次の処理は待機する
await asyncio.gather(*[chat(msg, lock) for msg in messages])
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:07:02.070 | INFO | __main__:chat:9 - msg='こんにちは'
2024-12-11 20:07:02.070 | DEBUG | __main__:chat:12 - Send request for 'こんにちは'
2024-12-11 20:07:02.091 | INFO | __main__:chat:9 - msg='hello'
2024-12-11 20:07:02.091 | INFO | __main__:chat:9 - msg='你好'
2024-12-11 20:07:02.091 | INFO | __main__:chat:9 - msg='안녕하세요'
2024-12-11 20:07:02.092 | INFO | __main__:chat:9 - msg='bonjour'
2024-12-11 20:07:03.041 | SUCCESS | __main__:chat:16 - msg='こんにちは', resp=こんにちは!どうかされましたか?何かお手伝いできることがありますか?
2024-12-11 20:07:03.041 | DEBUG | __main__:chat:12 - Send request for 'hello'
2024-12-11 20:07:03.913 | SUCCESS | __main__:chat:16 - msg='hello', resp=Hello! How can I assist you today?
2024-12-11 20:07:03.913 | DEBUG | __main__:chat:12 - Send request for '你好'
2024-12-11 20:07:04.625 | SUCCESS | __main__:chat:16 - msg='你好', resp=你好!有什么我可以帮助你的吗?
2024-12-11 20:07:04.626 | DEBUG | __main__:chat:12 - Send request for '안녕하세요'
2024-12-11 20:07:05.445 | SUCCESS | __main__:chat:16 - msg='안녕하세요', resp=안녕하세요! 무엇을 도와드릴까요?
2024-12-11 20:07:05.445 | DEBUG | __main__:chat:12 - Send request for 'bonjour'
2024-12-11 20:07:06.469 | SUCCESS | __main__:chat:16 - msg='bonjour', resp=Bonjour ! Comment puis-je vous aider aujourd'hui ?
Semaphore
:セマフォ
- 数を指定できる排他制御
-
async with sema
のスコープ内に実行中の処理が指定数以上ある場合、他の処理はasync with sema
のところで待機する- 実行中の処理が当該のセマフォスコープを抜け、実行中の処理の総数が指定数以下になったとき、待機中の処理がセマフォスコープ内に入って処理を始める
import asyncio
from loguru import logger
from openai import AsyncOpenAI
from openai.types.chat import ChatCompletion
async def chat(msg: str, sema: asyncio.Semaphore) -> ChatCompletion:
logger.info(f"{msg=}")
# セマフォの中(async with semaのスコープ内)には指定した数以上の処理は入れない。
# すでに指定した数以上の処理(APIリクエストを待っている処理)がある場合、当該処理がwithのscopeを抜けて、セマフォの数が空くまで待機する。
async with sema:
logger.debug(f"Send request for '{msg}'")
# 同時に送信されるリクエストは最大2つまで
resp = await AsyncOpenAI().chat.completions.create(
messages=[{"role": "user", "content": msg}], model="gpt-4o-mini"
)
logger.success(f"{msg=}, resp={resp.choices[0].message.content}")
async def main() -> None:
messages = ["こんにちは", "hello", "你好", "안녕하세요", "bonjour"]
# セマフォを作成。ここでは2つの処理までしか入れないようにしている
sema = asyncio.Semaphore(2)
# 同時にスケジューリングするが、セマフォの中には指定数分の処理しか入れない
await asyncio.gather(*[chat(msg, sema) for msg in messages])
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:08:35.429 | INFO | __main__:chat:9 - msg='こんにちは'
2024-12-11 20:08:35.429 | DEBUG | __main__:chat:12 - Send request for 'こんにちは'
2024-12-11 20:08:35.450 | INFO | __main__:chat:9 - msg='hello'
2024-12-11 20:08:35.450 | DEBUG | __main__:chat:12 - Send request for 'hello'
2024-12-11 20:08:35.459 | INFO | __main__:chat:9 - msg='你好'
2024-12-11 20:08:35.459 | INFO | __main__:chat:9 - msg='안녕하세요'
2024-12-11 20:08:35.459 | INFO | __main__:chat:9 - msg='bonjour'
2024-12-11 20:08:35.992 | SUCCESS | __main__:chat:16 - msg='こんにちは', resp=こんにちは!どういったことでお手伝いできますか?
2024-12-11 20:08:35.993 | DEBUG | __main__:chat:12 - Send request for '你好'
2024-12-11 20:08:36.005 | SUCCESS | __main__:chat:16 - msg='hello', resp=Hello! How can I assist you today?
2024-12-11 20:08:36.005 | DEBUG | __main__:chat:12 - Send request for '안녕하세요'
2024-12-11 20:08:36.683 | SUCCESS | __main__:chat:16 - msg='안녕하세요', resp=안녕하세요! 어떻게 도와드릴까요?
2024-12-11 20:08:36.683 | SUCCESS | __main__:chat:16 - msg='你好', resp=你好!有什么我可以帮助你的吗?
2024-12-11 20:08:36.684 | DEBUG | __main__:chat:12 - Send request for 'bonjour'
2024-12-11 20:08:37.330 | SUCCESS | __main__:chat:16 - msg='bonjour', resp=Bonjour ! Comment puis-je vous aider aujourd'hui ?
Event
:イベント駆動
-
.wait()
でイベント発火を待機 -
.set()
でイベントを発火
import asyncio
import random
from loguru import logger
from openai import AsyncOpenAI
async def chat(msg: str, e: asyncio.Event) -> None:
logger.debug(f"Wait: {msg}")
# イベントが発火(=e.set()のcall)するまで待機
await e.wait()
logger.debug(f"Fired: {msg}")
resp = await AsyncOpenAI().chat.completions.create(
messages=[{"role": "user", "content": msg}], model="gpt-4o-mini"
)
logger.success(f"{msg} -> {resp.choices[0].message.content}")
async def main() -> None:
msgs = ["こんにちは", "hello", "你好", "안녕하세요", "bonjour"]
# タスクごとにイベントを作成
events = {msg: asyncio.Event() for msg in msgs}
# イベント発火を待つタスクを作成
tasks = {msg: asyncio.create_task(chat(msg, events[msg])) for msg in msgs}
# 全てのtaskが完了するまでループ
while any(not t.done() for t in tasks.values()):
# イベント発火するメッセージを選択
msg = random.choice(msgs)
event = events[msg]
if event.is_set():
# すでに発火済みのイベント
logger.warning(f"Already done: {msg=}, {event.is_set()=}")
else:
# イベントを発火する
event.set()
await asyncio.sleep(1)
statuses = {k: v.done() for (k, v) in tasks.items()}
logger.debug(f"Status: {statuses}")
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:09:21.062 | DEBUG | __main__:chat:10 - Wait: こんにちは
2024-12-11 20:09:21.062 | DEBUG | __main__:chat:10 - Wait: hello
2024-12-11 20:09:21.062 | DEBUG | __main__:chat:12 - Fired: hello
2024-12-11 20:09:21.084 | DEBUG | __main__:chat:10 - Wait: 你好
2024-12-11 20:09:21.084 | DEBUG | __main__:chat:10 - Wait: 안녕하세요
2024-12-11 20:09:21.084 | DEBUG | __main__:chat:10 - Wait: bonjour
2024-12-11 20:09:21.630 | SUCCESS | __main__:chat:16 - hello -> Hello! How can I assist you today?
2024-12-11 20:09:22.063 | DEBUG | __main__:main:37 - Status: {'こんにちは': False, 'hello': True, '你好': False, '안녕하세요': False, 'bonjour': False}
2024-12-11 20:09:22.065 | DEBUG | __main__:chat:12 - Fired: 你好
2024-12-11 20:09:22.562 | SUCCESS | __main__:chat:16 - 你好 -> 你好!有什么我可以帮助你的吗?
2024-12-11 20:09:23.068 | DEBUG | __main__:main:37 - Status: {'こんにちは': False, 'hello': True, '你好': True, '안녕하세요': False, 'bonjour': False}
2024-12-11 20:09:23.069 | DEBUG | __main__:chat:12 - Fired: bonjour
2024-12-11 20:09:23.642 | SUCCESS | __main__:chat:16 - bonjour -> Bonjour ! Comment puis-je vous aider aujourd'hui ?
2024-12-11 20:09:24.072 | DEBUG | __main__:main:37 - Status: {'こんにちは': False, 'hello': True, '你好': True, '안녕하세요': False, 'bonjour': True}
2024-12-11 20:09:24.073 | DEBUG | __main__:chat:12 - Fired: こんにちは
2024-12-11 20:09:24.564 | SUCCESS | __main__:chat:16 - こんにちは -> こんにちは!どういったことをお手伝いできますか?
2024-12-11 20:09:25.074 | DEBUG | __main__:main:37 - Status: {'こんにちは': True, 'hello': True, '你好': True, '안녕하세요': False, 'bonjour': True}
2024-12-11 20:09:25.075 | WARNING | __main__:main:30 - Already done: msg='bonjour', event.is_set()=True
2024-12-11 20:09:26.078 | DEBUG | __main__:main:37 - Status: {'こんにちは': True, 'hello': True, '你好': True, '안녕하세요': False, 'bonjour': True}
2024-12-11 20:09:26.079 | WARNING | __main__:main:30 - Already done: msg='hello', event.is_set()=True
2024-12-11 20:09:27.079 | DEBUG | __main__:main:37 - Status: {'こんにちは': True, 'hello': True, '你好': True, '안녕하세요': False, 'bonjour': True}
2024-12-11 20:09:27.080 | DEBUG | __main__:chat:12 - Fired: 안녕하세요
2024-12-11 20:09:27.705 | SUCCESS | __main__:chat:16 - 안녕하세요 -> 안녕하세요! 어떻게 도와드릴까요?
2024-12-11 20:09:28.082 | DEBUG | __main__:main:37 - Status: {'こんにちは': True, 'hello': True, '你好': True, '안녕하세요': True, 'bonjour': True}
Condition
:イベント駆動
- 複数のタスクのトリガーを共通して管理
- 正直あまり使ったことがない
import asyncio
from loguru import logger
from openai import AsyncOpenAI
async def chat(msg: str, c: asyncio.Condition) -> None:
async with c:
logger.debug(f"Wait: {msg}")
# c.notifyの呼び出しまで待機
await c.wait()
logger.debug(f"Fired: {msg}")
resp = await AsyncOpenAI().chat.completions.create(
messages=[{"role": "user", "content": msg}], model="gpt-4o-mini"
)
logger.success(f"{msg} -> {resp.choices[0].message.content}")
async def main() -> None:
msgs = ["こんにちは", "hello", "你好", "안녕하세요", "bonjour"]
c = asyncio.Condition()
tasks = {msg: asyncio.create_task(chat(msg, c)) for msg in msgs}
while any(not t.done() for t in tasks.values()):
# 待っている順にn個ずつロックを外して通す
async with c:
c.notify(n=2)
await asyncio.sleep(5)
statuses = {k: v.done() for (k, v) in tasks.items()}
logger.debug(f"Status: {statuses}")
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:10:09.443 | DEBUG | __main__:chat:9 - Wait: こんにちは
2024-12-11 20:10:09.444 | DEBUG | __main__:chat:9 - Wait: hello
2024-12-11 20:10:09.444 | DEBUG | __main__:chat:9 - Wait: 你好
2024-12-11 20:10:09.444 | DEBUG | __main__:chat:9 - Wait: 안녕하세요
2024-12-11 20:10:09.444 | DEBUG | __main__:chat:9 - Wait: bonjour
2024-12-11 20:10:14.445 | DEBUG | __main__:main:29 - Status: {'こんにちは': False, 'hello': False, '你好': False, '안녕하세요': False, 'bonjour': False}
2024-12-11 20:10:14.447 | DEBUG | __main__:chat:12 - Fired: こんにちは
2024-12-11 20:10:15.328 | SUCCESS | __main__:chat:16 - こんにちは -> こんにちは!今日はどんなことをお話ししましょうか?
2024-12-11 20:10:15.328 | DEBUG | __main__:chat:12 - Fired: hello
2024-12-11 20:10:15.851 | SUCCESS | __main__:chat:16 - hello -> Hello! How can I assist you today?
2024-12-11 20:10:19.448 | DEBUG | __main__:main:29 - Status: {'こんにちは': True, 'hello': True, '你好': False, '안녕하세요': False, 'bonjour': False}
2024-12-11 20:10:19.449 | DEBUG | __main__:chat:12 - Fired: 你好
2024-12-11 20:10:20.065 | SUCCESS | __main__:chat:16 - 你好 -> 你好!有什么我可以帮助你的吗?
2024-12-11 20:10:20.066 | DEBUG | __main__:chat:12 - Fired: 안녕하세요
2024-12-11 20:10:20.715 | SUCCESS | __main__:chat:16 - 안녕하세요 -> 안녕하세요! 어떻게 도와드릴까요?
2024-12-11 20:10:24.450 | DEBUG | __main__:main:29 - Status: {'こんにちは': True, 'hello': True, '你好': True, '안녕하세요': True, 'bonjour': False}
2024-12-11 20:10:24.451 | DEBUG | __main__:chat:12 - Fired: bonjour
2024-12-11 20:10:25.019 | SUCCESS | __main__:chat:16 - bonjour -> Bonjour ! Comment puis-je vous aider aujourd'hui ?
2024-12-11 20:10:29.453 | DEBUG | __main__:main:29 - Status: {'こんにちは': True, 'hello': True, '你好': True, '안녕하세요': True, 'bonjour': True}
Queue
:メッセージング
- タスク間のメッセージング用
- これが一番使う
import asyncio
from loguru import logger
from openai import AsyncOpenAI
async def chat_handler(q: asyncio.Queue) -> None:
while True:
# キューからメッセージを取得。メッセージがない場合は待機する。
msg = await q.get()
logger.debug(f"Received: {msg}")
resp = await AsyncOpenAI().chat.completions.create(
messages=[{"role": "user", "content": msg}], model="gpt-4o-mini"
)
logger.success(f"{msg} -> {resp.choices[0].message.content}")
async def main() -> None:
msgs = ["こんにちは", "hello", "你好", "안녕하세요", "bonjour"]
q = asyncio.Queue()
# キューからメッセージを受け取って処理するタスクを作成
asyncio.create_task(chat_handler(q))
for msg in msgs:
logger.debug(f"Put {msg} in the queue!")
# キューにメッセージを追加
q.put_nowait(msg)
await asyncio.sleep(0) # 他の処理も入れるようにする
# キューが空になるまで待機
while q.qsize() != 0:
await asyncio.sleep(1)
logger.debug(f"Queue size: {q.qsize()}")
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:11:41.619 | DEBUG | __main__:main:24 - Put こんにちは in the queue!
2024-12-11 20:11:41.619 | DEBUG | __main__:chat_handler:11 - Received: こんにちは
2024-12-11 20:11:41.639 | DEBUG | __main__:main:24 - Put hello in the queue!
2024-12-11 20:11:41.641 | DEBUG | __main__:main:24 - Put 你好 in the queue!
2024-12-11 20:11:41.642 | DEBUG | __main__:main:24 - Put 안녕하세요 in the queue!
2024-12-11 20:11:41.642 | DEBUG | __main__:main:24 - Put bonjour in the queue!
2024-12-11 20:11:42.313 | SUCCESS | __main__:chat_handler:15 - こんにちは -> こんにちは!どういったことをお手伝いできますか?
2024-12-11 20:11:42.313 | DEBUG | __main__:chat_handler:11 - Received: hello
2024-12-11 20:11:42.645 | DEBUG | __main__:main:32 - Queue size: 3
2024-12-11 20:11:42.940 | SUCCESS | __main__:chat_handler:15 - hello -> Hello! How can I assist you today?
2024-12-11 20:11:42.941 | DEBUG | __main__:chat_handler:11 - Received: 你好
2024-12-11 20:11:43.431 | SUCCESS | __main__:chat_handler:15 - 你好 -> 你好!有什么我可以帮助你的吗?
2024-12-11 20:11:43.432 | DEBUG | __main__:chat_handler:11 - Received: 안녕하세요
2024-12-11 20:11:43.647 | DEBUG | __main__:main:32 - Queue size: 1
2024-12-11 20:11:44.255 | SUCCESS | __main__:chat_handler:15 - 안녕하세요 -> 안녕하세요! 어떻게 도와드릴까요?
2024-12-11 20:11:44.255 | DEBUG | __main__:chat_handler:11 - Received: bonjour
2024-12-11 20:11:44.649 | DEBUG | __main__:main:32 - Queue size: 0
TaskGroup
:例外
- 例外ハンドルをしやすくするためのもの
import asyncio
from loguru import logger
from openai import AsyncOpenAI
class InvalidQuestionError(Exception): ...
class OverLengthQuestionError(Exception): ...
async def ask(question: str) -> str:
if question == "アウトな質問":
raise InvalidQuestionError(question)
if question == "文字数オーバーな質問":
raise OverLengthQuestionError(question)
resp = await AsyncOpenAI().chat.completions.create(
messages=[{"role": "user", "content": question}], model="gpt-4o-mini"
)
return resp.choices[0].message.content
async def main() -> None:
# それぞれに対してtry-catchを書く必要がある
logger.debug("asyncio.create_taskを使った場合")
task_ok1 = asyncio.create_task(ask("こんにちは"))
task_out1 = asyncio.create_task(ask("アウトな質問"))
task_over1 = asyncio.create_task(ask("文字数オーバーな質問"))
await asyncio.sleep(2)
try:
task_ok1.result()
except InvalidQuestionError:
assert False
except OverLengthQuestionError:
assert False
try:
task_out1.result()
assert False
except InvalidQuestionError as e:
logger.error(e)
except OverLengthQuestionError:
assert False
try:
task_over1.result()
assert False
except InvalidQuestionError:
assert False
except OverLengthQuestionError as e:
logger.error(e)
# まとめてtry-catchを書くこともできる
logger.debug("asyncio.TaslGroupを使った場合")
try:
async with asyncio.TaskGroup() as tg:
task_ok = tg.create_task(ask("hello"))
task_out2 = tg.create_task(ask("アウトな質問"))
task_over2 = tg.create_task(ask("文字数オーバーな質問"))
await asyncio.sleep(2)
_ = [task_ok.result(), task_out2.result(), task_over2.result()]
except* InvalidQuestionError as e:
logger.error(e.exceptions)
except* OverLengthQuestionError as e:
logger.error(e.exceptions)
if __name__ == "__main__":
asyncio.run(main())
2024-12-11 20:12:17.884 | DEBUG | __main__:main:28 - asyncio.create_taskを使った場合
2024-12-11 20:12:19.888 | ERROR | __main__:main:46 - アウトな質問
2024-12-11 20:12:19.889 | ERROR | __main__:main:56 - 文字数オーバーな質問
2024-12-11 20:12:19.889 | DEBUG | __main__:main:59 - asyncio.TaslGroupを使った場合
2024-12-11 20:12:19.906 | ERROR | __main__:main:70 - (InvalidQuestionError('アウトな質問'),)
2024-12-11 20:12:19.907 | ERROR | __main__:main:72 - (OverLengthQuestionError('文字数オーバーな質問'),)
なんちゃってリアルタイム音声対話システム
冒頭のデモ動画で動いている実装です。以下作った所感をつらつら述べます。
各種Utility
- OpenAI周りはストリーム処理の結果を扱いやすくするように、ストリーミングレスポンス(
ChatCompletionChunk
)を集積して非ストリーム処理時のフォーマット(ChatCompletion
)に変換するようなコードを書いている - 音声認識はAPI(
stt
; whisper)とローカル処理(stt_local
; faster-whisper)の両方を書いてある-
stt_local
はCPU-boundな処理なのでプロセスを分ける -
stt
はAPIリクエストでI/O-boundなのでプロセスを分けなくとも効率的な並行処理が可能であるが、コードのインターフェースを揃えるために音声認識全体を別プロセスとして扱っている(後述)
-
- 音声合成はAPI(
tts_stream
; tts-1)を使用 - 音声の取り扱いはど素人なのでClaude君にだいぶ助けてもらった
-
is_filler
やgenerate_aizuchi
はリアルタイムな対話を実現するための状態判定器- 元々
GPT-4o
にやらせていたがAPIリクエストのレスポンスタイム的に厳しく、高速で動くシンプルなパターンマッチにした - 実は発話内容云々よりも、この二つを用いた発話状態・発話ターンの遷移・管理がより自然な対話にするために重要だった
- こういう細かい状態判定処理が高速に正確に動くようになっているとより自然なリアルタイム会話に繋がる
- 以下欲しくなった状態判定処理
- このユーザー発話はフィラーなのか?
- このフィラーに対してアシスタントは反応すべきなのか?
- このフィラーに対して進行中の発話をポーズすべきなのか?
- このフィラーに対して進行中の発話をキャンセルすべきなのか?
- このフィラーに対して進行中の発話をポーズして相槌を打つべきなのか?
- などなど
- 高速、といっても最大500msくらいで判定できれば、自然な発話状態・発話ターンの遷移・管理をするには十分だと思ったのでそんなに難しい要件ではない
- 以下欲しくなった状態判定処理
- 元々
音声認識・音声合成のプロセス分離
- 音声認識・音声合成はサブプロセスで実行するようにした
- ローカル実行の場合、両者ともにCPU-boundな処理であるためプロセスを分けないと他の処理をブロッキングしてしまう
- 結局APIを使っているのでプロセスを分けずともブロッキングは発生しないのではあるが、音声処理の時間をできるだけ高速に正確にしたいと思ったのでこうした
- もう1秒2秒処理が早いだけでだいぶ体感変わる気がする・・・
GUI
- GUIはfletを使用した
- Flutterのpython wrapperで使用感としてはtcl/tk
- asyncioと相性が良い+UI用の状態管理も不要なのでちょっとしたGUI画面・操作を実装したいときに使っている
- streamlitは非同期と相性が悪いし、状態管理が辛いので除外
- gradioはしらん
User周り
- ユーザー周りの処理は主に二つ
- 録音と文字起こし
- それぞれのタスクを別ループでスケジューリングして、キューを使ってやり取りする仕組みで作った
Assistant周り
- アシスタント周りの処理も主に二つ
- 発話生成と音声合成
- User同様それぞれのタスクを別ループでスケジューリングして、キューを使ってやり取りする
- 音声合成はポーズや取り消しといったイベントもあり、これはイベントオブジェクトを使って制御した
実装
import asyncio
import io
import multiprocessing as mp
import random
import time
import uuid
import wave
from collections import deque
from dataclasses import dataclass
from enum import StrEnum
from functools import partial
from typing import Literal, AsyncGenerator, AsyncIterator, Callable, Optional, Iterable
from dotenv import load_dotenv
import flet as ft
import numpy as np
import pyaudio
from faster_whisper import WhisperModel
from loguru import logger
from openai import AsyncOpenAI, AsyncAzureOpenAI, OpenAIError
from openai.types import CompletionUsage
from openai.types.chat import (
ChatCompletion,
ChatCompletionChunk,
ChatCompletionMessageParam,
ChatCompletionMessage,
)
from openai.types.chat.chat_completion import Choice
load_dotenv()
# OpenAI系のutilities
local_whisper: WhisperModel = WhisperModel("small", device="cpu", compute_type="int8")
def create_client() -> AsyncOpenAI | AsyncAzureOpenAI:
try:
return AsyncOpenAI()
except OpenAIError:
return AsyncAzureOpenAI()
def format_messages(
prompt: str | list[ChatCompletionMessageParam], instruction: str | None = None
) -> list[ChatCompletionMessage]:
if isinstance(prompt, list):
return prompt
else:
msgs = [
{
"role": "user",
"content": prompt,
}
]
if instruction:
msgs.insert(0, {"role": "system", "content": instruction})
return msgs
async def chat(
prompt: str | Iterable[ChatCompletionMessageParam],
*,
instruction: str | None = None,
model: str = "gpt-4o",
stream: bool = False,
on_chunk: Callable[[str], None] | None = None,
) -> ChatCompletion:
if stream:
return await chat_stream(
prompt, instruction=instruction, model=model, on_chunk=on_chunk
)
else:
messages = format_messages(prompt, instruction)
resp = await create_client().chat.completions.create(
model=model, messages=messages
)
return resp
async def chat_stream(
prompt: str | Iterable[ChatCompletionMessageParam],
*,
instruction: str | None = None,
model: str = "gpt-4o",
on_chunk: Callable[[str], None] | None = None,
) -> ChatCompletion:
messages = format_messages(prompt, instruction)
stream = await create_client().chat.completions.create(
model=model, messages=messages, stream=True
)
resp = await collect_streaming_response(stream, on_chunk)
return resp
async def collect_streaming_response(
stream: AsyncIterator[ChatCompletionChunk],
on_chunk: Optional[Callable[[str], None]] = None,
) -> ChatCompletion:
"""streamの有無によらずレスポンス型を揃えて使いやすくするためのヘルパー。"""
collected_chunks = []
collected_messages = []
usage = CompletionUsage(prompt_tokens=0, completion_tokens=0, total_tokens=0)
async for chunk in stream:
collected_chunks.append(chunk)
chunk_message = chunk.choices[0].delta.content or ""
if on_chunk:
if asyncio.iscoroutinefunction(on_chunk):
await on_chunk(chunk_message)
else:
on_chunk(chunk_message)
collected_messages.append(chunk_message)
if chunk.usage:
usage = chunk.usage
if on_chunk:
if asyncio.iscoroutinefunction(on_chunk):
await on_chunk("END")
else:
on_chunk("END")
full_message_content = "".join(collected_messages)
final_choice = Choice(
index=0,
message={"role": "assistant", "content": full_message_content},
finish_reason=collected_chunks[-1].choices[0].finish_reason,
)
return ChatCompletion(
id=collected_chunks[0].id,
choices=[final_choice],
created=collected_chunks[0].created,
model=collected_chunks[0].model,
system_fingerprint=getattr(collected_chunks[0], "system_fingerprint", None),
object="chat.completion",
usage=usage,
)
# 音声系のutilities
def is_silent(audio_data: bytes, silence_threshold: int) -> bool:
"""無音検出"""
data = np.frombuffer(audio_data, dtype=np.int16)
return np.max(np.abs(data)) < silence_threshold
def convert_frames_to_bytesio(frames: list[bytes], sample_rate: int) -> io.BytesIO:
"""音声データをバイトIOに変換する。"""
byte_io = io.BytesIO()
with wave.open(byte_io, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2)
wf.setframerate(sample_rate)
wf.writeframes(b"".join(frames))
byte_io.seek(0)
# https://sogo.dev/posts/2024/10/whisper-from-line-messaging-api
byte_io.name = "audio.wav"
return byte_io
async def stt(frames: list[bytes], sample_rate: int) -> None:
"""音声データをテキストに変換する(whisper)。
Args:
frames: 音声データ
sample_rate: サンプリングレート
"""
# https://sogo.dev/posts/2024/10/whisper-from-line-messaging-api
audio_bytes = convert_frames_to_bytesio(frames, sample_rate)
transcript = await create_client().audio.transcriptions.create(
model="whisper-1", file=audio_bytes, response_format="text", language="ja"
)
transcript = transcript.strip()
return transcript
async def stt_local(frames: list[bytes], sample_rate: int) -> None:
"""音声データをテキストに変換する(faster-whisper)。
Args:
frames: 音声データ
sample_rate: サンプリングレート
"""
audio_bytes = convert_frames_to_bytesio(frames, sample_rate)
global local_whisper
if local_whisper is None:
local_whisper = WhisperModel("small", device="cpu", compute_type="int8")
segments, info = local_whisper.transcribe(
audio_bytes, vad_filter=True, without_timestamps=True
)
return " ".join([s.text for s in segments])
async def tts_stream(
text: str, voice: str = "echo", speed: float = 1.5, volume_factor: float = 1.0
) -> AsyncGenerator:
async with create_client().audio.speech.with_streaming_response.create(
model="tts-1",
voice=voice,
response_format="wav",
input=text,
speed=speed,
) as response:
# WAVヘッダーをスキップするためのフラグ
header_skipped = False
async for chunk in response.iter_bytes(chunk_size=2048): # chunk_sizeを合わせる
if not header_skipped:
# 最初の44バイト(WAVヘッダー)をスキップ
header_skipped = True
continue
# 音声データをnumpy配列に変換
audio_array = np.frombuffer(chunk, dtype=np.int16)
# 音量調整
amplified = audio_array * volume_factor
amplified = np.clip(amplified, -32768, 32767)
# 処理したデータを返す
yield amplified.astype(np.int16).tobytes()
def create_audio_stream(
direction: str,
sample_rate: int = 16000,
chunk_size: int = 2048,
device_index: int | None = None,
) -> pyaudio.PyAudio.Stream:
"""音声ストリームを作成する。
Args:
direction: "input" or "output"
sample_rate: サンプリングレート (16000 - 音声認識用, 44100 - 音楽用)
chunk_size: 音声チャンクサイズ (1024 - 低遅延, 2048 - 通常, 4096 - 高品質)
device_index: 入力デバイスのインデックス
"""
audio = pyaudio.PyAudio()
kwargs = {
"format": pyaudio.paInt16,
"channels": 1,
"rate": sample_rate,
"frames_per_buffer": chunk_size,
}
if direction == "input":
kwargs["input"] = True
kwargs["input_device_index"] = device_index
else:
kwargs["output"] = True
kwargs["output_device_index"] = device_index
return audio.open(**kwargs)
# その他判定器系utility(高速な動作を保ちつつ性能の向上が必要;今回は適当なルールベース)
async def is_filler(text: str, messages: list[dict]) -> bool:
"""フィラー判定。ここを正確に早くできるとより自然なリアルタイム会話になれそうな気がする。"""
return len(text) < 5
async def generate_aizuchi(text: str) -> str:
# 基本的な相槌パターン
formal = ["なるほどね", "それはそれはだねぇ", "ふむふむだねぇ", "うむうむわかるな"]
casual = ["まぁそうだねぇ", "そうそうそう", "そうなのよ", "そうだよねぇ"]
reactive = ["ほぉ", "いやいや"]
aisatsu = ["こんにち", "やっほ"]
love = ["好き", "大好き", "すき", "だいすき"]
if text.startswith(tuple(aisatsu)):
return "やっほ〜"
if len(text) < 5:
return None
if any(l in text for l in love) or text.endswith(("とか")) or "しよう" in text:
return "いいねぇ〜"
if text.endswith(("かな", "かも", "だろう")):
return random.choice(formal) + "〜" * random.randint(3, 10)
if text.endswith(("んだ", "のよ", "だよ")):
return random.choice(casual) + "〜" * random.randint(3, 10)
if text.endswith(("!", "!", "。。")):
return random.choice(reactive) + "〜" * random.randint(3, 10)
if len(text) > 40:
return random.choice(formal)
# 普通の文末(「だ」「よ」「ね」など)
return random.choice(casual)
# 音声認識(stt)はfaster-whisperをローカルで使用する場合CPU-boundとなるのでプロセスを分ける
# OpenAIのwhisperを使う場合はIO-boundな処理になるのでプロセスを分けずとも問題ないが
# わかりやすさのためにsttを別プロセスとしている
@dataclass
class SttRequest:
frames: list[bytes]
sample_rate: int
id: str | None = None
def __post_init__(self):
self.id = str(uuid.uuid4())
def stt_process(input_queue: mp.Queue, output_queue: mp.Queue) -> None:
async def _main(iq: mp.Queue, oq: mp.Queue) -> None:
logger.debug("Started stt process")
q = asyncio.Queue()
# 親→子プロセス間の通信用ループ。
async def _input_loop() -> None:
logger.debug("Started stt_process.input_loop")
while True:
if not iq.empty():
q.put_nowait(iq.get())
await asyncio.sleep(0)
# 子→親プロセス間の通信用ループ。
async def _output_loop() -> None:
logger.debug("Started stt_process.output_loop")
while True:
req: SttRequest = await q.get()
logger.debug(f"Start stt: {req.id}")
s = time.time()
def _notify_finished(fut: asyncio.Future) -> None:
# タスク完了時に親プロセスに通知を送る
oq.put_nowait((req.id, fut.result()))
logger.success(f"Finished stt in {time.time() - s:.2f}s: {req.id}")
# sttをスケジューリングして終了時に通知する
# sttはAPIリクエストなのでタスク実行することで同時実行できる
# stt_localを使う場合cpu-boundなので同時実行にならない点に注意
coro = stt(req.frames, req.sample_rate)
# coro = stt_local(req.frames, req.sample_rate)
task = asyncio.create_task(coro)
task.add_done_callback(partial(_notify_finished))
async with asyncio.TaskGroup() as tg:
try:
tg.create_task(_input_loop())
tg.create_task(_output_loop())
logger.debug("Started stt_process")
except* Exception as e:
logger.error(f"Error: {e}")
asyncio.run(_main(input_queue, output_queue))
# 音声出力はCPU-boundで他の処理をブロッキングしてしまうためプロセスを分ける
# ttsはAPIリクエストのためにIO-boundな処理になるのでプロセスを分けずとも問題ないが
# sttとの対応でわかりやすいように+ローカルで実行されるttsモデルの組み込みもしやすいように
# ttsごと別プロセスとしている
@dataclass
class TtsRequest:
action: Literal["speak", "terminated", "pause"]
text: str | None = None
id: str | None = None
def __post_init__(self):
if self.id is None:
self.id = str(uuid.uuid4())
def tts_process(input_queue: mp.Queue, output_queue: mp.Queue) -> None:
# tts_streamが非同期実装なので非同期実行する
async def _main(iq: mp.Queue, oq: mp.Queue) -> None:
logger.success("Started tts_process")
q = asyncio.Queue()
stream_queue = asyncio.Queue()
pause_lock = asyncio.Lock()
cancel_event = asyncio.Event()
async def _wait_input_queue_empty() -> None:
logger.debug("Wait input queue empty")
while not iq.empty():
await asyncio.sleep(0)
logger.debug("Input queue is empty")
async def _enable_pause_lock(pause_interval: int = 3) -> None:
# ポーズのロックをかける(${pause_interval}秒)
async with pause_lock:
oq.put_nowait(("paused", None))
logger.debug(f"Pause tts in {pause_interval} seconds ...")
# input_queueが空になるか${pause_interval}秒待機
await asyncio.sleep(pause_interval)
# await asyncio.wait_for(
# _wait_input_queue_empty(), timeout=pause_interval
# )
logger.debug("Resume tts output")
oq.put_nowait(("resumed", None))
async def _receiving_loop() -> None:
logger.success("Started tts_process.receiving_loop")
while True:
if not iq.empty():
req: TtsRequest = iq.get()
logger.debug(f"Received tts request: {req}")
match req.action:
case "terminate":
break
case "speak":
# キャンセルを可能な状態にする
cancel_event.clear()
# 出力キューに通知
q.put_nowait(req)
case "pause":
# ポーズを有効にする
# ここでawaitすると親プロセスからのキャンセルが効かなくなるのでタスクをスケジューリングする
asyncio.create_task(_enable_pause_lock())
case "cancel":
# キャンセルを通知
cancel_event.set()
await asyncio.sleep(0)
async def _scheduling_loop() -> None:
logger.success("Started tts_process.scheduling_loop")
while True:
req: TtsRequest = await q.get()
# キャンセルイベント中はスケジューリングせずに親プロセスに通知
if cancel_event.is_set():
logger.debug(f"Canceled tts: {req}")
oq.put_nowait(("cancelled", req))
continue
# スケジューリングした上で、streamオブジェクトをdecoding_loopに渡す
stream_queue.put_nowait((tts_stream(req.text), req))
async def _decoding_loop() -> None:
logger.success("Started tts_process.decoding_loop")
output_stream = create_audio_stream(
"output", device_index=None, sample_rate=24000
)
try:
while True:
# APIリクエストはスケジューリング後、順次実行されるため並行に処理される。
# 一方で、decoding_loopはリクエスト済みのストリームがキューイングされており、
# キューイングされた順で直列で行われる。
# 異なるttsの結果が同時に出力されることはなく発話順序も保たれる
stream, req = await stream_queue.get()
logger.debug(f"Started tts: {req}")
oq.put_nowait(("started", req))
s = time.time()
async for chunk in stream:
# pauseのロックを取得してから出力する = pause中はasync with pause_lockで待機させ出力を停止
async with pause_lock:
# キャンセルイベント中はttsを中断
if cancel_event.is_set():
break
output_stream.write(chunk)
elapsed = time.time() - s
if cancel_event.is_set():
logger.warning(f"Cancelled tts in {elapsed:.2f}s: {req}")
oq.put_nowait(("cancelled", req))
else:
logger.success(f"Finished tts in {elapsed:.2f}s: {req}")
oq.put_nowait(("finished", req))
finally:
output_stream.stop_stream()
output_stream.close()
async with asyncio.TaskGroup() as tg:
logger.success("audio_output_process: start")
tg.create_task(_receiving_loop())
tg.create_task(_scheduling_loop())
tg.create_task(_decoding_loop())
asyncio.run(_main(input_queue, output_queue))
class Status(StrEnum):
LISTENING = "👂 listening ..."
SPEAKING = "🔊 speaking ..."
RECORDING = "🎤 recording ..."
PAUSED = "⏸️ paused ..."
class GUI:
"""音声チャットページクラス。非同期処理との相性の良さから[flet](https://flet.dev/)を使用。"""
def __init__(self, page: ft.Page):
self.page = page
self.page.title = "Voice Chat Assistant"
self.page.vertical_alignment = ft.MainAxisAlignment.CENTER
self.page.horizontal_alignment = ft.CrossAxisAlignment.CENTER
# ステータス表示用のテキスト
self.assistant_status_text = ft.Text(size=20)
self.user_status_text = ft.Text(size=20)
# チャット履歴表示用のコンテナ
self.chat_history = ft.ListView(expand=True, spacing=10, auto_scroll=True)
# レイアウトの設定
self.page.add(
ft.Container(
content=ft.Column(
[
ft.Row(
[self.assistant_status_text, self.user_status_text],
alignment=ft.MainAxisAlignment.SPACE_BETWEEN,
),
ft.Container(
content=self.chat_history,
expand=True,
),
],
),
expand=True,
padding=20,
)
)
self.set_assistant_status("👂 listening ...")
self.set_user_status("🟢 ready")
# ページの自動更新タスクをスケジューリングする
async def auto_page_update(interval: int = 1) -> None:
while True:
await asyncio.sleep(interval)
page.update()
self._page_update_task = asyncio.create_task(auto_page_update())
self._messages = []
def set_user_status(self, status: Status) -> None:
"""ステータスを更新する"""
self.user_status_text.value = status
def set_assistant_status(self, status: Status) -> None:
"""ステータスを更新する"""
self.assistant_status_text.value = status
def add_message(
self, role: Literal["assistant", "user"], content: str, **data
) -> str:
"""チャット履歴にメッセージを追加する"""
message_text = ft.Text(
content.strip(),
size=16,
color=ft.Colors.WHITE if role == "assistant" else ft.Colors.BLACK,
key=str(uuid.uuid4()),
data={"role": role.lower(), **data},
)
message_container = ft.Container(
content=message_text,
bgcolor=(
ft.Colors.GREY_400 if role == "assistant" else ft.Colors.LIGHT_GREEN_200
),
padding=10,
border_radius=10,
width=self.page.width * 0.7,
)
row_container = ft.Container(
content=ft.Row(
[
(
message_container
if role == "assistant"
else ft.Container(expand=True)
), # assistantの場合は左寄せ
(
ft.Container(expand=True)
if role == "assistant"
else message_container
), # Youの場合は右寄せ
],
alignment=ft.MainAxisAlignment.START,
),
)
self.chat_history.controls.append(row_container)
self._messages.append({"role": role.lower(), "content": content})
# text objectのuidを取得するためにページを更新する
self.page.update()
logger.debug(
f"Added a message to chat history: {message_text.uid} / {role} /{content}"
)
return message_text.uid
def update_message(self, id: str, content: str) -> None:
"""チャット履歴のメッセージを更新する"""
if control := self.page.get_control(id):
control.value = content
def delete_message(self, id: str) -> None:
control = self.page.get_control(id)
control.parent.parent.clean()
def compile_messages(self, instruction: str | None = None) -> list[dict]:
instruction_message = (
[{"role": "system", "content": instruction}]
if instruction is not None
else []
)
chat_messages = [
{"role": t.data["role"], "content": t.value}
for t in self.page.index.values()
if isinstance(t, ft.Text)
and t.data
and t.data.get("filler") is None
and t.value != "発話中..."
]
return instruction_message + chat_messages
class User:
def __init__(
self,
gui: GUI,
sample_rate: int = 16000,
chunk_size: int = 1024,
silence_threshold: int = 2000,
silence_duration: float = 0.66,
max_recording_duration: float = 10,
input_device_index: int | None = None,
):
self._gui = gui
self._sample_rate = sample_rate
self._chunk_size = chunk_size
self._silence_threshold = silence_threshold
self._silence_duration = silence_duration
self._max_recording_duration = max_recording_duration
self._input_device_index = input_device_index
self._silence_chunks = int(silence_duration * sample_rate / chunk_size)
self._max_chunks = int(max_recording_duration * sample_rate / chunk_size)
self._min_audio_frames = 8
self._frames_queue = asyncio.Queue()
self._transcription_queue = asyncio.Queue()
self._is_speaking = False
async def input_stream(self) -> None:
async with asyncio.TaskGroup() as tg:
task_recording = tg.create_task(self._recording_loop())
task_transcribing = tg.create_task(self._transcribing_loop())
try:
while True:
# 音声認識結果を取得して配信する
transcription = await self._transcription_queue.get()
if transcription == "":
continue
logger.debug(f"Input: {transcription}")
yield transcription
except* Exception as e:
logger.error(f"Error: {e}")
logger.debug(f"Recording task status: {task_recording._state}")
logger.debug(f"Transcribing task status: {task_transcribing._state}")
async def _recording_loop(self) -> None:
"""音声入力用のループ。"""
logger.debug("Start recording ...")
input_stream = create_audio_stream(
"input", self._sample_rate, self._chunk_size, self._input_device_index
)
frames, silent_cnt, buffer, self._is_speaking = ([], 0, deque(), False)
while data := input_stream.read(self._chunk_size, exception_on_overflow=False):
self._gui.set_user_status(Status.LISTENING)
_is_silent = is_silent(data, self._silence_threshold)
if not self._is_speaking:
self._gui.set_user_status(Status.LISTENING)
# 非発話状態では全フレームをバッファーに保存
buffer.append(data)
if len(buffer) > 5:
buffer.popleft()
# 音を検知した場合発話開始としてフレームを構築
if not _is_silent:
self._is_speaking = True
frames.extend(buffer)
frames.append(data)
else:
# 発話状態ではフレームを追加していく
self._gui.set_user_status(Status.RECORDING)
frames.append(data)
if _is_silent:
silent_cnt += 1
else:
silent_cnt = 0
# フレーム数が一定以上、かつ、無音フレームが一定以上の場合、または、最大フレーム数を超えた場合にフレームをキューに追加
if (
silent_cnt > self._silence_chunks
and len(frames) > self._min_audio_frames
) or len(frames) > self._max_chunks:
self._frames_queue.put_nowait(frames)
logger.debug(f"Recorded: {len(frames)}")
frames, silent_cnt, buffer, self._is_speaking = ([], 0, deque(), False)
await asyncio.sleep(0)
async def _transcribing_loop(self) -> None:
"""音声認識用のループ。音声認識は別プロセスに分けるのでそのプロセスとの通信を行う。"""
logger.debug("Start transcribing ...")
send_queue = mp.Queue()
receive_queue = mp.Queue()
process = mp.Process(
target=stt_process,
args=(send_queue, receive_queue),
)
process.start()
async def _send_task() -> None:
# recording_loopからのフレームを受け取り、stt_processに送信する
while frames := await self._frames_queue.get():
logger.debug(f"Send frames to transcription process ...")
item = SttRequest(frames=frames, sample_rate=self._sample_rate)
send_queue.put_nowait(item)
async def _receive_task() -> None:
# stt_processからの結果を受け取り、結果をtranscription_queueに追加する
while True:
if not receive_queue.empty():
_id, transcription = receive_queue.get_nowait()
if transcription == "":
logger.debug(f"Skip empty transcription: {_id}")
else:
logger.debug(f"Received transcription: {_id}={transcription}")
self._transcription_queue.put_nowait(transcription)
await asyncio.sleep(0.1)
async with asyncio.TaskGroup() as tg:
tg.create_task(_send_task())
tg.create_task(_receive_task())
@property
def is_speaking(self) -> bool:
return self._is_speaking
class Assistant:
def __init__(
self,
gui: GUI,
separator: str = "|",
text_buffer_length: int = 20,
):
self._gui = gui
self._text: str = ""
self._text_buffer_length = text_buffer_length
# ttsプロセスを開始
self._from_tts_queue = mp.Queue()
self._to_tts_queue = mp.Queue()
self._tts_process = mp.Process(
target=tts_process,
args=(self._to_tts_queue, self._from_tts_queue),
)
self._tts_process.start()
self._task_tts_signal_handler = asyncio.create_task(self._tts_signal_handler())
self._is_paused = False
self._pause_lock = asyncio.Lock()
self._queued_requests: dict[str, TtsRequest] = {}
# 途中で音声出力できるように切れ目込みで出力してもらう
self._separator = separator
self._chat_instruction = f"""
あなたは陽気なおじさん風雑談アシスタントです。ユーザーと楽しく会話してください。
[指示]
- アシスタントメッセージは音声出力を行います。
- 1文ごとに{self._separator}を入れてください(出力の最後にも忘れずに入れてください)。
- カジュアルな会話を心がけてください。
- ユーザーの発話は音声認識結果なので誤字脱字が多く含まれます。文脈から意図を読み取って回答してください。
"""
async def speak(self, text: str) -> None:
logger.debug(f"Speak: {text}")
_id = self._gui.add_message("assistant", "発話中...")
tts_request = TtsRequest(action="speak", text=text, id=_id)
self._to_tts_queue.put_nowait(tts_request)
self._queued_requests[_id] = tts_request
async def speak_one(self, text: str, **data) -> None:
logger.debug(f"Speak: {text}")
_id = self._gui.add_message("assistant", text, **data)
tts_request = TtsRequest(action="speak", text=text, id=_id)
self._to_tts_queue.put_nowait(tts_request)
self._queued_requests[_id] = tts_request
async def pause(self) -> None:
# すでにポーズ中の場合はすぐに返す
if self._is_paused:
return
# tts processにポーズ指示を出す
self.pause_nowait()
# ポーズ状態になるのを確認
while not self._is_paused:
await asyncio.sleep(0)
# .非ポーズ状態になるまで待機
while self._is_paused:
await asyncio.sleep(0)
def pause_nowait(self) -> None:
self._to_tts_queue.put_nowait(TtsRequest(action="pause"))
async def cancel(self, sleep_before: int = 1) -> None:
await asyncio.sleep(sleep_before)
self.cancel_nowait()
def cancel_nowait(self) -> None:
self._to_tts_queue.put_nowait(TtsRequest(action="cancel"))
async def chat(self, messages: list[dict]) -> "SpeakTrigger":
last_message = messages[-1]
logger.info(f"Generate assistance response@{last_message}")
start_time = time.time()
trigger = SpeakTrigger()
trigger.streaming_task = asyncio.create_task(
chat_stream(
messages,
on_chunk=partial(trigger.on_chunk, assistant=self),
)
)
trigger.streaming_task.add_done_callback(
lambda fut: logger.success(
f"Chat stream finished in {time.time() - start_time:.2f}s: {'canceled' if fut.cancelled() else fut.result()}@{last_message}"
)
)
return trigger
async def speak_aizuchi(self, aizuchi: str) -> None:
logger.debug(f"Run aizuchi: {aizuchi}")
await asyncio.sleep(0.33)
await self.speak_one(aizuchi, filler=True)
async def _tts_signal_handler(self) -> None:
while True:
if not self._from_tts_queue.empty():
status, req = self._from_tts_queue.get_nowait()
logger.debug(f"Received tts signal: {status} / {req}")
match status:
case "started":
self._gui.set_assistant_status(Status.SPEAKING)
case "cancelled":
self._gui.delete_message(req.id)
self._pop_queued_request(req.id)
self._gui.set_assistant_status(Status.LISTENING)
case "paused":
self._gui.set_assistant_status(Status.PAUSED)
self._is_paused = True
case "resumed":
self._gui.set_assistant_status(Status.SPEAKING)
self._is_paused = False
case "finished":
self._gui.update_message(req.id, req.text)
self._pop_queued_request(req.id)
self._gui.set_assistant_status(Status.LISTENING)
await asyncio.sleep(0)
def _pop_queued_request(self, _id: str) -> None:
try:
self._queued_requests.pop(_id)
except KeyError:
...
@property
def chat_instruction(self) -> str:
return self._chat_instruction
@property
def separator(self) -> str:
return self._separator
@property
def is_speaking(self) -> bool:
return len(self._queued_requests) > 0
@property
def is_paused(self) -> bool:
return self._is_paused
@dataclass
class SpeakTrigger:
"""音声出力トリガー。ストリームのチャットテキストを受け取ってアシスタントの音声合成をトリガーする。"""
text: str = ""
buffer_length: int = 20
filler: bool = False
cancel_event: asyncio.Event | None = None
is_finished: bool = False
streaming_task: asyncio.Task | None = None
id: str = str(uuid.uuid4())
def __post_init__(self):
self.cancel_event = asyncio.Event()
async def cancel(self) -> None:
logger.debug(f"Cancel speak trigger {self.id}: {self.text}")
self.cancel_event.set()
async def on_chunk(self, chunk: str, assistant: Assistant) -> None:
if self.cancel_event.is_set():
if self.is_finished:
return
else:
if chunk == assistant.separator:
logger.warning(f"Trigger speak on cancel: {self.text}")
await assistant.speak(self.text)
self.is_finished = True
else:
self.text += chunk
else:
if chunk == assistant.separator:
if len(self.text) > self.buffer_length and self.text != "":
logger.debug(
f"Trigger speak in the middle of the sentence: {self.text}"
)
await assistant.speak(self.text)
self.text = ""
elif chunk == "END" and self.text != "":
logger.debug(f"Trigger speak at the end of the sentence: {self.text}")
await assistant.speak(self.text)
else:
self.text += chunk
class RealTimeVoiceChat:
def __init__(self, page: ft.Page):
self._gui = GUI(page)
self._user = User(self._gui)
self._assistant = Assistant(self._gui)
self._user_input_queue = asyncio.Queue()
async def run(self) -> None:
async with asyncio.TaskGroup() as tg:
try:
task_input = tg.create_task(self._user_input_streaming())
task_output = tg.create_task(self._assistant_output_streaming())
except* Exception as e:
logger.error(f"Error: {e}")
logger.info("Terminating the output process")
logger.info(f"Input stream task status: {task_input._state}")
logger.info(f"Output stream task status: {task_output._state}")
async def _user_input_streaming(self) -> None:
async for inp in self._user.input_stream():
logger.debug(f"user: {inp}")
self._gui.add_message("user", inp)
# 発話が続いている場合は待機
if self._user.is_speaking:
# 時々相槌を打つ
if random.random() <= 0.33:
asyncio.create_task(self._assistant.speak_aizuchi("うんうん"))
logger.debug("User is still speaking. Continue recording ...")
continue
await self._user_input_queue.put(inp)
async def _assistant_output_streaming(self) -> SpeakTrigger:
trigger: SpeakTrigger | None = None
while True:
inp = await self._user_input_queue.get()
# 会話履歴を取得
messages = self._gui.compile_messages(self._assistant.chat_instruction)
# 新しい入力がフィラーの場合は何もせずに次へ
if await is_filler(inp, messages):
logger.debug(f"Filler detected: {inp}")
continue
else:
# フィラーでない入力があった場合、アシスタントの発話をキャンセルする
if self._assistant.is_speaking:
logger.warning(f"Cancel current speaking due to new input: {inp}")
self._assistant.cancel_nowait()
if trigger:
await trigger.cancel()
# ユーザーもアシスタントも発話をしていなければ
if not self._assistant.is_speaking and not self._user.is_speaking:
# 最後のユーザー発話に対してクイックに相槌を打つ
if aizuchi := await generate_aizuchi(inp):
asyncio.create_task(self._assistant.speak_aizuchi(aizuchi))
messages += [{"role": "assistant", "content": aizuchi}]
trigger = await self._assistant.chat(messages)
async def main(page: ft.Page):
try:
await RealTimeVoiceChat(page).run()
except KeyboardInterrupt as e:
...
if __name__ == "__main__":
ft.app(target=main)
Discussion