【実装解説】Google ADKで「動的ParallelAgent」パターンを作る
GoogleのAgent Development Kit(ADK)を使っていると、こんな課題にぶつかりませんか?
「実行時にタスクの種類や数が決まるんだけど、どう並列処理すればいい?」
そんな時に使えるのが、
「Custom Agentがその場でParallelAgentを組み立てて、session.stateでデータをやりとりする」
という設計パターンです。
この記事では、この手法を
✅ なぜ必要か
✅ 最小コード例
✅ 実行方法と実行結果
✅ 実装のコツと落とし穴
✅ 応用アイデア
の順で、わかりやすく解説していきます!
🤔なぜ「動的ParallelAgent」が必要なのか?
Google ADKには、ワークフロー制御用に
-
SequentialAgent
(順番に実行)⏩ -
ParallelAgent
(並列に実行)🔀 -
LoopAgent
(繰り返し実行)🔄
といった部品が用意されています。
中でも ParallelAgent
は、渡された子エージェントを同時実行するだけ という超シンプルな仕組みで、
特にI/O待ちが長いタスクを一気に高速化するのに向いています。
しかし、普通に使うと 「最初に渡す子エージェントのリストが固定」 という制約があります。
これだと、
- タスクの内容や数がその場で決まる
- 実行する子エージェントを動的に変えたい
みたいなケースに対応できません。
そこで活躍するのが、
Custom Agentで動的にParallelAgentを組み立てるというアプローチです!
🗒️【コード例】60行で作る動的ファンアウトパターン
まずは、最小のサンプルコードを見てみましょう。
import random, secrets
from typing import ClassVar, List
from google.adk.events import Event, EventActions
from google.adk.agents import BaseAgent, ParallelAgent, SequentialAgent
from google.genai import types
# ---------- 子エージェント --------------------------------------------------
# タスクを受け取って平方計算するWorker
class Worker(BaseAgent):
def __init__(self, *, name: str, run_id: str):
super().__init__(name=name); self._run_id = run_id
async def _run_async_impl(self, ctx):
n = ctx.session.state.get(f"task:{self._run_id}:{self.name}", 0)
result = n * n
yield Event(
author=self.name,
content=types.Content(role=self.name,
parts=[types.Part(text=f"{n}² = {result}")]),
actions=EventActions(
state_delta={f"result:{self._run_id}:{self.name}": result})
)
# ---------- 親エージェント --------------------------------------------------
# タスク配布とParallelAgent動的生成を担当するPlanner
class PlannerAndRunner(BaseAgent):
POOL: ClassVar[List[str]] = ["w0", "w1", "w2"]
async def _run_async_impl(self, ctx):
run_id = secrets.token_hex(2) # 名前空間を分離
picked = random.sample(self.POOL,
k=random.randint(1, len(self.POOL)))
# state にタスクを配布
task_delta = {f"task:{run_id}:{name}": random.randint(1, 9)
for name in picked}
yield Event(
author=self.name,
content=types.Content(role=self.name,
parts=[types.Part(text=f"Run {run_id} tasks {task_delta}")]),
actions=EventActions(state_delta={"current_run": run_id, **task_delta})
)
# 並列実行する子を生成
parallel = ParallelAgent(
name=f"block_{run_id}",
sub_agents=[Worker(name=n, run_id=run_id) for n in picked]
)
async for ev in parallel.run_async(ctx):
yield ev
# ---------- 集約エージェント ------------------------------------------------
# 結果を集計するAggregator
class Aggregator(BaseAgent):
async def _run_async_impl(self, ctx):
run_id = ctx.session.state.get("current_run")
vals = [v for k, v in ctx.session.state.items()
if run_id and k.startswith(f"result:{run_id}:")]
yield Event(
author=self.name,
content=types.Content(role=self.name,
parts=[types.Part(text=f"Sum = {sum(vals)}")]),
actions=EventActions(escalate=True)
)
# ---------- ルート ----------------------------------------------------------
root_agent = SequentialAgent(
name="root",
sub_agents=[PlannerAndRunner(name="planner"), Aggregator(name="collector")]
)
これだけで、タスク内容に応じて「誰を何人」でも並列実行できる仕組みが完成します!
🏃【実行方法】このコードを動かすには?
このサンプルは、Google ADKの標準コマンド adk run
を使って実行します。
$ adk run .
Log setup complete: /tmp/agents_log/agent.20250427_122520.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root, type exit to exit.
実行後は、ターミナル上で適当な入力(たとえばa
やb
)をすると、タスクが実行されます。
📒【実行結果】出力例
タスク配布内容がコンソールに表示されるので、流れがとても追いやすくなっています。
user: a
[planner]: Run 84e9 tasks {'task:84e9:w0': 3, 'task:84e9:w1': 5}
[w1]: 5² = 25
[w0]: 3² = 9
[collector]: Sum = 34
user: b
[planner]: Run 35d1 tasks {'task:35d1:w1': 6, 'task:35d1:w0': 7, 'task:35d1:w2': 2}
[w1]: 6² = 36
[w0]: 7² = 49
[w2]: 2² = 4
[collector]: Sum = 89
user:
-
planner
がそのターンでどのWorkerにタスクを振ったか、毎回ログでわかる -
Worker
たちは自分のタスクを処理して結果を出力 -
collector
がそのターンの合計を計算して出力
という流れです。
🧩実装のキモと落とし穴
動的ParallelAgentを作る上で、抑えておきたいポイントをまとめます。
1. Single-Parent Rule
ADKには 「エージェントは複数の親を持てない」 という制約があります。
同じWorker
インスタンスを使い回すと、即エラーになります。
➡ 毎ターン必ず新しいインスタンスを作り直すこと!
2. session.state設計
子エージェントは同じsession.state
を共有します。
なので、キーはなるべく
task:{run_id}:{worker_name}
みたいに名前空間付きにして、衝突を防ぎましょう。
3. Pydanticの罠とClassVar
ADKのエージェントは内部でPydanticモデルになっているため、
定数リスト(POOL
)には必ずClassVar
を付けないとビルドエラーになります。
from typing import ClassVar
class PlannerAndRunner(BaseAgent):
POOL: ClassVar[list[str]] = ["w0", "w1", "w2"]
📊他の方法と比べたときの強み
手法 | 課題・弱点 |
---|---|
固定ParallelAgent | 事前に子リストが固定、動的変更できない |
transfer_to_agent()使用 | 柔軟だがシリアル実行、かつLLM誤選択リスクあり |
asyncio.gather手書き | ADKのログ・state管理外になりトレーサビリティ低下 |
➡ この方法なら
- ✅ 並列化
- ✅ 柔軟なタスク振り分け
- ✅ ADKのログ&UI追跡
をすべて両立できます。
🌟応用アイデア
目的 | アイデア例 |
---|---|
タスク種別を増やす |
Worker に演算種別を持たせる |
集計をLLMにさせる |
Aggregator を LlmAgent に変える |
親をLLMドリブンにする |
LlmAgent で子エージェントを選出 |
Vertex AIデプロイ | 外部トリガでroot agent起動 |
さらに発展させれば、タスク種別の自動判別&動的並列実行みたいな高度なワークフローも作れます
📝まとめ
- Custom Agentで動的にParallelAgentを組み立てると、タスク内容に応じて自由に子エージェントを並列実行できる
- Single-Parent Rule や state設計に注意すれば、実装は意外とシンプル
動的ワークロードを持つエージェント設計では、このパターンを「標準テンプレート」として使えると思っています。(もしもっと良い方法があれば教えてください!)
🔽さらに深掘りしたい人向けリンク集
Discussion