🙌

【A2A/python_a2a】- コードを書きながらA2Aを理解していく①

に公開

執筆日

2025/7/17

やること

A2Aの勉強をしています。
概念は理解できたが、実装がまだできない...
python_a2aでコードを書きかながら、A2Aの理解を深めていく
今回は、以下の赤枠を差作成する

ライブラリのインストール

まずは pip でライブラリを導入します。

pip install python_a2a

Agentを定義する

天気予報のダミーデータを返すエージェントを作ってみます。

main.py
from python_a2a import A2AServer, skill, agent, run_server, TaskStatus, TaskState


@agent(
    name="天気予報エージェント",
    description="天気の予測情報を返答するエージェントです",
    version="1.0.0"
)

class WeatherAgent(A2AServer):
    def get_weather(self, location):
        """天気予報を返答します"""
        # 適当なデータです。
        return f"{location}は、晴れです!35度です!熱い"
    
    def handle_task(self, task):
        message_data = task.message or {}
        content = message_data.get("content", {})
        text = content.get("text", "") if isinstance(content, dict) else ""
        
        if "weather" in text.lower() and "in" in text.lower():
            location = text.split("in", 1)[1].strip().rstrip("?.")
            weather_text = self.get_weather(location)
            task.artifacts = [{
                "parts": [{"type": "text", "text": weather_text}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message={"role": "agent", "content": {"type": "text", 
                         "text": "特定の地域の天気予報を返答してください。"}}
            )
        return task

if __name__ == "__main__":
    agent = WeatherAgent()
    run_server(agent, port=5000)
  1. 以下を実行する
python main.py
  1. http://localhost:5000/ をブラウザ上で開く
  2. 以下のようにGUIのアプリケーションが開く

Skillを追加する

@skill で、スキル情報を明示します。

main.py
from python_a2a import A2AServer, skill, agent, run_server, TaskStatus, TaskState


@agent(
    name="天気予報エージェント",
    description="天気の予測情報を返答するエージェントです",
    version="1.0.0"
)

class WeatherAgent(A2AServer):
    @skill(
        name="天気予報を取得",
        description="指定の場所の天気予報を取得します",
        tags=["weather", "forecast"]
    )
    def get_weather(self, location):
        """天気予報を返答します"""
        # 適当なデータです。
        return f"{location}は、晴れです!35度です!熱い"
    
    def handle_task(self, task):
        message_data = task.message or {}
        content = message_data.get("content", {})
        text = content.get("text", "") if isinstance(content, dict) else ""
        
        if "weather" in text.lower() and "in" in text.lower():
            location = text.split("in", 1)[1].strip().rstrip("?.")
            weather_text = self.get_weather(location)
            task.artifacts = [{
                "parts": [{"type": "text", "text": weather_text}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message={"role": "agent", "content": {"type": "text", 
                         "text": "特定の地域の天気予報を返答してください。"}}
            )
        return task

if __name__ == "__main__":
    agent = WeatherAgent()
    run_server(agent, port=5000)
  1. 以下を実行する
python main.py
  1. Skillが追加されたことを確認する

エージェントを3体にしてみる

天気・時刻・ジョーク、3種類のエージェントを並列で起動し、それぞれのGUIを確認します。

main.py
from python_a2a import (
    A2AServer,
    agent,
    skill,
    run_server,
    TaskStatus,
    TaskState
)
import multiprocessing
from datetime import datetime
import random

# WeatherAgent 
@agent(
    name="天気予報エージェント",
    description="天気の予測情報を返答するエージェントです",
    version="1.0.0"
)
class WeatherAgent(A2AServer):
    @skill(
        name="天気予報を取得",
        description="指定の場所の天気予報を取得します",
        tags=["weather", "forecast"]
    )
    def get_weather(self, location):
        """天気予報を返答します"""
        return f"{location}は、晴れです!35度です!熱い"

    def handle_task(self, task):
        text = task.message.content.get("text", "") if task.message else ""
        if "weather" in text.lower() and "in" in text.lower():
            location = text.split("in", 1)[1].strip().rstrip("?.")
            weather_text = self.get_weather(location)
            task.artifacts = [{
                "parts": [{"type": "text", "text": weather_text}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message={"role": "agent", "content": {"type": "text",
                         "text": "特定の地域の天気予報を返答してください。"}}
            )
        return task

# 現在時刻取得エージェント 
@agent(
    name="時刻取得エージェント",
    description="現在の時刻を返答するエージェントです",
    version="1.0.0"
)
class TimeAgent(A2AServer):
    @skill(
        name="現在時刻を取得",
        description="現在のローカル時刻を取得します",
        tags=["time", "clock"]
    )
    def get_time(self):
        """現在時刻を返します"""
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def handle_task(self, task):
        text = task.message.content.get("text", "") if task.message else ""
        if "time" in text.lower():
            time_text = self.get_time()
            task.artifacts = [{
                "parts": [{"type": "text", "text": f"現在時刻: {time_text}"}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message={"role": "agent", "content": {"type": "text",
                         "text": "時刻を知りたいときは 'time' を含めてください。"}}
            )
        return task

# ジョーク提供エージェント 
@agent(
    name="ジョークエージェント",
    description="ランダムなジョークを返答するエージェントです",
    version="1.0.0"
)
class JokeAgent(A2AServer):
    @skill(
        name="ジョークを取得",
        description="面白いジョークを一つ返します",
        tags=["joke", "fun"]
    )
    def tell_joke(self):
        """ランダムなジョークを返します"""
        jokes = [
            "なぜ自転車は倒れない?答え:二(に)つあるから!",
            "プログラマーが海に行くと?『バグ』がいっぱい!",
            "数学者がバーに入ると?酒がπ倍美味しく感じる!"
        ]
        return random.choice(jokes)

    def handle_task(self, task):
        text = task.message.content.get("text", "") if task.message else ""
        if "joke" in text.lower():
            joke = self.tell_joke()
            task.artifacts = [{
                "parts": [{"type": "text", "text": joke}]
            }]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message={"role": "agent", "content": {"type": "text",
                         "text": "ジョークが欲しいときは 'joke' を含めてください。"}}
            )
        return task

def start_agent(agent_cls, port):
    srv = agent_cls()
    run_server(srv, host="0.0.0.0", port=port)

if __name__ == "__main__":
    configs = [
        (WeatherAgent, 5000),
        (TimeAgent,    5001),
        (JokeAgent,    5002)
    ]
    procs = []
    for cls, port in configs:
        p = multiprocessing.Process(target=start_agent, args=(cls, port))
        p.start()
        procs.append(p)
    for p in procs:
        p.join()

以下を実行する

python main.py

http://localhost:5000/

http://localhost:5001/

http://localhost:5002/

Routerを追加し、Taskを実行する

NetworkとRouterを設定し、自動的に適切なエージェントへtaskを振り分けます。

main.py
from python_a2a import (
    A2AServer,
    agent,
    skill,
    run_server,
    TaskStatus,
    TaskState,
    AgentNetwork,
    A2AClient,
    AIAgentRouter,
    Message,
    TextContent,
    ErrorContent,
    MessageRole
)
import multiprocessing
import time
from datetime import datetime
import random

@agent(name="天気予報エージェント", description="天気の予測情報を返す", version="1.0.0")
class WeatherAgent(A2AServer):
    @skill(name="天気予報を取得", description="指定の場所の天気を返す", tags=["weather","forecast"])
    def get_weather(self, location: str) -> str:
        return f"{location} は晴れ、気温 35℃ です。"

    def handle_task(self, task):
        msg = task.message or {}
        content = msg.get("content", {}) if isinstance(msg, dict) else {}
        text = content.get("text", "") if isinstance(content, dict) else ""

        is_ja = "天気" in text and ("教えて" in text or "ください" in text)
        is_en = "weather" in text.lower() and "in" in text.lower()

        if is_ja or is_en:
            if is_ja:
                loc = text.replace("の天気を教えて", "").strip()
            else:
                loc = text.lower().split("in",1)[1].strip().rstrip("?.")
            weather = self.get_weather(loc)
            task.artifacts = [{"parts":[{"type":"text","text":weather}]}]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message={
                    "role":"agent","content":{
                        "type":"text",
                        "text":"「東京の天気を教えて」のように場所を含めてください。"
                    }
                }
            )
        return task


@agent(name="時刻取得エージェント", description="現在の時刻を返す", version="1.0.0")
class TimeAgent(A2AServer):
    @skill(name="現在時刻を取得", description="ローカル時刻を返す", tags=["time","clock"])
    def get_time(self) -> str:
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    def handle_task(self, task):
        msg = task.message or {}
        content = msg.get("content", {}) if isinstance(msg, dict) else {}
        text = content.get("text", "") if isinstance(content, dict) else ""

        if "時刻" in text or "time" in text.lower():
            now = self.get_time()
            task.artifacts = [{"parts":[{"type":"text","text":f"現在時刻: {now}"}]}]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message={
                    "role":"agent","content":{
                        "type":"text",
                        "text":"「今の時刻を教えて」など時間を含めてください。"
                    }
                }
            )
        return task


@agent(name="ジョークエージェント", description="ランダムなジョークを返す", version="1.0.0")
class JokeAgent(A2AServer):
    @skill(name="ジョークを取得", description="面白いジョークを返す", tags=["joke","fun"])
    def tell_joke(self) -> str:
        pool = [
            "なぜ自転車は倒れない?答え:二(に)つあるから!",
            "プログラマーが海に行くと?『バグ』がいっぱい!",
            "数学者がバーに入ると?酒がπ倍美味しく感じる!"
        ]
        return random.choice(pool)

    def handle_task(self, task):
        msg = task.message or {}
        content = msg.get("content", {}) if isinstance(msg, dict) else {}
        text = content.get("text", "") if isinstance(content, dict) else ""

        if "ジョーク" in text or "joke" in text.lower():
            joke = self.tell_joke()
            task.artifacts = [{"parts":[{"type":"text","text":joke}]}]
            task.status = TaskStatus(state=TaskState.COMPLETED)
        else:
            task.status = TaskStatus(
                state=TaskState.INPUT_REQUIRED,
                message={
                    "role":"agent","content":{
                        "type":"text",
                        "text":"「ジョークを言って」とご指定ください。"
                    }
                }
            )
        return task

def start_agent(agent_cls, port):
    run_server(agent_cls(), host="0.0.0.0", port=port)

if __name__ == "__main__":
    configs = [(WeatherAgent,5001),(TimeAgent,5002),(JokeAgent,5003)]
    procs = []
    for cls, port in configs:
        p = multiprocessing.Process(target=start_agent, args=(cls, port))
        p.start(); procs.append(p)

    time.sleep(2) 

    network = AgentNetwork("SampleNetwork")
    network.add("weather","http://localhost:5001")
    network.add("time",   "http://localhost:5002")
    network.add("joke",   "http://localhost:5003")

    llm = A2AClient("http://localhost:5000/openai")
    router = AIAgentRouter(llm_client=llm, agent_network=network)

    query = "What's the weather like in Tokyo?"
    key, conf = router.route_query(query)
    print(f"Routing to {key} (confidence={conf:.2f})")

    client = network.get_agent(key)
    msg    = Message(content=TextContent(text=query), role=MessageRole.USER)
    resp   = client.send_message(msg)

    if isinstance(resp.content, TextContent):
        print("Answer:", resp.content.text)
    elif isinstance(resp.content, ErrorContent):
        print(f"Error(code={resp.content.code}):", resp.content.message)
    elif resp.artifacts:
        for art in resp.artifacts:
            for part in art.get("parts", []):
                if part.get("type")=="text":
                    print("Answer:", part["text"])
    else:
        print("Unknown response:", resp)

    print("\nAvailable Agents:")
    for info in network.list_agents():
        print(f"- {info['name']}: {info['description']}")

    for p in procs:
        p.terminate()
結果
Starting A2A server on http://0.0.0.0:5001/a2a
Google A2A compatibility: Enabled
 * Serving Flask app 'python_a2a.server.http'
 * Debug mode: off
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5001
 * Running on http://10.75.118.222:5001
INFO:werkzeug:Press CTRL+C to quit
Starting A2A server on http://0.0.0.0:5002/a2a
Google A2A compatibility: Enabled
 * Serving Flask app 'python_a2a.server.http'
 * Debug mode: off
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5002
 * Running on http://10.75.118.222:5002
INFO:werkzeug:Press CTRL+C to quit
Starting A2A server on http://0.0.0.0:5003/a2a
Google A2A compatibility: Enabled
 * Serving Flask app 'python_a2a.server.http'
 * Debug mode: off
INFO:werkzeug:WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:5003
 * Running on http://10.75.118.222:5003
INFO:werkzeug:Press CTRL+C to quit
INFO:werkzeug:127.0.0.1 - - [17/Jul/2025 16:04:47] "GET /.well-known/agent.json HTTP/1.1" 200 -
INFO:python_a2a.client.network:Added agent 'weather' from URL: http://localhost:5001
INFO:werkzeug:127.0.0.1 - - [17/Jul/2025 16:04:49] "GET /.well-known/agent.json HTTP/1.1" 200 -
INFO:python_a2a.client.network:Added agent 'time' from URL: http://localhost:5002
INFO:werkzeug:127.0.0.1 - - [17/Jul/2025 16:04:51] "GET /.well-known/agent.json HTTP/1.1" 200 -
INFO:python_a2a.client.network:Added agent 'joke' from URL: http://localhost:5003
Routing to weather (confidence=0.30)
INFO:werkzeug:127.0.0.1 - - [17/Jul/2025 16:05:38] "POST /tasks/send HTTP/1.1" 200 -
Answer: tokyo は晴れ、気温 35℃ です。

Available Agents:
- weather: 天気の予測情報を返す
- time: 現在の時刻を返す
- joke: ランダムなジョークを返す

まとめ

python_a2aを使ったマルチエージェント環境の立ち上げから、Routerによる自動タスク振り分けまでを紹介しました。次はMessageやArtifaceなどを実装してみようかなと。

ヘッドウォータース

Discussion