🧑‍✈️

A2Aを用いたエージェント間連携の実装

に公開

こんにちは。インフォメーション・ディベロプメント、AIスマートソリューション部の松尾です。このたび、弊社のコンサルティング事業本部にて、テックブログを運用することになりました。初回は、A2A (Agent2Agent Protocol) の概要と、Google Cloud上での実装について解説します。なお、A2Aと相互補完の関係にあるMCP (Model Context Protocol) につきましては、以前、弊社公式サイトのコラムにて、解説させていただきました。こちらも合わせて参照いただけますと幸いです。

1. A2Aとは?

A2Aとは、AIエージェント同士が連携してタスクを実行するための共通プロトコルとなります。Googleによって、2025年4月に発表されました。

https://cloud.google.com/blog/ja/products/ai-machine-learning/a2a-a-new-era-of-agent-interoperability?hl=ja

2025年6月には、A2AをLinux Foundationに寄贈することが発表されており、今後はLinux Foundation配下のAgent2Agentプロジェクトとして活動するようです。

https://developers.googleblog.com/ja/google-cloud-donates-a2a-to-linux-foundation/

Agent2Agentプロジェクトは、Amazon Web Services、Cisco、Google、Microsoft、Salesforce、SAP、ServiceNowの7社にて設立されております。このことからも今後はマルチクラウドやSaaS間でのエージェント連携が身近になってくるのかもしれませんね。

2. A2Aの実装

それでは、A2Aを用いたエージェントを実装してみましょう。今回は何かの資料を作成するエージェントを実装してみます。Google Cloud上に独立した2つのプロジェクトとして実作業を担当する資料作成エージェントと市場調査エージェントを作成します(実際には、これがAWSとAzureなどのマルチクラウド間やSaaS間の接続になると想定しています)。エージェントはCloud Run上で実行するようにしています。そしてこれらのエージェントに指示するオーケストレーションエージェントをオンプレミス環境に構築します。なお、Cloud Runのデプロイそのものについては解説しておりませんので、ご了承ください。

Google Cloudに実装したA2A

エージェントの実装が異なってもA2Aは機能するということを確認するために、各エージェントは以下の異なるフレームワークを用いて実装しています。

  • オーケストレーションエージェント: ADK (Agent Development Kit)
  • 資料作成エージェント、市場調査エージェント: LangGraph

また、A2A部分の実装そのものは、以下情報を参考にしています。

Agent2Agentプロジェクトが公開しているクイックスタートのNotebook↓
https://github.com/a2aproject/a2a-samples/blob/main/notebooks/a2a_quickstart.ipynb

CODELAB (エージェント間(A2A)プロトコルの概要: Cloud Run と Agent Engine での Gemini を使用した購買コンシェルジュとリモート販売エージェントのやり取り)↓
https://codelabs.developers.google.com/intro-a2a-purchasing-concierge

2.1. 資料作成エージェント / 市場調査エージェント

資料作成エージェントと市場調査エージェントはLangGraphで実装しています。2つのエージェントにロジックの違いはありませんので(システムプロンプトやクラス名等が違うのみ)、資料作成エージェントを用いて説明します。資料作成エージェントのクラス構成は以下の通りとなります。

document_agent.py
├── DocumentCreatorAgent         # 資料作成エージェントのコア部分 (LangGraph)
├── DocumentAgentExecutor        # A2AのAgentExecutorインターフェースの実装
└── create_document_agent_app()  # A2Aエージェントのアプリとして実行するための実装

必要なライブラリと環境変数の定義は以下の通りとなります。

import os

from langchain_core.messages import HumanMessage
from langchain_google_vertexai import ChatVertexAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill, Task, TaskState, TaskStatus
from a2a.utils import new_task
from a2a.utils.artifact import new_text_artifact

# 環境設定
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
os.environ["GOOGLE_CLOUD_LOCATION"] = "asia-northeast1"

2.1.1. DocumentCreatorAgent

エージェントのコア部分となるクラスとなります。__init__()で使用するモデルやシステムプロンプトを指定します。その中で、LangGraphのcreate_react_agentを用いてエージェントを作成します。本来はここでWeb検索等の使用するツールを指定するのですが、今回はツールの実装は割愛しています(実際は、市場調査エージェントがWeb検索ツールを呼び出せるようにする必要があります)。

class DocumentCreatorAgent:
    """LangGraphベースの資料作成エージェント"""

    def __init__(self):
        # ChatVertexAIモデルの初期化
        self.llm = ChatVertexAI(
            model_name="gemini-2.5-pro",
            temperature=0.3,
            max_output_tokens=8192,
        )

        # システムプロンプト
        self.system_prompt = """
        あなたは資料作成の専門エージェントです。以下の機能を提供します:

        1. **資料構成作成**: テーマに基づいて資料の構成を設計
        2. **コンテンツ生成**: 各セクションの詳細な内容を生成
        3. **フォーマット調整**: PowerPointやWord形式に適した構造化

        入力された情報を元に、以下の形式で資料を作成してください:
        
        ## 資料タイトル
        ### 目次
        ### 各セクションの詳細内容
        
        市場データが提供された場合は、それを効果的に資料に組み込んでください。
        図表やグラフの提案も含めてください。
        
        常に高品質で構造化された資料を作成し、ユーザーの要求に応じてカスタマイズしてください。
        """

        # LangGraphエージェントの作成(ツールなしのシンプル版)
        self.agent = create_react_agent(self.llm, tools=[], prompt=self.system_prompt)  # 現在はツールなし

        # メモリセーバー
        self.memory = MemorySaver()

DocumentCreatorAgentには、エージェントを呼び出してレスポンスを受け取るメソッド(invoke())も実装しています。

    def invoke(self, query: str, context_id: str) -> str:
        """エージェントを実行"""
        try:
            # 設定
            config = {"configurable": {"thread_id": context_id}}

            # メッセージを送信
            response = self.agent.invoke({"messages": [HumanMessage(content=query)]}, config=config)

            # レスポンスから最後のメッセージを取得
            if response and "messages" in response:
                messages = response["messages"]
                if messages:
                    # 最後のメッセージからコンテンツを取得
                    last_message = messages[-1]
                    if hasattr(last_message, "content"):
                        return last_message.content
                    else:
                        return str(last_message)

            return "資料作成が完了しました。"

        except Exception as e:
            return f"エラーが発生しました: {str(e)}"

DocumentCreatorAgentのインスタンスを作成します。

# エージェントインスタンスの作成
document_agent = DocumentCreatorAgent()

2.1.2. DocumentAgentExecutor

A2AフレームワークのAgentExecutorインターフェースを実装するクラスです。self.agentDocumentCreatorAgentのインスタンスを格納して、executeで実行しています。実行結果は、A2Aのライブラリで提供されている各種クラスに格納することになります。具体的には、Artifactに実行結果を格納して、Taskを用いてオーケストレーションエージェントと情報のやり取りをします。

class DocumentAgentExecutor(AgentExecutor):
    def __init__(self):
        self.agent = document_agent

    async def cancel(self, task_id: str) -> None:
        """タスクの実行をキャンセル"""
        pass

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        """LangGraphエージェントでリクエストを処理"""
        query = context.get_user_input()
        task = context.current_task or new_task(context.message)

        try:
            # LangGraphエージェントを実行
            result = self.agent.invoke(query, str(task.id))

            # 結果をArtifactに変換
            artifact = new_text_artifact(
                name="Document Creation Result",
                text=result,
                description="Document creation and content generation result",
            )

            # 完了タスクとしてキューに追加
            completed_task = Task(
                id=task.id,
                contextId=task.contextId if hasattr(task, "contextId") else getattr(task, "context_id", str(task.id)),
                status=TaskStatus(state=TaskState.completed),
                artifacts=[artifact],
            )

            await event_queue.enqueue_event(completed_task)

        except Exception as e:
            print(f"Document Agent Error: {e}")
            # エラータスクとしてキューに追加
            error_artifact = new_text_artifact(
                name="Error",
                text=f"エラーが発生しました: {str(e)}",
                description="Error that occurred during document creation",
            )

            error_task = Task(
                id=task.id,
                contextId=task.contextId if hasattr(task, "contextId") else getattr(task, "context_id", str(task.id)),
                status=TaskStatus(state=TaskState.failed),
                artifacts=[error_artifact],
            )

            await event_queue.enqueue_event(error_task)

2.1.3. create_document_agent_app()

エージェントをA2A準拠のHTTPサーバーとして実行するための実装です。A2AStarletteApplicationクラスのインスタンスを生成します。これにより、WebフレームワークにStarletを用いて、Uvicornなどで実行できるようになります。その際にAgentCardを指定する必要があります。これはエージェントの名刺のようなもので、そのエージェントがどのようなことを実行できるかを記述したものとなります。オーケストレーションエージェントは各エージェントのAgentCardの内容を確認して、タスクを割り振りします。AgentCardはHTTPサーバーの/.well-known/agent.jsonとして公開されます。

def create_document_agent_app():
    capabilities = AgentCapabilities(streaming=True)
    # AgentCardのurlはCloud RunのサービスURLとなるため、Cloud Runデプロイ後に環境変数を設定
    agent_host_url = os.getenv("HOST_OVERRIDE")

    agent_card = AgentCard(
        name="Document Creator Agent",
        description="資料作成とコンテンツ生成を行うエージェント",
        url=agent_host_url,
        version="1.0.0",
        defaultInputModes=["text", "text/plain"],
        defaultOutputModes=["text", "text/plain"],
        capabilities=capabilities,
        skills=[
            AgentSkill(
                id="create_document",
                name="資料作成",
                description="テーマに基づいて構造化された資料を作成",
                tags=["document", "presentation", "content"],
                examples=[
                    "AIの市場動向について資料を作成して",
                    "新製品のプレゼン資料を作成",
                    "競合分析レポートを作成",
                ],
            )
        ],
    )

    executor = DocumentAgentExecutor()
    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    return A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)


if __name__ == "__main__":
    import uvicorn

    app = create_document_agent_app()
    uvicorn.run(app.build(), host="0.0.0.0", port=8080)

なお、AgentCardurlには、HTTP(s)のエンドポイントURLを指定するのですが、Cloud Runにデプロイした後でないとURLが分からないため、Cloud Runに設定した環境変数(HOST_OVERRIDE)より取得するようにしています。この環境変数は、Cloud Runデプロイ後に手動で設定しています。

Cloud Runの環境変数

資料作成エージェントと市場調査エージェントのコード全体は以下の通りです。

document_agent.py (資料作成エージェント)
import os

from langchain_core.messages import HumanMessage
from langchain_google_vertexai import ChatVertexAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill, Task, TaskState, TaskStatus
from a2a.utils import new_task
from a2a.utils.artifact import new_text_artifact

# 環境設定
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
os.environ["GOOGLE_CLOUD_LOCATION"] = "asia-northeast1"


class DocumentCreatorAgent:
    """LangGraphベースの資料作成エージェント"""

    def __init__(self):
        # ChatVertexAIモデルの初期化
        self.llm = ChatVertexAI(
            model_name="gemini-2.5-pro",
            temperature=0.3,
            max_output_tokens=8192,
        )

        # システムプロンプト
        self.system_prompt = """
        あなたは資料作成の専門エージェントです。以下の機能を提供します:

        1. **資料構成作成**: テーマに基づいて資料の構成を設計
        2. **コンテンツ生成**: 各セクションの詳細な内容を生成
        3. **フォーマット調整**: PowerPointやWord形式に適した構造化

        入力された情報を元に、以下の形式で資料を作成してください:
        
        ## 資料タイトル
        ### 目次
        ### 各セクションの詳細内容
        
        市場データが提供された場合は、それを効果的に資料に組み込んでください。
        図表やグラフの提案も含めてください。
        
        常に高品質で構造化された資料を作成し、ユーザーの要求に応じてカスタマイズしてください。
        """

        # LangGraphエージェントの作成(ツールなしのシンプル版)
        self.agent = create_react_agent(self.llm, tools=[], prompt=self.system_prompt)  # 現在はツールなし

        # メモリセーバー
        self.memory = MemorySaver()

    def invoke(self, query: str, context_id: str) -> str:
        """エージェントを実行"""
        try:
            # 設定
            config = {"configurable": {"thread_id": context_id}}

            # メッセージを送信
            response = self.agent.invoke({"messages": [HumanMessage(content=query)]}, config=config)

            # レスポンスから最後のメッセージを取得
            if response and "messages" in response:
                messages = response["messages"]
                if messages:
                    # 最後のメッセージからコンテンツを取得
                    last_message = messages[-1]
                    if hasattr(last_message, "content"):
                        return last_message.content
                    else:
                        return str(last_message)

            return "資料作成が完了しました。"

        except Exception as e:
            return f"エラーが発生しました: {str(e)}"


# エージェントインスタンスの作成
document_agent = DocumentCreatorAgent()


class DocumentAgentExecutor(AgentExecutor):
    def __init__(self):
        self.agent = document_agent

    async def cancel(self, task_id: str) -> None:
        """タスクの実行をキャンセル"""
        pass

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        """LangGraphエージェントでリクエストを処理"""
        query = context.get_user_input()
        task = context.current_task or new_task(context.message)

        try:
            # LangGraphエージェントを実行
            result = self.agent.invoke(query, str(task.id))

            # 結果をArtifactに変換
            artifact = new_text_artifact(
                name="Document Creation Result",
                text=result,
                description="Document creation and content generation result",
            )

            # 完了タスクとしてキューに追加
            completed_task = Task(
                id=task.id,
                contextId=task.contextId if hasattr(task, "contextId") else getattr(task, "context_id", str(task.id)),
                status=TaskStatus(state=TaskState.completed),
                artifacts=[artifact],
            )

            await event_queue.enqueue_event(completed_task)

        except Exception as e:
            print(f"Document Agent Error: {e}")
            # エラータスクとしてキューに追加
            error_artifact = new_text_artifact(
                name="Error",
                text=f"エラーが発生しました: {str(e)}",
                description="Error that occurred during document creation",
            )

            error_task = Task(
                id=task.id,
                contextId=task.contextId if hasattr(task, "contextId") else getattr(task, "context_id", str(task.id)),
                status=TaskStatus(state=TaskState.failed),
                artifacts=[error_artifact],
            )

            await event_queue.enqueue_event(error_task)


def create_document_agent_app():
    capabilities = AgentCapabilities(streaming=True)
    # AgentCardのurlはCloud RunのサービスURLとなるため、Cloud Runデプロイ後に環境変数を設定
    agent_host_url = os.getenv("HOST_OVERRIDE")

    agent_card = AgentCard(
        name="Document Creator Agent",
        description="資料作成とコンテンツ生成を行うエージェント",
        url=agent_host_url,
        version="1.0.0",
        defaultInputModes=["text", "text/plain"],
        defaultOutputModes=["text", "text/plain"],
        capabilities=capabilities,
        skills=[
            AgentSkill(
                id="create_document",
                name="資料作成",
                description="テーマに基づいて構造化された資料を作成",
                tags=["document", "presentation", "content"],
                examples=[
                    "AIの市場動向について資料を作成して",
                    "新製品のプレゼン資料を作成",
                    "競合分析レポートを作成",
                ],
            )
        ],
    )

    executor = DocumentAgentExecutor()
    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    return A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)


if __name__ == "__main__":
    import uvicorn

    app = create_document_agent_app()
    uvicorn.run(app.build(), host="0.0.0.0", port=8080)

market_research_agent.py (市場調査エージェント)
import os

from langchain_core.messages import HumanMessage
from langchain_google_vertexai import ChatVertexAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.prebuilt import create_react_agent

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events import EventQueue
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.types import AgentCapabilities, AgentCard, AgentSkill, Task, TaskState, TaskStatus
from a2a.utils import new_task
from a2a.utils.artifact import new_text_artifact

# 環境設定
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
os.environ["GOOGLE_CLOUD_LOCATION"] = "asia-northeast1"


class MarketResearchAgent:
    """LangGraphベースの市場調査エージェント"""

    def __init__(self):
        # ChatVertexAIモデルの初期化
        self.llm = ChatVertexAI(
            model_name="gemini-2.5-pro",
            temperature=0.3,
            max_output_tokens=8192,
        )

        # システムプロンプト
        self.system_prompt = """
        あなたは市場調査の専門エージェントです。以下の機能を提供します:

        1. **市場動向分析**: 指定された業界の最新トレンドを調査
        2. **競合分析**: 主要な競合他社の動向と戦略を分析
        3. **定量データ収集**: 市場規模、成長率、シェアなどの数値データを収集
        4. **将来予測**: データに基づいた市場の将来予測を提供

        回答は以下の形式で構造化してください:
        
        ## 市場概要
        ## 主要トレンド
        ## 競合状況
        ## 定量データ
        ## 将来予測
        
        可能な限り具体的な数値とソースを含めてください。
        専門的で詳細な分析を提供し、ビジネス意思決定に役立つ洞察を含めてください。
        """

        # LangGraphエージェントの作成(ツールなしのシンプル版)
        self.agent = create_react_agent(self.llm, tools=[], prompt=self.system_prompt)  # 現在はツールなし

        # メモリセーバー
        self.memory = MemorySaver()

    def invoke(self, query: str, context_id: str) -> str:
        """エージェントを実行"""
        try:
            # 設定
            config = {"configurable": {"thread_id": context_id}}

            # メッセージを送信
            response = self.agent.invoke({"messages": [HumanMessage(content=query)]}, config=config)

            # レスポンスから最後のメッセージを取得
            if response and "messages" in response:
                messages = response["messages"]
                if messages:
                    # 最後のメッセージからコンテンツを取得
                    last_message = messages[-1]
                    if hasattr(last_message, "content"):
                        return last_message.content
                    else:
                        return str(last_message)

            return "市場調査が完了しました。"

        except Exception as e:
            return f"エラーが発生しました: {str(e)}"


# エージェントインスタンスの作成
market_research_agent = MarketResearchAgent()


class MarketResearchAgentExecutor(AgentExecutor):
    def __init__(self):
        self.agent = market_research_agent

    async def cancel(self, task_id: str) -> None:
        """タスクの実行をキャンセル"""
        pass

    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        """LangGraphエージェントでリクエストを処理"""
        query = context.get_user_input()
        task = context.current_task or new_task(context.message)

        try:
            # LangGraphエージェントを実行
            result = self.agent.invoke(query, str(task.id))

            # 結果をArtifactに変換
            artifact = new_text_artifact(
                name="Market Research Result", text=result, description="Market research analysis result"
            )

            # 完了タスクとしてキューに追加
            completed_task = Task(
                id=task.id,
                contextId=task.contextId if hasattr(task, "contextId") else getattr(task, "context_id", str(task.id)),
                status=TaskStatus(state=TaskState.completed),
                artifacts=[artifact],
            )

            await event_queue.enqueue_event(completed_task)

        except Exception as e:
            print(f"Market Research Agent Error: {e}")
            # エラータスクとしてキューに追加
            error_artifact = new_text_artifact(
                name="Error",
                text=f"エラーが発生しました: {str(e)}",
                description="Error that occurred during market research",
            )

            error_task = Task(
                id=task.id,
                contextId=task.contextId if hasattr(task, "contextId") else getattr(task, "context_id", str(task.id)),
                status=TaskStatus(state=TaskState.failed),
                artifacts=[error_artifact],
            )

            await event_queue.enqueue_event(error_task)


def create_market_research_agent_app():
    capabilities = AgentCapabilities(streaming=True)
    # AgentCardのurlはCloud RunのサービスURLとなるため、Cloud Runデプロイ後に環境変数を設定
    agent_host_url = os.getenv("HOST_OVERRIDE")

    agent_card = AgentCard(
        name="Market Research Agent",
        description="市場調査とトレンド分析を行うエージェント",
        url=agent_host_url,
        version="1.0.0",
        defaultInputModes=["text", "text/plain"],
        defaultOutputModes=["text", "text/plain"],
        capabilities=capabilities,
        skills=[
            AgentSkill(
                id="market_research",
                name="市場調査",
                description="指定された業界の市場動向と競合分析を実施",
                tags=["market", "research", "analysis", "trends"],
                examples=[
                    "AI業界の市場動向を調査して",
                    "クラウドサービス市場の競合分析",
                    "フィンテック市場の成長予測",
                ],
            )
        ],
    )

    executor = MarketResearchAgentExecutor()
    request_handler = DefaultRequestHandler(
        agent_executor=executor,
        task_store=InMemoryTaskStore(),
    )

    return A2AStarletteApplication(agent_card=agent_card, http_handler=request_handler)


if __name__ == "__main__":
    import uvicorn

    app = create_market_research_agent_app()
    uvicorn.run(app.build(), host="0.0.0.0", port=8080)

2.2. オーケストレーションエージェント

エージェントはADK(Agent Development Kit)で実装しています。クラス構成は以下の通りです。

orchestration_agent.py
├── _NoFunctionCallWarning           # ログフィルタークラス
├── A2AToolClient                    # A2A通信クライアント
├── orchestration_agent              # ADKのAgentクラス
└── run_orchestration_demo()         # デモ実行関数

必要なライブラリと環境変数の定義は以下の通りとなります。

import asyncio
import json
import logging
import os
import uuid
from urllib.parse import urlparse

import httpx
import nest_asyncio
import requests
from dotenv import load_dotenv
from google.adk.agents import Agent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.auth.transport.requests import Request
from google.genai import types
from google.oauth2 import service_account

from a2a.client import A2AClient
from a2a.types import AgentCard, MessageSendParams, SendMessageRequest

# Set Google Cloud Configuration
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
os.environ["GOOGLE_CLOUD_PROJECT"] = "your-project"  # Replace with your project
os.environ["GOOGLE_CLOUD_LOCATION"] = "global"  # Replace with your location
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./credentials/vertex-ai-creds.json" # Replace with your credential

load_dotenv()

2.2.1. _NoFunctionCallWarning

影響がない警告を抑制するための定義となります。filter()の実行結果が抑制結果(True: 警告出力する, False: 警告出力しない)となります。

# Google GenAIの関数呼び出し警告を抑制するフィルター
class _NoFunctionCallWarning(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        message = record.getMessage()
        if "there are non-text parts in the response:" in message:
            return False
        else:
            return True


# 警告フィルターを適用
logging.getLogger("google_genai.types").addFilter(_NoFunctionCallWarning())

2.2.2. A2AToolClient

A2Aでリモートエージェントと通信するためのクライアントとなります。リモートエージェントはCloud Runで実行するのですが、そのままだと誰でも呼び出せてしまうので、専用のサービスアカウントを作成して認証するようにしています。

class A2AToolClient:
    """A2Aクライアント実装"""

    def __init__(self, default_timeout: float = 120.0, agent_credentials_mapping: dict = None):
        self._agent_info_cache = {}
        self.default_timeout = default_timeout
        # エージェントURL -> 認証ファイルパスのマッピング
        self.agent_credentials_mapping = agent_credentials_mapping or {}

    def _get_id_token(self, url: str) -> dict:
        """指定されたURLに対するIDトークンを含むヘッダーを取得"""
        try:
            # Cloud Run のサービスURLからaudience を抽出
            parsed_url = urlparse(url)
            audience = f"{parsed_url.scheme}://{parsed_url.netloc}"
            base_url = audience  # マッピング用のベースURL

            # 1. 特定のエージェント用サービスアカウントキーを使用
            if base_url in self.agent_credentials_mapping:
                agent_creds_path = self.agent_credentials_mapping[base_url]
                if os.path.exists(agent_creds_path):
                    print(f"🔑 専用サービスアカウントを使用 ({base_url}): {agent_creds_path}")
                    credentials = service_account.IDTokenCredentials.from_service_account_file(
                        agent_creds_path, target_audience=audience
                    )
                    # IDトークンを生成
                    credentials.refresh(Request())
                    id_token = credentials.token

                    headers = {"Authorization": f"Bearer {id_token}", "Content-Type": "application/json"}
                    print(f"✅ 認証トークン(専用SA)を正常に取得: {audience}")
                    return headers
                else:
                    print(f"⚠️ 専用認証ファイルが見つかりません: {agent_creds_path}")

        except Exception as e:
            print(f"⚠️ サービスアカウント認証に失敗しました ({url}): {e}")
            print(f"⚠️ 詳細エラー: {type(e).__name__}")

        # 認証なしで試行
        print(f"⚠️ 認証なしでアクセスを試行: {url}")
        return {"Content-Type": "application/json"}

リモートエージェントをオーケストレーションエージェントに登録するメソッドです。

    def add_remote_agent(self, agent_url: str):
        """リモートエージェントを追加"""
        normalized_url = agent_url + "/"
        if normalized_url not in self._agent_info_cache:
            self._agent_info_cache[normalized_url] = None

リモートエージェントの一覧表示メソッドです。リモートエージェントのエージェントカードを取得しています。

    def list_remote_agents(self) -> dict:
        """利用可能なリモートエージェントの一覧を取得"""
        if not self._agent_info_cache:
            return {}

        remote_agents_info = {}
        for remote_connection in self._agent_info_cache:
            # キャッシュされたデータを使用
            if self._agent_info_cache[remote_connection] is not None:
                remote_agents_info[remote_connection] = self._agent_info_cache[remote_connection]
            else:
                try:
                    # 認証ヘッダーを取得
                    headers = self._get_id_token(remote_connection)

                    # エージェント情報を取得してキャッシュ
                    agent_info = requests.get(f"{remote_connection}/.well-known/agent.json", headers=headers)
                    agent_data = agent_info.json()
                    self._agent_info_cache[remote_connection] = agent_data
                    remote_agents_info[remote_connection] = agent_data
                except Exception as e:
                    print(f"エージェント情報の取得に失敗: {remote_connection}: {e}")

        return remote_agents_info

リモートエージェントにメッセージを送信するメソッドです。本来は非同期通信なのですが、同期通信にラップするメソッドを作成しています。_create_task_async()ではA2Aの機能を色々と使用していますが、ベースとしてはHTTPの処理を行っていることが分かるかと思います。

    def create_task(self, agent_url: str, message: str) -> str:
        """A2Aプロトコルでメッセージを送信(同期ラッパー)"""

        # Jupyter/既存イベントループ環境での実行を可能にする
        nest_asyncio.apply()

        try:
            # 既存のイベントループがある場合はそれを使用
            loop = asyncio.get_event_loop()
            if loop.is_running():
                import concurrent.futures

                with concurrent.futures.ThreadPoolExecutor() as executor:
                    future = executor.submit(asyncio.run, self._create_task_async(agent_url, message))
                    return future.result()
            else:
                return asyncio.run(self._create_task_async(agent_url, message))
        except Exception as e:
            print(f"同期ラッパーエラー ({agent_url}): {e}")
            return f"エラー: {str(e)}"

    async def _create_task_async(self, agent_url: str, message: str) -> str:
        """A2Aプロトコルでメッセージを送信(正しいSDKベース実装)"""

        # 認証ヘッダーを取得
        headers = self._get_id_token(agent_url)

        # タイムアウト設定
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout, connect=10.0, read=self.default_timeout, write=10.0, pool=5.0
        )

        try:
            async with httpx.AsyncClient(timeout=timeout_config, headers=headers) as httpx_client:
                # エージェントカードデータを確認
                if agent_url in self._agent_info_cache and self._agent_info_cache[agent_url] is not None:
                    agent_card_data = self._agent_info_cache[agent_url]
                else:
                    # エージェントカードを取得
                    agent_card_response = await httpx_client.get(f"{agent_url}/.well-known/agent.json")
                    agent_card_response.raise_for_status()
                    agent_card_data = agent_card_response.json()
                    # キャッシュに保存
                    self._agent_info_cache[agent_url] = agent_card_data

                # AgentCardを作成
                agent_card = AgentCard(**agent_card_data)

                # A2A SDKクライアントを作成
                client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)

                # メッセージパラメータを構築
                send_message_payload = {
                    "message": {
                        "role": "user",
                        "parts": [{"kind": "text", "text": message}],
                        "messageId": uuid.uuid4().hex,
                    }
                }

                # リクエストを作成
                request = SendMessageRequest(id=str(uuid.uuid4()), params=MessageSendParams(**send_message_payload))

                # A2A SDKでメッセージを送信
                response = await client.send_message(request)

                # レスポンスからテキストを抽出
                try:
                    response_dict = response.model_dump(mode="json", exclude_none=True)
                    if "result" in response_dict and "artifacts" in response_dict["result"]:
                        artifacts = response_dict["result"]["artifacts"]
                        for artifact in artifacts:
                            if "parts" in artifact:
                                for part in artifact["parts"]:
                                    if "text" in part:
                                        return part["text"]

                    # テキストが抽出できない場合は、フォーマット済みJSONを返す
                    return json.dumps(response_dict, indent=2, ensure_ascii=False)

                except Exception as e:
                    print(f"レスポンス解析エラー: {e}")
                    return str(response)

        except Exception as e:
            import traceback

            print(f"A2A通信エラー ({agent_url}): {e}")
            print(f"エラーの詳細: {traceback.format_exc()}")
            return f"エラー: {str(e)}"

リモートエージェントをオーケストレーションエージェントから削除するメソッドです。

    def remove_remote_agent(self, agent_url: str):
        """リモートエージェントを削除"""
        normalized_url = agent_url.rstrip("/")
        if normalized_url in self._agent_info_cache:
            del self._agent_info_cache[normalized_url]

A2AToolClientをインスタンス化する部分です。リモートエージェントを実行するCloud Runは、異なる2つのGoogle Cloudプロジェクトになりますので、サービスアカウントキーも別々となります。そのため、Cloud RunのURLと対応するサービスアカウントキーのマッピングを定義しています(agent_credentials_mapping)。

# A2Aクライアントの設定
# エージェントURL -> サービスアカウントキーファイルのマッピング
agent_credentials_mapping = {
    # Document Agent (プロジェクト1)
    "https://your-document-agent.asia-northeast1.run.app": "./credentials/document-agent-creds.json",
    # Market Research Agent (プロジェクト2)
    "https://your-market-research-agent.asia-northeast1.run.app": "./credentials/market-research-agent-creds.json",
}

# A2Aクライアントの設定
a2a_client = A2AToolClient(agent_credentials_mapping=agent_credentials_mapping)

# agent_credentials_mappingからリモートエージェントを動的に追加
for agent_url in agent_credentials_mapping.keys():
    a2a_client.add_remote_agent(agent_url)

2.2.3. orchestration_agent (ADKのAgentクラス)

オーケストレーションエージェントそのものはADKで実装しています。使用できるツールとして、A2AToolClientlist_remote_agentscreate_taskを登録しています。

# オーケストレーションエージェント
orchestration_agent = Agent(
    model="gemini-2.5-pro",
    name="document_orchestrator",
    instruction="""
    あなたは資料作成プロセスを統括するオーケストレーションエージェントです。
    ユーザーから資料作成の依頼を受けた場合、以下の手順で**自律的に**処理してください:

    1. **要件分析**: ユーザーの要求を分析し、必要な情報を特定
    2. **エージェント発見**: `list_remote_agents`を使用して利用可能なエージェントを確認
    3. **自動実行計画**: 要求に応じて適切なエージェントの実行順序を決定
    4. **市場調査**: 必要に応じて市場調査エージェントに`create_task`で調査を依頼
    5. **資料作成**: 収集した情報を基に資料作成エージェントに`create_task`で資料生成を依頼
    6. **統合・最適化**: 各エージェントの成果物を統合し、最終的な資料を完成

    **重要な実行方針:**
    - ユーザーに進捗確認や追加指示を求めることなく、自律的に全プロセスを完了してください
    - 各ステップで何を実行しているかを簡潔に報告してください
    - エージェント間の連携を効率的に行い、高品質な資料を作成してください
    - エラーが発生した場合は、代替手段を自動的に試行してください

    **実行例:**
    ユーザー: "AI市場について資料を作成して"
    → あなたは自動的に:
    1. 利用可能エージェントを確認
    2. 市場調査エージェントでAI市場を調査
    3. 調査結果を資料作成エージェントに渡して資料生成
    4. 統合された最終資料をユーザーに提供

    **出力形式:**
    各ステップの進捗を報告しつつ、最終的に完成した資料をユーザーに提示してください。
    """,
    tools=[a2a_client.list_remote_agents, a2a_client.create_task],
)

2.2.4. run_orchestration_demo()

マルチエージェントのデモを行う関数です。ADKではRunnerにオーケストレーションエージェントとなるエージェントを指定します。runner.run_async()部分にてオーケストレーションエージェントを実行しています。

async def run_orchestration_demo():
    """完全自律型のオーケストレーションデモ"""

    # ADK Runnerの初期化
    runner = Runner(
        app_name=orchestration_agent.name,
        agent=orchestration_agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    # セッション作成
    session = await runner.session_service.create_session(
        app_name=orchestration_agent.name,
        user_id="demo_user",
        state={},
        session_id="demo_session_001",
    )

    # ユーザーリクエスト(様々なパターンをテスト可能)
    user_requests = [
        "AI/機械学習市場について包括的な資料を作成してください。市場規模、主要プレイヤー、トレンド分析を含めてください。",
        # "クラウドコンピューティング市場の競合分析資料を作成して",
        # "フィンテック業界の最新動向をまとめたプレゼン資料が必要です",
    ]

    for i, user_request in enumerate(user_requests, 1):
        print(f"\n🎯 デモ {i}: {user_request}")
        print("=" * 80)

        # メッセージ作成
        content = types.Content(role="user", parts=[types.Part.from_text(text=user_request)])

        # オーケストレーションエージェント実行(完全自律)
        print("🤖 オーケストレーションエージェントが自律的に処理を開始...")

        response_parts = []
        async for event in runner.run_async(user_id="demo_user", session_id=session.id, new_message=content):
            # イベントの内容を詳細に処理
            if event.content and event.content.parts:
                for part in event.content.parts:
                    # テキスト部分の処理
                    if hasattr(part, "text") and part.text:
                        if event.is_final_response():
                            response_parts.append(part.text)
                        else:
                            print(part.text, end="", flush=True)

                    # 関数呼び出し部分の処理
                    elif hasattr(part, "function_call") and part.function_call:
                        func_name = getattr(part.function_call, "name", "unknown")
                        print(f"\n🔧 実行中: {func_name}")

        # 最終結果の表示
        final_response = "\n".join(response_parts)
        print("\n\n📋 最終結果:")
        print("=" * 80)
        print(final_response)
        print("=" * 80)


if __name__ == "__main__":
    # デモ実行
    print("🚀 自律型A2Aオーケストレーションデモを開始...")
    asyncio.run(run_orchestration_demo())

オーケストレーションエージェントのコード全体は以下の通りです。

orchestration_agent.py (オーケストレーションエージェント)
import asyncio
import json
import logging
import os
import uuid
from urllib.parse import urlparse

import httpx
import nest_asyncio
import requests
from dotenv import load_dotenv
from google.adk.agents import Agent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.auth.transport.requests import Request
from google.genai import types
from google.oauth2 import service_account

from a2a.client import A2AClient
from a2a.types import AgentCard, MessageSendParams, SendMessageRequest


# Google GenAIの関数呼び出し警告を抑制するフィルター
class _NoFunctionCallWarning(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        message = record.getMessage()
        if "there are non-text parts in the response:" in message:
            return False
        else:
            return True


# 警告フィルターを適用
logging.getLogger("google_genai.types").addFilter(_NoFunctionCallWarning())

# Set Google Cloud Configuration
os.environ["GOOGLE_GENAI_USE_VERTEXAI"] = "TRUE"
os.environ["GOOGLE_CLOUD_PROJECT"] = (
    "aif-llm01-dev"  # @param {type: "string", placeholder: "[your-project-id]", isTemplate: true}
)
os.environ["GOOGLE_CLOUD_LOCATION"] = "global"  # Replace with your location
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "./credentials/vertex-ai-creds.json"

load_dotenv()


class A2AToolClient:
    """A2Aクライアント実装"""

    def __init__(self, default_timeout: float = 120.0, agent_credentials_mapping: dict = None):
        self._agent_info_cache = {}
        self.default_timeout = default_timeout
        # エージェントURL -> 認証ファイルパスのマッピング
        self.agent_credentials_mapping = agent_credentials_mapping or {}

    def _get_id_token(self, url: str) -> dict:
        """指定されたURLに対するIDトークンを含むヘッダーを取得"""
        try:
            # Cloud Run のサービスURLからaudience を抽出
            parsed_url = urlparse(url)
            audience = f"{parsed_url.scheme}://{parsed_url.netloc}"
            base_url = audience  # マッピング用のベースURL

            # 1. 特定のエージェント用サービスアカウントキーを使用
            if base_url in self.agent_credentials_mapping:
                agent_creds_path = self.agent_credentials_mapping[base_url]
                if os.path.exists(agent_creds_path):
                    print(f"🔑 専用サービスアカウントを使用 ({base_url}): {agent_creds_path}")
                    credentials = service_account.IDTokenCredentials.from_service_account_file(
                        agent_creds_path, target_audience=audience
                    )
                    # IDトークンを生成
                    credentials.refresh(Request())
                    id_token = credentials.token

                    headers = {"Authorization": f"Bearer {id_token}", "Content-Type": "application/json"}
                    print(f"✅ 認証トークン(専用SA)を正常に取得: {audience}")
                    return headers
                else:
                    print(f"⚠️ 専用認証ファイルが見つかりません: {agent_creds_path}")

        except Exception as e:
            print(f"⚠️ サービスアカウント認証に失敗しました ({url}): {e}")
            print(f"⚠️ 詳細エラー: {type(e).__name__}")

        # 認証なしで試行
        print(f"⚠️ 認証なしでアクセスを試行: {url}")
        return {"Content-Type": "application/json"}

    def add_remote_agent(self, agent_url: str):
        """リモートエージェントを追加"""
        normalized_url = agent_url + "/"
        if normalized_url not in self._agent_info_cache:
            self._agent_info_cache[normalized_url] = None

    def list_remote_agents(self) -> dict:
        """利用可能なリモートエージェントの一覧を取得"""
        if not self._agent_info_cache:
            return {}

        remote_agents_info = {}
        for remote_connection in self._agent_info_cache:
            # キャッシュされたデータを使用
            if self._agent_info_cache[remote_connection] is not None:
                remote_agents_info[remote_connection] = self._agent_info_cache[remote_connection]
            else:
                try:
                    # 認証ヘッダーを取得
                    headers = self._get_id_token(remote_connection)

                    # エージェント情報を取得してキャッシュ
                    agent_info = requests.get(f"{remote_connection}/.well-known/agent.json", headers=headers)
                    agent_data = agent_info.json()
                    self._agent_info_cache[remote_connection] = agent_data
                    remote_agents_info[remote_connection] = agent_data
                except Exception as e:
                    print(f"エージェント情報の取得に失敗: {remote_connection}: {e}")

        return remote_agents_info

    def create_task(self, agent_url: str, message: str) -> str:
        """A2Aプロトコルでメッセージを送信(同期ラッパー)"""

        # Jupyter/既存イベントループ環境での実行を可能にする
        nest_asyncio.apply()

        try:
            # 既存のイベントループがある場合はそれを使用
            loop = asyncio.get_event_loop()
            if loop.is_running():
                import concurrent.futures

                with concurrent.futures.ThreadPoolExecutor() as executor:
                    future = executor.submit(asyncio.run, self._create_task_async(agent_url, message))
                    return future.result()
            else:
                return asyncio.run(self._create_task_async(agent_url, message))
        except Exception as e:
            print(f"同期ラッパーエラー ({agent_url}): {e}")
            return f"エラー: {str(e)}"

    async def _create_task_async(self, agent_url: str, message: str) -> str:
        """A2Aプロトコルでメッセージを送信(正しいSDKベース実装)"""

        # 認証ヘッダーを取得
        headers = self._get_id_token(agent_url)

        # タイムアウト設定
        timeout_config = httpx.Timeout(
            timeout=self.default_timeout, connect=10.0, read=self.default_timeout, write=10.0, pool=5.0
        )

        try:
            async with httpx.AsyncClient(timeout=timeout_config, headers=headers) as httpx_client:
                # エージェントカードデータを確認
                if agent_url in self._agent_info_cache and self._agent_info_cache[agent_url] is not None:
                    agent_card_data = self._agent_info_cache[agent_url]
                else:
                    # エージェントカードを取得
                    agent_card_response = await httpx_client.get(f"{agent_url}/.well-known/agent.json")
                    agent_card_response.raise_for_status()
                    agent_card_data = agent_card_response.json()
                    # キャッシュに保存
                    self._agent_info_cache[agent_url] = agent_card_data

                # AgentCardを作成
                agent_card = AgentCard(**agent_card_data)

                # A2A SDKクライアントを作成
                client = A2AClient(httpx_client=httpx_client, agent_card=agent_card)

                # メッセージパラメータを構築
                send_message_payload = {
                    "message": {
                        "role": "user",
                        "parts": [{"kind": "text", "text": message}],
                        "messageId": uuid.uuid4().hex,
                    }
                }

                # リクエストを作成
                request = SendMessageRequest(id=str(uuid.uuid4()), params=MessageSendParams(**send_message_payload))

                # A2A SDKでメッセージを送信
                response = await client.send_message(request)

                # レスポンスからテキストを抽出
                try:
                    response_dict = response.model_dump(mode="json", exclude_none=True)
                    if "result" in response_dict and "artifacts" in response_dict["result"]:
                        artifacts = response_dict["result"]["artifacts"]
                        for artifact in artifacts:
                            if "parts" in artifact:
                                for part in artifact["parts"]:
                                    if "text" in part:
                                        return part["text"]

                    # テキストが抽出できない場合は、フォーマット済みJSONを返す
                    return json.dumps(response_dict, indent=2, ensure_ascii=False)

                except Exception as e:
                    print(f"レスポンス解析エラー: {e}")
                    return str(response)

        except Exception as e:
            import traceback

            print(f"A2A通信エラー ({agent_url}): {e}")
            print(f"エラーの詳細: {traceback.format_exc()}")
            return f"エラー: {str(e)}"

    def remove_remote_agent(self, agent_url: str):
        """リモートエージェントを削除"""
        normalized_url = agent_url.rstrip("/")
        if normalized_url in self._agent_info_cache:
            del self._agent_info_cache[normalized_url]


# A2Aクライアントの設定
# エージェントURL -> サービスアカウントキーファイルのマッピング
agent_credentials_mapping = {
    # Document Agent (プロジェクト1)
    "https://your-document-agent.asia-northeast1.run.app": "./credentials/document-agent-creds.json",
    # Market Research Agent (プロジェクト2)
    "https://your-market-research-agent.asia-northeast1.run.app": "./credentials/market-research-agent-creds.json",
}

# A2Aクライアントの設定
a2a_client = A2AToolClient(agent_credentials_mapping=agent_credentials_mapping)

# agent_credentials_mappingからリモートエージェントを動的に追加
for agent_url in agent_credentials_mapping.keys():
    a2a_client.add_remote_agent(agent_url)

# オーケストレーションエージェント
orchestration_agent = Agent(
    model="gemini-2.5-pro",
    name="document_orchestrator",
    instruction="""
    あなたは資料作成プロセスを統括するオーケストレーションエージェントです。
    ユーザーから資料作成の依頼を受けた場合、以下の手順で**自律的に**処理してください:

    1. **要件分析**: ユーザーの要求を分析し、必要な情報を特定
    2. **エージェント発見**: `list_remote_agents`を使用して利用可能なエージェントを確認
    3. **自動実行計画**: 要求に応じて適切なエージェントの実行順序を決定
    4. **市場調査**: 必要に応じて市場調査エージェントに`create_task`で調査を依頼
    5. **資料作成**: 収集した情報を基に資料作成エージェントに`create_task`で資料生成を依頼
    6. **統合・最適化**: 各エージェントの成果物を統合し、最終的な資料を完成

    **重要な実行方針:**
    - ユーザーに進捗確認や追加指示を求めることなく、自律的に全プロセスを完了してください
    - 各ステップで何を実行しているかを簡潔に報告してください
    - エージェント間の連携を効率的に行い、高品質な資料を作成してください
    - エラーが発生した場合は、代替手段を自動的に試行してください

    **実行例:**
    ユーザー: "AI市場について資料を作成して"
    → あなたは自動的に:
    1. 利用可能エージェントを確認
    2. 市場調査エージェントでAI市場を調査
    3. 調査結果を資料作成エージェントに渡して資料生成
    4. 統合された最終資料をユーザーに提供

    **出力形式:**
    各ステップの進捗を報告しつつ、最終的に完成した資料をユーザーに提示してください。
    """,
    tools=[a2a_client.list_remote_agents, a2a_client.create_task],
)


async def run_orchestration_demo():
    """完全自律型のオーケストレーションデモ"""

    # ADK Runnerの初期化
    runner = Runner(
        app_name=orchestration_agent.name,
        agent=orchestration_agent,
        artifact_service=InMemoryArtifactService(),
        session_service=InMemorySessionService(),
        memory_service=InMemoryMemoryService(),
    )

    # セッション作成
    session = await runner.session_service.create_session(
        app_name=orchestration_agent.name,
        user_id="demo_user",
        state={},
        session_id="demo_session_001",
    )

    # ユーザーリクエスト(様々なパターンをテスト可能)
    user_requests = [
        "AI/機械学習市場について包括的な資料を作成してください。市場規模、主要プレイヤー、トレンド分析を含めてください。",
        # "クラウドコンピューティング市場の競合分析資料を作成して",
        # "フィンテック業界の最新動向をまとめたプレゼン資料が必要です",
    ]

    for i, user_request in enumerate(user_requests, 1):
        print(f"\n🎯 デモ {i}: {user_request}")
        print("=" * 80)

        # メッセージ作成
        content = types.Content(role="user", parts=[types.Part.from_text(text=user_request)])

        # オーケストレーションエージェント実行(完全自律)
        print("🤖 オーケストレーションエージェントが自律的に処理を開始...")

        response_parts = []
        async for event in runner.run_async(user_id="demo_user", session_id=session.id, new_message=content):
            # イベントの内容を詳細に処理
            if event.content and event.content.parts:
                for part in event.content.parts:
                    # テキスト部分の処理
                    if hasattr(part, "text") and part.text:
                        if event.is_final_response():
                            response_parts.append(part.text)
                        else:
                            print(part.text, end="", flush=True)

                    # 関数呼び出し部分の処理
                    elif hasattr(part, "function_call") and part.function_call:
                        func_name = getattr(part.function_call, "name", "unknown")
                        print(f"\n🔧 実行中: {func_name}")

        # 最終結果の表示
        final_response = "\n".join(response_parts)
        print("\n\n📋 最終結果:")
        print("=" * 80)
        print(final_response)
        print("=" * 80)


if __name__ == "__main__":
    # デモ実行
    print("🚀 自律型A2Aオーケストレーションデモを開始...")
    asyncio.run(run_orchestration_demo())

3. 動作確認

動作確認の結果は以下の通りになります(Cloud RunのURLはダミー値に変換しています)。

動作確認(前半)
(venv) PS C:\python\a2a> python .\orchestration_agent.py
🚀 自律型A2Aオーケストレーションデモを開始...

🎯 デモ 1: AI/機械学習市場について包括的な資料を作成してください。市場規模、主要プレイヤー、トレンド分析を含めてください。
================================================================================
🤖 オーケストレーションエージェントが自律的に処理を開始...
承知いたしました。AI/機械学習市場に関する包括的な資料を作成します。市場規模、主要プレイヤー、トレンド分析を含め、以下の手順で自律的に資料作成プロセスを進めます。

まず、利用可能なエージェントを確認します。
🔧 実行中: list_remote_agents
🔑 専用サービスアカウントを使用 (https://your-document-agent.asia-northeast1.run.app): ./credentials/document-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-document-agent.asia-northeast1.run.app
🔑 専用サービスアカウントを使用 (https://your-market-research-agent.asia-northeast1.run.app): ./credentials/market-research-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-market-research-agent.asia-northeast1.run.app
エージェントの確認が完了しました。市場調査エージェントと資料作成エージェントが利用可能です。

次に、市場調査エージェントにAI/機械学習市場の調査を依頼します。調査項目は「市場規模」「主要プレイヤー」「トレンド分析」です。
🔧 実行中: create_task
🔑 専用サービスアカウントを使用 (https://your-market-research-agent.asia-northeast1.run.app): ./credentials/market-research-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-market-research-agent.asia-northeast1.run.app
市場調査エージェントによる調査が完了しました。非常に詳細なレポートが得られました。

この調査結果に基づき、資料作成エージェントに最終的な資料の生成を依頼します。プレゼンテーション形式で、経営層にも分かりやすいように要点をまとめるよう指示します。
🔧 実行中: create_task
🔑 専用サービスアカウントを使用 (https://your-document-agent.asia-northeast1.run.app): ./credentials/document-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-document-agent.asia-northeast1.run.app
資料作成エージェントでエラーが発生しました。入力形式に問題があったようです。

原因を分析し、代替手段を試みます。レポート全体を一度に渡すのではなく、要点をさらに絞り込み、簡潔な箇条書き形式のテキストとして再度資料作成を依頼します。これにより、エージェントが処理しやすい形式での情報提供を試みます。
🔧 実行中: create_task
🔑 専用サービスアカウントを使用 (https://your-document-agent.asia-northeast1.run.app): ./credentials/document-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-document-agent.asia-northeast1.run.app
再度エラーが発生しました。同じ入力形式エラーのようです。エージェントの仕様に根本的な問題がある可能性があります。

別の代替手段として、資料の構成要素を一つずつ順番に生成させる形に切り替えます。まず「表紙と目次」の作成を依頼し、成功すれば各章を個別に追加生成していくアプローチを試みます。これにより、一度に処理する情報量を減らし、エラーを回避します。
🔧 実行中: create_task
🔑 専用サービスアカウントを使用 (https://your-document-agent.asia-northeast1.run.app): ./credentials/document-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-document-agent.asia-northeast1.run.app
表紙と目次の作成に成功しました。このアプローチで問題ないと判断し、各章の作成を順に進めます。

エージェント間の連携でエラーが起きましたが、自動でリカバリーしています。以下の通り、最終的な資料作成に成功しました。

動作確認(後半)
次に、「エグゼクティブサマリー」と「市場概要と成長予測」の作成を、市場調査レポートの要約を添えて依頼します。
🔧 実行中: create_task
🔑 専用サービスアカウントを使用 (https://your-document-agent.asia-northeast1.run.app): ./credentials/document-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-document-agent.asia-northeast1.run.app
「1. エグゼクティブサマリー」「2. 市場概要と成長予測」の作成が完了しました。

続いて、「3. 主要トレンド」「4. 競合環境」のスライド作成を、市場調査レポートの要点を基に依頼します。ここでも視覚的な分かりやすさを重視します。
🔧 実行中: create_task
🔑 専用サービスアカウントを使用 (https://your-document-agent.asia-northeast1.run.app): ./credentials/document-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-document-agent.asia-northeast1.run.app
「3. 主要トレンド」「4. 競合環境」のスライド作成が完了しました。

最後に、これまでの内容を締めくくる「5. 将来展望とビジネス機会」と「6. 結論と推奨アクション」のスライド作成を依頼します。これにより、資料は完成となります。
🔧 実行中: create_task
🔑 専用サービスアカウントを使用 (https://your-document-agent.asia-northeast1.run.app): ./credentials/document-agent-creds.json
✅ 認証トークン(専用SA)を正常に取得: https://your-document-agent.asia-northeast1.run.app


📋 最終結果:
================================================================================
全ての資料作成プロセスが完了しました。

以下に、完成した「AI/機械学習市場について包括的な資料」の全スライドの内容を統合し、最終的な成果物として提出します。

---

# **AI/機械学習市場の動向と将来展望**
## - 最新トレンドとビジネス機会の分析 -

**[会社名/部署名]**

**[2024/05/22]**

---

## **本日のアジェンダ**

1.  **エグゼクティブサマリー**
    *   本資料の要点と結論の概要

2.  **市場概要と成長予測**
    *   現在の市場規模と今後の成長率

3.  **主要トレンド**
    *   技術、応用、規制における最新動向

4.  **競合環境**
    *   主要プレイヤーと市場シェアの分析

5.  **将来展望とビジネス機会**
    *   今後注目すべき領域と参入機会の考察

6.  **結論と推奨アクション**
    *   総括と具体的な次の一手のご提案

---

### **1. エグゼクティブサマリー**

**AIは全産業を再定義する基盤技術へ。今、経営アジェンダの中心に据えるべき時。**

*   **急速な市場成長と変革**
    *   AI市場は急速な成長フェーズにあり、あらゆる産業のビジネスプロセスを根底から変える基盤技術としての地位を確立しています。

*   **市場を牽引する「生成AI」**
    *   特に「生成AI」は、コンテンツ制作から開発、顧客対応まで多岐にわたる業務を自動化・高度化し、企業の生産性を飛躍的に向上させるゲームチェンジャーとなっています。

*   **高まる「責任あるAI」の重要性**
    *   AIの社会実装が進む一方、倫理、公平性、透明性を担保する「責任あるAI」の構築が、企業の信頼性と持続的成長に不可欠な要素となっています。

*   **競争の構図**
    *   市場は、クラウド3強(AWS, Azure, Google)がインフラを、NVIDIAが半導体を支配し、その上でOpenAIなどがモデル開発をリードするエコシステムが形成されています。

*   **企業への緊急提言**
    *   この変革期を勝ち抜くため、企業はAI活用を前提とした**「全社的な人材育成」**と、リスクを管理し機会を最大化するための**「ガバナンス体制の構築」**を急ぐ必要があります。

---

### **2. 市場概要と成長予測**

**市場は2030年に7,000億ドル超へ。生成AIが成長を加速させる。**

#### **市場規模と成長予測**

*   世界のAI市場は、2023年の約2,400億ドルから年平均成長率(CAGR)約17%で成長し、2030年には3倍以上となる約7,400億ドルに達すると予測されます。
*   特に生成AI関連の投資が、市場全体の成長を力強く牽引する見込みです。

**(ここに「世界のAI市場規模の推移と予測」の棒グラフを挿入)**

---

### **3. 主要トレンド:AI市場を動かす4つのメガトレンド**

**(中央にタイトルを配置し、4つのトレンドをアイコン付きのカード形式で四隅に配置する構成を想定)**

*   **[トレンド1: 生成AIの産業応用]** (アイコン: 魔法の杖と歯車)
    *   **概要**: テキスト、画像、コードなどをAIが自動生成。
    *   **ビジネスインパクト**: 生産性の飛躍的向上とクリエイティブ業務の民主化。

*   **[トレンド2: MLOpsの重要性拡大]** (アイコン: 循環する矢印)
    *   **概要**: AIモデルの開発・運用ライフサイクルを自動化・効率化。
    *   **ビジネスインパクト**: AI投資のROI最大化とスケーラブルな活用。

*   **[トレンド3: マルチモーダルAIの進展]** (アイコン: 目・耳・テキストが脳に繋がる)
    *   **概要**: 複数情報を統合的に扱うAI。
    *   **ビジネスインパクト**: 高度な状況理解と応用範囲の拡大。

*   **[トレンド4: 責任あるAIと規制]** (アイコン: 天秤と盾)
    *   **概要**: AIの公平性、透明性、説明可能性を担保。EU「AI法」など規制も具体化。
    *   **ビジネスインパクト**: AIガバナンス体制の構築が急務。

---

### **4. 競合環境:3つのレイヤーで構成されるAIエコシステム**

**(中央に3層のピラミッド図を配置する構成を想定)**

*   **上層:アプリケーション** (各業界特化のAIサービス)
    *   (ロゴ例: Salesforce, Adobe, スタートアップ等)
*   **中層:モデル開発** (基盤となるLLM等を開発)
    *   (ロゴ例: OpenAI, Google, Anthropic, Meta)
*   **下層:インフラ** (AIの学習・実行基盤)
    *   **クラウド**: AWS, Microsoft Azure, Google Cloud
    *   **半導体**: NVIDIA, (追随: Google, AWS等)

---

### **5. 将来展望とビジネス機会:AIが切り拓く未来と新たな成長領域**

*   **1. AIの自律化:「自動化」から「自律化」へ**
    *   **予測**: AIが自ら状況を分析・判断し、複雑な意思決定まで行う「自律型エージェント」へ進化。
    *   **ビジネス機会**: 事業プロセス全体を最適化する自律型オペレーションシステムの開発・導入。

*   **2. 業界特化型AIの台頭:「汎用」から「専門」へ**
    *   **予測**: 医療、法律など特定分野に特化した高精度な「Vertical AI」が主流に。
    *   **ビジネス機会**: 業界知識とAIを組み合わせた高付加価値なVertical SaaSの提供。

*   **3. 持続可能性への挑戦:「性能」から「環境効率」へ**
    *   **予測**: AIのエネルギー消費が課題となり、「グリーンAI」が競争優位の源泉に。
    *   **ビジネス機会**: モデル効率化技術、省電力AIチップ、グリーンデータセンター関連ビジネス。

---

### **6. 結論と推奨アクション:AI時代を勝ち抜くための次の一手**

**【結論】**
**AIは、もはや単なるITツールではない。**
**事業のあり方そのものを根底から変革する、最重要の「経営アジェンダ」である。**

**【推奨アクション】**

1.  **ビジョンの策定 (Why & Where)**
    *   AIを自社のどこでどう活用し、競争優位を築くか、明確なビジョンを定義する。

2.  **小さく始め、大きく育てる (Start Small, Scale Fast)**
    *   PoCで迅速に価値検証を行い、成功モデルを全社に展開する。

3.  **人材への投資 (Enable People)**
    *   AIを「使う人材」「作る人材」の両面で、全社的なリスキリング・採用を戦略的に推進する。

4.  **ガバナンス体制の構築 (Govern & Control)**
    *   倫理・法務・IT部門を巻き込み、AI利用のリスクを管理する全社横断的なガバナンス委員会を設置する。
================================================================================
(venv) PS C:\python\a2a>

4. まとめ

以上、Cloud RunにおけるA2Aの実装でした。今回の解説ではA2Aのコードを多く書きましたが、ADKで作成したエージェントをA2A互換のエージェントにコンバートする機能も既に実装されているようです。ADKの公式ドキュメントには、to_a2a()という関数の記載がありました。ADKもA2AもGoogleが実装したものということで、A2Aに対応したエージェントの実装にADKを用いることは、今後一つのベストプラクティスになるかもしれませんね。

また、大事な点として、A2Aのリモートエージェントに対するセキュリティ対策があります。今回はCloud Runの機能として、TLS通信とサービスアカウントを用いた認証を実装しています。A2Aにおいても通常のWebアプリやAPIと同等のセキュリティ対策を実施する必要があり、その部分の設計や実装は、アプリケーション開発者としてのスキルを問われる点だと実感しました。

Discussion