🤖

DatabricksでSupervisor Agentを構築してみた

に公開

こんにちは、IVRyでデータエンジニアとして働いている松田健司(@ken_3ba)です。趣味はビリヤードで、プロの試合にも出ているぐらい割とガチでやっています。

今年に入って、ビリヤードの世界大会で日本人の女性と男性がそれぞれ優勝しました!ビリヤードはマイナースポーツなのでご存知ない方も多いかもしれませんが、これは本当にすごいことでとても感動しました!

そして、優勝するとビリヤード台の上でパフォーマンスをするという慣例があるのですが、その優勝した日本人プロの方は靴を脱いで台に上がり、日本人らしさを感じました笑

優勝のポスター(引用元: https://www.billiards-days.com/20260210-1/)

さて、本日のビリヤードの話はこのへんで切り上げて本題に入ります!

はじめに

今回は、業務効率化のためにDatabricks上でLangGraphベースのSupervisor Agentを構築した話をします。設計からMLflowでの登録・評価までを紹介します。

半年前までAI Agentを触ったことすらなかった自分でも、Databricksを使えばかなり簡単に構築できました。「自分でもAIエージェントを作ってみたい!」という方、ぜひこの記事を参考にチャレンジしてみてください!

なぜAgentを作ったのか

IVRyはAIによる電話自動応答サービスを提供しており、社内でも営業活動にアイブリーを利用しています。そこから得られるデータを活用し、セールスチームの業務を支援する社内サービスを実験的に開発しています。

たとえば、「アポイント電話の終話直後に次の提案資料が自動生成されている」、といった体験を目指しています。今回はその第一歩として、複数のデータソースから情報を集め、ボタン1つで資料のドラフトを作成できるサービスを作りました。

AI Agentでセールス活動の効率化
引用: Databricks After Party 2025 LTスライド

弊社ではあらゆるデータがDatabricksに集まっているため、Databricks上でAI Agentを構築しました。実験的な開発ということもあり、Databricksとの組み合わせ事例が多く参考にしやすかったLangGraphをAgentフレームワークとして採用しています。

Agentの設計

Supervisor Agentにした理由

Supervisor Agentパターンとは、親Agentがリクエストを解析し、適切なSub Agentに処理を委譲するアーキテクチャです。各Sub Agentが結果を返し、Supervisorがそれを統合して回答します。

Multi-Agentの構成パターンには、以下のようなものがあります。

パターン 説明
ネットワーク型 すべてのAgentが互いに自由に通信でき、どのAgentも次にどのAgentを呼ぶか決定できる
Supervisor型 各Agentが単一のSupervisor Agentと通信し、Supervisorが次の呼び出しAgentを決定する
Supervisor(ツール型) Agentをツールとして扱い、Supervisor Agentがツール呼び出し可能なLLMを使って、どのAgentを呼ぶかおよび引数を決定する
階層型 Supervisorの上位にさらに別のSupervisorを置くことで、階層構造を作り管理を効率化する
カスタム型 特定のニーズに応じてAgent同士の接続を自由に定義する

Multi-Agentの構成パターン
引用: Multi-Agentシステム概要 - Qiita

今回はSupervisor(ツール型)(以降、本記事では単にSupervisorと呼びます)を採用しました。Sub Agentを@toolでツール化し、SupervisorのLLMがどのSub Agentを呼ぶかを自律的に判断する構成です。

採用した理由は以下です。

  1. 関心の分離: 各Sub Agentが専門領域に特化し、プロンプトの複雑さを抑えられる
  2. 拡張性: 新しいSub Agentを追加するだけで機能追加できる
  3. デバッグのしやすさ: どのAgentで何が行われたか明確になる

アーキテクチャの全体像

実際に構築したSupervisor Agentは以下のような構成です。

Supervisor Agentの構成

  • Supervisor Agent: リクエストを受け取り、Sub Agentにルーティングし、結果を統合して返す
  • 企業情報取得Agent: 企業の基本情報を検索・取得
  • 議事録取得Agent: ミーティングの議事録を取得・要約
  • 提案資料作成Agent: 収集した情報から提案資料のドラフトを生成

データはUnity Catalogに格納し、UDFs経由でアクセスします。

やってみた

UDFsの作成

Agentが自由にSQLを発行するとスキーマの誤認識や不要なデータ取得が起きるため、DatabricksのUDFsでデータアクセスを制限しています。

以下のようにSQLでUnity Catalog上に関数を作れます。まだTerraformで管理できないので、今後のアップデートが待ち遠しいですね。

CREATE OR REPLACE FUNCTION dummy_catalog.dummy_schema.search_company(
    company_name STRING COMMENT '検索する企業名(部分一致)'
)
RETURNS TABLE(
    account_id STRING COMMENT 'アカウントID',
    name STRING COMMENT '企業名',
    industry STRING COMMENT '業種',
    location STRING COMMENT '所在地'
)
COMMENT '企業名で企業情報を検索します。'
RETURN
    SELECT
        account_id,
        name,
        industry,
        location
    FROM dummy_catalog.dummy_schema.companies
    WHERE name LIKE CONCAT('%', company_name, '%')
    ORDER BY name
    LIMIT 20

Agent定義の作成

各Sub AgentはPythonファイルとして定義します。以下は企業情報取得Agentの例です。create_react_agentにUDFsをツールとして渡すだけでAgentが作れます。

  1. ModelConfig()MLflow登録時の設定値を取得
  2. UCFunctionToolkit — UDFsをAgentのツールとして利用可能にする。データアクセスをUDFs経由に制限できる
  3. create_react_agent — LLMとツールでReActパターンのAgentを作成
  4. set_model — MLflowのモデルとして登録可能にする
import mlflow
from langchain_community.chat_models import ChatDatabricks
from langchain_community.tools.databricks import UCFunctionToolkit
from langgraph.prebuilt import create_react_agent

# Step 1: MLflow登録時に渡される設定値を取得
model_config = mlflow.models.ModelConfig()
LLM_ENDPOINT = model_config.get("llm_endpoint")
WAREHOUSE_ID = model_config.get("warehouse_id")

UC_FUNCTIONS = [
    "dummy_catalog.dummy_schema.search_company",
]

SYSTEM_PROMPT = """あなたは企業情報取得エージェントです。
入力された企業名から企業情報を検索し、以下のJSON形式で返してください。
{"account_id": "アカウントID", "name": "企業名", "industry": "業種", "location": "所在地"}
見つからない場合はnullを返し、理由を記載してください。
"""

# Step 2: UDFsをAgentのツールとして利用可能にする

toolkit = UCFunctionToolkit(warehouse_id=WAREHOUSE_ID, function_names=UC_FUNCTIONS)
tools = toolkit.get_tools()

# Step 3: LLMとツールを組み合わせてReActパターンのAgentを作成

llm = ChatDatabricks(endpoint=LLM_ENDPOINT, temperature=0.1)
agent = create_react_agent(llm, tools, prompt=SYSTEM_PROMPT)

# Step 4: このAgentをMLflowのモデルとして登録可能にする

mlflow.models.set_model(agent)

MLflowでのSub Agent登録

作成したAgent定義ファイルをMLflowでUnity Catalogに登録します。まずトレーシングを有効化します。

import mlflow
from mlflow.models import infer_signature

CATALOG = "dummy_catalog"
SCHEMA = "dummy_schema"
LLM_ENDPOINT = "databricks-meta-llama-3-3-70b-instruct"
WAREHOUSE_ID = "xxxxxxxxxxxxxxxx"

mlflow.langchain.autolog(log_traces=True)

次に、Agent定義ファイルをMLflowに記録し、Unity Catalogに登録する関数を定義します。

  1. log_model — AgentのPythonファイルをMLflowに記録する。まだ外部から参照できない「下書き」状態
  2. register_model — Unity Catalogに正式登録し、models:/catalog.schema.model_name のURIで参照可能にする
  3. エイリアス設定@latest などの別名を付け、モデル更新時に呼び出し側のコード変更を不要にする
def register_agent(
    catalog: str,
    schema: str,
    model_name: str,
    definition_file: str,
    llm_endpoint: str,
    warehouse_id: str,
    input_example: dict,
    extra_config: dict | None = None,
) -> dict:
    """Agent定義ファイルをUnity Catalogに登録する。"""

    model_config = {
        "llm_endpoint": llm_endpoint,
        "warehouse_id": warehouse_id,
    }
    if extra_config:
        model_config.update(extra_config)

    # Step 1: AgentをMLflowに記録
    with mlflow.start_run(run_name=model_name):
        model_info = mlflow.langchain.log_model(
            lc_model=os.path.join(os.getcwd(), definition_file),
            name="agent",
            input_example=input_example,
            model_config=model_config,
        )

    # Step 2: Unity Catalogに登録
    registered_model_name = f"{catalog}.{schema}.{model_name}"
    mlflow.set_registry_uri("databricks-uc")
    registered_model = mlflow.register_model(
        model_uri=model_info.model_uri,
        name=registered_model_name,
    )

    # Step 3: エイリアスを設定
    client = mlflow.tracking.MlflowClient()
    client.set_registered_model_alias(
        name=registered_model_name,
        alias="latest",
        version=registered_model.version,
    )

    return {
        "model_uri": model_info.model_uri,
        "registered_model_name": registered_model_name,
        "version": registered_model.version,
    }

あとはこの関数を呼び出すだけで、各Sub Agentを登録できます。

result = register_agent(
    catalog=CATALOG,
    schema=SCHEMA,
    model_name="company_search_agent",
    definition_file="company_search_agent_definition.py",
    llm_endpoint=LLM_ENDPOINT,
    warehouse_id=WAREHOUSE_ID,
    input_example={
        "messages": [{"role": "user", "content": "〇〇株式会社を検索してください"}]
    },
)
print(f"Agent 登録完了! Version: {result['version']}")

MLflowでのSupervisor Agent登録

次にSupervisor Agentを登録します。ポイントはSub Agentを@toolでツール化し、Supervisorから呼び出す点です。

  1. load_agent_model — Unity Catalog登録済みのSub Agentをエイリアスでロード。Sub Agent更新時もSupervisor側の変更不要
  2. @tool — 各Sub AgentをLangChainのツールとして定義。SupervisorのLLMがどのSub Agentを呼ぶか自律的に判断する
  3. create_react_agent — Supervisor自身もReActパターンで作成。Sub Agentと同じ仕組みでアーキテクチャが統一される
import mlflow
from langchain_community.chat_models import ChatDatabricks
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent

model_config = mlflow.models.ModelConfig()
LLM_ENDPOINT = model_config.get("llm_endpoint")
CATALOG = model_config.get("catalog")
SCHEMA = model_config.get("schema")

mlflow.set_registry_uri("databricks-uc")


# Step 1: Unity Catalogから登録済みSub Agentをロード
def load_agent_model(agent_name: str):
    model_uri = f"models:/{CATALOG}.{SCHEMA}.{agent_name}@latest"
    return mlflow.langchain.load_model(model_uri)


# Step 2: 各Sub Agentをツールとして定義
@tool
def call_company_search(company_name: str) -> str:
    """企業名から企業情報を検索します。"""
    agent = load_agent_model("company_search_agent")
    result = agent.invoke(
        {"messages": [{"role": "user", "content": f"{company_name}を検索してください"}]}
    )
    return result["messages"][-1].content


@tool
def call_meeting_notes(company_name: str) -> str:
    """企業に関連するミーティングの議事録を取得します。"""
    agent = load_agent_model("meeting_notes_agent")
    result = agent.invoke(
        {"messages": [{"role": "user", "content": f"{company_name}の議事録を取得してください"}]}
    )
    return result["messages"][-1].content


@tool
def call_document_generator(context: str) -> str:
    """収集した情報をもとに提案資料のドラフトを生成します。"""
    agent = load_agent_model("document_generator_agent")
    result = agent.invoke(
        {"messages": [{"role": "user", "content": f"以下の情報から資料を作成してください:\n{context}"}]}
    )
    return result["messages"][-1].content


SYSTEM_PROMPT = """あなたはSupervisor(司令塔)エージェントです。
ユーザーの質問を解釈し、適切なSub Agentを呼び出して回答を統合します。

[利用可能なツール]
1. call_company_search: 企業名から企業情報を検索
2. call_meeting_notes: 企業に関連する議事録を取得
3. call_document_generator: 収集した情報から提案資料を生成

[ワークフロー]
1. call_company_search で企業情報を取得
2. call_meeting_notes で関連する議事録を取得
3. call_document_generator で資料のドラフトを生成
"""

# Step 3: Supervisor AgentをReActパターンで作成
tools = [call_company_search, call_meeting_notes, call_document_generator]
llm = ChatDatabricks(endpoint=LLM_ENDPOINT, temperature=0.1)
agent = create_react_agent(llm, tools, prompt=SYSTEM_PROMPT)
mlflow.models.set_model(agent)

Sub Agentと同様にregister_agentで登録します。extra_configでSub Agentのバージョン情報を渡し、定義ファイル内からModelConfig経由で参照します。

result_supervisor = register_agent(
    catalog=CATALOG,
    schema=SCHEMA,
    model_name="proposal_supervisor_agent",
    definition_file="proposal_supervisor_agent_definition.py",
    llm_endpoint=LLM_ENDPOINT,
    warehouse_id=WAREHOUSE_ID,
    input_example={
        "messages": [{"role": "user", "content": "〇〇株式会社の提案資料を作成してください"}]
    },
    extra_config={
        "catalog": CATALOG,
        "schema": SCHEMA,
        "company_search_version": result_company_search["version"],
        "meeting_notes_version": result_meeting_notes["version"],
        "document_generator_version": result_document_generator["version"],
    },
)
print(f"Supervisor Agent 登録完了! Version: {result_supervisor['version']}")

Agentを使用する

登録したAgentはmlflow.langchain.load_modelでロードするだけで実行できます。

import mlflow

mlflow.set_registry_uri("databricks-uc")

# Unity CatalogからAgentをロード
model_uri = "models:/dummy_catalog.dummy_schema.proposal_supervisor_agent@latest"
agent = mlflow.langchain.load_model(model_uri)

# 実行
result = agent.invoke({
    "messages": [{"role": "user", "content": "〇〇株式会社の提案資料を作成してください"}]
})
print(result["messages"][-1].content)

実行すると、以下の流れで処理されます。

  1. Supervisorが質問を解析し、必要なSub Agentを判断
  2. 企業情報取得Agentが企業情報を検索
  3. 議事録取得Agentが関連する議事録を取得
  4. 資料作成Agentが提案資料のドラフトを生成
  5. Supervisorが結果を統合して回答

MLflowを使ってみて便利だったこと

特に便利だった機能を紹介します。

MLflow Tracing

mlflow.langchain.autolog() を有効にすると、Agentの処理フローが自動でトレーシングされます。どのSub Agentがどんな結果を返したか可視化され、デバッグに役立ちます。

MLflow Tracing
引用: DatabricksにおけるMLflow Tracing

評価

MLflowの評価機能で、モデルごとの出力を並べて比較できます。人間の目で実際の回答を見比べられるので、プロンプトやモデルの改善サイクルを回しやすくなります。

MLflow Evaluate
引用: MLflow 2.4のmlflow.evaluateとアーティファクトビューでLLMの評価が捗る件

モデルのバージョン管理とデプロイ

MLflowでモデルをバージョン管理でき、Databricks Asset Bundlesを使えばModel Servingとして簡単にAPIデプロイできます。

モデルのバージョン管理
引用: Workspace Model Registry の例

Agent Bricksへの期待

DatabricksではAgent Bricksがすでに海外リージョンで公開されており、より簡単にAgentを作成できるようになります。日本リージョンへの展開が待ち遠しいです。
https://docs.databricks.com/aws/ja/generative-ai/agent-bricks/

まとめ

業務効率化のためにDatabricks上でLangGraphベースのSupervisor Agentを構築し、設計からMLflowでの登録・評価までを紹介しました。

Databricksを使えば、Agent定義からMLflowでの登録・評価・デプロイまで一気通貫で行えます。Databricks Appsと組み合わせれば社内アプリも簡単に構築でき、こうしたトータルソリューションがDatabricksの魅力です。


IVRyではキャリア登録やカジュアル面談の機会をご用意しています。ご興味のある方はぜひ以下よりお申し込みください。

https://herp.careers/v1/ivry/wmZiOSAmZ4SQ
https://www.notion.so/209eea80adae800483a9d6b239281f1b?pvs=21

GitHubで編集を提案
IVRyテックブログ

Discussion