🏝️

A2A リモートエージェント対応のマルチエージェントシステム

に公開

はじめに

下記の記事では、「ネット記事の作成業務」を例とした、マルチエージェントシステムのアーキテクチャーを解説しました。

この中で、リモート環境にデプロイしたエージェントを A2A で利用する方法として、次の図のアーキテクチャーを紹介しました。この記事では、この環境を実際に構築する手順を紹介します。


A2A リモートエージェントによるマルチエージェントシステムの構成

環境準備

Vertex AI workbench のノートブック上で実装しながら説明するために、まずは、ノートブックの実行環境を用意します。新しいプロジェクトを作成したら、Cloud Shell のコマンド端末を開いて、必要な API を有効化します。

gcloud services enable \
  aiplatform.googleapis.com \
  notebooks.googleapis.com \
  cloudresourcemanager.googleapis.com \
  run.googleapis.com \
  cloudbuild.googleapis.com

続いて、Workbench のインスタンスを作成します。

PROJECT_ID=$(gcloud config list --format 'value(core.project)')
gcloud workbench instances create agent-development \
  --project=$PROJECT_ID \
  --location=us-central1-a \
  --machine-type=e2-standard-2

クラウドコンソールのナビゲーションメニューから「Vertex AI」→「Workbench」を選択すると、作成したインスタンス agent-development があります。インスタンスの起動が完了するのを待って、「JUPYTERLAB を開く」をクリックしたら、「Python 3(ipykernel)」の新規ノートブックを作成します。

この後は、ノートブックのセルでコードを実行していきます。

まず、Agent Development Kit (ADK) と A2A SDK のパッケージをインストールします。

%pip install --user \
    google-adk==1.11.0 \
    a2a-sdk==0.3.1

インストール時に表示される ERROR: pip's dependency resolver does not currently take into... というエラーは無視してください。

インストールしたパッケージを利用可能にするために、次のコマンドでカーネルを再起動します。

import IPython
app = IPython.Application.instance()
_ = app.kernel.do_shutdown(True)

再起動を確認するポップアップが表示されるので [Ok] をクリックします。

続いて、必要なモジュールをインポートして、実行環境の初期設定を行います。

import copy, json, os, uuid
import vertexai
from vertexai import agent_engines

from google.genai.types import Part, Content
from google.adk.agents.llm_agent import LlmAgent
from google.adk.artifacts import InMemoryArtifactService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner

from google.adk.agents.callback_context import CallbackContext
from google.adk.models import LlmResponse, LlmRequest
from google.adk.agents.sequential_agent import SequentialAgent

[PROJECT_ID] = !gcloud config list --format 'value(core.project)'
[PROJECT_NUMBER] = !gcloud projects describe {PROJECT_ID} --format='value(projectNumber)'
LOCATION = 'us-central1'

vertexai.init(project=PROJECT_ID, location=LOCATION,
              staging_bucket=f'gs://{PROJECT_ID}')

os.environ['GOOGLE_CLOUD_PROJECT'] = PROJECT_ID
os.environ['GOOGLE_CLOUD_LOCATION'] = LOCATION
os.environ['GOOGLE_GENAI_USE_VERTEXAI'] = 'True'

また、ノートブック上でエージェントを利用するための簡易アプリのクラスを定義します。

class LocalApp:
    def __init__(self, agent, app_name='default_app', user_id='default_user'):
        self._agent = agent
        self._app_name = app_name
        self._user_id = user_id
        self._runner = Runner(
            app_name=self._app_name,
            agent=self._agent,
            artifact_service=InMemoryArtifactService(),
            session_service=InMemorySessionService(),
            memory_service=InMemoryMemoryService(),
        )
        self._session = None
        
    async def stream(self, query):
        if not self._session:
            self._session = await self._runner.session_service.create_session(
                app_name=self._app_name,
                user_id=self._user_id,
                session_id=uuid.uuid4().hex,
            )
        content = Content(role='user', parts=[Part(text=query)])
        async_events = self._runner.run_async(
            user_id=self._user_id,
            session_id=self._session.id,
            new_message=content,
        )
        result = []
        async for event in async_events:
            if DEBUG:
                print(f'====\n{event}\n====')
            if (event.content and event.content.parts):
                response = '\n'.join([p.text for p in event.content.parts if p.text])
                if response:
                    print(response)
                    result.append(response)
        return result

リモートエージェントの定義と Agent Engine へのデプロイ

今回の構成では、次の 4 つのエージェントをリモートエージェントとして使用します。

  • research_agent1:調査レポートの調査項目を選定する
  • research_agent2:選定した項目に基づいて調査レポートを作成する
  • writer_agent:記事を作成する
  • review_agent:記事をレビューする

前回の記事と同じコードでこれらのエージェントを定義した後に、Agent Engine にデプロイして、リモートエージェントとして呼び出せるようにします。まずは、それぞれのエージェントを定義します。

research_agent1

instruction = '''
あなたの役割は、記事の執筆に必要な情報を収集して調査レポートにまとめる事です。
指定されたテーマの記事を執筆する際に参考となるトピックを5項目程度のリストにまとめます。
後段のエージェントがこのリストに基づいて、調査レポートを作成します。

* 出力形式
日本語で出力。
'''

research_agent1 = LlmAgent(
    name='research_agent1',
    model='gemini-2.5-flash',
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(テーマ選定)',
    instruction=instruction,
)

research_agent2

instruction = '''
あなたの役割は、記事の執筆に必要な情報を収集して調査レポートにまとめる事です。
前段のエージェントは、5項目程度の調査対象トピックを指定します。

* 出力形式
日本語で出力。
調査レポートは、トピックごとに客観的情報をまとめます。各トピックについて、5文以上の長さで記述すること。
'''

research_agent2 = LlmAgent(
    name='research_agent2',
    model='gemini-2.5-flash',
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(レポート作成)',
    instruction=instruction,
)

writer_agent

instruction = '''
あなたの役割は、特定のテーマに関する気軽な読み物記事を書くことです。
記事の「テーマ」と、その内容に関連する「調査レポート」が与えられるので、
調査レポートに記載の客観的事実に基づいて、信頼性のある読み物記事を書いてください。

レビュアーによる修正ポイントが指摘された際は、直前に書いた記事を指摘に従って書き直してください。

**出力条件**
- トピックに関してある程度の基礎知識がある読者を前提として、数分で気軽に読める内容にしてください。
- 比較的カジュアルで語りかける口調の文章にします。
- 思考過程は出力せずに、最終結果だけを出力します。
- 記事タイトルは付けないで、次の構成で出力します。各セクションタイトルは、内容に合わせてアレンジしてください。
0. 導入:セクションタイトルを付けないで、この記事を読みたくなる導入を1〜2文にまとめます。
1. 概要:トピックの全体像をまとめて簡単に説明します。
2. 最新情報:特に注目したい新しい情報を取り上げます。
3. 実践:トピックに関して、読者自身がやってみるとよさそうな事を1つ紹介します。
4. まとめ

- 各セクションのタイトルはマークダウンヘッダ ## 付けます。必要に応じて箇条書きのマークダウンを使用します。
- それ以外のマークダウンによる装飾は行いません。
'''

writer_agent = LlmAgent(
    name='writer_agent',
    model='gemini-2.5-flash',
    description='特定のテーマに関する読み物記事を書くエージェント',
    instruction=instruction,
)

review_agent

instruction = f'''
あなたの役割は、読み物記事をレビューして、記事の条件にあった内容にするための改善コメントを与える事です。

* 記事の条件
- 記事は、はじめに40文字程度のタイトルがあること。
 今日から役立つ生活情報があって「すぐに読まなきゃ」と読者が感じるタイトルにすること。
 タイトルはマークダウンヘッダ # をつけること。
- タイトルの直後に「なぜいまこのテーマを取り上げるのか」をまとめた導入を加えて、読者にこの記事を読む動機づけを与えます。
- 各セクションのサブタイトルには、絵文字を用いて親しみやすさを出すこと。
- 読者が今日から実践できる具体例が3つ以上紹介されていること。

* 出力形式
- 日本語で出力。
- はじめに、記事の良い点を説明します。
- 次に、修正ポイントを箇条書きで出力します。
'''

review_agent = LlmAgent(
    name='review_agent',
    model='gemini-2.5-flash',
    description='読み物記事をレビューするエージェント',
    instruction=instruction,
)

次のコードで、これらのエージェントを Agent Engine にデプロイします。

agents = {
    'research_agent1': research_agent1,
    'research_agent2': research_agent2,
    'writer_agent': writer_agent,
    'review_agent': review_agent,
}

deployed_agents = [agent.display_name for agent in agent_engines.list()]

for agent_name in agents.keys():
    if not agent_name in deployed_agents:
        remote_agent = agent_engines.create(
            agent_engine=agents[agent_name],
            display_name=f'{agent_name}',
            requirements=['google-adk==1.11.0'],
        )
    else:
        resource_name = [
            agent.name for agent in agent_engines.list()
            if agent.display_name == agent_name
        ][0]
        remote_agent = agent_engines.update(
            resource_name=resource_name,
            agent_engine=agents[agent_name],
            display_name=f'{agent_name}',
            requirements=['google-adk==1.11.0'],
        )

ここでは、各エージェントのディスプレイネーム(display_name オプション)に research_agent1 などのエージェント名をセットしており、同名のエージェントがすでにデプロイ済みの場合は、新規にデプロイするのではなく、デプロイ済みのエージェントを更新するようにしています。

デプロイされたエージェントには、ディスプレイネームとは別に、ユニークな Agent ID が割り当てられます。次のコマンドで、それぞれの Agent ID を確認します。

agent_list = list(agent_engines.list())
for agent_name in agents.keys():
    resource_name = [
        agent.name for agent in agent_list
        if agent.display_name == agent_name
    ][0]
    print(f'{agent_name}: {resource_name}')

[出力結果]

research_agent1: 3354750713832931328
research_agent2: 4183413045269102592
writer_agent: 180557401466863616
review_agent: 4784643595523063808

表示される Agent ID は、環境によって異なります。

A2A サーバーのデプロイ

リモートエージェントのそれぞれについて、A2A プロトコルによるアクセスを中継する A2A サーバーをデプロイします。ここでは、Cloud Run の環境にデプロイするので、デプロイ処理の際に必要となる、コンテナイメージのリポジトリを作成しておきます。

REPO_NAME='cloud-run-source-deploy'
REPO=f'{LOCATION}-docker.pkg.dev/{PROJECT_ID}/{REPO_NAME}'
!gcloud artifacts repositories create {REPO_NAME} \
     --repository-format docker --location {LOCATION}

続いて、次のコードで、A2A サーバーのデプロイに必要なファイルを作成します。

  • main.py:A2A サーバーを実装した関数
  • Dockerfile:コンテナイメージのビルドに使用する Dockerfile
  • requirements.txt:依存ライブラリーを記述したファイル
%%bash
cat <<EOF >main.py
import json, os
import vertexai
from vertexai import agent_engines

from google.adk.events import Event
from google.adk.sessions import VertexAiSessionService

from a2a.server.agent_execution import AgentExecutor
from a2a.server.tasks import TaskUpdater, InMemoryTaskStore
from a2a.types import (
    Task, TextPart, UnsupportedOperationError,
    AgentCapabilities, AgentCard, AgentSkill,
)
from a2a.utils.errors import ServerError
from a2a.server.apps import A2AStarletteApplication
from a2a.server.request_handlers import DefaultRequestHandler

PROJECT_ID = os.environ['PROJECT_ID']
LOCATION = os.environ['REGION']
AGENT_ID = os.environ['AGENT_ID']
SERVICE_URL = os.environ['SERVICE_URL']

vertexai.init(project=PROJECT_ID, location=LOCATION)
SESSION_SERVICE = VertexAiSessionService(
    project = PROJECT_ID,
    location = LOCATION,
)

class AdkAgent:
    SUPPORTED_CONTENT_TYPES = ['text', 'text/plain']

    def __init__(self, remote_agent):
        self._remote_agent = remote_agent

    async def stream(self, updater, user_input):
        # Create a new session and restore past events.
        session = self._remote_agent.create_session(
            user_id='default_user'
        )
        app_name = session['appName']
        user_id = session['userId']
        session_id = session['id']
        session = await SESSION_SERVICE.get_session(
            app_name=app_name,
            user_id=user_id,
            session_id=session_id,
        )
        for event in json.loads(user_input): # past events
            await SESSION_SERVICE.append_event(session, Event(**event))

        # Send query to remote agent.
        events = self._remote_agent.stream_query(
            user_id=user_id,
            session_id=session_id,
            message='', # Sequential agents receive no user messages.
        )
        text_parts = []        
        for event in events:
            if text_parts: # not a last part
                updater.new_agent_message(text_parts)
            text_parts = []
            if 'content' in event and 'parts' in event['content']:
                for part in event['content']['parts']:
                    if 'text' in part:
                        text_parts.append(TextPart(text=part['text']))
        # treat last part as an artifact
        await updater.add_artifact(text_parts)
        remote_agent.delete_session(
            user_id=user_id,
            session_id=session_id,
        )


class AdkAgentExecutor(AgentExecutor):
    def __init__(self, adk_agent: AdkAgent):
        self.adk_agent = adk_agent

    async def cancel(self, request, event_queue):
        raise ServerError(
            error=UnsupportedOperationError('Cancel operation is not supported.')
        )
        
    async def execute(self, context, event_queue):
        updater = TaskUpdater(
            event_queue,
            context.task_id,
            context.context_id
        )
        if not context.current_task:
            await updater.submit()
        await updater.start_work()

        user_input = context.get_user_input()
        await self.adk_agent.stream(updater, user_input)
        await updater.complete()


def create_agent_card(
    base_url, supported_content_types, agent_name, agent_description):
    
    capabilities = AgentCapabilities(streaming=True)
    skill = AgentSkill(
        id=f"{agent_name.lower().replace(' ', '_')}_skill",
        name=f'{agent_name} Skill',
        description=agent_description,
        tags=[tag.strip() for tag in agent_name.lower().split()],
        examples=[f'Interact with {agent_name}'],
    )

    return AgentCard(
        name=agent_name,
        description=agent_description,
        url=base_url,
        version='1.0.0',
        defaultInputModes=supported_content_types,
        defaultOutputModes=supported_content_types,
        capabilities=capabilities,
        skills=[skill],
    )


remote_agent = agent_engines.get(AGENT_ID)
adk_agent = AdkAgent(remote_agent=remote_agent)

agent_card = create_agent_card(
    base_url=SERVICE_URL,
    supported_content_types=AdkAgent.SUPPORTED_CONTENT_TYPES,
    agent_name=remote_agent.name,
    agent_description=remote_agent.name,
)
    
request_handler = DefaultRequestHandler(
    agent_executor=AdkAgentExecutor(adk_agent),
    task_store=InMemoryTaskStore(),
)

server_app_builder = A2AStarletteApplication(
    agent_card=agent_card, http_handler=request_handler
)

app = server_app_builder.build()
EOF

cat <<EOF >Dockerfile
FROM python:3.11-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]
EOF

cat <<EOF >requirements.txt
uvicorn
starlette
google-adk==1.11.0
google-cloud-aiplatform==1.108.0
a2a-sdk==0.3.1
EOF

次のコードを実行すると、4 つのリモートエージェントのそれぞれに対応した A2A サーバーがデプロイされます。

agents = [
    'research_agent1',
    'research_agent2',
    'writer_agent',
    'review_agent',
]

deployed_agents = {
    agent.display_name: agent.name for agent in agent_engines.list()
}

for agent_name in agents:
    AGENT_ID = deployed_agents[agent_name]
    print(f'## {agent_name}: {AGENT_ID}')

    SERVICE_NAME=f'a2a-server-{agent_name}'.replace('_', '-')
    SERVICE_URL=f'https://{SERVICE_NAME}-{PROJECT_NUMBER}.{LOCATION}.run.app'

    !mkdir -p {agent_name}
    !cp main.py requirements.txt Dockerfile {agent_name}/
    !cd {agent_name} && gcloud builds submit --tag {REPO}/{SERVICE_NAME}
    !gcloud run deploy {SERVICE_NAME} \
       --image {REPO}/{SERVICE_NAME} \
       --platform managed \
       --region {LOCATION} \
       --no-allow-unauthenticated \
       --update-env-vars "\
PROJECT_ID={PROJECT_ID},REGION={LOCATION},AGENT_ID={AGENT_ID},\
SERVICE_URL={SERVICE_URL}" 2>&1 | cat

プロキシーエージェントの作成

これでリモートエージェントの準備ができたので、次は、これらを呼び出すプロキシーエージェントを定義します。プロキシーエージェントは、表面的には通常のエージェントと同じ LlmAgent オブジェクトですが、実際には、コールバック関数を用いて、リモートエージェントにリクエストを送信して、その応答結果を返します。

そこで、まずは、A2A クライアントとしてリモートエージェントを呼び出すコールバック関数を用意します。

import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import MessageSendParams, SendStreamingMessageRequest
import google.auth.transport.requests
import google.oauth2.id_token 


async def a2a_remote_call(query, a2a_server_url):
    auth_req = google.auth.transport.requests.Request()
    id_token = google.oauth2.id_token.fetch_id_token(auth_req, a2a_server_url)
    headers = {
        'Authorization': f'Bearer {id_token}',
        'Content-Type': 'application/json'
    }
    payload = {
        'message': {
            'role': 'user',
            'parts': [{'kind': 'text', 'text': query}],
            'messageId': uuid.uuid4().hex,
        },
    }
    request = SendStreamingMessageRequest(
        id=uuid.uuid4().hex,
        params=MessageSendParams(**payload),
    )

    async with httpx.AsyncClient(headers=headers, timeout=100) as httpx_client:
        agent_card = await A2ACardResolver(
            base_url=a2a_server_url, httpx_client=httpx_client
        ).get_agent_card()
        client = A2AClient(httpx_client, agent_card, url=a2a_server_url)
        stream_response = client.send_message_streaming(request)
        result = []
        async for chunk in stream_response:
            chunk = json.loads(chunk.root.model_dump_json(exclude_none=True))
            if chunk['result']['kind'] == 'artifact-update':
                text_messages = []
                for part in chunk['result']['artifact']['parts']:
                    if 'text' in part:
                        text_messages.append(part['text'])
                result.append(Part(text='\n'.join(text_messages)))
    
    return result


def get_call_a2a_agent(agent_name):
    service_name=f'a2a-server-{agent_name}'.replace('_', '-')
    a2a_server_url=f'https://{service_name}-{PROJECT_NUMBER}.{LOCATION}.run.app'

    async def call_a2a_agent(
        callback_context: CallbackContext, llm_request: LlmRequest
    ) -> Content:

        # Convert past events into JSON serializable format.
        events = callback_context._invocation_context.session.events
        events = [event.model_dump() for event in events]
        for event in events:
            for key, item in event.items():
                if isinstance(item, set):
                    event[key] = list(item)

        parts = await a2a_remote_call(json.dumps(events), a2a_server_url)
        return LlmResponse(
            content=Content(role='model', parts=parts) 
        )
    return call_a2a_agent

たとえば、get_call_a2a_agent('research_agent1') を実行すると、research_agent1 を呼び出すコールバック関数が得られます。

これを用いて、4 つのリモートエージェントのそれぞれに対応するプロキシーエージェントを用意します。

research_agent1 = LlmAgent(
    name='research_agent1',
    model='gemini-2.5-flash', # not used
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(テーマ選定)',
    before_model_callback=get_call_a2a_agent('research_agent1'),
)

research_agent2 = LlmAgent(
    name='research_agent2',
    model='gemini-2.5-flash', # not used
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント(レポート作成)',
    before_model_callback=get_call_a2a_agent('research_agent2'),
)

writer_agent = LlmAgent(
    name='writer_agent',
    model='gemini-2.5-flash', # not used
    description='特定のテーマに関する読み物記事を書くエージェント',
    before_model_callback=get_call_a2a_agent('writer_agent'),
)

review_agent = LlmAgent(
    name='review_agent',
    model='gemini-2.5-flash', # not used
    description='読み物記事をレビューするエージェント',
    before_model_callback=get_call_a2a_agent('review_agent'),
)

ここでは、before_model_callback オプションに先ほどのコールバックを指定しており、これにより、エージェントが受け取ったリクエストは、LLM に入力される直前にコールバック関数が横取りして、リモートエージェントに送信します。そして、リモートエージェントから得られた結果を LLM が生成した結果であるかのように返します。

業務フローエージェントの定義

ここまでの準備ができたら、プロキシーエージェントを組み合わせた Sequential Agent を構成して、これらを sub_agents に登録した Root Agent(業務フローエージェント)を定義します。4 つのエージェントがプロキシーエージェントに置き換わっている以外は、前回の記事で作った Root Agent(業務フローエージェント)と同じ構成です。

def get_print_agent(text):
    def before_model_callback(
        callback_context: CallbackContext, llm_request: LlmRequest
    ) -> LlmResponse:
        return LlmResponse(
            content=Content(
                role='model', parts=[Part(text=text)],
            )
        )
    return LlmAgent(
        name='print_agent',
        model='gemini-2.0-flash', # not used
        description='',
        instruction = '',
        before_model_callback=before_model_callback,
    )

research_agent = SequentialAgent(
    name='research_agent',
    sub_agents=[
        get_print_agent('\n---\n## リサーチエージェントが調査レポートを作成します。\n---\n'),
        get_print_agent('\n## 調査対象のトピックを選定します。\n'),
        copy.deepcopy(research_agent1),
        get_print_agent('\n## 選定したトピックに基づいて、調査レポートを作成します。\n'),
        copy.deepcopy(research_agent2),
        get_print_agent('\n#### 調査レポートが準備できました。記事の作成に取り掛かってもよいでしょうか?\n'),
    ],
    description='記事の執筆に必要な情報を収集してレポートにまとめるエージェント',
)

write_and_review_agent = SequentialAgent(
    name='write_and_review_agent',
    sub_agents=[
        get_print_agent('\n---\n## ライターエージェントが記事を執筆します。\n---\n'),
        copy.deepcopy(writer_agent),
        get_print_agent('\n---\n## レビューエージェントが記事をレビューします。\n---\n'),
        copy.deepcopy(review_agent),
       get_print_agent('\n#### レビューに基づいて記事の修正を依頼しますか?\n'),
    ],
    description='記事を作成、レビューする。',
)

root_agent = LlmAgent(
    name='article_generation_flow',
    model='gemini-2.0-flash',
    instruction = '''
何ができるか聞かれた場合は、以下の処理をすることをわかりやすくフレンドリーな文章にまとめて返答してください。

- ユーザーが指定したテーマの記事を作成する業務フローを実行する。
- はじめに、テーマに関する調査レポートを作成する。
- その後、ライターエージェントとレビューエージェントが協力して、編集方針に則した記事を作成する。

ユーザーが記事のテーマを指定した場合は、次のフローを実行します。

1. そのテーマの記事の作成に取り掛かる旨を伝えて、research_agent に転送して、調査レポートを依頼します。
2. ユーザー記事の作成を支持したら、write_and_review_agent に転送して、記事の作成とレビューを依頼します。
3. ユーザーが記事の修正を希望する場合は、write_and_review_agent に転送します。

**条件**
research_agent のニックネームは、リサーチエージェント
write_and_review_agent のネックネームは、ライターエージェントとレビューエージェント

''',
    sub_agents=[
        copy.deepcopy(research_agent),
        copy.deepcopy(write_and_review_agent),
    ],
    description='記事を作成する業務フローを実行するエージェント'
)

実行例

業務フローエージェントが完成したので、実際に使ってみましょう。ここでは、先に用意した簡易アプリで実行します。

client = LocalApp(root_agent)
DEBUG = False

query = '''
こんにちは。何ができますか?
'''
result = await client.stream(query)

[出力結果]

こんにちは!私は、あなたが指定したテーマに関する記事を作成するお手伝いをします。

具体的には、まずリサーチエージェントがテーマに関する調査レポートを作成します。
その後、ライターエージェントとレビューエージェントが協力して、編集方針に沿った記事を作成します。

記事のテーマを指定していただければ、すぐに作成に取り掛かります。ご希望のテーマがあれば教えてください!

続けて、記事のテーマを入力すると、まずは、調査レポートが作成されます。

query = '''
「夏の節電対策」をテーマに記事を作成してください。
'''
result = await client.stream(query)

[出力結果]

かしこまりました。「夏の節電対策」をテーマにした記事の作成ですね。

まず、リサーチエージェントに調査レポートを依頼します。

---
## リサーチエージェントが調査レポートを作成します。
---
...(以下省略)...

この後は、前回の実行例と同様の流れで、記事の作成を進めることができます。もちろん、ここで定義した業務フローエージェントを Agent Engine にデプロイしてリモートエージェントとして使用することも可能です。

今回実装した内容をまとめたノートブックが下記にあるので、こちらも参考にしてください。

まとめ

この記事では、A2A のリモートエージェントを用いたマルチエージェントシステムを構築する手順を紹介しました。基本的には、前回の記事で作成したマルチエージェントシステムにおいて、エージェントの最小単位となる個々の LlmAgent をプロキシーエージェントに置き換えて、A2A プロトコルでリモートエージェントにアクセスするように変更した形になります。

なお、Agent Engine で稼働するリモートエージェントの前段にデプロイした A2A サーバーと、プロキシーエージェントのコールバック関数に含まれる A2A クライアントのコードについては、詳しい説明はあえて割愛しました。ADK および Agent Engine の A2A 対応は、急速に開発が進められている状況ですので、この部分のコードの詳細は今後のバージョンで大きく変わる可能性があります。まずは、冒頭の図に示したアーキテクチャーを押さえておくとよいでしょう。

Google Cloud Japan

Discussion