🤸‍♀️

【実装解説】Google ADKで「動的ParallelAgent」パターンを作る

に公開

GoogleのAgent Development Kit(ADK)を使っていると、こんな課題にぶつかりませんか?

「実行時にタスクの種類や数が決まるんだけど、どう並列処理すればいい?」

そんな時に使えるのが、
「Custom Agentがその場でParallelAgentを組み立てて、session.stateでデータをやりとりする」
という設計パターンです。

この記事では、この手法を
✅ なぜ必要か
✅ 最小コード例
✅ 実行方法と実行結果
✅ 実装のコツと落とし穴
✅ 応用アイデア
の順で、わかりやすく解説していきます!


🤔なぜ「動的ParallelAgent」が必要なのか?

Google ADKには、ワークフロー制御用に

  • SequentialAgent(順番に実行)⏩
  • ParallelAgent(並列に実行)🔀
  • LoopAgent(繰り返し実行)🔄
    といった部品が用意されています。

中でも ParallelAgent は、渡された子エージェントを同時実行するだけ という超シンプルな仕組みで、
特にI/O待ちが長いタスクを一気に高速化するのに向いています。

しかし、普通に使うと 「最初に渡す子エージェントのリストが固定」 という制約があります。
これだと、

  • タスクの内容や数がその場で決まる
  • 実行する子エージェントを動的に変えたい

みたいなケースに対応できません。

そこで活躍するのが、
Custom Agentで動的にParallelAgentを組み立てるというアプローチです!


🗒️【コード例】60行で作る動的ファンアウトパターン

まずは、最小のサンプルコードを見てみましょう。

import random, secrets
from typing import ClassVar, List
from google.adk.events import Event, EventActions
from google.adk.agents import BaseAgent, ParallelAgent, SequentialAgent
from google.genai import types

# ---------- 子エージェント --------------------------------------------------
# タスクを受け取って平方計算するWorker
class Worker(BaseAgent):
    def __init__(self, *, name: str, run_id: str):
        super().__init__(name=name); self._run_id = run_id
    async def _run_async_impl(self, ctx):
        n = ctx.session.state.get(f"task:{self._run_id}:{self.name}", 0)
        result = n * n
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                                  parts=[types.Part(text=f"{n}² = {result}")]),
            actions=EventActions(
                state_delta={f"result:{self._run_id}:{self.name}": result})
        )

# ---------- 親エージェント --------------------------------------------------
# タスク配布とParallelAgent動的生成を担当するPlanner
class PlannerAndRunner(BaseAgent):
    POOL: ClassVar[List[str]] = ["w0", "w1", "w2"]
    async def _run_async_impl(self, ctx):
        run_id = secrets.token_hex(2)                   # 名前空間を分離
        picked = random.sample(self.POOL,
                               k=random.randint(1, len(self.POOL)))
        # state にタスクを配布
        task_delta = {f"task:{run_id}:{name}": random.randint(1, 9)
                      for name in picked}
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                   parts=[types.Part(text=f"Run {run_id} tasks {task_delta}")]),
            actions=EventActions(state_delta={"current_run": run_id, **task_delta})
        )
        # 並列実行する子を生成
        parallel = ParallelAgent(
            name=f"block_{run_id}",
            sub_agents=[Worker(name=n, run_id=run_id) for n in picked]
        )
        async for ev in parallel.run_async(ctx):
            yield ev

# ---------- 集約エージェント ------------------------------------------------
# 結果を集計するAggregator
class Aggregator(BaseAgent):
    async def _run_async_impl(self, ctx):
        run_id = ctx.session.state.get("current_run")
        vals = [v for k, v in ctx.session.state.items()
                if run_id and k.startswith(f"result:{run_id}:")]
        yield Event(
            author=self.name,
            content=types.Content(role=self.name,
                   parts=[types.Part(text=f"Sum = {sum(vals)}")]),
            actions=EventActions(escalate=True)
        )

# ---------- ルート ----------------------------------------------------------
root_agent = SequentialAgent(
    name="root",
    sub_agents=[PlannerAndRunner(name="planner"), Aggregator(name="collector")]
)

これだけで、タスク内容に応じて「誰を何人」でも並列実行できる仕組みが完成します!


🏃【実行方法】このコードを動かすには?

このサンプルは、Google ADKの標準コマンド adk run を使って実行します。

$ adk run .
Log setup complete: /tmp/agents_log/agent.20250427_122520.log
To access latest log: tail -F /tmp/agents_log/agent.latest.log
Running agent root, type exit to exit.

実行後は、ターミナル上で適当な入力(たとえばab)をすると、タスクが実行されます。


📒【実行結果】出力例

タスク配布内容がコンソールに表示されるので、流れがとても追いやすくなっています。

user: a
[planner]: Run 84e9 tasks {'task:84e9:w0': 3, 'task:84e9:w1': 5}
[w1]: 5² = 25
[w0]: 3² = 9
[collector]: Sum = 34
user: b
[planner]: Run 35d1 tasks {'task:35d1:w1': 6, 'task:35d1:w0': 7, 'task:35d1:w2': 2}
[w1]: 6² = 36
[w0]: 7² = 49
[w2]: 2² = 4
[collector]: Sum = 89
user:
  • planner がそのターンでどのWorkerにタスクを振ったか、毎回ログでわかる
  • Workerたちは自分のタスクを処理して結果を出力
  • collector がそのターンの合計を計算して出力

という流れです。


🧩実装のキモと落とし穴

動的ParallelAgentを作る上で、抑えておきたいポイントをまとめます。

1. Single-Parent Rule

ADKには 「エージェントは複数の親を持てない」 という制約があります。
同じWorkerインスタンスを使い回すと、即エラーになります。

毎ターン必ず新しいインスタンスを作り直すこと!

2. session.state設計

子エージェントは同じsession.stateを共有します。
なので、キーはなるべく
task:{run_id}:{worker_name}
みたいに名前空間付きにして、衝突を防ぎましょう。

3. Pydanticの罠とClassVar

ADKのエージェントは内部でPydanticモデルになっているため、
定数リスト(POOL)には必ずClassVarを付けないとビルドエラーになります。

from typing import ClassVar
class PlannerAndRunner(BaseAgent):
    POOL: ClassVar[list[str]] = ["w0", "w1", "w2"]

📊他の方法と比べたときの強み

手法 課題・弱点
固定ParallelAgent 事前に子リストが固定、動的変更できない
transfer_to_agent()使用 柔軟だがシリアル実行、かつLLM誤選択リスクあり
asyncio.gather手書き ADKのログ・state管理外になりトレーサビリティ低下

➡ この方法なら

  • ✅ 並列化
  • ✅ 柔軟なタスク振り分け
  • ✅ ADKのログ&UI追跡

をすべて両立できます。


🌟応用アイデア

目的 アイデア例
タスク種別を増やす Worker に演算種別を持たせる
集計をLLMにさせる AggregatorLlmAgent に変える
親をLLMドリブンにする LlmAgentで子エージェントを選出
Vertex AIデプロイ 外部トリガでroot agent起動

さらに発展させれば、タスク種別の自動判別&動的並列実行みたいな高度なワークフローも作れます


📝まとめ

  • Custom Agentで動的にParallelAgentを組み立てると、タスク内容に応じて自由に子エージェントを並列実行できる
  • Single-Parent Rulestate設計に注意すれば、実装は意外とシンプル

動的ワークロードを持つエージェント設計では、このパターンを「標準テンプレート」として使えると思っています。(もしもっと良い方法があれば教えてください!)


🔽さらに深掘りしたい人向けリンク集

Discussion