🙌

OpenAI Realtime API の 非同期 Function Calling :長時間ツール処理中でも会話が止まらないチャット体験

に公開

はじめに

前回の記事では、Azure OpenAI Realtime APIを使って音声チャットアプリを構築しました。今回は、その続編として、GA版(gpt-realtime)で追加された非同期のFunction Calling機能を実装してみました。

https://github.com/DXC-Lab-Linkage/ai-agent-sample-hub/tree/main/azure_realtimeapi_async_fc

非同期Function Callingとは?

OpenAI Realtime APIのGA版では、Function Callingに重要な改善が加えられました。

長時間実行される Function Calling によってセッションの流れが中断されることはなくなり、モデルは結果の待機中であってもスムーズな会話を継続できます。

つまり、以下のようなシナリオが可能になり、今回のアプリではこのシナリオを実現しています。

非同期Function Callingの流れ(図解)

Realtime APIの非同期Function Callingでは、長時間処理が発生するツール呼び出しをバックグラウンドで実行しつつ、別のツールを並行して処理することが可能です。
以下は、ユーザーがまず時間のかかる調査を依頼し、その処理中に天気を尋ねるケースを図示したものです。

このシーケンスでは、search_database の処理が完了するまでの間に、get_weather のような別のツールを呼び出して並行的に応答できる点がポイントです。
Realtime APIは複数のFunction Callを独立したタスクとして処理できるため、会話が中断されることなく自然な対話体験を実現できます。

サンプルアプリでは、軽量ツールであるget_weather も 非同期に実装していますが、実際のアプリでは必要に応じて同期/非同期を使い分けてください。

実装方法

ここから実装方法について説明します。

1. ツールの定義

まず、2種類のツールを定義します。

  • get_weather: 即座に応答する軽量なツール
  • search_database: 20秒かかる長時間実行ツール

天気予報ツール(軽量)

async def get_weather(location: str, date: Optional[str] = None) -> dict:
    """天気予報を取得する関数"""
    await asyncio.sleep(0.5)  # API呼び出しをシミュレート
    
    weather_patterns = {
        "東京": {"weather": "晴れ", "temp": "25", "humidity": "60"},
        "大阪": {"weather": "曇り", "temp": "23", "humidity": "65"},
    }
    
    location_normalized = location.replace("市", "").replace("都", "").strip()
    weather_info = weather_patterns.get(
        location_normalized,
        {"weather": "晴れのち曇り", "temp": "24", "humidity": "62"}
    )
    
    return {
        "location": location,
        "date": date if date else "今日",
        "weather": weather_info["weather"],
        "temperature": weather_info["temp"],
        "humidity": weather_info["humidity"],
        "forecast": f"{location}の天気は{weather_info['weather']}、気温は{weather_info['temp']}度です。"
    }

データベース検索ツール(長時間実行)

async def search_database(query: str, category: Optional[str] = None) -> dict:
    """
    データベースを検索する関数(長時間実行のシミュレーション)
    
    非同期Function Callingの動作確認用に、意図的に20秒かかるように設定
    """
    wait_time = 20
    logger.info(f"🔍 データベース検索開始: {query} (約{wait_time}秒かかります)")
    await asyncio.sleep(wait_time)
    
    results = [
        {
            "id": 1,
            "title": f"{query}に関する最新の研究論文",
            "summary": "2025年の最新データに基づく包括的な分析結果です。",
            "relevance": "95%"
        },
        {
            "id": 2,
            "title": f"{query}の実践的なガイドライン",
            "summary": "業界標準のベストプラクティスをまとめたドキュメントです。",
            "relevance": "88%"
        },
    ]
    
    category_text = f" (カテゴリ: {category})" if category else ""
    
    return {
        "query": query,
        "category": category,
        "total_results": len(results),
        "results": results,
        "summary": f"「{query}{category_text}に関して{len(results)}件の結果が見つかりました。検索には{wait_time}秒かかりました。"
    }

2. セッション設定とInstruction設計

Realtime APIのセッション初期化時に、ツール定義と共にInstructions(システムプロンプト)を設定します。
ここで重要なのは、非同期Function Callingの動作を最大限活かすためのInstructions設計です。

await connection.session.update(
    session={
        "modalities": ["text", "audio"],
        "instructions": (
            "Always respond in Japanese. "
            "ユーザーとは日本語で会話してください。\n\n"
            
            # ツールの使用方法を明示
            "天気に関する質問があれば、get_weatherツールを使用して正確な情報を提供してください。"
            "データベース検索の依頼があれば、search_databaseツールを使用してください。\n\n"
            
            # ★ 非同期実行の動作を促す重要な指示 ★
            "重要: search_databaseツールは時間がかかる処理です。"
            "ツールの実行中でも、ユーザーとの会話は継続してください。"
            "例えば「検索を開始しました。結果が出るまで少し時間がかかりますが、"
            "他に何かお手伝いできることはありますか?」のように応答できます。"
        ),
        "tools": [
            # ... ツール定義
        ],
        "tool_choice": "auto",
    }
)

Realtime公式ガイドで推奨されているTool Calling設定

Realtime APIのFunction Callingでは、OpenAIの公式ガイドでも、ツール呼び出しをより信頼性高く・自然に行うための詳細なプロンプト設計が推奨されています。

今回のアプリではすべてを実装しているわけではありませんが、ガイドで特に重要なポイントであるツールの説明の明示、非同期実行中も会話を継続する指示の部分を中心に取り入れています。今回はそれに加えて独自に実行時間の記載も追加しました。

OpenAIの公式ガイドでは、gpt-realtimeモデルの高い指示追従性を前提として、以下のようなポイントが強調されています。

ポイント1. Toolsセクションの明示的な設計

ツール情報はプロンプト内に専用セクション(# Tools など)を設け、

  • どのツールをいつ使うか
  • 引数の取り方
  • ツール呼び出し中に話す内容(前文/preamble)
  • エラー時の挙動

などを明確に記述します。

ポイント2. ツール選択とプロンプトの整合性

プロンプトに記述したツールの説明と、実際にセッションへ渡すツールリストが一致していることが重要です。

ポイント3. ツール呼び出し時中の待ち時間を自然な会話でカバーする (レイテンシマスキング)

ツール実行中に「少々お待ちください」などの短いセリフをツール呼び出しと同時に出力するよう指示することで、ユーザーに待ち時間を感じさせず、自然な対話体験を実現できます。こうしたツール呼び出し前の短い発話を preamble(プリアンブル) と呼びます。

ポイント4. ツール呼び出しのルール明示

複数ツールがある場合、どのタイミングで・どの条件で呼ぶか/呼ばないかを明示することで、誤った呼び出しを防ぎ、精度と体験を向上させます。

ポイント5. 高度な設定:ツールの分類

ツールを PROACTIVE / CONFIRMATION FIRST / PREAMBLES などに分類し、それぞれの動作方針を記述することも可能です。
ガイドには、一般的なルールとして、不必要な確認ループを避けるために「ツールを呼び出す際は、ユーザーの確認を求めず、積極的(Proactive)に行動する」という指示を追加するのがよいと述べられています。また、もしモデルがツール呼び出しを急ぎすぎる場合は、「proactive」のような強い言葉を和らげることで調整できるというヒントも紹介されています。

分類 ふるまいの特徴 典型的なシナリオ モデルへの指示例
PROACTIVE 確認なしですぐにツールを呼び出す 天気や株価など、即時に取得できる情報 「このツールを呼び出すときは、ユーザーへの確認を行わず、すぐに呼び出してください。」
CONFIRMATION FIRST 呼び出し前にユーザーに確認を取る 返金やアカウント変更など、重要な操作 「このツールを呼び出す前に、ユーザーの意図を明確に確認してください。」
PREAMBLES 呼び出し前に短い一言を発話してからツールを実行 時間がかかる検索・API呼び出しなど 「このツールを呼び出す前に、『確認しています』などの短い一言を発話してから呼び出してください。」

ポイント6. 一般的なツール(Common Tools)の活用推奨

ガイドでは、特定の一般的なツール(例:answer(question: string)、escalate_to_human()、finish_session())の名前、署名、説明を参考に、類似の振る舞いを求めるユースケースではそれらを近しい形で定義することで、モデルの信頼性を最大化できると推奨されています。

この他にも、ツールの失敗/エラー処理の明示、さらに高度なパターンとして「Responder-Thinker Architecture」と「Rephrase Supervisor Tool」などもガイドされています。

3. ツール定義の詳細

ツール定義では、descriptionに実行時間の情報を含めるという工夫をしました。

"tools": [
    {
        "type": "function",
        "name": "get_weather",
        "description": "指定された地域の天気予報を取得します。ユーザーが天気について質問したら、このツールを使用してください。",
        "parameters": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "天気を調べる地域名(例: 東京、大阪、札幌)"
                },
                "date": {
                    "type": "string",
                    "description": "天気を調べる日付(例: 今日、明日、2025-10-05)。指定がない場合は今日の天気を返します。",
                }
            },
            "required": ["location"]
        }
    },
    {
        "type": "function",
        "name": "search_database",
        "description": (
            "データベースから情報を検索します。この処理には5-8秒程度かかります。"
            "ユーザーが何かを調べてほしいと言ったら、このツールを使用してください。"
            "例: 「AIについて調べて」「機械学習の情報を検索して」など"
        ),
        "parameters": {
            "type": "object",
            "properties": {
                "query": {
                    "type": "string",
                    "description": "検索クエリ(例: 人工知能、機械学習、データサイエンス)"
                },
                "category": {
                    "type": "string",
                    "description": "検索カテゴリ(オプション。例: 技術、ビジネス、研究)",
                }
            },
            "required": ["query"]
        }
    }
],
"tool_choice": "auto",

descriptionに実行時間を明記することで、モデルは

  • ツール呼び出し前にユーザーに「時間がかかります」と伝える
  • 実行中に他の質問を促す
  • 適切なタイミングで結果を報告する

といった判断をより正確に行えるようになります。

4. Function Callイベントの処理

Realtime APIからは、Function Call関連のイベントが送信されるので、それらをハンドリングする処理を実装します。

async def _receive_events_loop(connection: AsyncRealtimeConnection):
    """Azure Realtime APIからのイベントを受信・処理"""
    async for event in connection:
        event_type = getattr(event, "type", "")
        
        # Function Call引数の差分を受信
        elif event_type == "response.function_call_arguments.delta":
            await _handle_function_call_arguments_delta(event)
        
        # Function Call引数の受信完了
        elif event_type == "response.function_call_arguments.done":
            await _handle_function_call_arguments_done(event)

5. 非同期実行の実装(重要!)

次に、ツールを呼び出す実装をします。
ここが非同期Function Callingの最も重要なポイントです。

通常のツール呼び出しの場合は以下のように実装します。

❌ ブロッキングする実装(NGパターン)

async def _handle_function_call_arguments_done(event):
    # 同期的に実行すると、20秒間イベントループがブロックされる
    result = await _execute_function_call(name, arguments)  # ← ここで20秒待つ
    await connection.conversation.item.create(...)
    await connection.response.create()

しかしこの実装では、search_databaseが20秒かかる間、他のイベント(例: 天気の質問)を処理できません。

✅ 非ブロッキングの実装(OKパターン)

async def _handle_function_call_arguments_done(event):
    """Function Call引数の受信完了を処理"""
    call_id = getattr(event, "call_id", None)
    name = getattr(event, "name", None)
    arguments = getattr(event, "arguments", "")
    
    # 非同期でFunction Callを実行(ブロッキングしない)
    asyncio.create_task(
        _execute_function_call_async(connection, call_id, name, arguments)
    )
    
    # すぐにreturnして、他のイベントを処理できるようにする

async def _execute_function_call_async(
    connection: AsyncRealtimeConnection,
    call_id: str,
    name: str,
    arguments: str
):
    """Function Callを非同期で実行(ブロッキングしない)"""
    
    # ChainlitのStepを開始
    step = cl.Step(name=f"🔧 {name}", type="tool")
    await step.send()
    
    try:
        # 引数を表示
        args_dict = json.loads(arguments)
        args_display = "\n".join([f"- **{k}**: {v}" for k, v in args_dict.items()])
        step.input = f"**引数:**\n{args_display}"
        await step.update()
        
        # Function Callを実行(ここで20秒かかるが、他のタスクはブロックされない)
        result = await _execute_function_call(name, arguments)
        
        # 結果を表示
        result_dict = json.loads(result)
        if name == "search_database" and "summary" in result_dict:
            step.output = f"✅ {result_dict['summary']}\n\n**検索結果:**"
            for idx, item in enumerate(result_dict.get("results", []), 1):
                step.output += f"\n{idx}. **{item['title']}** (関連度: {item['relevance']})"
        
        await step.update()
        
        # Function Call結果を会話に追加
        await connection.conversation.item.create(
            item={
                "type": "function_call_output",
                "call_id": call_id,
                "output": result,
            }
        )
        
        # 応答生成フラグをチェック
        # すでに生成中でない場合のみ新しい応答を生成
        if not cl.user_session.get(KEY_IS_GENERATING):
            cl.user_session.set(KEY_IS_GENERATING, True)
            await connection.response.create(
                response={
                    "modalities": ["text", "audio"],
                    "instructions": (
                        "Function callの結果を使って、ユーザーに分かりやすく回答してください。"
                    ),
                }
            )
        
    except Exception as e:
        step.output = f"❌ エラー: {str(e)}"
        step.is_error = True
        await step.update()

ポイント

  1. asyncio.create_task()を使用: 関数を非同期タスクとして実行し、すぐにreturnする
  2. イベントループをブロックしない: 長時間実行中も、他のイベント(音声入力、別のツール呼び出し)を処理できる
  3. 独立したタスクとして実行: 各Function Callは独立したタスクとして並行実行される

6. ツール実行状況の可視化

ChainlitのStep機能を使うと、ツールの実行過程がより視覚的に分かりやすくなります。
ツール実行の可視化部分も非同期対応で実装するのが重要なポイントです。
async with cl.Stepを使用すると完了するまで次の処理が進まないため、非同期Function Callingの利点を十分に視覚化できないため、Stepも非同期で実行するようにします。
async withの代わりに、明示的にsend()update()を呼び出せば、処理がブロッキングされません。

step = cl.Step(name=f"🔧 {name}", type="tool")
await step.send()  # 即座に表示

# 引数を設定
step.input = "引数の内容"
await step.update()

# 実行後、結果を設定
step.output = "実行結果"
await step.update()

UIでは以下のように表示されます:

Instructions設計のベストプラクティス

非同期Function Callingをうまく稼働させるには、Instructionsで「モデルにどう指示するか」が重要ということがわかりました。
コツは以下の通りです。

1. 階層的な説明

instructions = (
    # レベル1: 基本的な動作
    "ユーザーとは日本語で会話してください。\n\n"
    
    # レベル2: ツールの使用方法
    "天気に関する質問 → get_weatherツール\n"
    "データベース検索 → search_databaseツール\n\n"
    
    # レベル3: 特殊な動作(非同期)
    "重要: search_databaseは時間がかかります。\n"
    "実行中も会話を継続してください。\n\n"
    
    # レベル4: 具体例
    "例: 「検索を開始しました。他にご質問は?」"
)

2. 肯定的な表現

# ✅ 良い例: 何をすべきか明確
"ツールの実行中でも、ユーザーとの会話は継続してください。"

# ❌ 悪い例: 否定形は曖昧
"ツールの実行中に黙らないでください。"

3. 具体的な応答例

# ✅ 良い例: 具体的な応答パターンを示す
"例えば「検索を開始しました。結果が出るまで少し時間がかかりますが、
他に何かお手伝いできることはありますか?」のように応答できます。"

# ❌ 悪い例: 抽象的
"適切に応答してください。"

まとめ

OpenAI Realtime API (GA版) の非同期Function Calling機能により以下の機能が実装できました。

  • ✅ 長時間実行ツールでも会話が中断されない
  • ✅ 複数のツールを並行して実行できる
  • ✅ ユーザー体験が大幅に向上

特に、外部API呼び出し、データ分析など、時間のかかる処理を伴うチャットボットでは、この機能が非常に有効です。

MCPでも似たような非同期実行機能やさらに高度な機能があるようなので、今後はMCPを試してみたいと思います。

参考リンク


DXC Lab

Discussion