Open6

LiveKit Agents v1.x 色々お試しメモ

kun432kun432

ルーム入室時に挨拶させる

Voice Agent Quickstartだとこうなっている。抜粋。

class Assistant(Agent):
    def __init__(self) -> None:
        super().__init__(instructions="あなたは親切な日本語のAI音声アシスタントです。")


async def entrypoint(ctx: agents.JobContext):
    await ctx.connect()

    session = AgentSession(
        stt=openai.STT(model="whisper-1", language="ja"),
        llm=openai.LLM(model="gpt-4o-mini"),
        tts=openai.TTS(model="tts-1", voice="coral"),
        vad=silero.VAD.load(),
        turn_detection=MultilingualModel(),
    )

    await session.start(
        room=ctx.room,
        agent=Assistant(),
    )

    await session.generate_reply(
        instructions="ユーザーに挨拶し、支援を申し出てください。"
    )

entrypointの中で、エージェントセッション開始後に手動で発話させている。このときの発話はgenerate_replyを使って、LLMに生成させている。

Pipelineノードを使うとエージェントそのものに処理を追加できる。on_enterをエージェント側に追加することもできる。上と挙動としては同じになる。

class Assistant(Agent):
    def __init__(self) -> None:
        super().__init__(instructions="あなたは親切な日本語のAI音声アシスタントです。")

    async def on_enter(self):
        await session.generate_reply(
            instructions="ユーザーに挨拶し、支援を申し出てください。"
        )


async def entrypoint(ctx: agents.JobContext):
    await ctx.connect()

    session = AgentSession(
        stt=openai.STT(model="whisper-1", language="ja"),
        llm=openai.LLM(model="gpt-4o-mini"),
        tts=openai.TTS(model="tts-1", voice="coral"),
        vad=silero.VAD.load(),
        turn_detection=MultilingualModel(),
    )

    await session.start(
        room=ctx.room,
        agent=Assistant(),
    )

session.say()を使えば、LLMを使わずに、テキストでそのままSTTに発話させる。

class Assistant(Agent):
    def __init__(self) -> None:
        super().__init__(instructions="あなたは親切な日本語のAI音声アシスタントです。")

    async def on_enter(self) -> None:
        await self.session.say("こんにちは!今日はどのようなご用件ですか?")

entrypointの中と、エージェントのon_enterで両方指定した場合はどうなるか?

class Assistant(Agent):
    def __init__(self) -> None:
        super().__init__(instructions="あなたは親切な日本語のAI音声アシスタントです。")

    async def on_enter(self) -> None:
        await self.session.say("on_enterで呼ばれました。")


async def entrypoint(ctx: agents.JobContext):
    await ctx.connect()

    session = AgentSession(
        stt=openai.STT(model="whisper-1", language="ja"),
        llm=openai.LLM(model="gpt-4o-mini"),
        tts=openai.TTS(model="tts-1", voice="coral"),
        vad=silero.VAD.load(),
        turn_detection=MultilingualModel(),
    )

    await session.start(
        room=ctx.room,
        agent=Assistant(),
    )

    await session.say("entrypointで呼ばれました。")

「entrypointで呼ばれました。」→「on_enterで呼ばれました。」となる。

マルチエージェント構成などで、エージェントが切り替わって、切り替わり後のエージェントに発話させる場合などはエージェントクラス側で持たせたほうがいいと思う。接続直後に必ず固定で発話させたいとかならentrypointでやるのが良さそう。

kun432kun432

イベント周りの変更

v0.xのVoicePipelineAgentでは、ユーザ・エージェントの各アクションや処理ステップで「イベント」が発行されていた。

https://docs.livekit.io/agents/v0/voice-agent/voice-pipeline/#emitted-events

イベント 説明
user_started_speaking ユーザーが発話を開始した時。
user_stopped_speaking ユーザーが発話を停止した時。
agent_started_speaking エージェントが発話を開始した時。
agent_stopped_speaking エージェントが発話を停止した時。
user_speech_committed ユーザーの発話内容がチャットコンテキストにコミットされた時。
agent_speech_committed エージェントの発話内容がチャットコンテキストにコミットされた時。
agent_speech_interrupted エージェントが会話中に中断された時。
function_calls_collected 実行されるべき関数の完全なセットが受信された時。
function_calls_finished すべての関数呼び出しが実行された時。
metrics_collected メトリクスが収集された時。

上記は、VoicePipelineAgentのパイプラインの各コンポーネント、STT・LLM・TTSが行われたそれぞれのタイミングの前後で発火する。またv0.Xではこれらは全てVoicePipelineAgentのデコレータメソッドon()で全てのイベントを取得できた。

    agent = VoicePipelineAgent(...)

    @agent.on("metrics_collected")
    def _on_metrics_collected(mtrcs: metrics.AgentMetrics):
        metrics.log_metrics(mtrcs)

v1.0ではこれらがPipelineノードを使ってカスタマイズできるようになっている。

https://docs.livekit.io/agents/build/nodes/

ノード 説明
on_enter() エージェントがセッションに入った時。
on_exit() エージェントがセッションから退出した時。
on_user_turn_completed() ユーザーのターンが完了した時。
transcription_node() エージェントのLLMの出力の書き起こし処理時。
stt_node() エージェントのSTT処理時(パイプラインのみ)。
llm_node() エージェントの LLM 処理時ステップ(パイプラインのみ)。
tts_node() エージェントの TTS 処理時ステップ(パイプラインのみ)。
realtime_audio_output_node() エージェントの音声出力ステップ時(Realtimeのみ)。

v0.Xからのマイグレガイドにもあるように、

https://docs.livekit.io/agents/start/v0-migration/

user_started_speaking and user_stopped_speaking events are no longer emitted. They've been combined into a single user_state_changed event.

となっていて、v0.Xでは各ステップの「前後」でイベント発生していたものが、v1.0では全体パイプラインの前後などは増えているが、個々のステップの前後は取れなくなっているように思える。

また、v1.0の各Pipelineノードは、Agentクラスのメソッドとして実装することで、挙動をオーバーライドすることが目的になっているため、v0.Xのような「イベント」とはやや異なる使い方になる。

class MyAgent(Agent):
    async def tts_node(self, text: AsyncIterable[str], model_settings: ModelSettings):
        # デフォルトのTTSをオーバーライドして、テキストの前処理を行う
        return Agent.default.tts_node(self, tokenize.utils.replace_words(text), model_settings)

従来のイベントに相当するものは、AgentSession側にデコレータメソッドon()があった。

    session = AgentSession(...)

    @session.on("agent_state_changed")
    ...
    
    @session.on("user_state_changed")    
    ...

    @session.on("metrics_collected")
    ...

    @session.on("speech_created")
    ...

    @session.on("close")
    ...

    @session.on("error")
    ...

    @session.on("conversation_item_added")
    ...

    @session.on("user_input_transcribed")
    ...

    @session.on("function_tools_executed")
    ...

それぞれのタイミングまでは確認していない。

イベントとPipelineノードでそれぞれ目的が異なる上、v0.Xとはイベントの内容も異なる、という点に注意が必要。

あとLiveKitサーバ側のルームで発生するイベントも @ctx.room.onみたいな感じで拾えるようだが、これはv0.Xもv1.0も同じだし、ちょっとエージェントのレイヤーとは異なると思う。

https://docs.livekit.io/home/client/events/

kun432kun432

コンテキスト・会話履歴

「会話履歴」については、ピンポイントなドキュメントが見つかりにくいが、この辺(トピックの主題が全部違う・・・。会話履歴はLLMを使う場合に重要なので別途章立てしてほしい・・・)

https://docs.livekit.io/agents/build/agents-handoffs/#context-preservation

https://docs.livekit.io/agents/observability/data/#session-transcripts-and-reports

https://docs.livekit.io/agents/build/testing/#loading-conversation-history

kun432kun432

Windsurfに、以下のAgentsと初心者向けスターターキットのソースコードと、LiveKitドキュメントのMCPを渡して、わかりやすく説明してもらった。(正解かどうかは保証しない)

https://github.com/livekit-examples/agent-starter-python

https://github.com/livekit/agents

https://docs.livekit.io/home/get-started/mcp-server/

LiveKit Agents の会話履歴(Conversation History)管理メモ

この文書は、このレポジトリに含まれる LiveKit Agents(Python)の実装を前提に、会話履歴がどのように保持・更新されるかを、コードと公式ドキュメントに沿ってまとめたものです。

対象の主なコード:

関連ドキュメント(公式):

1. まず結論(どういう設計か)

LiveKit Agents では、会話履歴を「テキストのログ」ではなく、会話のアイテムの列として扱います。

  • ひとまず推論(言語モデル)に渡すときは、履歴をコピーしてそこに今回の入力などを追加します。
  • その後「確定した」と判断できたものだけを、セッションの履歴(全体)と、エージェントの履歴にコミットします。

この二段構えによって、割り込みや取り消しが起きても、履歴が壊れにくくなっています。

2. 用語(やさしく説明 → 用語名)

ここでは、必要な専門語だけを短く整理します。

会話履歴の入れ物

会話履歴は「メッセージやツールの実行などを、時間順に並べたリスト」です。

この入れ物が ChatContext(チャットコンテキスト / ChatContext)です。

会話の1つ1つの要素

ChatContext の中に入るのは、ChatItem という種類の要素です。代表的には次のものがあります。

  • ChatMessage(発話。roleuser / assistant / system / developer
  • FunctionCall(ツールの呼び出し)
  • FunctionCallOutput(ツール結果)
  • AgentHandoff(エージェントの切り替え)

ポイントは、ツール実行も同じ履歴に入ることです。

3. 履歴を「誰が」持つか(2つある)

LiveKit Agents の実装を見ると、履歴は主に2か所にあります。

3.1 エージェントが持つ履歴(そのエージェントの履歴)

  • Agent の内部に self._chat_ctx があり、外からは agent.chat_ctx で見えます。
  • agent.chat_ctx読み取り専用です。直接いじるとエラーになります(意図的な設計)。

履歴を安全に差し替える正規ルートは Agent.update_chat_ctx() です。

  • セッション開始前(まだ動いていない): self._chat_ctx = chat_ctx.copy(...)
  • セッション中(動作中): activity 経由で更新(リアルタイム動作を壊しにくい)

3.2 セッションが持つ履歴(セッション全体の履歴)

AgentSession の内部にも self._chat_ctx があり、session.history で取得できます。

公式ドキュメントでも、session.history は「セッションの完全な会話」として扱われます。

4. コンポーネント構成(どこに何があるか)

下の図は、会話履歴に関係する主要コンポーネントを、コード上の責務に合わせてまとめたものです。

読み方としては、

  • AgentSession は「全体の進行役」で、セッションの履歴 session.history を持ちます。
  • Agent は「人格・道具・(そのエージェントの)履歴」を持ちます。
  • AgentActivity は「1ターンの処理の中心」で、推論用の一時コンテキストを作ってモデルに渡し、確定したら履歴に反映します。

5. いつ履歴が増えるか(推論用と確定用を分ける)

ここが会話履歴の肝です。

5.1 推論用(まだ確定ではない)

agent_activity.py_pipeline_reply_task_impl() では、次の流れがあります。

  • 受け取った chat_ctxchat_ctx.copy() する
  • 今回の new_message があれば、コピーした方に insert して推論に含める
  • 指示文(instructions)があれば、コピーした方に差し込む
  • そのコピーを perform_llm_inference(... chat_ctx=chat_ctx ...) に渡す

この時点では、セッションの履歴も、エージェントの履歴も増えません

5.2 確定用(コミット)

同じ処理の中で、条件を満たしたときだけ「確定したメッセージ」を履歴に追加します。

例(ユーザー入力):

  • new_message is not None and speech_handle.scheduled のとき
    • self._agent._chat_ctx.insert(new_message)
    • self._session._conversation_item_added(new_message)

例(アシスタント出力):

  • 音声出力などが終わって「送ったテキスト」が確定した段階で
    • self._agent._chat_ctx.add_message(role="assistant", ...)
    • self._session._conversation_item_added(msg)

この作りのおかげで、割り込みがあったときに

  • 「推論は走ったが、会話としては成立しなかった」

のようなケースでも、履歴が必要以上に汚れにくくなります。

6. 1ターンの流れ(シーケンス図)

音声会話を想定した、典型的な1ターンの流れです。

7. 履歴の並び順(なぜ insert があるか)

ChatContext.insert() は、各アイテムの created_at を見て、挿入位置を決めます。

このため、処理が少し前後しても、履歴の順番を「時間順」に整えやすいです。

8. 永続化(保存と復元のやり方)

結論として、SDK が勝手に外部へ保存するわけではなく、あなたが外部ストレージから読み込み、あなたが外部ストレージへ保存します

ここで言う「外部ストレージ」は、ファイルでもデータベースでも構いません。
分かりやすい例として「JSONファイルに保存する」サンプルを使います。

import asyncio
import json
import logging
from pathlib import Path

from dotenv import load_dotenv
from livekit import rtc
from livekit.agents import (
   Agent,
   AgentServer,
   AgentSession,
   ChatContext,
   JobContext,
   JobProcess,
   cli,
   inference,
   room_io,
)
from livekit.agents.llm.chat_context import (
   AgentHandoff,
   ChatMessage,
   FunctionCall,
   FunctionCallOutput,
)
from livekit.plugins import noise_cancellation, silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel

logger = logging.getLogger("agent")

load_dotenv(".env.local")

HISTORY_FILE = Path("/tmp/agent_history.json")
_ITEM_MAP = {
   "message": ChatMessage,
   "function_call": FunctionCall,
   "function_call_output": FunctionCallOutput,
   "agent_handoff": AgentHandoff,
}


class Assistant(Agent):
   def __init__(self) -> None:
       super().__init__(
           instructions="""You are a helpful voice AI assistant. The user is interacting with you via voice, even if you perceive the conversation as text.
           You eagerly assist users with their questions by providing information from your extensive knowledge.
           Your responses are concise, to the point, and without any complex formatting or punctuation including emojis, asterisks, or other symbols.
           You are curious, friendly, and have a sense of humor.""",
       )


def load_chat_ctx_from_json(path: Path) -> ChatContext:
   if not path.exists():
       return ChatContext.empty()

   data = json.loads(path.read_text())
   return ChatContext.from_dict(data)


async def apply_initial_history(agent: Agent, path: Path) -> None:
   chat_ctx = load_chat_ctx_from_json(path)
   if chat_ctx.items:
       await agent.update_chat_ctx(chat_ctx)


async def save_session_history(session: AgentSession, path: Path) -> None:
   existing = load_chat_ctx_from_json(path)
   merged = existing.copy()
   merged.merge(session.history.copy())

   data = merged.to_dict(exclude_audio=False, exclude_timestamp=False)
   tmp_path = path.with_suffix(path.suffix + ".tmp")
   tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2))
   tmp_path.replace(path)

server = AgentServer()


def prewarm(proc: JobProcess):
   proc.userdata["vad"] = silero.VAD.load()


server.setup_fnc = prewarm


@server.rtc_session()
async def my_agent(ctx: JobContext):
   initial_chat_ctx = load_chat_ctx_from_json(HISTORY_FILE)

   agent = Assistant()
   if initial_chat_ctx.items:
       await agent.update_chat_ctx(initial_chat_ctx)

   session = AgentSession(
       stt=inference.STT(model="assemblyai/universal-streaming", language="en"),
       llm=inference.LLM(model="openai/gpt-4.1-mini"),
       tts=inference.TTS(
           model="cartesia/sonic-3", voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc"
       ),
       turn_detection=MultilingualModel(),
       vad=ctx.proc.userdata["vad"],
       preemptive_generation=True,
   )

   async def _write_history() -> None:
       await save_session_history(session, HISTORY_FILE)

   ctx.add_shutdown_callback(lambda: asyncio.create_task(_write_history()))

   await session.start(
       agent=agent,
       room=ctx.room,
       room_options=room_io.RoomOptions(
           audio_input=room_io.AudioInputOptions(
               noise_cancellation=lambda params: noise_cancellation.BVCTelephony()
               if params.participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP
               else noise_cancellation.BVC(),
           ),
       ),
   )

   await ctx.connect()


if __name__ == "__main__":
   cli.run_app(server)

8.1 外部から読み込んで開始する(初期履歴の注入)

開始時にやることは単純で、「外部から履歴を読み込み、ChatContext に戻して、agent.update_chat_ctx(...) でエージェントへ反映する」です。

サンプルでは、次の順番になっています。

  1. JSON を読み込む
  2. ChatContext.from_dict(...)ChatContext に戻す
  3. await agent.update_chat_ctx(chat_ctx) を呼ぶ

このとき、Agentchat_ctx 引数に渡すのではなく、update_chat_ctx() を呼んでいるのは、「セッション開始前も開始後も、同じ入口で履歴を差し替えたい」意図だと読み取れます。

別の例として、OpenAI の Realtime Model の読み込み例もあります。

こちらは JSON から復元する代わりに、アプリ側で持っている [{role, content}, ...]ChatContext に追加して、Agent(..., chat_ctx=chat_ctx) として渡しています。

8.2 終了時に外部へ保存する(セッション履歴の永続化)

終了時は、次の方針が分かりやすいです。

  1. 既存の外部履歴を読み込む(ある場合)
  2. 今回の session.history とマージする
  3. to_dict(...) でJSON向きの形にする
  4. 外部へ書く(.tmp へ書いてから置き換えると壊れにくい)

agent_with_history.py では、ctx.add_shutdown_callback(...) を使って「終了時に保存処理を走らせる」形になっています。保存内容は「既存ファイルの履歴」と「今回の session.history」をマージしたものをto_dict(exclude_audio=False, exclude_timestamp=False) に通したものです。

ここで「セッション履歴 session.history」を保存対象にしているのは、そのセッションで実際に起きた会話の完全な記録を取りたいからです。

補足として、公式ドキュメントでも session.history が保存の入口として説明されています。

8.3 読み込み→会話→保存の全体の流れ(図)

この図のとおり、

  • 「外部→(開始時に)Agentへ注入」
  • 「会話中は session.history が更新され続ける」
  • 「最後に(既存+今回)をマージして外へ出す」

と覚えると整理しやすいです。

9. 監視・連携の入口(イベント)

公式ドキュメントの AgentSession の説明にもある通り、セッションはイベントを出します。

  • conversation_item_added: 履歴にメッセージが追加されたとき
  • user_input_transcribed: 文字起こしができたとき

保存や可視化をしたい場合は、

  • セッション中は conversation_item_added を購読して蓄積
  • セッション終了時に session.history をまとめて保存

という作りにすると、理解しやすく、壊れにくいです。

10. もう少しだけ(履歴を短くする仕組み)

履歴が長くなりすぎると、推論に渡す入力が大きくなります。

ChatContext には、履歴を扱いやすくするための道具があります。

  • truncate(max_items=...): 末尾 N 件に縮める
  • ただし先頭がツール呼び出しで始まらないように調整する
  • system の指示文は残す
  • _summarize(...): 古い会話を要約して圧縮する(LLM を使う)

どれを使うべきかは要件次第ですが、まずは truncate が分かりやすいです。

付録: 参照リンク


ポイントは以下の部分だと思う。

この二段構えによって、割り込みや取り消しが起きても、履歴が壊れにくくなっています。

このあたりは音声ならではの部分で、これにマルチエージェントの要素も合わさって、少し仕組みが見えにくくなっているのだと思う。