🐈

Agent Development Kit (ADK) をSlackから呼び出しA2Aにしてみた。

に公開

はじめに

近年、LLM(大規模言語モデル)を活用したエージェントアプリケーションが注目を集めています。本記事では、Python+Google ADK をベースに構築されたパーソナルアシスタントエージェント「nekota-agent」の構成と実装のポイントをご紹介します。

https://x.com/furuta_katsumi/status/1924043704349900955

プロジェクト概要

  • 名前:nekota-agent
  • 説明:AI を活用したパーソナルアシスタントエージェント。日常的なタスク管理や情報収集をサポート。
  • バージョン:1.0.0
[project]
name = "nekota-agent"
version = "1.0.0"
description = "AIを活用したパーソナルアシスタントエージェント。日常的なタスク管理や情報収集をサポートします。"
requires-python = ">=3.11"
dependencies = [
    "google-adk>=0.5.0",
    "litellm>=1.70.0",
    "notion-client>=2.3.0",
    "python-dotenv>=1.1.0",
    "slack-bolt>=1.23.0",
]

ディレクトリ構成

.
├ slack_bot.py            # Slack 連携のエントリポイント
├ pyproject.toml
├ nekota/                 # メインのパーソナルアシスタント
│   └ agent.py
├ nekota_task/            # タスク管理(Notion)エージェント
│   └ agent.py
├ nekota_note/            # ノート(Obsidian)連携エージェント
│   └ agent.py
└ .env                    # 各種 API キー・トークン

コアエージェント(nekota/agent.py)

LLM モデルとして o4-mini を利用し、以下のツールとサブエージェントを登録しています。

from google.adk.models.lite_llm import LiteLlm
from google.adk.agents       import Agent
from nekota_task.agent       import root_agent as task_agent

llm_model = LiteLlm(model="openrouter/openai/o4-mini")

def get_current_date_and_time() -> str:
    # 現在日時を返すシンプルなツール
    [...]

root_agent = Agent(
    name="nekota_agent",
    model=llm_model,
    description="パーソナルアシスタントエージェント。",
    instruction="…上司の意図を汲み迅速に行動し…",
    tools=[get_current_date_and_time],
    sub_agents=[task_agent],
)
  • 日付/時刻取得ツール
  • タスク管理サブエージェント(後述)

タスク管理エージェント(nekota_task/agent.py)

Notion API を用いて「未完了のタスク一覧取得」と「タスク追加」を行うツールを提供します。

12
from notion_client import Client

notion = Client(auth=os.environ["NOTION_API_KEY"])
NOTION_DATABASE_ID = os.environ["NOTION_DATABASE_ID"]

def get_task_name() -> dict:
    # 未完了タスクを取得
    [...]

def send_task_to_tool(task: str) -> dict:
    # タスクを Notion に追加
    [...]

root_agent = Agent(
    name="nekota_task_agent",
    model=LiteLlm(model="openrouter/google/gemini-2.0-flash-001"),
    tools=[get_task_name, send_task_to_tool],
)
  • get_task_name:未完了タスクの一覧を Notion から取得
  • send_task_to_tool:新規タスクを Notion データベースに追加

ノート連携エージェント(nekota_note/agent.py)

Obsidian などのノートアプリと通信するエージェント(現状ツールは未登録)。

llm_model = LiteLlm(model="openrouter/google/gemini-2.0-flash-001")


API_URL = os.getenv("OBSIDIAN_REST_API_URL", "https://127.0.0.1:27124").rstrip('/')
API_KEY = os.getenv("OBSIDIAN_REST_API_KEY", "")

def obsidian_search(query: str) -> dict:
    """Obsidian vaultを検索し、検索結果を返します。"""
    print(query)
    try:
        url = f"{API_URL}/search/simple/"
        headers = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {}
        params = {"query": query}
        # Add: 出力するcurlコマンドを生成してログに表示
        response = requests.post(url, headers=headers, params=params, verify=False)
        response.raise_for_status()
        return {"status": "success", "results": response.json()}
    except Exception as e:
        return {"status": "error", "message": str(e)}

def obsidian_get(path: str) -> dict:
    """指定したパスのノート内容を取得して返します。"""
    try:
        path_encoded = quote(path, safe='')
        url = f"{API_URL}/vault/{path_encoded}"
        headers = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {}
        response = requests.get(url, headers=headers, verify=False)
        response.raise_for_status()
        return {"status": "success", "content": response.text}
    except Exception as e:
        return {"status": "error", "message": str(e)}

def obsidian_add(content: str) -> dict:
    """指定したマークダウンコンテンツをアクティブなノートに追加します。"""
    try:
        url = f"{API_URL}/active/"
        headers = {"Authorization": f"Bearer {API_KEY}"} if API_KEY else {}
        headers["Content-Type"] = "text/markdown
        response = requests.post(url, headers=headers, data=content, verify=False)
        response.raise_for_status()
        return {"status": "success", "response": response.text}
    except Exception as e:
        return {"status": "error", "message": str(e)}


root_agent = Agent(
    name="nekota_note_agent",
    model=llm_model,
    description="noteアプリ(obsidian)との通信を行います。",
    instruction="個人が情報を収集、整理、蓄積した情報へアクセスできます。",
    tools=[],
)

Agent同士でタスクの受け渡しもしてくれます。
https://x.com/furuta_katsumi/status/1924099390647783649


Slack Bot 連携(slack_bot.py)

Slack Bolt+Socket Mode でエージェントに会話を中継し、スレッドごとにセッション管理します。

app = App(token=SLACK_BOT_TOKEN)

# ADKのセッションとランナーを初期化
dotenv_app_name = "nekota_note_slack"
session_service = InMemorySessionService()
runner = Runner(agent=root_agent, app_name=dotenv_app_name, session_service=session_service)
# 有効なスレッドIDを管理するためのセットを追加
valid_threads = set()

@app.event("app_mention")
def handle_message(event, say):
    print("=============== app_mention ==================")
    # Bot自身のメッセージは無視
    if event.get("bot_id"):
        return
    user_id = event.get("user")
    text = event.get("text")
    # スレッドTSがあればそれを使い、なければメッセージTSをスレッドIDとする
    thread_ts = event.get("thread_ts", event.get("ts"))
    session_id = thread_ts
    # valid_threadsセットに登録
    valid_threads.add(session_id)
    # セッションを取得、存在しなければ作成
    response_text = "" 
    session_service.create_session(app_name=dotenv_app_name, user_id=user_id, session_id=session_id)
    content = types.Content(role="user", parts=[types.Part(text=text)])
    events = runner.run(user_id=user_id, session_id=session_id, new_message=content)
    for evt in events:
      if evt.content and evt.content.parts:
        for part in evt.content.parts:
          if part.text:
            response_text += part.text
    # スレッドに返信
    say(text=response_text, thread_ts=thread_ts)

@app.event("message")
def handle_thread_reply(event, say):
    print("=============== message ==================")
    # ボット自身やファイル共有系などのサブタイプは無視
    if event.get("bot_id") or event.get("subtype"):
        return

    # thread_ts があって親メッセージ(ts)と異なれば「リプライ」とみなすbot
    thread_ts = event.get("thread_ts")
    ts       = event.get("ts")
    
    if thread_ts and thread_ts != ts:
        # スレッドがapp_mentionで始まっていない場合は無視
        if thread_ts not in valid_threads:
            return
        channel = event["channel"]
        user_id = event["user"]
        text    = event.get("text", "")

        session_service.get_session(app_name=dotenv_app_name, user_id=user_id, session_id=thread_ts)
        content = types.Content(role="user", parts=[types.Part(text=text)])
        events = runner.run(user_id=user_id, session_id=thread_ts, new_message=content)
        response_text = ""
        for evt in events:
            if evt.content and evt.content.parts:
                for part in evt.content.parts:
                    if part.text:
                        response_text += part.text

        print(response_text)  
        say(text=response_text, thread_ts=thread_ts)


if __name__ == "__main__":
    # Socket Modeでアプリを起動
    handler = SocketModeHandler(app, SLACK_APP_TOKEN)
    handler.start() 
  • メンション/スレッドをトリガーに会話を継続
  • InMemorySessionService でユーザー毎・スレッド毎に状態を保持

環境構築と実行方法

  1. Python ≥3.12 の環境を用意
  2. .env ファイルに以下を設定
    • SLACK_BOT_TOKEN
    • SLACK_APP_TOKEN
    • NOTION_API_KEY
    • NOTION_DATABASE_ID
  3. 依存ライブラリのインストール
    pip install .
    
  4. Slack Bot を起動
    python slack_bot.py
    

今後の展望

  • ノートエージェント (nekota_note_agent) のツール実装
  • Web UI や CLI での対話インターフェース
  • Azure/AWS Lambda などでのサーバーレス化
  • Immemoryの揮発対応

まとめ

この「nekota-agent」は、Google ADK を通じて複数のサブエージェント(タスク管理/ノート連携)を組み合わせ、Slack 上でシームレスに対話できるパーソナルアシスタントです。今後さらに機能を拡張し、日々の情報整理・蓄積を強力にサポートしていきます。ぜひお試しください!

Discussion