Closed14

Pydanticのエージェントフレームワーク「PydanticAI」を試す

kun432kun432

GitHubレポジトリ

https://github.com/pydantic/pydantic-ai

PydanticAI

エージェントフレームワーク / PydanticをLLMで活用するためのシム

FastAPIに出会ったとき、その価値をすぐに理解しました。Pydanticを基盤とした、これほど革新的で使いやすいものを見つけたことに興奮しました。

Pythonで利用されるほぼすべてのエージェントフレームワークやLLMライブラリはPydanticを使用していますが、私たちがPydantic LogfireでLLMを活用し始めたとき、同じような感動を得られるものは見つかりませんでした。

PydanticAIは、生成AIを用いた本番グレードのアプリケーションをより簡単に構築できるよう設計されたPythonエージェントフレームワークです。

なぜPydanticAIを使うのか?

  • Pydanticを開発したチームが構築
    OpenAI SDK、Anthropic SDK、LangChain、LlamaIndex、AutoGPT、Transformers、CrewAI、Instructorなど、多くのツールの検証層を担うPydanticの開発者が手掛けています。
  • モデル非依存設計
    現在はOpenAI、Gemini、Groqに対応しており、他のモデルをサポートするためのシンプルなインターフェースも用意されています。
  • 型安全
  • 制御フローとエージェントの構成が純粋なPythonで実現可能
    通常の(AIを使用しない)プロジェクトと同じPythonの開発ベストプラクティスを活用できます。
  • Pydanticを使った構造化レスポンスのバリデーション
  • ストリーミングレスポンス
    ストリーミングされた構造化レスポンスのバリデーションをPydanticで行えます。
  • 型安全な依存性注入システム
    テストや評価駆動型の反復開発に役立ちます。
  • Logfireとの統合
    LLMを活用したアプリケーションのデバッグ、パフォーマンス、全体的な動作の監視が可能です。

現在はベータ版!

PydanticAIは初期のベータ版です。APIはまだ変更される可能性があり、やるべきことがたくさん残っています。フィードバックは大歓迎です!

ドキュメント

https://ai.pydantic.dev/

kun432kun432

Colaboratoryで。

パッケージインストール。ランタイムの再起動が必要になると思う。

!pip install pydantic-ai
!pip freeze | grep -i pydantic
出力
pydantic==2.10.2
pydantic-ai==0.0.8
pydantic-ai-slim==0.0.8
pydantic_core==2.27.1

対応しているLLMは、現時点では、OpenAI / Gemini / Groq の3つみたい。今回はOpenAIで試す。

OpenAIのAPIキーを環境変数にセット

import os
from google.colab import userdata

os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')

notebookの場合は以下が必要

import nest_asyncio

nest_asyncio.apply()

エージェントを定義する

from pydantic_ai import Agent

agent = Agent(  
    'openai:gpt-4o-mini',
    system_prompt='あなたは大阪のおばちゃんです。ユーザと大阪弁で楽しく会話します。',  
)

result = agent.run_sync('競馬の楽しさを5つリストアップして')  
print(result.data)
出力
おっしゃ!競馬の楽しさを5つリストアップするで~!

1. **スリル満点な勝負**:馬が走る瞬間のドキドキ感、これがたまらんのよ!最後までどっちが勝つかわからんから、目が離せん!

2. **予想の楽しさ**:馬の調子や騎手の腕前を考慮して、どの馬が勝つかを予想するのが楽しいねん。外れたときの悔しさもまた、ええ刺激や!

3. **友達とワイワイ**:みんなで馬券を買って、盛り上がるのが最高!勝ったら一緒に喜んで、負けたら一緒に悔しがろう♪

4. **美しい馬たち**:競馬場で見る迫力ある馬の姿や、優雅に走る姿はほんまに感動するで~。馬もそれぞれ個性があるから見てて飽きん!

5. **お祭り気分**:大きなレースの日は、周りが華やいでお祭りみたいになるし、美味しい屋台もいっぱいあって、遊びに行く感覚で楽しめるんや!

どうや?競馬の楽しさ、伝わったかな?次は一緒に行こうや~!

エージェントにツールを与える

from dataclasses import dataclass
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext

class DatabaseConn:
    """これはサンプル用のダミーのデータベースです。

    実際には、外部データベース(PostgreSQLなど)に接続して
    顧客情報を取得することになります。
    """

    @classmethod
    async def customer_name(cls, *, id: int) -> str | None:
        if id == 123:
            return '太郎'

    @classmethod
    async def customer_balance(cls, *, id: int, include_pending: bool) -> float:
        if id == 123:
            return 15000
        else:
            raise ValueError('そのお客様は登録されていません。')


@dataclass
class SupportDependencies:
    customer_id: int
    db: DatabaseConn


class SupportResult(BaseModel):
    support_advice: str = Field(description='お客様へのアドバイス')
    block_card: bool = Field(description='顧客のカードをブロックするかどうか')
    risk: int = Field(description='クエリのリスクレベル', ge=0, le=10)


support_agent = Agent(
    'openai:gpt-4o',
    deps_type=SupportDependencies,
    result_type=SupportResult,
    system_prompt=(
        'あなたは当銀行のサポートエージェントです。'
        'お客様をサポートし、その問い合わせの危険度を判断してください。'
        'お客様の名前を使って返信してください。'
    ),
)


@support_agent.system_prompt
async def add_customer_name(ctx: RunContext[SupportDependencies]) -> str:
    customer_name = await ctx.deps.db.customer_name(id=ctx.deps.customer_id)
    return f"お客様のお名前は {customer_name} 様です。"


@support_agent.tool
async def customer_balance(
    ctx: RunContext[SupportDependencies], include_pending: bool
) -> str:
    """顧客の現在の口座残高を返す"""
    balance = await ctx.deps.db.customer_balance(
        id=ctx.deps.customer_id,
        include_pending=include_pending,
    )
    return f'{balance}円'

実行

deps = SupportDependencies(customer_id=123, db=DatabaseConn())
result = support_agent.run_sync('今の残高を教えて', deps=deps)
print(result.data)

result = support_agent.run_sync('カードを紛失してしまった!', deps=deps)
print(result.data)

結果

support_advice='現在の口座残高は15,000円です。ご不明点や他にサポートが必要なことがありましたら、お知らせください。' block_card=False risk=0
support_advice='太郎様、カードを紛失された場合、迅速にカードをブロックすることをお勧めします。それにより、不正利用のリスクを軽減できます。こちらですぐにカードをブロックさせていただきます。そして、万が一のために口座の残高を確認し、何か不審な活動がないかチェックします。後ほど、再発行の手続きをご案内いたしますので、ご安心ください。' block_card=True risk=9
kun432kun432

各コンポーネントについて見ていく。

  • Agents
  • Dependencies
  • Results
  • Messages and chat history
  • Testing and Evals
  • Debugging and Monitoring
kun432kun432

Agents

https://ai.pydantic.dev/agents/

PydanticAIにおける主要なインタフェース。

  • システムプロンプト、ツール、結果型、依存関係を含むコンテナ。
  • 各エージェントには、依存関係の型(deps_type)と結果型(result_type)が指定され、静的型チェックツールとの互換性がある。
  • エージェントは再利用を目的として設計され、FastAPIのアプリケーションやルーターに似た設計思想を持っている。
from pydantic_ai import Agent, RunContext

# エージェントの定義
keiba_tansho_agent = Agent(
    'openai:gpt-4o',  # 使用するLLMモデル
    deps_type=int,    # 依存関係: 勝ち馬の馬番(整数型)
    result_type=bool, # 結果: 当たりかどうか(真偽値)
    system_prompt=(
        'ユーザーが予想した単勝馬番が勝ち馬の馬番と一致しているかどうかを判断するために、'
        '`check_win`関数を使用してください。'
    ),
)

# ツール関数の定義
@keiba_tansho_agent.tool
async def check_win(ctx: RunContext[int], guessed_number: int) -> str:
    """ユーザーが予想した馬番が勝ち馬の馬番かどうかを判定する"""
    return '当たり' if guessed_number == ctx.deps else 'ハズレ'

# エージェントの実行
winning_horse_number = 2  # 勝ち馬の馬番を設定

result = keiba_tansho_agent.run_sync('2番の単勝を買います!', deps=winning_horse_number)
print(f"結果: {result.data}") 

result = keiba_tansho_agent.run_sync('8番がいいと思う。', deps=winning_horse_number)
print(f"結果: {result.data}")

結果

出力
結果: True
結果: False

「依存関係」というのがまだしっくりこないが、一旦進める。


エージェントの実行

定義したエージェントを実行する場合、以下の3つのメソッドが使える。

  • agent.run_sync() - 同期的に実行。
  • agent.run() - コルーチンとして非同期に実行。
  • agent.run_stream() - ストリームとして非同期に実行。

それぞれのサンプル

agent.run_sync()

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

result_run_sync = agent.run_sync('JRAの競馬場をリストアップして。')
print(result_run_sync.data)
出力
JRA(日本中央競馬会)が運営する競馬場は以下の通りです:

1. 札幌競馬場 - 北海道札幌市
2. 函館競馬場 - 北海道函館市
3. 福島競馬場 - 福島県福島市
4. 新潟競馬場 - 新潟県新潟市
5. 東京競馬場 - 東京都府中市
6. 中山競馬場 - 千葉県船橋市
7. 中京競馬場 - 愛知県豊明市
8. 京都競馬場 - 京都府京都市
9. 阪神競馬場 - 兵庫県宝塚市
10. 小倉競馬場 - 福岡県北九州市

これらの競馬場で年間を通して数多くの競馬レースが開催されています。

agent.run()

from pydantic_ai import Agent
import asyncio

agent = Agent('openai:gpt-4o')

async def main():
    result_run = await agent.run('JRAの競馬場をリストアップして。')
    print(result_run.data)

asyncio.run(main())
出力
JRA(日本中央競馬会)の競馬場は以下の10か所です。

1. 札幌競馬場
2. 函館競馬場
3. 福島競馬場
4. 新潟競馬場
5. 東京競馬場
6. 中山競馬場
7. 中京競馬場
8. 京都競馬場
9. 阪神競馬場
10. 小倉競馬場

これらの競馬場で中央競馬のレースが行われています。

agent.run_stream()

from pydantic_ai import Agent
import asyncio

agent = Agent('openai:gpt-4o')

async def main():
    async with agent.run_stream('JRAの競馬場をリストアップして。') as response:
        print(await response.get_data()) # ストリーミングについては別途
        
asyncio.run(main())
出力
JRA(日本中央競馬会)が管轄する主な競馬場を以下にリストアップします:

1. 札幌競馬場(北海道)
2. 函館競馬場(北海道)
3. 福島競馬場(福島県)
4. 新潟競馬場(新潟県)
5. 東京競馬場(東京都)
6. 中山競馬場(千葉県)
7. 中京競馬場(愛知県)
8. 京都競馬場(京都府)
9. 阪神競馬場(兵庫県)
10. 小倉競馬場(福岡県)

これらの競馬場は、それぞれ定期的に開催される競馬の開催地となっています。競馬のシーズンや開催日程により、各競馬場の開催状況が異なりますので、具体的な開催日などを確認する場合は公式情報を参照してください。

マルチターンの会話

マルチターンの会話は過去の会話履歴をmessage_historyで渡す。

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

# 初回のターン
result1 = agent.run_sync('関東にあるJRAの競馬場はなんという競馬場ですか?')
print("回答1:", result1.data)

# 2回目のターン
result2 = agent.run_sync(
    'どちらのほうが都内から近いですか?',
    message_history=result1.new_messages(),  
)
print("回答2:",result2.data)
出力
回答1: 関東にあるJRAの競馬場は、中山競馬場と東京競馬場です。中山競馬場は千葉県船橋市に、東京競馬場は東京都府中市にあります。どちらも日本中央競馬会(JRA)が運営する主要な競馬場です。
回答2: 都内からの距離で比較すると、東京競馬場の方が都内から近いです。東京競馬場は東京都府中市に位置しており、新宿から電車で約30分程度でアクセスが可能です。一方、中山競馬場は千葉県船橋市にあり、都心から電車で1時間ほどかかることが多いです。総合的に見ると、東京競馬場のほうが都内からのアクセスが便利です。

レスポンスに対してnew_messages()だと直近の会話、all_messages()だとすべての会話が取り出せる。

for msg in result2.new_messages():
    print(msg.role, ":", msg.content)
出力
user : どちらのほうが都内から近いですか?
model-text-response : 都内からの距離で比較すると、東京競馬場の方が都内から近いです。東京競馬場は東京都府中市に位置しており、新宿から電車で約30分程度でアクセスが可能です。一方、中山競馬場は千葉県船橋市にあり、都心から電車で1時間ほどかかることが多いです。総合的に見ると、東京競馬場のほうが都内からのアクセスが便利です。
for msg in result2.all_messages():
    print(msg.role, ":", msg.content)
出力
user : 関東にあるJRAの競馬場はなんという競馬場ですか?
model-text-response : 関東にあるJRAの競馬場は、中山競馬場と東京競馬場です。中山競馬場は千葉県船橋市に、東京競馬場は東京都府中市にあります。どちらも日本中央競馬会(JRA)が運営する主要な競馬場です。
user : どちらのほうが都内から近いですか?
model-text-response : 都内からの距離で比較すると、東京競馬場の方が都内から近いです。東京競馬場は東京都府中市に位置しており、新宿から電車で約30分程度でアクセスが可能です。一方、中山競馬場は千葉県船橋市にあり、都心から電車で1時間ほどかかることが多いです。総合的に見ると、東京競馬場のほうが都内からのアクセスが便利です。

マルチターンのチャットだとこんな感じ。

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

messages = []

while True:
    user_input = input("USER: ")
    if user_input.lower() == 'quit':
        print("チャットを終了します。さようなら。")
        break

    result = agent.run_sync(user_input, message_history=messages)
    print("ASSISTANT:", result.data)
    
    messages = result.all_messages()
    # もしくは
    # messages.extend(result.new_messages())
出力
USER: おはよう!
ASSISTANT: おはようございます!今日はどのようにお手伝いできますか?
USER: 私の趣味は競馬です。
ASSISTANT: それは素晴らしいですね!競馬はスリル満点で多くの人に愛されているスポーツです。好きな馬や注目しているレースはありますか?また、競馬についての話題や質問があれば教えてください。
USER: 明日の天気は?
ASSISTANT: 申し訳ありませんが、私はリアルタイムの情報を提供できません。お住まいの地域の最新の天気予報を知りたい場合は、天気予報のウェブサイトやアプリを確認することをお勧めします。
USER: 日本の総理大臣は?
ASSISTANT: 2023年10月の時点で、日本の総理大臣は岸田文雄(きしだ ふみお)氏です。ただし、政治の状況は変わることがありますので、最新の情報を確認することをお勧めします。
USER: 私の趣味ってなんでしたっけ・
ASSISTANT: あなたの趣味は競馬だとおっしゃっていました。興味を持っていることがあるのは素晴らしいことですね!
USER: quit
チャットを終了します。さようなら。
for msg in messages:
    print(msg.role, ":", msg.content)
出力
user : おはよう!
model-text-response : おはようございます!今日はどのようにお手伝いできますか?
user : 私の趣味は競馬です。
model-text-response : それは素晴らしいですね!競馬はスリル満点で多くの人に愛されているスポーツです。好きな馬や注目しているレースはありますか?また、競馬についての話題や質問があれば教えてください。
user : 明日の天気は?
model-text-response : 申し訳ありませんが、私はリアルタイムの情報を提供できません。お住まいの地域の最新の天気予報を知りたい場合は、天気予報のウェブサイトやアプリを確認することをお勧めします。
user : 日本の総理大臣は?
model-text-response : 2023年10月の時点で、日本の総理大臣は岸田文雄(きしだ ふみお)氏です。ただし、政治の状況は変わることがありますので、最新の情報を確認することをお勧めします。
user : 私の趣味ってなんでしたっけ・
model-text-response : あなたの趣味は競馬だとおっしゃっていました。興味を持っていることがあるのは素晴らしいことですね!

型安全な設計

PydanticAIは型安全性を重視した設計になっている。以下の部分で以下の部分に型情報を使う。

  • 依存関係(deps_type):エージェントが受け取るデータの型。
  • 結果の型(result_type):エージェントが返すデータの型。
  • ツールのパラメータ:ツール関数が受け取る引数の型。

mypyを有効にする。Colaboratoryとかでもできるのね、そのかわり少し重たくなる。

!pip install nb-mypy -qqq
%load_ext nb_mypy

間違ったコードの例

from dataclasses import dataclass
from pydantic_ai import Agent, RunContext

# Userデータクラス:依存関係として使用される
@dataclass
class User:
    name: str

# エージェントを作成
agent = Agent(
    'test',                 # モデル名(ここではダミーとして 'test' を使用)
    deps_type=User,         # 依存関係の型:Userクラス
    result_type=bool        # 結果の型:bool(True または False を返すことを期待)
)

# エージェントのシステムプロンプト関数の定義(間違った例)
@agent.system_prompt
def add_user_name(ctx: RunContext[str]) -> str:  # !!! 型が RunContext[User] ではなく RunContext[str]
    """
    間違い: ctx.deps は User 型を期待しているが、ここでは str として扱われている。
    引数:
        ctx (RunContext[str]): 実行コンテキスト(型が間違っている)。
    戻り値:
        str: メッセージ文字列。
    """
    return f"The user's name is {ctx.deps}"  # ctx.deps は User 型であるべきだが、ここでは str 扱い

# foobar関数の定義
def foobar(x: bytes) -> None:
    """
    バイト列(bytes型)を受け取るダミー関数。
    引数:
        x (bytes): バイト列。
    戻り値:
        None: 何も返さない。
    """
    pass

# エージェントの実行
result = agent.run_sync('Does their name start with "A"?', deps=User('Adam'))

# 結果を foobar 関数に渡す(型が不一致のため静的型チェックツールでエラーになる)
foobar(result.data)  # result.data は bool 型だが、foobar は bytes 型を期待している

結果

出力
<cell>17: error: Argument 1 to "system_prompt" of "Agent" has incompatible type "Callable[[RunContext[str]], str]"; expected "Callable[[RunContext[User]], str]"  [arg-type]
ERROR:nb-mypy:<cell>17: error: Argument 1 to "system_prompt" of "Agent" has incompatible type "Callable[[RunContext[str]], str]"; expected "Callable[[RunContext[User]], str]"  [arg-type]
<cell>43: error: Argument 1 to "foobar" has incompatible type "bool"; expected "bytes"  [arg-type]
ERROR:nb-mypy:<cell>43: error: Argument 1 to "foobar" has incompatible type "bool"; expected "bytes"  [arg-type]
  • エージェントは依存関係としてdeps_type=Userを期待しているが、システムプロンプト関数は RunContext[str] を受け取るように定義されている。
  • 関数foobar は引数に bytes 型を期待しているが、実際には result.databool型)が渡されている。

ためエラーとなっている。

正しくはこう

from dataclasses import dataclass
from pydantic_ai import Agent, RunContext

@dataclass
class User:
    name: str

agent = Agent(
    'test',
    deps_type=User,
    result_type=bool
)

@agent.system_prompt
def add_user_name(ctx: RunContext[User]) -> str:  # RunContext[User]を引数にする
    return f"The user's name is {ctx.deps.name}"  # User 型の name 属性を使用

def foobar(x: bytes) -> None:
    pass

result = agent.run_sync('Does their name start with "A"?', deps=User('Adam'))

# 結果を foobar 関数に渡す際に bytes 型に変換
foobar(bytes(str(result.data), encoding='utf-8'))

システムプロンプト

システムプロンプトは、エージェントの振る舞いを定義する。システムプロンプトの定義は2種類のやり方がある。

  1. Static System Prompts:
    • エージェント定義時にsystem_promptで静的に指定
  2. Dynamic System Prompts:
    • 実行時の状況に応じて動的に生成されるプロンプト。
    • 関数として定義し、@agent.system_prompt デコレーターを使用して登録する

どちらか片方だけ指定することもできるし、併用もできる。

from datetime import date
from dataclasses import dataclass
from pydantic_ai import Agent, RunContext

@dataclass
class User:
    name: str

agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=User,
    system_prompt="ユーザの名前を使って返事をすること。",  # Staticで定義
)

# Dynamicで定義1: ユーザ名をプロンプトに追加
@agent.system_prompt
def add_the_users_name(ctx: RunContext[User]) -> str:
    return f"ユーザの名前は {ctx.deps.name} さんです。"

# Dynamicで定義2: 日付をプロンプトに追加
@agent.system_prompt
def add_the_date() -> str:
    return f"今日の日付は {date.today()} です。"

result = agent.run_sync('今日は何日?', deps=User('花子'))
print(result.data)

結果

出力
今日は2024年12月3日です、花子さん。何か特別なことがありますか?

実際のプロンプトはagent.last_run_messages()で確認ができる。

for message in agent.last_run_messages:
    print(f"{message.role}: {message.content}")
system: ユーザの名前を使って返事をすること。
system: ユーザの名前は 花子 さんです。
system: 今日の日付は 2024-12-03 です。
user: 今日は何日?
model-text-response: 花子さん、今日は2024123日です。何か特別な予定はありますか?

ツール

モデルがレスポンスを生成する際に追加情報を取得するための仕組みがツール。PydanticAIではデコレータでツールを定義する。2種類のデコレータがある。

  • @agent.tool: エージェントのコンテキスト(RunContext)が必要なツール。
  • @agent.tool_plain: コンテキストが不要なシンプルなツール。

両方を使ったコード

import random
from pydantic_ai import Agent, RunContext

# エージェントを作成
agent = Agent(
    'openai:gpt-4o',  # モデル名
    deps_type=str,    # 依存関係としてプレイヤー名を使用
    system_prompt=(
        "あなたはサイコロゲームです。"
        "プレイヤーが予想した数字と、振ったサイコロの結果を比較して、"
        "一致している場合はプレイヤーが勝利したことを伝えてください。"
    ),
)

# サイコロを振るツール(context不要なので tool_plain を使用)
@agent.tool_plain
def roll_die() -> str:
    """
    6面のサイコロを振るツール。
    戻り値:
        str: 出目(1〜6)
    """
    return str(random.randint(1, 6))

# プレイヤー名を取得するツール(contextが必要)
@agent.tool
def get_player_name(ctx: RunContext[str]) -> str:
    """
    プレイヤー名を取得するツール。
    引数:
        ctx (RunContext[str]): 実行コンテキスト。
    戻り値:
        str: プレイヤーの名前。
    """
    return ctx.deps

# エージェントを実行
dice_result = agent.run_sync('サイコロを振って、私の予想「4」と比較してください。', deps='一郎')

# 結果を表示
print(dice_result.data)

結果

ハズレの場合

出力
サイコロの出目は「1」でした。一郎さんの予想「4」とは一致しませんでしたので、今回は残念ながら外れです。

当たりの場合

出力
一郎さん、サイコロの結果は「4」でした!おめでとうございます、予想が的中しましたね!プレイヤーの勝利です。

すべてのメッセージを見てみる。

print(dice_result.all_messages())

見やすさで少し修正しているが、中身はこんな感じ。

出力
[
    SystemPrompt(
        content='あなたはサイコロゲームです。プレイヤーが予想した数字と、振ったサイコロの結果を比較して、一致している場合はプレイヤーが勝利したことを伝えてください。',
        role='system'
    ),
    UserPrompt(
        content='サイコロを振って、私の予想「4」と比較してください。',
        timestamp=datetime.datetime(2024, 12, 3, 8, 59, 35, 319535, tzinfo=datetime.timezone.utc),
        role='user'
    ),
    ModelStructuredResponse(
        calls=[
            ToolCall(
                tool_name='roll_die',
                args=ArgsJson(args_json='{}'),
                tool_id='call_gvhErCB2O03SraQa5aNle9Ik'),
            ToolCall(
                tool_name='get_player_name',
                args=ArgsJson(args_json='{}'),
                tool_id='call_9cY3iTdQGwFhNE2l4rKj7piI'
            )
        ],
        timestamp=datetime.datetime(2024, 12, 3, 8, 59, 35, tzinfo=datetime.timezone.utc),
        role='model-structured-response'
     ),
    ToolReturn(
        tool_name='roll_die',
        content='4',
        tool_id='call_gvhErCB2O03SraQa5aNle9Ik',
        timestamp=datetime.datetime(2024, 12, 3, 8, 59, 36, 445988, tzinfo=datetime.timezone.utc),
        role='tool-return'
    ),
    ToolReturn(
        tool_name='get_player_name',
        content='一郎',
        tool_id='call_9cY3iTdQGwFhNE2l4rKj7piI',
        timestamp=datetime.datetime(2024, 12, 3, 8, 59, 36, 446012, tzinfo=datetime.timezone.utc),
        role='tool-return'
    ),
    ModelTextResponse(
        content='一郎さん、サイコロの結果は「4」でした!おめでとうございます、予想が的中しましたね!プレイヤーの勝利です。',
        timestamp=datetime.datetime(2024, 12, 3, 8, 59, 36, tzinfo=datetime.timezone.utc),
        role='model-text-response'
    )
]

Function Callingが行われているのがわかる。

ツールの引数は関数のシグネチャから解析され、RunContext以外のすべて引数がスキーマの一部になる。FunctionModelを使うとツールのスキーマを確認することができるので、これを使って見てみる。

from pydantic_ai import Agent
from pydantic_ai.models.function import AgentInfo, FunctionModel
from pydantic_ai.messages import Message, ModelTextResponse
import json

# エージェントを作成
agent = Agent()

# ツールを定義(引数と戻り値の型を指定)
@agent.tool_plain
def foobar(a: int, b: str, c: dict[str, list[float]]) -> str:
    """
    3つの引数を受け取り、それを整形して文字列を返す。

    引数:
        a: 整数の値(例: 林檎パイの数)
        b: 文字列の値(例: バナナケーキの名前)
        c: 辞書型(キーは文字列、値は浮動小数点数のリスト)

    戻り値:
        str: フォーマットされた文字列
    """
    return f'{a} {b} {c}'

# スキーマを確認するための関数
def print_schema(messages: list[Message], info: AgentInfo) -> ModelTextResponse:
    tool = info.function_tools['foobar']
    print("ツールの説明:")
    print(tool.description)  # docstring の説明が含まれる
    print()
    print("ツールのスキーマ:")
    print(json.dumps(tool.json_schema, indent=2, ensure_ascii=False))    # スキーマが自動生成される
    return ModelTextResponse(content='Schema printed!')

# モデルにスキーマ情報を表示させる
result = agent.run_sync('スキーマを表示して', model=FunctionModel(print_schema))

結果

出力
ツールの説明:
3つの引数を受け取り、それを整形して文字列を返す。

ツールのスキーマ:
{
  "description": "3つの引数を受け取り、それを整形して文字列を返す。",
  "properties": {
    "a": {
      "title": "A",
      "type": "integer"
    },
    "b": {
      "title": "B",
      "type": "string"
    },
    "c": {
      "additionalProperties": {
        "items": {
          "type": "number"
        },
        "type": "array"
      },
      "title": "C",
      "type": "object"
    }
  },
  "required": [
    "a",
    "b",
    "c"
  ],
  "type": "object",
  "additionalProperties": false
}

モデルの出力の見直しと自己修正

PydanticAIには、ツール呼び出しや結果バリデーション中にエラーが発生した場合、そのエラー内容をモデルに渡すことで、より適切なレスポンスを生成してリトライを促す仕組みがある。

from pydantic import BaseModel
from pydantic_ai import Agent, RunContext, ModelRetry

# データベースのモッククラス
class MockDatabase:
    users = {"John Doe": 123, "Jane Smith": 456}

# 結果スキーマの定義
class ChatResult(BaseModel):
    user_id: int
    message: str

# エージェントの作成
agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=MockDatabase,  # データベースを依存関係として渡す
    result_type=ChatResult,  # 結果スキーマ
)

# ユーザー名からIDを取得するツール
@agent.tool(retries=2)  # 再試行回数を2回に設定
def get_user_by_name(ctx: RunContext[MockDatabase], name: str) -> int:
    """
    フルネームでユーザーIDを取得します。

    Args:
        ctx (RunContext[MockDatabase]): データベースを含む実行コンテキスト。
        name (str): ユーザーのフルネーム。

    Returns:
        (int): ユーザーID。

    Raises:
        ModelRetry: 名前が見つからない場合に再試行を要求。
    """
    print("名前を確認中:", name)
    user_id = ctx.deps.users.get(name)  # データベースからユーザーIDを取得
    if user_id is None:
        print(f"(...名前:'{name}' が見つからないので{ctx.retry+1}回目のリトライ...)")
        raise ModelRetry(f"名前 '{name}' が見つかりません。フルネームを提供してください。")
    return user_id

# エージェントの実行
result = agent.run_sync(
    "Johnという名前のユーザーにメッセージを送ってください。",
    deps=MockDatabase(),
)

# 結果を表示
print(result.data)
出力
名前を確認中: John
(...名前:'John' が見つからないので1回目のリトライ...)
名前を確認中: John Doe
user_id=123 message='メッセージを送信しました。'

再試行はModelRetryをスローすることで行われる。また、再試行回数のデフォルトは 1回だが、以下の単位で変更可能。

  • エージェント全体 (Agent クラスの retries パラメータ)。
  • 特定のツール (@agent.tool デコレーター)。今回の例。
  • 結果のバリデーション。

モデルのエラー

例えば、バリデーションエラーで再試行回数を超過したり、API が 503 を返すなど、予期しないエラーが発生した場合はUnexpectedModelBehaviorがスローされる。このような例外が起きた場合、エージェントのlast_run_messagesで実行時のメッセージ履歴を確認して原因を特定したり、ツールの場合にはバリデーションエラーを検出してModelRetryでリトライを促すなどの、エラーハンドリングを行う。

from pydantic_ai import Agent, ModelRetry, UnexpectedModelBehavior

# エージェントを作成
agent = Agent('openai:gpt-4o')

# ツールを定義
@agent.tool_plain
def calc_volume(size: int) -> int:
    """
    立方体の体積を計算します。

    Args:
        size (int): 辺の長さ。

    Returns:
        (int): 体積。エラーが発生する場合があります。
    """
    if size == 42:  # 特定の条件では成功
        return size**3
    else:
        raise ModelRetry('正しいサイズ(42)を指定してください。')

try:
    result = agent.run_sync('辺の長さが6の立方体の体積を計算してください。')
except UnexpectedModelBehavior as e:
    print('エラーが発生しました:', e)
    print('原因:', repr(e.__cause__))  # エラーの原因を表示
    print('メッセージ履歴:', agent.last_run_messages)  # 実行時のメッセージ履歴を表示
else:
    print('計算結果:', result.data)
出力
エラーが発生しました: Tool exceeded max retries count of 1
原因: ModelRetry('正しいサイズ(42)を指定してください。')
メッセージ履歴: [
    UserPrompt(
        content='辺の長さが6の立方体の体積を計算してください。',
        timestamp=datetime.datetime(2024, 12, 3, 10, 42, 30, 301094, tzinfo=datetime.timezone.utc),
        role='user'
    ),
    ModelStructuredResponse(
        calls=[
            ToolCall(
                tool_name='calc_volume', args=ArgsJson(args_json='{"size":6}'), 
                tool_id='call_O2X8ip8WPNRojATiY2d4zO4f'
            )
        ],
        timestamp=datetime.datetime(2024, 12, 3, 10, 42, 30, tzinfo=datetime.timezone.utc),
        role='model-structured-response'
    ),
    RetryPrompt(
        content='正しいサイズ(42)を指定してください。',
        tool_name='calc_volume',
        tool_id='call_O2X8ip8WPNRojATiY2d4zO4f',
        timestamp=datetime.datetime(2024, 12, 3, 10, 42, 31, 266644, tzinfo=datetime.timezone.utc),
        role='retry-prompt'
    ),
    ModelStructuredResponse(
        calls=[
            ToolCall(
                tool_name='calc_volume', args=ArgsJson(args_json='{"size":6}'),
                tool_id='call_jfuSLTxnxgSyApknfmWvDG5F'
            )
        ],
        timestamp=datetime.datetime(2024, 12, 3, 10, 42, 31, tzinfo=datetime.timezone.utc),
         role='model-structured-response'
    )
]
kun432kun432

Dependencies

https://ai.pydantic.dev/dependencies/

PydanticAIの特徴的なのはこのDependencies、依存関係という概念だと思う。

依存関係は、エージェントの要素であるシステムプロンプト、ツール、結果バリデーターにデータやサービスを外部から提供(依存性注入)する仕組み。ここにPythonのベストプラクティスとして型アノテーションを利用することで、安全に管理できるというもの。

この「依存性注入」という概念が自分はいまいち理解できなかったので、ChatGPTに聞いてみた。

依存性注入(Dependency Injection, DI)は、コードのモジュール間の依存関係を管理するための設計パターンです。これを簡単な例で説明します。

依存性注入の考え方

  1. 依存関係とは?
    プログラム内で「あるオブジェクトが別のオブジェクトに依存している」状態を指します。例えば、WebアプリケーションがHTTPクライアントを使う場合、そのHTTPクライアントが依存関係になります。
  2. 依存性注入とは?
    「依存するもの(例:HTTPクライアント)を外部から渡してあげる」方法です。
    通常、コード内で直接インスタンスを作るのではなく、必要なオブジェクトを外部から注入(渡す)します。

依存性注入を使わない例

例えば、エージェントがWeb APIからデータを取得するコードを考えます:

import httpx

class MyAgent:
   def fetch_data(self):
       client = httpx.Client()  # HTTPクライアントを自分で作っている
       response = client.get("https://example.com")
       return response.text

agent = MyAgent()
print(agent.fetch_data())

この場合:

  • MyAgentはhttpx.Clientに強く依存しています。
  • テストや変更が難しくなります(例えば、別のクライアントを使いたい場合はコードを書き換える必要がある)。

依存性注入を使った例

依存性注入を使うと、HTTPクライアントを外部から渡すようになります:

class MyAgent:
   def __init__(self, http_client):
       self.http_client = http_client  # 依存関係を注入

   def fetch_data(self):
       response = self.http_client.get("https://example.com")
       return response.text

# 外部からHTTPクライアントを渡す
import httpx
client = httpx.Client()
agent = MyAgent(client)

print(agent.fetch_data())

この場合:

  • MyAgentはhttpx.Clientに直接依存していません。
  • テスト時にモック(テスト用のダミーオブジェクト)を渡せます。

将来的に別のHTTPライブラリに切り替えるのも簡単です。

ふむ、まあこれはわかる。

PydanticAIでの依存性注入

PydanticAIは、この考え方を「エージェントが利用するサービス(HTTPクライアントやAPIキーなど)」に適用しています。

具体的には、以下のように動作します:

  1. 依存関係の型を定義する
    例えば、APIキーやHTTPクライアントをまとめたMyDepsというデータクラスを作ります:
from dataclasses import dataclass
import httpx

@dataclass
class MyDeps:
   api_key: str
   http_client: httpx.AsyncClient
  1. エージェントに依存関係を登録する
    エージェントに「どんな依存関係が必要か」を型として教えます:
from pydantic_ai import Agent

agent = Agent("openai:gpt-4o", deps_type=MyDeps)
  1. エージェントの処理に依存関係を使う
    エージェント内で、渡された依存関係を使うコードを書きます:
@agent.system_prompt
async def get_system_prompt(ctx):
   response = await ctx.deps.http_client.get(
       "https://example.com",
       headers={"Authorization": f"Bearer {ctx.deps.api_key}"}
   )
   return response.text
  1. 依存関係を実行時に渡す
    実際にエージェントを動かすときに、依存関係を注入します:
import httpx
async with httpx.AsyncClient() as client:
   deps = MyDeps(api_key="your-api-key", http_client=client)
   result = await agent.run("Tell me a joke.", deps=deps)

メリット

  • テストが簡単: 依存関係をモックに置き換えるだけでテストが可能。
  • 柔軟性: 依存関係の変更が容易(例:HTTPクライアントを変更してもコードを書き換える必要がない)。
  • 再利用性: 依存関係を共有したり、複数のエージェント間で使い回しが可能。

依存性注入システムを使うことで、モジュール間の結合を弱め、コードをより保守しやすく、テストしやすいものにすることができます!

なるほど、ちょっとわかった気もする。ではサンプルコード。

from dataclasses import dataclass
from pydantic_ai import Agent
import asyncio
import httpx


@dataclass
class MyDeps:  
    api_key: str
    http_client: httpx.AsyncClient


agent = Agent(
    'openai:gpt-4o',
    deps_type=MyDeps,  
)


async def main():
    async with httpx.AsyncClient() as client:
        deps = MyDeps('foobar', client)
        result = await agent.run(
            '冗談を言ってみて',
            deps=deps,  
        )
        print(result.data)

asyncio.run(main())

結果

出力
もちろん!じゃあ、こんなのはどう?

最近、仕事が忙しすぎて家まで歩いて帰る夢を見るんだけど、それでも通勤ラッシュに巻き込まれるんだよね。

ただ、この例だと実際にはDependenciesは使用されていない。実際に使用する場合はRunContextを介してアクセスする。ということで、実際に動くサンプルを作ってみた。OpenWeatherMapのAPIを使う。

from dataclasses import dataclass
import httpx
from pydantic_ai import Agent, RunContext
from google.colab import userdata


@dataclass
class MyDeps:
    weather_api_key: str
    http_client: httpx.AsyncClient


agent = Agent(
    'openai:gpt-4o',
    deps_type=MyDeps,
)


@agent.system_prompt  
async def get_system_prompt(ctx: RunContext[MyDeps]) -> str:  
    response = await ctx.deps.http_client.get(
        "https://api.openweathermap.org/data/2.5/weather",
        params={
            "q": "kobe",
            "appid": ctx.deps.weather_api_key,
            "units": "metric",
        }
    )
    response.raise_for_status()
    weather_data = response.json()

    city = weather_data["name"]
    condition = weather_data["weather"][0]["description"]
    temperature = weather_data["main"]["temp"]
    wind_speed = weather_data["wind"]["speed"]
    humidity = weather_data["main"]["humidity"]
    
    return (
        f"現在、{city}の天気は「{condition}」で、気温は{temperature}°Cです。"
        f"風速は{wind_speed}m/s、湿度は{humidity}%です。"
        "この情報に基づいてユーザと会話してください。"
    )

async def main():
    async with httpx.AsyncClient() as client:
        deps = MyDeps(userdata.get('OPENWEATHER_API_KEY'), client)
        result = await agent.run('おはよう!', deps=deps)
        print(result.data)

asyncio.run(main())
出力
おはようございます!神戸の天気は曇り空のようですね。気温は約11°Cで、少し肌寒いかもしれません。何か今日の予定がありますか?または何かお手伝いできることがあれば教えてください。

実行時にAPIキーとHTTPクライアントインスタンスを渡して、それを使って天気情報を取得してシステムプロンプトに追加している。

メッセージを見てみる。

for message in agent.last_run_messages:
    print(f"{message.role}: {message.content}")
出力
system: 現在、Kobeの天気は「overcast clouds」で、気温は10.84°Cです。風速は3.02m/s、湿度は59%です。この情報に基づいてユーザと会話してください。
user: おはよう!
model-text-response: おはようございます!神戸の天気は曇り空のようですね。気温は約11°Cで、少し肌寒いかもしれません。何か今日の予定がありますか?または何かお手伝いできることがあれば教えてください。

非同期 vs 同期

PydanticAIは常に「非同期」で動作する。よって、そのエージェントに紐づけられたシステムプロンプト関数、ツール関数、結果バリデーターなどは、非同期コードに対応できるよう設計されている。

これらの関数が同期関数(def)だった場合、run_in_executorを使い、スレッドプール内で実行されるため、少しオーバーヘッドが発生し効率が落ちるが、問題なく動作はするらしい。まあ基本的には非同期で統一するのが推奨なのだと思う。

1つ前のコードを同期で書いた場合

from dataclasses import dataclass
import httpx
from pydantic_ai import Agent, RunContext
from google.colab import userdata


@dataclass
class MyDeps:
    weather_api_key: str
    http_client: httpx.Client  # 同期クライアントに変更


agent = Agent(
    'openai:gpt-4o',
    deps_type=MyDeps,
)


@agent.system_prompt  
def get_system_prompt(ctx: RunContext[MyDeps]) -> str:  # 同期関数に変更
    # 同期リクエストに変更
    response = ctx.deps.http_client.get(
        "https://api.openweathermap.org/data/2.5/weather",
        params={
            "q": "kobe",
            "appid": ctx.deps.weather_api_key,
            "units": "metric",
        }
    )
    response.raise_for_status()
    weather_data = response.json()

    city = weather_data["name"]
    condition = weather_data["weather"][0]["description"]
    temperature = weather_data["main"]["temp"]
    wind_speed = weather_data["wind"]["speed"]
    humidity = weather_data["main"]["humidity"]
    
    return (
        f"現在、{city}の天気は「{condition}」で、気温は{temperature}°Cです。"
        f"風速は{wind_speed}m/s、湿度は{humidity}%です。"
        "この情報に基づいてユーザと会話してください。"
    )

def main():
    # 同期クライアントを使用する
    client = httpx.Client()
    deps = MyDeps(userdata.get('OPENWEATHER_API_KEY'), client)
    result = agent.run_sync('おはよう!', deps=deps)
    print(result.data)

main()

ただし以下には留意かな

同期または非同期のどちらの依存関係を使用するかは、runまたはrun_syncを使用するかに完全に依存しません。run_syncは単にrunのラッパーであり、エージェントは常に非同期コンテキスト内で実行されます。

上の非同期のコードに結果バリデーションも追加してみる。単に400じゃないことだけのバリデーションだけども。

from dataclasses import dataclass
import httpx
from pydantic_ai import Agent, RunContext
from google.colab import userdata


@dataclass
class MyDeps:
    weather_api_key: str
    http_client: httpx.AsyncClient


agent = Agent(
    'openai:gpt-4o',
    deps_type=MyDeps,
)


@agent.system_prompt  
async def get_system_prompt(ctx: RunContext[MyDeps]) -> str:  
    response = await ctx.deps.http_client.get(
        "https://api.openweathermap.org/data/2.5/weather",
        params={
            "q": "kobe",
            "appid": ctx.deps.weather_api_key,
            "units": "metric",
        }
    )
    response.raise_for_status()
    weather_data = response.json()

    city = weather_data["name"]
    condition = weather_data["weather"][0]["description"]
    temperature = weather_data["main"]["temp"]
    wind_speed = weather_data["wind"]["speed"]
    humidity = weather_data["main"]["humidity"]
    
    return (
        f"現在、{city}の天気は「{condition}」で、気温は{temperature}°Cです。"
        f"風速は{wind_speed}m/s、湿度は{humidity}%です。"
        "この情報に基づいてユーザと会話してください。"
    )


@agent.result_validator  
async def validate_result(ctx: RunContext[MyDeps], final_response: str) -> str:
    response = await ctx.deps.http_client.get(
        "https://api.openweathermap.org/data/2.5/weather",
        params={
            "q": "kobe",
            "appid": ctx.deps.weather_api_key,
            "units": "metric",
        }
    )
    if response.status_code == 400:
        raise ModelRetry(f'invalid response: {response.text}')
    response.raise_for_status()
    return final_response


async def main():
    async with httpx.AsyncClient() as client:
        deps = MyDeps(userdata.get('OPENWEATHER_API_KEY'), client)
        result = await agent.run('おはよう!', deps=deps)
        print(result.data)

asyncio.run(main())

依存関係の上書き

実行時の実際の依存関係を別の依存関係で上書きすることができる。例えば、モックやカスタマイズされた依存関係を使用してエージェントの動作をテストしたり、特定の環境で挙動を変更することが可能となる。

依存関係を上書きするには、Agent.overrideを使う。withブロックと組み合わせることで、元の依存関係を差し替える期間を限定できる。

from unittest.mock import AsyncMock
from dataclasses import dataclass
import httpx
from pydantic_ai import Agent, RunContext
import asyncio


@dataclass
class MyDeps:
    weather_api_key: str
    http_client: httpx.AsyncClient


agent = Agent(
    "openai:gpt-4o",
    deps_type=MyDeps,
)


@agent.system_prompt
async def get_system_prompt(ctx: RunContext[MyDeps]) -> str:
    response = await ctx.deps.http_client.get(
        "https://api.openweathermap.org/data/2.5/weather",
        params={
            "q": "kobe",
            "appid": ctx.deps.weather_api_key,
            "units": "metric",
        },
    )
    await response.raise_for_status()
    weather_data = await response.json()

    city = weather_data["name"]
    condition = weather_data["weather"][0]["description"]
    temperature = weather_data["main"]["temp"]

    return (
        f"現在、{city}の天気は「{condition}」で、気温は{temperature}°Cです。"
        "この情報に基づいてユーザと会話してください。"
    )


async def test_agent_with_override():
    # モックHTTPクライアントの作成
    mock_http_client = AsyncMock()
    mock_response = AsyncMock()
    mock_response.json.return_value = {
        "name": "Mock City",
        "weather": [{"description": "晴れ"}],
        "main": {"temp": 25.0},
    }
    mock_http_client.get.return_value = mock_response  # getの戻り値をモック

    # モック依存関係を定義
    test_deps = MyDeps(weather_api_key="mock_key", http_client=mock_http_client)

    # 依存関係をオーバーライドして実行
    with agent.override(deps=test_deps):
        result = await agent.run("天気を教えて")
        print(result.data)


# 実行
asyncio.run(test_agent_with_override())
出力
現在、Mock Cityの天気は「晴れ」で、気温は25.0°Cです。気持ちの良い天気ですね!何か他にお手伝いできることはありますか?

他のエージェントの依存関係としてのエージェント

注: ちょっと試しているうちにバージョンが上がった

pydantic==2.10.3
pydantic-ai==0.0.9
pydantic-ai-slim==0.0.9
pydantic_core==2.27.1

PydanticAIでは、エージェント自体を依存関係として他のエージェントに渡すことができる。これは、エージェントがPythonのオブジェクトとして設計されているため。これにより、あるエージェントの処理結果を別のエージェントで活用する、といった複雑なタスクを分解して処理する仕組みを構築できるらしい。

from dataclasses import dataclass

from pydantic_ai import Agent, RunContext


@dataclass
class MyDeps:
    factory_agent: Agent[None, list[str]]


joke_agent = Agent(
    'openai:gpt-4o-mini',
    deps_type=MyDeps,
    system_prompt=(
        '"joke_factory"を使ってジョークを生成し、最も面白いものを選んで。'
        'ジョークは1つだけ返して。'
    ),
)

factory_agent = Agent('openai:gpt-4o', result_type=list[str])


@joke_agent.tool
async def joke_factory(ctx: RunContext[MyDeps], count: int) -> str:
    r = await ctx.deps.factory_agent.run(f'ジョークを {count} 個生成して')
    return '\n'.join(r.data)


result = joke_agent.run_sync('なにかジョークを言ってみて。', deps=MyDeps(factory_agent))
print(result.data)
出力
「なぜ科学者は原子を信用しないの?それは彼らがすべてを作り上げるから!」

実際に何が行われているか?まずjoke_agentのlast_run_messagesを見てみる。

from pydantic_ai.messages import (
    SystemPrompt,
    UserPrompt,
    ModelTextResponse,
    ModelStructuredResponse,
    ToolReturn
)

for message in joke_agent.last_run_messages:
    if type(message) == ModelStructuredResponse:
        role = message.role
        if message.calls:
            for call in message.calls:
                print(f"{message.role}: calling '{call.tool_name}' with {call.args.args_json}")
    elif type(message) == ToolReturn:
        print(f"{message.role}: '{message.tool_name}' returned '{message.content}'")
    else:
        print(f"{message.role}: {message.content}")
出力
system: "joke_factory"を使ってジョークを生成し、最も面白いものを選んで。ジョークは1つだけ返して。
user: なにかジョークを言ってみて。
model-structured-response: calling 'joke_factory' with {"count":1}
tool-return: 'joke_factory' returned 'Why don't scientists trust atoms? Because they make up everything!'
model-text-response: 「なぜ科学者は原子を信用しないの?それは彼らがすべてを作り上げるから!」

次にfactory_agent。

for message in factory_agent.last_run_messages:
    if type(message) == ModelStructuredResponse:
        role = message.role
        if message.calls:
            for call in message.calls:
                print(f"{message.role}: calling '{call.tool_name}' with {call.args.args_json}")
    elif type(message) == ToolReturn:
        print(f"{message.role}: '{message.tool_name}' returned '{message.content}'")
    else:
        print(f"{message.role}: {message.content}")
出力
user: ジョークを 1 個生成して
model-structured-response: calling 'final_result' with {"response":["Why don't scientists trust atoms? Because they make up everything!"]}
kun432kun432

Results

https://ai.pydantic.dev/results/

Resultsはエージェントが最終的に返す結果、値のこと。PydanticAIではこれらの結果はこの結果データは、RunResult / StreamedRunResult にラップされて返され、追加で以下のような情報にもアクセスできる。

  • トークン使用量
  • メッセージ履歴

RunResultStreamedRunResult は汎用型(ジェネリック)として実装されており、エージェントが返すデータの型情報が保持されるため、結果データを型安全に扱うことができる。

from pydantic import BaseModel
from pydantic_ai import Agent


class CityLocation(BaseModel):
    city: str
    country: str


agent = Agent(
    'openai:gpt-4o-mini',
    result_type=CityLocation
)

result = agent.run_sync('2012年のオリンピック開催地はどこ?')
print(result.data)
print(result.cost())

結果。見やすさのためのちょっといじっている。

出力
city='ロンドン' country='イギリス'
Cost(
    request_tokens=72,
    response_tokens=39,
    total_tokens=111,
    details={
        'accepted_prediction_tokens': 0,
        'audio_tokens': 0,
        'reasoning_tokens': 0,
        'rejected_prediction_tokens': 0,
        'cached_tokens': 0
    }
)

結果データ

PydanticAIでは、エージェントから返される結果の「型」によって、結果データの扱いが変わる。

  1. 文字列型(str)またはstrを含むUnion型の場合
    • モデルはプレーンテキストのレスポンスを返す。
    • モデルの生データ(テキスト)そのものが結果データとして利用される。

文字列型の場合

from pydantic_ai import Agent

agent = Agent(
    'openai:gpt-4o-mini',
    result_type=str
)

result = agent.run_sync('日本の首都は?')
print(result.data)
print(type(result.data))
出力
日本の首都は東京です。
<class 'str'>

文字列型を含むUnion型の場合

from typing import Union
from pydantic import BaseModel
from pydantic_ai import Agent


class Box(BaseModel):
    width: int
    height: int
    depth: int
    units: str


agent: Agent[None, Union[Box, str]] = Agent(
    'openai:gpt-4o-mini',
    result_type=Union[Box, str],  # type: ignore
    system_prompt=(
        "箱の寸法を抽出してください。"
        "寸法を特定するために情報が足りない場合は、ユーザーに確認してください。"
    ),
)

result = agent.run_sync('箱は 10x20x30 です。')
print(result.data)
print(type(result.data))

result = agent.run_sync('箱は 10x20x30 cmです。')
print(result.data)
print(type(result.data))
出力
箱の寸法は幅10、高さ20、奥行き30と理解しましたが、単位が指定されていません。単位を教えていただけますか?(例えば、センチメートル、インチなど)
<class 'str'>
width=10 height=20 depth=30 units='cm'
<class '__main__.Box'>
  1. 複数の型を持つUnion型の場合
    • Union型の各メンバー(str以外)は、モデルに別々の「ツール」として登録される。
    • これにより、ツールのスキーマが簡潔になり、モデルが適切に応答する可能性が上がる。
from typing import Union

from pydantic_ai import Agent

agent: Agent[None, Union[list[str], list[int]]] = Agent(
    'openai:gpt-4o-mini',
    result_type=Union[list[str], list[int]],  # type: ignore
    system_prompt='与えられた図形から色またはサイズを抽出して。',
)

result = agent.run_sync('赤い四角、青い円、緑の三角')
print(result.data)
print(type(result.data))
result = agent.run_sync('サイズ10の四角、サイズ20の円、サイズ30の三角')
print(result.data)
print(type(result.data))
出力
['赤', '青', '緑']
<class 'list'>
[10, 20, 30]
<class 'list'>

実際にメッセージを見てみるとツールになっているのがわかる。

from pydantic_ai.messages import (
    SystemPrompt,
    UserPrompt,
    ModelTextResponse,
    ModelStructuredResponse,
    ToolReturn
)

for message in agent.last_run_messages:
    if type(message) == ModelStructuredResponse:
        role = message.role
        if message.calls:
            for call in message.calls:
                print(f"{message.role}: calling '{call.tool_name}' with {call.args.args_json}")
    elif type(message) == ToolReturn:
        print(f"{message.role}: '{message.tool_name}' returned '{message.content}'")
    else:
        print(f"{message.role}: {message.content}")
出力
system: 与えられた図形から色またはサイズを抽出して。
user: サイズ10の四角、サイズ20の円、サイズ30の三角
model-structured-response: calling 'final_result_list_2' with {"response": [10, 20, 30]}
model-structured-response: calling 'final_result_list' with {"response": ["四角", "円", "三角"]}
  1. 結果型のスキーマが"object"型でない場合(例:リストや単純な値)
    • スキーマは単一要素のオブジェクトとしてラップされる。
    • これにより、モデルに登録されたすべてのツールがオブジェクト型のスキーマを共有する形になる。
from typing import List
from pydantic_ai import Agent

agent = Agent(
    'openai:gpt-4o-mini',
    result_type=List[str]  # List[str]はobjectではない
)  
result = agent.run_sync('3つの果物をリストアップして')
print(result.data)
print(type(result.data))
出力
['リンゴ', 'オレンジ', 'バナナ']
<class 'list'>

あと、result_typeにUnion型などを指定する場合、現在のPythonの型ヒントでは正しく解釈できない可能性があるため、# type: ignoreを付ける必要がある。PEP-747が導入されれば解決する見込みらしい。


結果の検証

一部の検証(特にIOを伴うもの)はPydanticのバリデーターだけでは実現が難しい。例えば

  • IOが必要なもの
  • 非同期なもの

などがそう。

PydanticAIでは、@agent.result_validator デコレーターを使ってカスタム検証関数を登録できる。

以下はSQLを生成させてバリデーションする例。実際のDBを用意するのは面倒なのでダミー、かつ、バリデーションも簡易なものにしてある。

from typing import Union
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext, ModelRetry

# 簡単なスキーマ情報
SCHEMA = {
    "users": ["id", "name", "email", "last_active"],
    "orders": ["id", "user_id", "amount", "created_at"],
}


class DummyDatabase:
    """ダミーデータベース接続クラス"""

    def __init__(self, schema):
        self.schema = schema

    async def execute(self, query: str):
        """SQLクエリがスキーマに従っているかを簡易的にチェック"""
        if "SELECT" not in query.upper():
            raise ValueError("クエリはSELECT文でなければならない、INSERT/UPDATE文は許可されていない")

        # テーブル名をチェック
        valid_tables = list(self.schema.keys())
        if not any(table in query for table in valid_tables):
            raise ValueError(f"クエリは次のテーブルのどれかを参照しなければならない: {valid_tables}")

        print(f"EXPLAIN成功 -> クエリ: {query}")


class Success(BaseModel):
    """SQLが正しく生成された場合のレスポンス"""
    sql_query: str = Field(..., description="生成されたSQLクエリ")
    explanation: str = Field("", description="SQLクエリの説明")


class InvalidRequest(BaseModel):
    """SQL生成が失敗した場合のレスポンス"""
    error_message: str


# レスポンス型の定義
Response = Union[Success, InvalidRequest]

# エージェントの作成
agent = Agent(
    'openai:gpt-4o-mini',
    result_type=Response,  # type: ignore
    deps_type=DummyDatabase,
    retries=2
)


# エージェントのシステムプロンプト
@agent.system_prompt
async def system_prompt() -> str:
    return f"""\
You are a SQL generator for a PostgreSQL database.
The schema is as follows:

{SCHEMA}

Examples:
- Request: Show me all users
  Response: SELECT * FROM users

- Request: Show me users who were last active yesterday
  Response: SELECT * FROM users WHERE last_active::date = CURRENT_DATE - INTERVAL '1 day'

- Request: Add a new user with id=1, name="John", email="john@example.com", last_active=today
  Response: INSERT INTO users (id, name, email, last_active) VALUES (1, 'John', 'john@example.com', CURRENT_DATE)

Generate SQL based on the user request.
"""


# 結果の検証
@agent.result_validator
async def validate_result(ctx: RunContext[DummyDatabase], result: Response) -> Response:
    if isinstance(result, InvalidRequest):
        return result

    # SQLの簡易バリデーションとしてダミーのexecuteを使用
    try:
        await ctx.deps.execute(f"EXPLAIN {result.sql_query}")
    except ValueError as e:
        print("バリデーション: ERROR!")
        raise ModelRetry(f"クエリが不正: {e}") from e
    else:
        print("バリデーション: OK")
        return result


# ダミーデータベース接続の作成
dummy_db = DummyDatabase(SCHEMA)

# ユーザー入力
user_request = "一昨日アクティブだったユーザーを取得してください"
result = agent.run_sync(user_request, deps=dummy_db)

if isinstance(result.data, Success):
    print("生成されたSQL:", result.data.sql_query)
    print("説明:", result.data.explanation)
else:
    print("エラー:", result.data.error_message)

出力
EXPLAIN成功 -> クエリ: EXPLAIN SELECT * FROM users WHERE last_active::date = CURRENT_DATE - INTERVAL '2 days'
バリデーション: OK
生成されたSQL: SELECT * FROM users WHERE last_active::date = CURRENT_DATE - INTERVAL '2 days'
説明: このクエリは、二日前にアクティブだったユーザーを取得します。

バリデーションエラーの例

user_request = "id=2, name='Alice', email='alice@example.com' のユーザーを追加してください"
出力
バリデーション: ERROR!
エラー: INSERT/UPDATE statements are not allowed. Only SELECT statements are permitted.

バリデーション関数が実行されていることがわかる。

実際にデータベースを使ったサンプルが以下にある。

https://ai.pydantic.dev/examples/sql-gen/


結果のストリーミング

ストリーミングには以下の課題がある。

  1. データが完全に届く前に、部分的に検証を行う必要がある
  2. ストリーミングが開始されるタイミングで、それが「ツール呼び出し」なのか「テキストレスポンス」なのかを特定する必要がある。

Pydantic AIでは以下のアプローチを取る。

  • ストリーミングを開始し、最初の部分だけを「のぞき見」して、レスポンスの種類を判別
  • 判別結果に基づいて:
    • ツール呼び出しの場合は、ストリーム全体を読み込み、ツールを呼び出して処理を進める。
    • 通常のレスポンスの場合は、ストリーム全体を受信し、それをStreamedRunResultとして返す。

テキストのストリーミング

from pydantic_ai import Agent
import asyncio

agent = Agent('openai:gpt-4o-mini')

async def main():
    async with agent.run_stream("日本の総理大臣は誰?") as result:
        async for message in result.stream():
            print(message)

asyncio.run(main())

結果

出力
私の知識が2023年10
私の知識が2023年10月までのものであるため、現在の日本の
私の知識が2023年10月までのものであるため、現在の日本の総理大臣は岸田文雄(
私の知識が2023年10月までのものであるため、現在の日本の総理大臣は岸田文雄(きしだ ふみお)氏です
私の知識が2023年10月までのものであるため、現在の日本の総理大臣は岸田文雄(きしだ ふみお)氏です。彼は2021年10月4日に
私の知識が2023年10月までのものであるため、現在の日本の総理大臣は岸田文雄(きしだ ふみお)氏です。彼は2021年10月4日に就任しました。ただし、最新の情報は
私の知識が2023年10月までのものであるため、現在の日本の総理大臣は岸田文雄(きしだ ふみお)氏です。彼は2021年10月4日に就任しました。ただし、最新の情報はニュースなどで確認してください。

部分レスポンスを増分で取得する

from pydantic_ai import Agent
import asyncio

agent = Agent('openai:gpt-4o-mini')

async def main():
    async with agent.run_stream("日本の総理大臣は誰?") as result:
        async for delta in result.stream_text(delta=True):
            print(delta)
            # 実際に使う場合は以下
            #print(delta, end="", flush=True)

asyncio.run(main())
出力
私の知識が2023年10月まで
のものであるため、現在の日本の総理
大臣についての最新情報は提供できませんが、
2023年10月時点では岸田
文雄(きしだ ふ
みお)氏
が総理大
臣でした。最新の情報を確認
するためには、ニュースや
公式な情報源を参照してください。

構造化レスポンスのストリーミングの場合、データは最初は不完全な状態で送られるが、PydanticのBaseModelはデータが完全であることを前提にしているため、こういった不完全なデータを扱うのが困難。現時点では必須フィールドを緩やかに扱えるTypeDictを使う。

from datetime import date
from typing_extensions import TypedDict
from pydantic_ai import Agent
import asyncio


class UserProfile(TypedDict, total=False):
    name: str
    dob: date
    bio: str


agent = Agent(
    'openai:gpt-4o',
    result_type=UserProfile
)

async def main():
    user_input = '私の名前は太郎です。生年月日は2000年1月1日です。'
    async with agent.run_stream(user_input) as result:
        async for profile in result.stream():
            print(profile)

asyncio.run(main())
出力
{'name': '太郎'}
{'name': '太郎'}
{'name': '太郎', 'dob': datetime.date(2000, 1, 1)}
{'name': '太郎', 'dob': datetime.date(2000, 1, 1)}

検証を細かく制御したい場合、特に検証エラーを捕捉したい場合は、以下のパターンを使用できる

from datetime import date
from typing_extensions import TypedDict
from pydantic_ai import Agent
import asyncio


class UserProfile(TypedDict, total=False):
    name: str
    dob: date
    bio: str


agent = Agent(
    'openai:gpt-4o',
    result_type=UserProfile
)

async def main():
    user_input = '私の名前は太郎です。生年月日は2000年1月1日です。'

    async with agent.run_stream(user_input) as result:
        # ストリーミングされた構造化レスポンス(ModelStructuredResponse)を、
        # stream_structuredで逐次受信
        # - debounce_by: データの変化を監視するための待機時間
        async for message, last in result.stream_structured(debounce_by=0.01):
            try:
                # validate_structured_resultで、レスポンスを検証
                profile = await result.validate_structured_result(
                    message,
                    # allow_partial=not lastで、最終レスポンスかどうかを確認、」
                    # 最終レスポンスでない場合に部分的なデータの検証を許可する
                    allow_partial=not last,  
                )
            except ValidationError:
                continue  # 検証エラーが発生した場合はスキップ
            print(profile)

asyncio.run(main())
出力
{}
{'name': ''}
{'name': '太郎'}
{'name': '太郎'}
{'name': '太郎'}
{'name': '太郎'}
{'name': '太郎'}
{'name': '太郎', 'dob': datetime.date(2000, 1, 1)}
{'name': '太郎', 'dob': datetime.date(2000, 1, 1)}
kun432kun432

Messages and chat history

Agentの章でも少し触れているが、エージェントが実行されているときにやり取りされているメッセージ、つまり会話履歴の扱い方。

結果からすべてのメッセージを取得する

エージェントの実行結果は以下のresultオブジェクトとして返される。

  • RunResult
    • Agent.run / Agent.run_sync から返される
  • StreamedRunResult
    • Agent.run_streamからストリームで返される

これらからメッセージを取り出すためには、以下の2つのメソッドを使う。

  • all_messages()
    • システムプロンプト、過去の実行からのメッセージ、現在の実行のメッセージを含む「全て」のメッセージを取り出す。
    • all_messages_json()でJSON形式で取り出せる。
  • new_messages()
    • システムプロンプトを除く、現在の実行のみのメッセージを返す
    • new_messages_json()でJSON形式で取り出せる。

ただし、StreamedRunResult で完全なメッセージを得るには、ストリームが「完了」している必要があるので、以下のようにコルーチンをawaitする必要がある。

  • StreamedRunResult.stream()
  • StreamedRunResult.stream_text()
  • StreamedRunResult.stream_structured()
  • StreamedRunResult.get_data()

RunResultからメッセージを取り出す

from pydantic_ai import Agent

agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。'
)

result = agent.run_sync('ジョークを言って。')

print("==== 結果 ====\n")
print(result.data)

print("\n==== すべてのメッセージ ====\n")
print(result.all_messages())

print("\n==== 現在の実行のメッセージ ====\n")
print(result.new_messages())

結果。見やすさのためにちょっといじっている。

出力
==== 結果 ====

もちろん!こんなジョークはいかがでしょうか?

「鶏が道を渡った理由は何ですか?」

「反対側に行きたかったからです!」

シンプルですが、思わずにっこりしちゃいますね!他に何か聞きたいことがあれば教えてください。

==== すべてのメッセージ ====

[
    SystemPrompt(content='あなたは親切な日本語のアシスタントです。', role='system'),
    UserPrompt(content='ジョークを言って。', timestamp=datetime.datetime(2024, 12, 5, 3, 7, 19, 629538, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='もちろん!こんなジョークはいかがでしょうか?\n\n「鶏が道を渡った理由は何ですか?」\n\n「反対側に行きたかったからです!」\n\nシンプルですが、思わずにっこりしちゃいますね!他に何か聞きたいことがあれば教えてください。', timestamp=datetime.datetime(2024, 12, 5, 3, 7, 19, tzinfo=datetime.timezone.utc), role='model-text-response')
]

==== 現在の実行のメッセージ ====

[
    UserPrompt(content='ジョークを言って。', timestamp=datetime.datetime(2024, 12, 5, 3, 7, 19, 629538, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='もちろん!こんなジョークはいかがでしょうか?\n\n「鶏が道を渡った理由は何ですか?」\n\n「反対側に行きたかったからです!」\n\nシンプルですが、思わずにっこりしちゃいますね!他に何か聞きたいことがあれば教えてください。', timestamp=datetime.datetime(2024, 12, 5, 3, 7, 19, tzinfo=datetime.timezone.utc), role='model-text-response')
]

StreamedRunResultからメッセージを取り出す

from pydantic_ai import Agent
import asyncio

agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。'
)

async def main():
    async with agent.run_stream('ジョークを言って。') as result:
        print("==== ストリーム未完了時 ====\n")
        print(result.all_messages())
        
        print("\n==== ストリーム中 ====\n")
        async for text in result.stream():
            print(text)
            print("----")

        print("\n==== ストリーム後 ====\n")
        print(result.all_messages())

asyncio.run(main())

結果。見やすさのためにちょっといじっている。

出力
==== ストリーム未完了時 ====

[
    SystemPrompt(content='あなたは親切な日本語のアシスタントです。', role='system'),
    UserPrompt(content='ジョークを言って。', timestamp=datetime.datetime(2024, 12, 5, 3, 15, 50, 621081, tzinfo=datetime.timezone.utc), role='user')
]

==== ストリーム中 ====

もちろん!こんなジョークはいかがでしょうか?

「カメが銀行に行きました
----
もちろん!こんなジョークはいかがでしょうか?

「カメが銀行に行きました。そしたら、行員が言いました。
『お金を下ろす
----
もちろん!こんなジョークはいかがでしょうか?

「カメが銀行に行きました。そしたら、行員が言いました。
『お金を下ろすんですか?それとも預けるん
----
もちろん!こんなジョークはいかがでしょうか?

「カメが銀行に行きました。そしたら、行員が言いました。
『お金を下ろすんですか?それとも預けるんですか?』
カメは答えました。
『
----
もちろん!こんなジョークはいかがでしょうか?

「カメが銀行に行きました。そしたら、行員が言いました。
『お金を下ろすんですか?それとも預けるんですか?』
カメは答えました。
『いいえ、ただ見てるだけです。
----
もちろん!こんなジョークはいかがでしょうか?

「カメが銀行に行きました。そしたら、行員が言いました。
『お金を下ろすんですか?それとも預けるんですか?』
カメは答えました。
『いいえ、ただ見てるだけです。』」

少し笑っていただけましたか?
----

==== ストリーム後 ====

[
    SystemPrompt(content='あなたは親切な日本語のアシスタントです。', role='system'),
    UserPrompt(content='ジョークを言って。', timestamp=datetime.datetime(2024, 12, 5, 3, 15, 50, 621081, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='もちろん!こんなジョークはいかがでしょうか?\n\n「カメが銀行に行きました。そしたら、行員が言いました。\n『お金を下ろすんですか?それとも預けるんですか?』\nカメは答えました。\n『いいえ、ただ見てるだけです。』」\n\n少し笑っていただけましたか?', timestamp=datetime.datetime(2024, 12, 5, 3, 15, 50, tzinfo=datetime.timezone.utc), role='model-text-response')
]

メッセージを次のエージェント実行時の入力として渡す

エージェントのメッセージ履歴を次のエージェントに渡すことで、複数エージェント間でコンテキストを維持することができる。メッセージ履歴は、 Agent.run / Agent.run_sync / Agent.run_streammessage_historyパラメータで渡す。

マルチエージェントとあるけど、サンプルは1つのエージェントで会話履歴を維持するマルチターンの会話になっている。

from pydantic_ai import Agent

agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。'
)

result1 = agent.run_sync('ジョークを言ってみて。')
print("==== 結果: 1回目 ====\n")
print(result1.data)

result2 = agent.run_sync(
    'このジョークの面白さを説明して。',
    message_history=result1.new_messages()  # 前回のメッセージ履歴を渡す
)
print("==== 結果: 2回目 ====\n")
print(result2.data)

print("\n==== 最終的な全メッセージ ====\n")
print(result2.all_messages())

結果。見やすさのために少しいじっている。

出力
==== 結果: 1回目 ====

もちろんです!こんなジョークはいかがでしょうか?

なぜカメは遅いの?

だって、いつも殻にこもってるから! 

どうでしたか?他にも聞きたいジョークがあれば、言ってくださいね!
==== 結果: 2回目 ====

このジョークの面白さは、カメの特性「遅い」と「殻にこもる」という2つの要素を絡めているところにあります。

1. **カメの特徴**: カメは生物学的に遅い動物として知られています。この「遅い」という印象が、カメに対する一般的なイメージとして浸透しているため、聴き手はすぐに理解できます。

2. **言葉の遊び**: 「殻にこもる」という表現には二重の意味があります。一つは、文字通りカメが自分の殻に入って動かないということ。もう一つは、比喩的に「内向的」であまり外に出ない人を指す場合があります。このダブルミーニングがユーモラスな効果を生んでいます。

このように、カメの特性を知り、それに関連した言葉遊びを使うことで、聴き手に楽しい発見を促し、笑いを生み出しています。ジョークのシンプルさも、すぐに理解できる要素となっているため、多くの人にウケやすいのです。

==== 最終的な全メッセージ ====

[
    SystemPrompt(content='あなたは親切な日本語のアシスタントです。', role='system'),
    UserPrompt(content='ジョークを言ってみて。', timestamp=datetime.datetime(2024, 12, 5, 3, 38, 24, 902037, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='もちろんです!こんなジョークはいかがでしょうか?\n\nなぜカメは遅いの?\n\nだって、いつも殻にこもってるから! \n\nどうでしたか?他にも聞きたいジョークがあれば、言ってくださいね!', timestamp=datetime.datetime(2024, 12, 5, 3, 38, 25, tzinfo=datetime.timezone.utc), role='model-text-response'),
    UserPrompt(content='このジョークの面白さを説明して。', timestamp=datetime.datetime(2024, 12, 5, 3, 38, 25, 943718, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='このジョークの面白さは、カメの特性「遅い」と「殻にこもる」という2つの要素を絡めているところにあります。\n\n1. **カメの特徴**: カメは生物学的に遅い動物として知られています。この「遅い」という印象が、カメに対する一般的なイメージとして浸透しているため、聴き手はすぐに理解できます。\n\n2. **言葉の遊び**: 「殻にこもる」という表現には二重の意味があります。一つは、文字通りカメが自分の殻に入って動かないということ。もう一つは、比喩的に「内向的」であまり外に出ない人を指す場合があります。このダブルミーニングがユーモラスな効果を生んでいます。\n\nこのように、カメの特性を知り、それに関連した言葉遊びを使うことで、聴き手に楽しい発見を促し、笑いを生み出しています。ジョークのシンプルさも、すぐに理解できる要素となっているため、多くの人にウケやすいのです。', timestamp=datetime.datetime(2024, 12, 5, 3, 38, 26, tzinfo=datetime.timezone.utc), role='model-text-response')
]

ここではnew_message()を使用して、直近のメッセージだけを渡している。この場合、直近のみなので、過去のメッセージ履歴は当然渡されない。

from pydantic_ai import Agent

agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。'
)

result1 = agent.run_sync('私の趣味は競馬です。覚えてね。')
print("==== 結果: 1回目 ====\n")
print(result1.data)

result2 = agent.run_sync(
    '日本の総理大臣ってだれだっけ?',
    message_history=result1.new_messages()
)
print("==== 結果: 2回目 ====\n")
print(result2.data)

result3 = agent.run_sync(
    '私の趣味ってなんでしたっけ?',
    message_history=result2.new_messages()
)
print("==== 結果: 3回目 ====\n")
print(result3.data)

print("\n==== 最終的な全メッセージ ====\n")
print(result3.all_messages())
出力
==== 結果: 1回目 ====

もちろん、競馬があなたの趣味だということを覚えておきます!競馬について何か特に興味のあることや質問があれば、ぜひ教えてください。
==== 結果: 2回目 ====

2023年10月時点での日本の総理大臣は岸田文雄(きしだ ふみお)さんです。彼は2021年10月4日に就任しました。最新の情報を確認したい場合は、ニュースや公式な情報源をチェックすることをおすすめします。
==== 結果: 3回目 ====

申し訳ありませんが、あなたの趣味を知る情報は持っていません。もしよろしければ、あなたの趣味について教えていただければ、その趣味に関連する情報やアクティビティをご提案することができますよ!

==== 最終的な全メッセージ ====

[
    SystemPrompt(content='あなたは親切な日本語のアシスタントです。', role='system'),
    UserPrompt(content='日本の総理大臣ってだれだっけ?', timestamp=datetime.datetime(2024, 12, 5, 3, 40, 25, 174485, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='2023年10月時点での日本の総理大臣は岸田文雄(きしだ ふみお)さんです。彼は2021年10月4日に就任しました。最新の情報を確認したい場合は、ニュースや公式な情報源をチェックすることをおすすめします。', timestamp=datetime.datetime(2024, 12, 5, 3, 40, 25, tzinfo=datetime.timezone.utc), role='model-text-response'),
    UserPrompt(content='私の趣味ってなんでしたっけ?', timestamp=datetime.datetime(2024, 12, 5, 3, 40, 26, 357252, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='申し訳ありませんが、あなたの趣味を知る情報は持っていません。もしよろしければ、あなたの趣味について教えていただければ、その趣味に関連する情報やアクティビティをご提案することができますよ!', timestamp=datetime.datetime(2024, 12, 5, 3, 40, 26, tzinfo=datetime.timezone.utc), role='model-text-response')]

all_messages()ですべてのメッセージ履歴を渡すと、過去のコンテキストが維持される。

from pydantic_ai import Agent

agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。'
)

result1 = agent.run_sync('私の趣味は競馬です。覚えてね。')
print("==== 結果: 1回目 ====\n")
print(result1.data)

result2 = agent.run_sync(
    '日本の総理大臣ってだれだっけ?',
    message_history=result1.all_messages()  # すべてのメッセージ履歴を渡す
)
print("==== 結果: 2回目 ====\n")
print(result2.data)

result3 = agent.run_sync(
    '私の趣味ってなんでしたっけ?',
    message_history=result2.all_messages()  # すべてのメッセージ履歴を渡す
)
print("==== 結果: 3回目 ====\n")
print(result3.data)

print("\n==== 最終的な全メッセージ ====\n")
print(result3.all_messages())
出力
==== 結果: 1回目 ====

もちろんです!あなたの趣味は競馬ですね。何か特別なことがあれば教えてください。競馬についての話題もお待ちしています!
==== 結果: 2回目 ====

2023年10月時点での日本の総理大臣は岸田文雄(きしだ ふみお)さんです。彼は2021年10月に就任しました。ただし、情報は変わることがあるので、最新の情報を確認することをおすすめします。
==== 結果: 3回目 ====

あなたの趣味は競馬です。何か競馬に関する話題や質問があれば、ぜひ教えてください!

==== 最終的な全メッセージ ====

[
    SystemPrompt(content='あなたは親切な日本語のアシスタントです。', role='system'),
    UserPrompt(content='私の趣味は競馬です。覚えてね。', timestamp=datetime.datetime(2024, 12, 5, 3, 42, 51, 600547, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='もちろんです!あなたの趣味は競馬ですね。何か特別なことがあれば教えてください。競馬についての話題もお待ちしています!', timestamp=datetime.datetime(2024, 12, 5, 3, 42, 51, tzinfo=datetime.timezone.utc), role='model-text-response'),
    UserPrompt(content='日本の総理大臣ってだれだっけ?', timestamp=datetime.datetime(2024, 12, 5, 3, 42, 52, 488725, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='2023年10月時点での日本の総理大臣は岸田文雄(きしだ ふみお)さんです。彼は2021年10月に就任しました。ただし、情報は変わることがあるので、最新の情報を確認することをおすすめします。', timestamp=datetime.datetime(2024, 12, 5, 3, 42, 52, tzinfo=datetime.timezone.utc), role='model-text-response'),
    UserPrompt(content='私の趣味ってなんでしたっけ?', timestamp=datetime.datetime(2024, 12, 5, 3, 42, 53, 714108, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='あなたの趣味は競馬です。何か競馬に関する話題や質問があれば、ぜひ教えてください!', timestamp=datetime.datetime(2024, 12, 5, 3, 42, 54, tzinfo=datetime.timezone.utc), role='model-text-response')
]

次に、エージェント間でメッセージを渡す場合を書いてみる。

from pydantic_ai import Agent

# 1人目のエージェント
normal_agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。'
)

# 2人目のエージェント
obachan_agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは大阪のおばちゃんです。大阪弁で元気に会話します。'
)

result1 = normal_agent.run_sync('ジョークを言ってみて。')
print("==== 通常エージェントの結果 ====\n")
print(result1.data)

result2 = obachan_agent.run_sync(
    'このジョークの面白さを説明して。',
    message_history=result1.new_messages()  # 前回のメッセージだけ渡す
)
print("==== 大阪のおばちゃんエージェントの結果 ====\n")
print(result2.data)

print("\n==== 最終的な全メッセージ ====\n")
print(result2.all_messages())
出力
==== 通常エージェントの結果 ====

もちろんです!こんなジョークはいかがですか?

「さくらんぼが道を渡った理由は何ですか?」

「フルーツサラダに入るためです!」

ちょっとした笑いが取れるといいですね!他にも聞きたいジョークがあれば教えてください。
==== 大阪のおばちゃんエージェントの結果 ====

このジョークの面白さは、予想外な展開にあるんや。最初に「さくらんぼが道を渡った理由」って言うと、普通は「車にひかれないため」とか、そういう真面目な答えを想像するやろ?それが「フルーツサラダに入るため」って、全然関係ない可愛らしい答えが来るから、意表を突かれて笑えるんや。

さらに、フルーツサラダってみんなが知ってるもので、日常的な要素も入ってるから、身近に感じられて、クスっとくるところがええんやろね!こういうちょっとした「意外性」が笑いを生むんや。おばちゃんもこういうの好きやで!もっと聞きたい?

==== 最終的な全メッセージ ====

[
    SystemPrompt(content='あなたは大阪のおばちゃんです。大阪弁で元気に会話します。', role='system'),
    UserPrompt(content='ジョークを言ってみて。', timestamp=datetime.datetime(2024, 12, 5, 4, 24, 39, 394439, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='もちろんです!こんなジョークはいかがですか?\n\n「さくらんぼが道を渡った理由は何ですか?」\n\n「フルーツサラダに入るためです!」\n\nちょっとした笑いが取れるといいですね!他にも聞きたいジョークがあれば教えてください。', timestamp=datetime.datetime(2024, 12, 5, 4, 24, 39, tzinfo=datetime.timezone.utc), role='model-text-response'),
    UserPrompt(content='このジョークの面白さを説明して。', timestamp=datetime.datetime(2024, 12, 5, 4, 24, 40, 751520, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='このジョークの面白さは、予想外な展開にあるんや。最初に「さくらんぼが道を渡った理由」って言うと、普通は「車にひかれないため」とか、そういう真面目な答えを想像するやろ?それが「フルーツサラダに入るため」って、全然関係ない可愛らしい答えが来るから、意表を突かれて笑えるんや。\n\nさらに、フルーツサラダってみんなが知ってるもので、日常的な要素も入ってるから、身近に感じられて、クスっとくるところがええんやろね!こういうちょっとした「意外性」が笑いを生むんや。おばちゃんもこういうの好きやで!もっと聞きたい?', timestamp=datetime.datetime(2024, 12, 5, 4, 24, 40, tzinfo=datetime.timezone.utc), role='model-text-response')
]

エージェント間の受け渡しにnew_messages()を使うと、システムプロンプトは含まれないので、最終的なメッセージ履歴にあるシステムプロンプトは2人目のエージェントのものになっている。

all_messages()を使うとどうなるだろうか?

from pydantic_ai import Agent

normal_agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。'
)

obachan_agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは大阪のおばちゃんです。大阪弁で元気に会話します。'
)

result1 = normal_agent.run_sync('ジョークを言ってみて。')
print("==== 通常エージェントの結果 ====\n")
print(result1.data)

result2 = obachan_agent.run_sync(
    'このジョークの面白さを説明して。',
    message_history=result1.all_messages()  # すべてのメッセージを渡す
)
print("==== 大阪のおばちゃんエージェントの結果 ====\n")
print(result2.data)

print("\n==== 最終的な全メッセージ ====\n")
print(result2.all_messages())
出力
==== 通常エージェントの結果 ====

もちろんです!では、こちらのジョークはいかがでしょうか?

「カメはどうして遅れて学校に行ったの?」

「だって、家を出るのに5年もかかったから!」

少しでも笑顔になっていただけたら嬉しいです!
==== 大阪のおばちゃんエージェントの結果 ====

このジョークの面白さは、カメの特性と時間に対する期待のギャップにあります。

1. **キャラクターの特性**: カメは一般的に遅い動物として知られています。この特性をハイライトすることで、ジョークの背景が作られています。カメが学校に行くという設定も、子どもたちが通う学校と重なり、親しみやすいです。

2. **予想外の展開**: 「家を出るのに5年もかかった」というフレーズは、時間の概念をユーモラスに誇張しています。通常の遅刻は数分や数時間ですが、5年というのは常識外れであり、そのギャップが笑いを生んでいます。

3. **シュールな要素**: カメが学校に行くこと自体もシュールで、日常生活とは少し異なる設定が笑いを引き立てています。このような非現実的な状況は、ジョークをさらに面白くする要因です。

要は、このジョークはある特定のキャラクターを使って、一般的な状況の予想を裏切ることで笑いを生んでいます。

==== 最終的な全メッセージ ====

[
    SystemPrompt(content='あなたは親切な日本語のアシスタントです。', role='system'), ]
    UserPrompt(content='ジョークを言ってみて。', timestamp=datetime.datetime(2024, 12, 5, 4, 27, 52, 540221, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='もちろんです!では、こちらのジョークはいかがでしょうか?\n\n「カメはどうして遅れて学校に行ったの?」\n\n「だって、家を出るのに5年もかかったから!」\n\n少しでも笑顔になっていただけたら嬉しいです!', timestamp=datetime.datetime(2024, 12, 5, 4, 27, 52, tzinfo=datetime.timezone.utc), role='model-text-response'),
    UserPrompt(content='このジョークの面白さを説明して。', timestamp=datetime.datetime(2024, 12, 5, 4, 27, 56, 952781, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='このジョークの面白さは、カメの特性と時間に対する期待のギャップにあります。\n\n1. **キャラクターの特性**: カメは一般的に遅い動物として知られています。この特性をハイライトすることで、ジョークの背景が作られています。カメが学校に行くという設定も、子どもたちが通う学校と重なり、親しみやすいです。\n\n2. **予想外の展開**: 「家を出るのに5年もかかった」というフレーズは、時間の概念をユーモラスに誇張しています。通常の遅刻は数分や数時間ですが、5年というのは常識外れであり、そのギャップが笑いを生んでいます。\n\n3. **シュールな要素**: カメが学校に行くこと自体もシュールで、日常生活とは少し異なる設定が笑いを引き立てています。このような非現実的な状況は、ジョークをさらに面白くする要因です。\n\n要は、このジョークはある特定のキャラクターを使って、一般的な状況の予想を裏切ることで笑いを生んでいます。', timestamp=datetime.datetime(2024, 12, 5, 4, 27, 57, tzinfo=datetime.timezone.utc), role='model-text-response')
]

今度は1人目のエージェントのシステムプロンプトになっているのがわかる。

以下のように注意書きがある

前のエージェントの会話履歴を全部入れたい、がシステムプロンプトは現在のエージェントのものを使用したい、というような場合には、自分でメッセージ履歴をいじる必要がありそう。


メッセージの他の使い方

PydanticAIのメッセージは、単純なデータクラスとして定義されているため、以下のような柔軟な使い方が可能となっている。

  • 手動でメッセージを作成・操作
    • メッセージを直接作成し、テストやカスタマイズされた会話シナリオに利用可能。
  • 異なるエージェント間でメッセージを再利用
    • モデル間の互換性を保ちながら、メッセージを共有可能。
  • テストやデバッグに活用
    • エージェントを使わずに、直接メッセージを生成して結果を確認可能。
from pydantic_ai import Agent
from google.colab import userdata
import os

# 2回目のやり取りではGeminiを使う
os.environ['GEMINI_API_KEY'] = userdata.get('GEMINI_API_KEY')

agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。'
)

result1 = agent.run_sync('ジョークを言ってみて。')
print("==== 結果: 1回目(gpt-4o-mini) ====\n")
print(result1.data)

result2 = agent.run_sync(
    'ジョークの面白さを説明して。',
    model='gemini-1.5-flash',
    message_history=result1.new_messages()
)
print("==== 結果: 2回目(gemini-1.5-flash) ====\n")
print(result2.data)

print("\n==== 最終的な全メッセージ ====\n")
print(result2.all_messages())
出力
==== 結果: 1回目(gpt-4o-mini) ====

もちろんです!では、こちらのジョークをどうぞ。

「カメはどうして遅いの?」  
「だって、甲羅に重い荷物を持ってるからさ!」 

笑っていただけたら嬉しいです!
==== 結果: 2回目(gemini-1.5-flash) ====

このジョークの面白さは、**言葉遊びと意外性**にあります。

* **言葉遊び:** 「甲羅」という言葉に、「荷物」という言葉を掛けています。「甲羅」自体はカメの体の部分であり、本来「荷物」ではありません。この言葉の掛け合わせが、ユーモラスな効果を生み出しています。

* **意外性:**  カメが遅いのは、一般的に知られた事実です。しかし、その理由が「甲羅に重い荷物を持ってるから」という、突拍子もない、現実離れした説明であるところに、笑いのポイントがあります。  予想外の、少しばかげた結論が、聞き手の笑いを誘います。

つまり、真面目な問いかけに対して、滑稽で不条理な答えを返すことで、笑いを生み出しているのです。  単純な構造ですが、その意外性と言葉遊びによって、小さいながらも確実な笑いを引き起こすジョークと言えるでしょう。


==== 最終的な全メッセージ ====

[
    SystemPrompt(content='あなたは親切な日本語のアシスタントです。', role='system'),
    UserPrompt(content='ジョークを言ってみて。', timestamp=datetime.datetime(2024, 12, 5, 4, 45, 32, 807809, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='もちろんです!では、こちらのジョークをどうぞ。\n\n「カメはどうして遅いの?」  \n「だって、甲羅に重い荷物を持ってるからさ!」 \n\n笑っていただけたら嬉しいです!', timestamp=datetime.datetime(2024, 12, 5, 4, 45, 33, tzinfo=datetime.timezone.utc), role='model-text-response'),
    UserPrompt(content='ジョークの面白さを説明して。', timestamp=datetime.datetime(2024, 12, 5, 4, 45, 34, 402010, tzinfo=datetime.timezone.utc), role='user'),
    ModelTextResponse(content='このジョークの面白さは、**言葉遊びと意外性**にあります。\n\n* **言葉遊び:** 「甲羅」という言葉に、「荷物」という言葉を掛けています。「甲羅」自体はカメの体の部分であり、本来「荷物」ではありません。この言葉の掛け合わせが、ユーモラスな効果を生み出しています。\n\n* **意外性:**  カメが遅いのは、一般的に知られた事実です。しかし、その理由が「甲羅に重い荷物を持ってるから」という、突拍子もない、現実離れした説明であるところに、笑いのポイントがあります。  予想外の、少しばかげた結論が、聞き手の笑いを誘います。\n\nつまり、真面目な問いかけに対して、滑稽で不条理な答えを返すことで、笑いを生み出しているのです。  単純な構造ですが、その意外性と言葉遊びによって、小さいながらも確実な笑いを引き起こすジョークと言えるでしょう。\n', timestamp=datetime.datetime(2024, 12, 5, 4, 45, 36, 425357, tzinfo=datetime.timezone.utc), role='model-text-response')
]
kun432kun432

とりあえずここまでのまとめ

とりあえず各コンポーネントは一通り触ってみたところでの印象。

  • シンプルに書ける。
  • エージェントへの入力・LLMからの出力・ツールなどは、堅牢な書き方ができる。

あたりはまあそうだよねという感。シンプルなエージェントの実装に必要なものは揃っているし、学習コスト低く始めれそう。あと、Pydanticがベースにあって書き方もそれにあわせていくことになると思うので、自然と堅牢になるのではないかな?実はここが最大のメリットなのかもと思ったり。

ただ、「マルチ」エージェントという観点で見た場合、

  • 複数エージェントの処理の流れ(グラフだったり)
  • ステートの管理

のような、最近のマルチエージェントフレームワークに良く見られる設計とは異なるアプローチだと思う。READMEにあるこれかなぁ。

Control flow and agent composition is done with vanilla Python, allowing you to make use of the same Python development best practices you'd use in any other (non-AI) project.

PydanticAIだと「依存関係」と「メッセージ履歴」で実現することになるかなと思うけど、グラフみたいなものよりはやや直感性にかけると個人的には思うし、あと並列処理なんかも自前で実装しないといけないんじゃないだろうか?このあたりの捉え方は人によって違うかもしれない。自分はまだエージェントについてもPythonについても知見が足りていないので、間違っているかもしれないけど、もっとサンプルがほしいなというところ。

Issueにも具体的なマルチエージェントのサンプルが欲しいというのが挙がっているので、そのうち追加されるかもしれない。

https://github.com/pydantic/pydantic-ai/issues/120

とりあえず、以下についても確認、そのあとサンプルコードを追いかけてみようと思う。

  • Testing and Evals
  • Debugging and Monitoring
kun432kun432

テスト・評価の章は一旦置いておいて、デバッグや観測を先にやる。

Debugging and Monitoring

https://ai.pydantic.dev/logfire/

LLMには、以下の課題がある。

  • LLMの一般的な課題
    • 遅い
    • 信頼性が低い
    • 高コスト
  • 既存のアプリケーションに見られない、LLM固有の課題
    • LLMの回答は気まぐれ非決定論的
      • プロンプトに少しの変更を加えるだけで、モデルの性能は大きく変わりうる
      • その理由を解析するためのデバッグツール、例えばDBにおけるEXPLAINクエリのようなものがない。

したがって、モデルの性能とアプリケーションの挙動を両方を把握するための新しいツールが必要になる。そこでPydanticが提供しているのが「Pydantic Logfire」になる。


Pydantic Logfire

https://pydantic.dev/logfire

Logfire単体については、以下で簡単に試している。

https://zenn.dev/kun432/scraps/1df3e592e2b844


Logfireを使ったエージェントのトレーシング

事前にLogfireアカウントを作成しておく。

パッケージインストール。logfireはextrasでオプションとして指定できる様子。

!pip install 'pydantic-ai[logfire]'
!pip freeze | egrep -i "pydantic|logfire"
出力
logfire==2.6.2
logfire-api==2.6.2
pydantic==2.10.3
pydantic-ai==0.0.11
pydantic-ai-slim==0.0.11
pydantic_core==2.27.1

Logfireの認証。Logfireは"development"と"production"の概念がある。"production"の場合はトークンで認証、"development"の場合はCLIで認証を行う。

 logfire auth

ブラウザを使った認証が促される。"ENTER"を入力。

出力
Welcome to Logfire! 🔥
Before you can send data to Logfire, we need to authenticate you.

Press Enter to open logfire.pydantic.dev in your browser...

認証用URLが発行される。ブラウザでURLにアクセスして認証を行う。

出力
Please open https://logfire.pydantic.dev/auth/device/XXXXXXXXXXXX in 
your browser to authenticate if it hasn't already.
Waiting for you to authenticate with Logfire...

ブラウザ認証がOKなら以下のようにクレデンシャルが保存され、以降はLogfireにアクセス可能となる。

出力
Successfully authenticated!

Your Logfire credentials are stored in /root/.logfire/default.toml

Logfireのプロジェクトを設定する。既にプロジェクトが作成されている場合はlogfire projects use プロジェクト名でもOK。今回はCLIからプロジェクトを作成する。

!logfire projects new

組織内にプロジェクト作成するか聞かれるので"y"

出力
The project will be created in the organization "kun432". Continue? [y/n] (y): y

作成するプロジェクト名を入力

出力
Enter the project name (content): pydantic-ai-test

プロジェクトが作成され、このプロジェクトにログ等が送信される。

出力
Project created successfully. You will be able to view it at: 
https://logfire.pydantic.dev/kun432/pydantic-ai-test

ブラウザでアクセスするとこんな感じ。まだ何も送信されていないので、当然データは何もない。

では、PydanticAIのエージェントを用意する。OpenWeather APIを使って以下のツールを持つエージェントを作成する。

  • 都市名から緯度経度を取得
  • 緯度経度から現在の天気状態を取得

LLMはOpenAIを使用する。APIキーを環境変数にセット。

import os
from google.colab import userdata

os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')

エージェントのコード。まだLogfireとの連携はしていない。

from dataclasses import dataclass
import httpx
from pydantic_ai import Agent, RunContext, ModelRetry
from google.colab import userdata
import asyncio
from typing import List

# notebookの場合は以下が必要
import nest_asyncio
nest_asyncio.apply()

@dataclass
class MyDeps:
    openweather_api_key: str
    http_client: httpx.AsyncClient

weather_agent = Agent(
    'openai:gpt-4o-mini',
    system_prompt='あなたは親切な日本語のアシスタントです。',
    deps_type=MyDeps,
    retries=2,
)

@weather_agent.tool
async def get_lat_lon(ctx: RunContext[MyDeps], city_name: str) -> List[dict]:
    """与えられた都市名から緯度経度を返す

    引数:
        ctx: コンテキスト
        city_name: 都市名
    """
    params = {
        'q': city_name,
        'appid': ctx.deps.openweather_api_key,
    }
    response = await ctx.deps.http_client.get(
        "http://api.openweathermap.org/geo/1.0/direct",
        params=params
    )
    response.raise_for_status()

    location_data = response.json()
    if location_data:
        return location_data
    else:
        raise ModelRetry('Could not find the location.')

@weather_agent.tool
async def get_weather_forecast(ctx: RunContext[MyDeps], lat: float, lon: float) -> dict:
    """与えられた緯度経度の5日間(3時間おき)の天気予報を返す

    引数:
        ctx: コンテキスト
        lat: 緯度
        lon: 経度
    """
    params = {
        'lat': lat,
        'lon': lon,
        'units': 'metric',
        'appid': ctx.deps.openweather_api_key,
    }
    response = await ctx.deps.http_client.get(
        "https://api.openweathermap.org/data/2.5/forecast",
        params=params
    )
    response.raise_for_status()

    forecast_data = response.json()
    if forecast_data:
        return forecast_data
    else:
        raise ModelRetry('Could not find the weather information.')

async def main(query: str):
    async with httpx.AsyncClient() as client:
        deps = MyDeps(userdata.get('OPENWEATHER_API_KEY'), client)
        result = await weather_agent.run(query, deps=deps)
        print(result.data)

こんな感じで動く。

asyncio.run(main("神戸の明日の天気を知りたい"))
出力
神戸の明日の天気予報は以下の通りです。

### 2024年12月9日(日)
- **朝(09:00)**
  - 気温: 6.44°C
  - 体感温度: 3.48°C
  - 天気: 散発的な雲
  - 降水確率: 0%
  
- **昼(12:00)**
  - 気温: 4.86°C
  - 体感温度: 1.33°C
  - 天気: 散発的な雲
  - 降水確率: 0%

- **午後(15:00)**
  - 気温: 4.02°C
  - 体感温度: 0.68°C
  - 天気: 晴れ
  - 降水確率: 0%

- **夜(18:00)**
  - 気温: 4.24°C
  - 体感温度: 1.21°C
  - 天気: 晴れ
  - 降水確率: 0%

- **夜(21:00)**
  - 気温: 3.54°C
  - 体感温度: 0.27°C
  - 天気: 散発的な雲
  - 降水確率: 0%

全体的に寒い一日になりそうですので、暖かい服装でお出かけください。

ではLogfireを有効にする。以下を有効にするだけ。

import logfire

logfire.configure()
出力
<logfire._internal.main.Logfire at 0x7f8986ce4550>
Logfire project URL: https://logfire.pydantic.dev/kun432/pydantic-ai-test

では再度エージェントを実行してみる。

asyncio.run(main("東京の明日の天気を知りたい"))

以下の通り、実際の回答の出力以外に各処理のステップでログっぽいものが表示される。

出力
13:02:16.032 weather_agent run prompt=東京の明日の天気を知りたい
13:02:16.033   model request run_step=1
13:02:16.634   handle model response
13:02:16.636     running tools=['get_lat_lon']
13:02:16.804   model request run_step=2
13:02:17.608   handle model response
13:02:17.609     running tools=['get_weather_forecast']
13:02:17.749   model request run_step=3
13:02:20.399   handle model response
東京の明日の天気予報は以下の通りです。

- **日付**: 2024年12月9日
- **天気**: 晴れのち曇り
- **予想気温**:
  - 最低気温: 6.4°C
  - 最高気温: 10.7°C
- **具体的な時間帯の天気**:
  - 03:00 - 晴れ、気温 9.5°C
  - 06:00 - 晴れ、気温 10.7°C
  - 12:00 - 晴れ、気温 9.5°C
  - 15:00 - 小雨、気温 8.3°C
  - 18:00 - 小雨、気温 7.9°C
  - 21:00 - 曇り、気温 7.1°C

明日は日中は晴れ間もありますが、午後から小雨の予報ですので、外出の際は傘を持っていると安心です。気温も少し肌寒いので、暖かい服装をおすすめします。

Logfire側にも以下のようにログが出力される。

より細かく出力させたければ、以下のようにカスタムでスパンに送信するデータを定義できる。

(snip)

@weather_agent.tool
async def get_lat_lon(ctx: RunContext[MyDeps], city_name: str) -> List[dict]:
    """与えられた都市名から緯度経度を返す

    引数:
        ctx: コンテキスト
        city_name: 都市名
    """
    params = {
        'q': city_name,
        'appid': ctx.deps.openweather_api_key,
    }
    # APIリクエスト部分をスパンとして登録し、リクエストパラメータを追加(APIキーは取り除いている)
    with logfire.span('call geocode api', params={k: v for k, v in params.items() if k != "appid"}) as span:
        response = await ctx.deps.http_client.get(
            "http://api.openweathermap.org/geo/1.0/direct",
            params=params
        )
        response.raise_for_status()
        location_data = response.json()  
        # APIリクエストの結果をスパンに追加
        span.set_attribute('response', location_data)

    if location_data:
        return location_data
    else:
        raise ModelRetry('Could not find the location.')

(snip)

実行してみると以下のようにスパンが追加されているのがわかる。

kun432kun432

Testing and Evals

https://ai.pydantic.dev/testing-evals/#unit-testing-with-testmodel

PydanticAIにおけるテストについて。テストには2種類ある。

  1. ユニットテスト: アプリケーションのコードが正しく動作するかをテストする
  2. 評価(Evals): LLMのレスポンスの質をテストする

前者はアプリ、後者はLLMというところで、目的と考慮すべき点が異なるところが違い。


ユニットテスト

PydanticAIのコードに対するユニットテストには、一般的なPythonコードにおける確立されたテスト手法がそのまま使える。基本的には以下の戦略となる。

  • テストハーネスとしてpytestを使う
  • 長いアサーションを入力する場合は、inline-snapshot を使う
  • 大きなデータ構造を比較する場合は、dirty-equals を使う
  • LLMを実際に呼び出すのは、コスト・待ち時間・結果のばらつきの問題がある。これを避けるには、実際のモデルの代わりにPydanticAIのTestModel / FunctionModel を使用する
  • PydanticAIのAgent.overrideを使用して、アプリケーションロジック内でモデルを置き換える
  • ALLOW_MODEL_REQUESTS=False をグローバルに設定し、テストモデル以外へのアクセスをブロックする

TestModelを使ったユニットテスト

TestModelFunctionModelは、両方ともテスト用のモデルで、LLMを使わずに「LLMの応答を模倣」するために使う。まずはTestModelから。

TestModelは、エージェントのすべてのツールを実行し、エージェントのreturn_typeに応じて、プレーンテキストもしくは構造化レスポンスを返すテストモデル

・・・とあるのだが、サンプルコードを見てみたところ、

  • モジュールなどが省略されている(fake_databaseとかweather_serviceとか。実はここにある。)
  • 天気のAPI叩いて結果を取得するとかシンプルな例ならまだわかるのだけども、プロンプトからDBを検索する、ってのがいまいち実践的に思えない。
  • We want to test this code without having to mock certain objectsとか書いてあるけど、fake_databaseとかweather_serviceがそもそもモック・・・

という感じで、更に自分はpytestの知見が足りないせいもあって、全然理解できなかった。

なので、めちゃめちゃシンプルなところから初めてみる。ツールが何もなく、単にLLMでレスポンスを返すだけのエージェント。

simple_agent.py
from pydantic_ai import Agent

simple_agent = Agent(
    model="openai:gpt-4o-mini",
    system_prompt="あなたは親切な日本語のアシスタントです。",
)
app.py
import asyncio
from simple_agent import simple_agent

async def main():
    result = await simple_agent.run(
        'おはよう!今日はいいお天気だね。',
    )  
    print(result.data)

asyncio.run(main())

実行するとこうなる。

python app.py
出力
おはようございます!本当にいいお天気ですね。こんな日はお出かけしたくなりますね。何か特別な予定はありますか?

で、これのテストコード。

test_simple_agent.py
from datetime import timezone
import asyncio
import pytest
from dirty_equals import IsNow

from pydantic_ai import models
from pydantic_ai.models.test import TestModel
from pydantic_ai.messages import SystemPrompt, UserPrompt, ModelTextResponse

from simple_agent import simple_agent

# 非同期のテストにAnyIOを使う
pytestmark = pytest.mark.anyio
# 誤ってLLMに実際にアクセスしないようブロック
models.ALLOW_MODEL_REQUESTS = False  

# AnyIOのバックエンドをasyncioだけにする
@pytest.fixture
def anyio_backend():
    return 'asyncio'

async def test_simple_agent_response():
    # TestModelでエージェントのモデルを模倣
    test_assitant_response = "これはテスト応答です。"
    with simple_agent.override(model=TestModel(custom_result_text=test_assitant_response)):
        # ユーザーからの入力プロンプト
        test_user_prompt = "こんにちは、あなたは誰ですか?"

        # エージェントを直接呼び出し
        result = await simple_agent.run(test_user_prompt)

        # (1) 応答の内容を確認
        assert result.data == test_assitant_response

        # (2) メッセージ履歴を確認
        assert simple_agent.last_run_messages == [
            SystemPrompt(
                content="あなたは親切な日本語のアシスタントです。",
                role="system",
            ),
            UserPrompt(
                content=test_user_prompt,
                role="user",
                timestamp=IsNow(tz=timezone.utc),
            ),
            ModelTextResponse(
                content=test_assitant_response,
                role="model-text-response",
                timestamp=IsNow(tz=timezone.utc),
            ),
        ]

ドキュメントのサンプルコードだと何のパラメータもなしにTestModelを呼び出しているのだが、ドキュメントを見ると、テキストレスポンスやツールの引数などを渡すことができる様子。

https://ai.pydantic.dev/api/models/test/#pydantic_ai.models.test.TestModel

上のサンプルだと、

  • custom_result_textで、LLMにクエリを送信したあとのレスポンスとして「これはテスト応答です。」というダミーのレスポンスを返すテスト用モデルを定義する。
  • Agent.overrideで、元のエージェントのコードはそのままにモデルだけをテスト用モデルに置き換える
  • 2箇所のアサーションでテスト
    • (1) で LLMからのレスポンスだけをテスト
    • (2) でシステムプロンプトも含めたすべてのメッセージ(履歴)をテスト

という感じになる。

これでテストを実行してみる。

pytest test_simple_agent.py
出力
==================================== test session starts =====================================
platform darwin -- Python 3.12.7, pytest-8.3.4, pluggy-1.5.0
rootdir: /Users/kun432/work/pydantic-ai-test
plugins: asyncio-0.24.0, anyio-4.7.0
asyncio: mode=Mode.STRICT, default_loop_scope=None
collected 1 item                                                                             

test_simple_agent.py .                                                                 [100%]

===================================== 1 passed in 0.30s ======================================

テスト成功。

次に、ツールを与えてみる。四則演算を行うエージェントの例。

simple_calc_agent.py
from pydantic_ai import Agent

simple_calc_agent = Agent(
    model="openai:gpt-4o-mini",
    system_prompt="あなたは親切な日本語のアシスタントです。必ずツールを使って計算をします。",
)

@simple_calc_agent.tool_plain
def add(a: float, b: float) -> float:
    """与えられた2つの数値を足し算する"""
    return a + b

@simple_calc_agent.tool_plain
def subtract(a: float, b: float) -> float:
    """数値が2つ与えられたら、1番目の数値(a)から2番目の数値(b)を引く"""
    return a - b

@simple_calc_agent.tool_plain
def multiply(a: float, b: float) -> float:
    """与えられた2つの数値を掛け算する"""
    return a * b

@simple_calc_agent.tool_plain
def divide(a: float, b: float) -> float:
    """数値が2つ与えられたら、1番目の数値(a)を2番目の数値(b)で割る"""
    if b == 0:
        raise ValueError("Division by zero is not allowed.")
    return a / b
calc_app.py
import asyncio
from simple_calc_agent import simple_calc_agent

async def main():
    queries = [
        "100 たす 50 は?",
        "100 ひく 50 は?",
        "100 かける 50 は?",
        "100 わる 50 は?",
        "100 わる 0 は?",
    ]
    for q in queries:
        result = await simple_calc_agent.run(q)
        print(result.data)

asyncio.run(main())

試しに実行

python calc_app.py
出力
100 たす 50 は 150 です。
100 ひく 50 は 50 です。
100 かける 50 は 5000 です。
100を50で割ると、結果は2.0です。
0で割ることはできないため、結果は定義されていません。数学的には、どんな数でも0で割ることは無限大となるため、計算結果は存在しません。

では、TestModelを使ってテスト・・・なのだけど、とりあえず以下のように、テストコードとしては不完全だけど、雑に書いて動かしてみる。

test_simple_calc_agent.py
import asyncio
import pytest

from pydantic_ai import models
from pydantic_ai.models.test import TestModel

from simple_calc_agent import simple_calc_agent

pytestmark = pytest.mark.anyio
models.ALLOW_MODEL_REQUESTS = False  

@pytest.fixture
def anyio_backend():
    return 'asyncio'

async def test_add():
    test_assitant_response = "100 たす 50 は 150 です。"
    with simple_calc_agent.override(model=TestModel(custom_result_text=test_assitant_response)):
        result = await simple_calc_agent.run("100 たす 50 は?")

        assert result.data == test_assitant_response 
pytest -q test_simple_calc_agent.py
出力
F                                                                                                     [100%]
================================================= FAILURES ==================================================
_________________________________________________ test_add __________________________________________________

    async def test_add():
        test_assitant_response = "100 たす 50 は 150 です。"
        with simple_calc_agent.override(model=TestModel(custom_result_text=test_assitant_response)):
>           result = await simple_calc_agent.run("100 たす 50 は?")

test_simple_calc_agent.py:19: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
.venv/lib/python3.12/site-packages/pydantic_ai/agent.py:202: in run
    either = await self._handle_model_response(model_response, deps)
.venv/lib/python3.12/site-packages/pydantic_ai/agent.py:719: in _handle_model_response
    messages += await asyncio.gather(*tasks)
.venv/lib/python3.12/site-packages/pydantic_ai/tools.py:194: in run
    response_content = await _utils.run_in_executor(function, *args, **kwargs)
.venv/lib/python3.12/site-packages/pydantic_ai/_utils.py:24: in run_in_executor
    return await asyncio.get_running_loop().run_in_executor(None, partial(func, *args, **kwargs))
../../.local/share/mise/installs/python/3.12.7/lib/python3.12/concurrent/futures/thread.py:58: in run
    result = self.fn(*self.args, **self.kwargs)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

a = 0.0, b = 0.0

    @simple_calc_agent.tool_plain
    def divide(a: float, b: float) -> float:
        """数値が2つ与えられたら、1番目の数値(a)を2番目の数値(b)で割る"""
        if b == 0:
>           raise ValueError("Division by zero is not allowed.")
E           ValueError: Division by zero is not allowed.

simple_calc_agent.py:31: ValueError
========================================== short test summary info ==========================================
FAILED test_simple_calc_agent.py::test_add - ValueError: Division by zero is not allowed.
1 failed, 1 warning in 0.32s

エージェントを実行するところでコケていて、かつ、割り算で失敗している。上の方でも書いているが、以下。

TestModelは、エージェントのすべてのツールを実行し、

TestModelを使うと、エージェントに紐づけられたツールはすべて実行される。つまり今回の場合だと、add/subtract/multiply/divideが実行される。そして、このとき各ツールに渡す引数は、pytestの出力にもある通り、

a = 0.0, b = 0.0

なので、0で割り算はできないのでエラーとなっている様子。

このツールに渡す引数なのだが、TestModelだとどうも意図的に設定することができなさそう。唯一できそうなのは、

  • seedを与えるとそれがそのままツールの引数になるみたい
  • call_toolsを与えると、特定のツールだけを実行できる。

あたりでこんな感じ。

test_simple_calc_agent.py
import asyncio
import pytest
from datetime import timezone
from dirty_equals import IsNow

from pydantic_ai import models
from pydantic_ai.models.test import TestModel
from pydantic_ai.messages import (
    SystemPrompt,
    UserPrompt,
    ModelStructuredResponse,
    ToolCall,
    ArgsDict,
    ToolReturn ,
    ModelTextResponse,
)

from simple_calc_agent import simple_calc_agent

pytestmark = pytest.mark.anyio
models.ALLOW_MODEL_REQUESTS = False  

@pytest.fixture
def anyio_backend():
    return 'asyncio'

async def test_add():
    test_assitant_response = "10 たす 10 は 20 です。"
    with simple_calc_agent.override(model=TestModel(
        custom_result_text=test_assitant_response,
        # ツールを限定(デフォルトは"all"ですべてが実行される)
        call_tools=["add"],
        # ツールに渡されるパラメータ。今回の例だとaとbの両方に10.0が渡されるっぽい。
        seed=10
    )):
        test_user_prompt = "10 たす 10 は?"
        result = await simple_calc_agent.run(test_user_prompt)

        assert result.data == test_assitant_response 

        assert simple_calc_agent.last_run_messages == [
            SystemPrompt(
                content="あなたは親切な日本語のアシスタントです。必ずツールを使って計算をします。",
                role="system",
            ),
            UserPrompt(
                content=test_user_prompt,
                role="user",
                timestamp=IsNow(tz=timezone.utc),
            ),
            ModelStructuredResponse(
                calls=[
                    ToolCall(
                        tool_name='add',
                        args=ArgsDict(
                            args_dict={
                                'a': 10.0,
                                'b': 10.0,
                            }
                        ),
                        tool_id=None,
                    )
                ],
                timestamp=IsNow(tz=timezone.utc),
                role='model-structured-response',
            ),
            ToolReturn(
                tool_name='add',
                content=20.0,
                tool_id=None,
                timestamp=IsNow(tz=timezone.utc),
                role='tool-return',
            ),
            ModelTextResponse(
                content=test_assitant_response,
                role="model-text-response",
                timestamp=IsNow(tz=timezone.utc),
            ),
        ]
pytest -q test_simple_calc_agent.py
出力
.                                                                                                     [100%]
1 passed, 1 warning in 0.26s

まあ簡易なものであればこれでもいいのかもしれないが、複雑なものだとTestModelでは難しくなる。

FunctionModelを使ったユニットテスト

FunctionModelを使うと、ツールの呼び出し方をカスタマイズできる。

test_simple_calc_agent.py
import asyncio
import pytest
from datetime import timezone
from dirty_equals import IsNow

from pydantic_ai import models
from pydantic_ai.models.function import FunctionModel, AgentInfo
from pydantic_ai.messages import (
    Message,
    SystemPrompt,
    UserPrompt,
    ModelStructuredResponse,
    ToolCall,
    ArgsDict,
    ToolReturn ,
    ModelTextResponse,
    ModelAnyResponse
)

from simple_calc_agent import simple_calc_agent

pytestmark = pytest.mark.anyio
models.ALLOW_MODEL_REQUESTS = False  

@pytest.fixture
def anyio_backend():
    return 'asyncio'


def call_calculator(messages: list[Message], info: AgentInfo) -> ModelAnyResponse:
    # 受け取ったメッセージ数で分岐
    if len(messages) == 2:
        # メッセージ数=2、つまりシステムプロンプト+ユーザプロンプトの場合は、ツール呼び出しのレスポンスを返す
        user_prompt = messages[1]        
        print("ユーザの入力: ", user_prompt.content)
        # 今回は固定、ユーザの入力からパースしたりしても良さそう
        args = {"a": 2.0, "b": 5.0}
        return ModelStructuredResponse(
                calls=[ToolCall.from_dict('add', args)]  # 今回は足し算だけ
        )
    else:
        # メッセージ数!=2の場合、ツール実行の結果を踏まえた回答を返す

        # 最後のメッセージにツールの実行結果が含まれる
        msg = messages[-1]
        assert msg.role == 'tool-return'
        result = msg.content
        print("ツール実行結果:", result)
        if msg.content.is_integer():
            result = int(msg.content)
        return ModelTextResponse(f'答えは {result} です。')


@pytest.mark.anyio
async def test_addition_with_functionmodel():
    # TestModelで四則演算エージェントを模倣
    with simple_calc_agent.override(model=FunctionModel(call_calculator)):
        test_user_prompt = "2 + 5 は?"
        
        # エージェントにプロンプトを送信
        result = await simple_calc_agent.run(test_user_prompt)

        assert simple_calc_agent.last_run_messages == [
            SystemPrompt(
                content="あなたは親切な日本語のアシスタントです。必ずツールを使って計算をします。",
                role="system",
            ),
            UserPrompt(
                content=test_user_prompt,
                role="user",
                timestamp=IsNow(tz=timezone.utc),
            ),
            ModelStructuredResponse(
                calls=[
                    ToolCall(
                        tool_name='add',
                        args=ArgsDict(
                            args_dict={
                                'a': 2.0,
                                'b': 5.0,
                            }
                        ),
                        tool_id=None,
                    )
                ],
                timestamp=IsNow(tz=timezone.utc),
                role='model-structured-response',
            ),
            ToolReturn(
                tool_name='add',
                content=7.0,
                tool_id=None,
                timestamp=IsNow(tz=timezone.utc),
                role='tool-return',
            ),
            ModelTextResponse(
                content="答えは 7 です。",
                role="model-text-response",
                timestamp=IsNow(tz=timezone.utc),
            ),
        ]

FunctionModelに関数を指定すると、この関数には、

  • メッセージ: メッセージのリスト
  • エージェント情報: 実行可能なツール等などエージェントの設定情報

が渡される。これを使って、ツールの呼び出し・実行結果の処理等をカスタマイズして、レスポンスを返すという形になるみたい。今回はツールの引数を固定、足し算のみだが、このあたりをうまくパースしてやれば、ある程度融通が効きそう。

テストしてみる。

pytest -s -q 
ユーザの入力:  2 + 5 は?
ツール実行結果: 7.0
.
1 passed, 1 warning in 0.29s

少しデバッグ出力を入れてみたが、指定した引数でツールが実行できているのがわかる。


そもそもテスト用にカスタムモデルを定義すればいいじゃん、という話もある。なるほどね。

https://github.com/pydantic/pydantic-ai/issues/190

kun432kun432

続けて評価(Evals)

Evals

ざっと冒頭に書いてあることのまとめ

Evalsとは?

  • Evalsは特定のアプリケーションにおけるモデルのパフォーマンスを評価するプロセス。
  • ユニットテストではなく、ベンチマークに近い性質を持つ。
    • ユニットテストのように「合格/不合格」を判定するのではなく、時間の経過とともにパフォーマンスの変化を重視。
  • 実際のモデルで実行する必要があり、処理に時間がかかり、コストも発生する。
  • CIで頻繁に実行するには適さない。

パフォーマンス測定の難しさ

  • 測定方法はモデルの目的によって大きく異なる。
    • 例:SQL生成では、SQLの有効性や正確性を簡単に測定可能。
    • 一方、禁煙アドバイスの効果など、長期的かつ複雑な測定が必要なケースもある。

測定戦略

  1. エンドツーエンドの自己完結型テスト
    • 最終結果を即座にテスト。
    • 例:生成されたSQLが正しいかを検証。
  2. 合成された自己完結型テスト
    • 単純なチェックを活用(例:応答に特定のフレーズが含まれるか)。
    • 問題箇所の特定が容易。
  3. LLMによるLLMの評価
    • 別のモデルや異なるプロンプトを使用して、エージェントのパフォーマンスを評価。
    • 複雑だが、適切な状況下では有用。
  4. プロダクション環境でのEvals
    • 実際の運用環境での結果を測定。
    • 定量的な指標を作成し、モデルやプロンプトの変更による影響を追跡可能。
    • logfireのようなツールでカスタムクエリを利用すると効果的。

なるほど、過去に評価何度かやってきたけど、ユースケース等に合わせた測定戦略の違い、ってのは特に意識せずに都度都度考えてた気がするな。こういうふうにまとめてあるとわかりやすい。


システムプロンプトのカスタマイズによる評価

システムプロンプトを変更することにより、LLMの生成結果がどのように変わるか?という観点での評価の例。

  • SQLを生成するクエリと期待するSQLのデータセットを用意。
  • クエリをLLMに投げて、SQLを生成させる。
  • データセットのSQLと生成されたSQLをそれぞれ実行して結果が比較してスコアリングする

というものみたい。ドキュメントのサンプル例は抜粋になっていてわかりにくく感じたので、実際にDBにアクセスして評価するサンプルを書いたみた。

DBはChinookを使ってSQLiteでやる。

https://github.com/lerocha/chinook-database

前回から少し時間が経ってしまったので、再度最初から。Colaboratoryで。

パッケージインストール。

!pip install pydantic-ai
!pip freeze | grep -i pydantic
出力
pydantic==2.10.3
pydantic-ai==0.0.13
pydantic-ai-slim==0.0.13
pydantic_core==2.27.1

ノートブック環境では以下が必要

import nest_asyncio

nest_asyncio.apply()

OpenAI APIキーをセット

import os
from google.colab import userdata

os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_API_KEY')

Chinookデータベースをダウンロード

!wget https://github.com/lerocha/chinook-database/releases/download/v1.4.5/Chinook_Sqlite.sqlite

Chinookデータベースにアクセスするためのクラスを用意

from typing import List, Dict
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker
from sqlalchemy.engine.row import RowMapping

class Chinook:
    """Chinook DBへの接続を管理するクラス"""
    def __init__(self, db_url: str) -> None:
        self.engine = create_engine(db_url)
        self.Session = sessionmaker(bind=self.engine)

    def _execute_query(self, query: str) -> List[RowMapping]:
        """指定されたSQLクエリを実行して結果を辞書形式で返す"""
        try:
            with self.engine.connect() as connection:
                result = connection.execute(text(query))
                return [row._mapping for row in result]
        except SQLAlchemyError as e:
            raise RuntimeError(f"SQLクエリの実行中にエラーが発生しました: {e}")

    def _get_schema(self) -> List[dict]:
        """データベースのスキーマ情報を取得する"""
        query = "SELECT name, sql FROM sqlite_master WHERE type='table';"
        return self._execute_query(query)

    def _get_schema_w_rows(self, n: int = 5) -> Dict[str, Dict[str, List[dict]]]:
        """全テーブルのスキーマ情報と最初のN行を取得する"""
        try:
            schema_info = self._get_schema()
            results = {}
            for schema in schema_info:
                table_name = schema["name"]
                table_sql = schema["sql"]
                query = f"SELECT * FROM {table_name} LIMIT {n};"
                rows = self._execute_query(query)
                results[table_name] = {
                    "schema": table_sql,
                    "rows": rows
                }
            return results
        except SQLAlchemyError as e:
            raise RuntimeError(f"スキーマとデータ取得中にエラーが発生しました: {e}")
    
    def get_result_dict(self, query: str) -> List[dict]:
        """指定されたSQLクエリを実行して結果をシリアライズ可能な形式で返す"""
        raw_results = self._execute_query(query)
        return [dict(row) for row in raw_results]  # `row`を辞書に変換

    def get_result_data_only(self, query: str) -> List[tuple]:
        """カラム名を無視して結果のデータ部分のみを返す(評価用)"""
        raw_results = self._execute_query(query)
        return [tuple(row.values()) for row in raw_results]

    def get_schema_w_rows_as_str(self, n: int = 0, separator: str = '-' * 40) -> str:
        """全テーブルのスキーマ情報と最初のN行を文字列として返す"""
        schema_and_rows = self._get_schema_w_rows(n)
        output = []
        for table, info in schema_and_rows.items():
            output.append(f"Table: {table}")
            output.append("Schema:")
            output.append(info["schema"])
            if n > 0:
                output.append(f"First {n} Row(s):")
                for row in info["rows"]:
                    output.append(str(row))
            output.append(separator)
        return "\n".join(output)

こんな感じで使える。

データベースに接続

db = Chinook("sqlite:///Chinook_Sqlite.sqlite")

データベースの各テーブルのスキーマ情報を取得

schema = db.get_schema_w_rows_as_str()
print("\n".join(schema.split("\n")[:30]))     # 抜粋
出力
Table: Album
Schema:
CREATE TABLE [Album]
(
    [AlbumId] INTEGER  NOT NULL,
    [Title] NVARCHAR(160)  NOT NULL,
    [ArtistId] INTEGER  NOT NULL,
    CONSTRAINT [PK_Album] PRIMARY KEY  ([AlbumId]),
    FOREIGN KEY ([ArtistId]) REFERENCES [Artist] ([ArtistId]) 
		ON DELETE NO ACTION ON UPDATE NO ACTION
)
----------------------------------------
Table: Artist
Schema:
CREATE TABLE [Artist]
(
    [ArtistId] INTEGER  NOT NULL,
    [Name] NVARCHAR(120),
    CONSTRAINT [PK_Artist] PRIMARY KEY  ([ArtistId])
)
----------------------------------------
Table: Customer
Schema:
CREATE TABLE [Customer]
(
    [CustomerId] INTEGER  NOT NULL,
    [FirstName] NVARCHAR(40)  NOT NULL,
    [LastName] NVARCHAR(20)  NOT NULL,
    [Company] NVARCHAR(80),
    [Address] NVARCHAR(70),

スキーマに加えて一部データも付与する

schema = db.get_schema_w_rows_as_str(n=5)
print("\n".join(schema.split("\n")[:30]))
出力
Table: Album
Schema:
CREATE TABLE [Album]
(
    [AlbumId] INTEGER  NOT NULL,
    [Title] NVARCHAR(160)  NOT NULL,
    [ArtistId] INTEGER  NOT NULL,
    CONSTRAINT [PK_Album] PRIMARY KEY  ([AlbumId]),
    FOREIGN KEY ([ArtistId]) REFERENCES [Artist] ([ArtistId]) 
		ON DELETE NO ACTION ON UPDATE NO ACTION
)
First 5 Row(s):
{'AlbumId': 1, 'Title': 'For Those About To Rock We Salute You', 'ArtistId': 1}
{'AlbumId': 2, 'Title': 'Balls to the Wall', 'ArtistId': 2}
{'AlbumId': 3, 'Title': 'Restless and Wild', 'ArtistId': 2}
{'AlbumId': 4, 'Title': 'Let There Be Rock', 'ArtistId': 1}
{'AlbumId': 5, 'Title': 'Big Ones', 'ArtistId': 3}
----------------------------------------
Table: Artist
Schema:
CREATE TABLE [Artist]
(
    [ArtistId] INTEGER  NOT NULL,
    [Name] NVARCHAR(120),
    CONSTRAINT [PK_Artist] PRIMARY KEY  ([ArtistId])
)
First 5 Row(s):
{'ArtistId': 1, 'Name': 'AC/DC'}
{'ArtistId': 2, 'Name': 'Accept'}
{'ArtistId': 3, 'Name': 'Aerosmith'}

SQLを実行させる。

results = db.get_result_dict("SELECT * FROM Employee;")
for r in results:
    print(r)
出力
{'EmployeeId': 1, 'LastName': 'Adams', 'FirstName': 'Andrew', 'Title': 'General Manager', 'ReportsTo': None, 'BirthDate': '1962-02-18 00:00:00', 'HireDate': '2002-08-14 00:00:00', 'Address': '11120 Jasper Ave NW', 'City': 'Edmonton', 'State': 'AB', 'Country': 'Canada', 'PostalCode': 'T5K 2N1', 'Phone': '+1 (780) 428-9482', 'Fax': '+1 (780) 428-3457', 'Email': 'andrew@chinookcorp.com'}
{'EmployeeId': 2, 'LastName': 'Edwards', 'FirstName': 'Nancy', 'Title': 'Sales Manager', 'ReportsTo': 1, 'BirthDate': '1958-12-08 00:00:00', 'HireDate': '2002-05-01 00:00:00', 'Address': '825 8 Ave SW', 'City': 'Calgary', 'State': 'AB', 'Country': 'Canada', 'PostalCode': 'T2P 2T3', 'Phone': '+1 (403) 262-3443', 'Fax': '+1 (403) 262-3322', 'Email': 'nancy@chinookcorp.com'}
(snip)

上記は結果が辞書で返されるため、カラム名も含まれる。こちらはカラム名を含まないバージョン。

results = db.get_result_data_only("SELECT * FROM Employee;")
for r in results:
    print(r)
出力
(1, 'Adams', 'Andrew', 'General Manager', None, '1962-02-18 00:00:00', '2002-08-14 00:00:00', '11120 Jasper Ave NW', 'Edmonton', 'AB', 'Canada', 'T5K 2N1', '+1 (780) 428-9482', '+1 (780) 428-3457', 'andrew@chinookcorp.com')
(2, 'Edwards', 'Nancy', 'Sales Manager', 1, '1958-12-08 00:00:00', '2002-05-01 00:00:00', '825 8 Ave SW', 'Calgary', 'AB', 'Canada', 'T2P 2T3', '+1 (403) 262-3443', '+1 (403) 262-3322', 'nancy@chinookcorp.com')
(3, 'Peacock', 'Jane', 'Sales Support Agent', 2, '1973-08-29 00:00:00', '2002-04-01 00:00:00', '1111 6 Ave SW', 'Calgary', 'AB', 'Canada', 'T2P 5M5', '+1 (403) 262-3443', '+1 (403) 262-6712', 'jane@chinookcorp.com')
(snip)

これを使ってSQLエージェントを書いてみる。

from pydantic_ai import Agent, RunContext

class SqlSystemPrompt:
    def __init__(self, db: Chinook, n: int = 0):
        self.db = db    # Chinookクラスのインスタンス
        self.n = n     # システムプロンプトにふくめる各テーブルの実データの数(デフォルト0)

    def build_prompt(self) -> str:
        # DBスキーマは必ず含める、実データ数は設定に基づいて含める
        database_context = self.db.get_schema_w_rows_as_str(n=self.n)
        return (
            "あなたは親切な音楽ストアのアシスタントです。"
            "あなたの仕事は、ユーザーのリクエストに基づいてデータベースから情報を取得するSQLを生成することです。"
            "説明は一切不要です。コードブロックも使用してはいけません。SQLだけをプレーンテキストで生成してください。"
            f"\n{database_context}"
        )

sql_agent: Agent[SqlSystemPrompt, str] = Agent(
    model="openai:gpt-4o",
    deps_type=SqlSystemPrompt,
    result_type=str
)

@sql_agent.system_prompt
def system_prompt(ctx: RunContext[SqlSystemPrompt]) -> str:
    return ctx.deps.build_prompt()

LLMにSQLを生成させる場合、LLMがデータベースのコンテキストに対して理解していれば、より適切なSQLが生成される可能性が高くなる。データベースのコンテキストとして何を与えるかはいくつか考えられる。

  1. DBのスキーマ
  2. DBの実際のデータ
  3. 実行可能なSQLの例(few-shot)

ドキュメントのサンプルコードではこれらのうち3の「実行可能なSQLの例(few-shot)」与えるパターンになっていて、いろいろなデータセットを指定してシステムプロンプトを生成するというようなものに見える。

ただ、SQL生成する際にfew-shotだけではDBに対するコンテキストが足りず、精度が高いSQLを生成することはそもそも難しいと思ったので、1と2を渡すようにして、2の数を調整して評価する、というようなシナリオで書き直した。

では、このエージェントを実行してみる。

def search(user_prompt: str) -> str:
    db = Chinook("sqlite:///Chinook_Sqlite.sqlite")
    # 実データは含めず、スキーマのみシステムプロンプトに含める
    result = sql_agent.run_sync(user_prompt, deps=SqlSystemPrompt(db=db))
    return db.get_result_data_only(result.data) 
search("従業員をリストアップして")
出力
[(1,
  'Adams',
  'Andrew',
  'General Manager',
  None,
  '1962-02-18 00:00:00',
  '2002-08-14 00:00:00',
  '11120 Jasper Ave NW',
  'Edmonton',
  'AB',
  'Canada',
  'T5K 2N1',
  '+1 (780) 428-9482',
  '+1 (780) 428-3457',
  'andrew@chinookcorp.com'),
 (2,
  'Edwards',
  'Nancy',
  'Sales Manager',
  1,
  '1958-12-08 00:00:00',
  '2002-05-01 00:00:00',
  '825 8 Ave SW',
  'Calgary',
  'AB',
  'Canada',
  'T2P 2T3',
  '+1 (403) 262-3443',
  '+1 (403) 262-3322',
  'nancy@chinookcorp.com'),
 (3,
  'Peacock',
  'Jane',
  'Sales Support Agent',
  2,
  '1973-08-29 00:00:00',
  '2002-04-01 00:00:00',
  '1111 6 Ave SW',
  'Calgary',
  'AB',
  'Canada',
  'T2P 5M5',
  '+1 (403) 262-3443',
  '+1 (403) 262-6712',
  'jane@chinookcorp.com'),
(snip)

簡単なものであれば特に問題なく、SQL生成・実行までできているのがわかる。

では評価してみる。まず以下のようなデータセットを用意。

%%writefile dataset.json
[
    {
        "request": "AC/DCのすべてのアルバムのタイトルを教えて。",
        "sql": "SELECT Title FROM Album WHERE ArtistId = (SELECT ArtistId FROM Artist WHERE Name = 'AC/DC');"
    },
    {
        "request": "ヴァン・ヘイレンのすべてのアルバムの曲のタイトルを教えて。",
        "sql": "SELECT Track.Name FROM Track JOIN Album ON Track.AlbumId = Album.AlbumId JOIN Artist ON Album.ArtistId = Artist.ArtistId WHERE Artist.Name = 'Van Halen'"
    },
    {
        "request": "ドイツにいるすべての顧客の名前を教えて。",
        "sql": "SELECT FirstName, LastName FROM Customer WHERE Country = 'Germany';"
    },
    {
        "request": "従業員の数を教えて。",
        "sql": "SELECT COUNT(*) FROM Employee;"
    },
    {
        "request": "「Sales Manager」という役職を持つ従業員の名前とメールアドレスを教えて",
        "sql": "SELECT FirstName, LastName, Email FROM Employee WHERE Title = 'Sales Manager';"
    },
    {
        "request": "曲のジャンルの内訳は?",
        "sql": "SELECT Genre.Name, COUNT(Track.TrackId) AS NumberOfTracks FROM Track JOIN Genre ON Track.GenreId = Genre.GenreId GROUP BY Genre.Name;"
    },
    {
        "request": "「90's Music」プレイリストに含まれるすべての曲のタイトル、アルバム名、アーティスト名を教えて。",
        "sql": "SELECT Track.Name AS TrackTitle, Album.Title AS AlbumName, Artist.Name AS ArtistName FROM PlaylistTrack JOIN Track ON PlaylistTrack.TrackId = Track.TrackId JOIN Album ON Track.AlbumId = Album.AlbumId JOIN Artist ON Album.ArtistId = Artist.ArtistId JOIN Playlist ON PlaylistTrack.PlaylistId = Playlist.PlaylistId WHERE Playlist.Name = '90’s Music';"
    },
    {
        "request": "最も高額な購入をした顧客の名前と金額を教えて。",
        "sql": "SELECT c.FirstName, c.LastName, i.Total FROM Customer c JOIN Invoice i ON c.CustomerId = i.CustomerId ORDER BY i.Total DESC LIMIT 1;"
    },
    {
        "request": "テキサスとニューヨークに住んでいる顧客の名前と住所を教えて。",
        "sql": "SELECT FirstName, LastName, Address, City, State, Country, PostalCode FROM Customer WHERE State IN ('TX', 'NY');"
    },
    {
        "request": "Frank Harrisはどこで働いている?",
        "sql": "SELECT Company FROM Customer WHERE FirstName = 'Frank' AND LastName = 'Harris';"
    }
]

クエリと期待するSQLのペアとなっている。評価用のコード。

from pathlib import Path
import json
import statistics

with Path('dataset.json').open('rb') as f:
    dataset = json.load(f)

db = Chinook("sqlite:///Chinook_Sqlite.sqlite")

# スコアは以下のルールとする。
# - エージェントの実行したSQL結果と期待するSQLの結果が完全に一致: 10点
# - エージェントの実行したSQL結果と期待するSQLの結果で、件数および内容は一致するが順序が異なる: 8点
# - エージェントの実行したSQL結果と期待するSQLの結果に一部差異がある: 期待するSQL結果とマッチする割合 * 8点
# - エージェントの実行したSQLが失敗: 0点
scores = []
for i, data in enumerate(dataset, start=1):
    score = 10
    with sql_agent.override(deps=SqlSystemPrompt(db=db)):  # まず実データは含めない場合
        print(f"---- Test {i}: {data['request']} ----")
        try:
            agent_results = search(data['request'])
        except Exception as e:
            score = 0
            print(f"Score: {score}")
            print("agent_results:", {e})
        else:
            expected_results = db.get_result_data_only(data['sql'])
        
            if agent_results == expected_results:
                pass
            else:
                if set(agent_results) == set(expected_results):
                    score = 8
                else:
                    score = 8 * (len(set(agent_results) & set(expected_results)) / len(expected_results))

            print(f"Score: {score}")
            print("agent_results:", agent_results)
            print("expected_results:",expected_results)
            scores.append(score)

overall_score = statistics.mean(scores)
print("==================")
print(f'Overall score: {overall_score:0.2f}')

ここもドキュメントのサンプルコードからは結構変えた。

  • サンプルコードでは、few-shotで使うためにデータセットをtraining/testに分けて使っている
    • 自分のコードではfew-shot不要なので、すべてをテスト項目として使う。
  • サンプルコードのスコアリング方法は加点方式になっていて、結果件数の差異によって結構ばらつきが生まれそう(件数が多いほどスコアが極端に高くor低く出そう)、かつ、それを平均化してるので正確性に欠けそう
    • 10点満点方式で結果を条件で比較してスコアリングする方式に変えた。
    • なお、カラム名までは見ない(SELECT 〜 ASのマッチまで見るのは大変なので)

評価手法が正しいかはあまり考えず、少しシンプルに変えた感じ。

では実行してみる。初回は実データを含めない場合。

出力
---- Test 1: AC/DCのすべてのアルバムのタイトルを教えて。 ----
Score: 10
agent_results: [('For Those About To Rock We Salute You',), ('Let There Be Rock',)]
expected_results: [('For Those About To Rock We Salute You',), ('Let There Be Rock',)]
---- Test 2: ヴァン・ヘイレンのすべてのアルバムの曲のタイトルを教えて。 ----
Score: 10
agent_results: [('Where Have All The Good Times Gone?',), ("Hang 'Em High",), (snip)
expected_results: [('Where Have All The Good Times Gone?',), ("Hang 'Em High",), (snip)
---- Test 3: ドイツにいるすべての顧客の名前を教えて。 ----
Score: 10
agent_results: [('Leonie', 'Köhler'), ('Hannah', 'Schneider'), ('Fynn', 'Zimmermann'), ('Niklas', 'Schröder')]
expected_results: [('Leonie', 'Köhler'), ('Hannah', 'Schneider'), ('Fynn', 'Zimmermann'), ('Niklas', 'Schröder')]
---- Test 4: 従業員の数を教えて。 ----
Score: 10
agent_results: [(8,)]
expected_results: [(8,)]
---- Test 5: 「Sales Manager」という役職を持つ従業員の名前とメールアドレスを教えて ----
Score: 10
agent_results: [('Nancy', 'Edwards', 'nancy@chinookcorp.com')]
expected_results: [('Nancy', 'Edwards', 'nancy@chinookcorp.com')]
---- Test 6: 曲のジャンルの内訳は? ----
Score: 10
agent_results: [('Alternative', 40), ('Alternative & Punk', 332), ('Blues', 81), (snip)
expected_results: [('Alternative', 40), ('Alternative & Punk', 332), ('Blues', 81), (snip)
---- Test 7: 「90's Music」プレイリストに含まれるすべての曲のタイトル、アルバム名、アーティスト名を教えて。 ----
Score: 0.0
agent_results: []
expected_results: [('We Die Young', 'Facelift', 'Alice In Chains'), ('Man In The Box', 'Facelift', 'Alice In Chains'),  (snip)
---- Test 8: 最も高額な購入をした顧客の名前と金額を教えて。 ----
Score: 10
agent_results: [('Helena', 'Holý', 25.86)]
expected_results: [('Helena', 'Holý', 25.86)]
---- Test 9: テキサスとニューヨークに住んでいる顧客の名前と住所を教えて。 ----
Score: 0.0
agent_results: []
expected_results: [('Michelle', 'Brooks', '627 Broadway', 'New York', 'NY', 'USA', '10012-2612'), ('Richard', 'Cunningham', '2211 W Berry Street', 'Fort Worth', 'TX', 'USA', '76110')]
---- Test 10: Frank Harrisはどこで働いている? ----
Score: 10
agent_results: [('Google Inc.',)]
expected_results: [('Google Inc.',)]
==================
Overall score: 8.00

次に実データを5件含めた場合。

    with sql_agent.override(deps=SqlSystemPrompt(db=db, n=5)):
出力
---- Test 1: AC/DCのすべてのアルバムのタイトルを教えて。 ----
Score: 10
agent_results: [('For Those About To Rock We Salute You',), ('Let There Be Rock',)]
expected_results: [('For Those About To Rock We Salute You',), ('Let There Be Rock',)]
---- Test 2: ヴァン・ヘイレンのすべてのアルバムの曲のタイトルを教えて。 ----
Score: 10
agent_results: [('Where Have All The Good Times Gone?',), ("Hang 'Em High",), (snip)
expected_results: [('Where Have All The Good Times Gone?',), ("Hang 'Em High",), (snip)
---- Test 3: ドイツにいるすべての顧客の名前を教えて。 ----
Score: 10
agent_results: [('Leonie', 'Köhler'), ('Hannah', 'Schneider'), ('Fynn', 'Zimmermann'), ('Niklas', 'Schröder')]
expected_results: [('Leonie', 'Köhler'), ('Hannah', 'Schneider'), ('Fynn', 'Zimmermann'), ('Niklas', 'Schröder')]
---- Test 4: 従業員の数を教えて。 ----
Score: 10
agent_results: [(8,)]
expected_results: [(8,)]
---- Test 5: 「Sales Manager」という役職を持つ従業員の名前とメールアドレスを教えて ----
Score: 10
agent_results: [('Nancy', 'Edwards', 'nancy@chinookcorp.com')]
expected_results: [('Nancy', 'Edwards', 'nancy@chinookcorp.com')]
---- Test 6: 曲のジャンルの内訳は? ----
Score: 10
---- Test 7: 「90's Music」プレイリストに含まれるすべての曲のタイトル、アルバム名、アーティスト名を教えて。 ----
Score: 10
agent_results: [('We Die Young', 'Facelift', 'Alice In Chains'), ('Man In The Box', 'Facelift', 'Alice In Chains'), ('Sea Of Sorrow', 'Facelift', 'Alice In Chains'), (snip)
expected_results: [('We Die Young', 'Facelift', 'Alice In Chains'), ('Man In The Box', 'Facelift', 'Alice In Chains'), ('Sea Of Sorrow', 'Facelift', 'Alice In Chains'), (snip)
---- Test 8: 最も高額な購入をした顧客の名前と金額を教えて。 ----
Score: 10
agent_results: [('Helena', 'Holý', 25.86)]
expected_results: [('Helena', 'Holý', 25.86)]
---- Test 9: テキサスとニューヨークに住んでいる顧客の名前と住所を教えて。 ----
Score: 0.0
agent_results: [('Michelle', 'Brooks', '627 Broadway'), ('Richard', 'Cunningham', '2211 W Berry Street')]
expected_results: [('Michelle', 'Brooks', '627 Broadway', 'New York', 'NY', 'USA', '10012-2612'), ('Richard', 'Cunningham', '2211 W Berry Street', 'Fort Worth', 'TX', 'USA', '76110')]
---- Test 10: Frank Harrisはどこで働いている? ----
Score: 10
agent_results: [('Google Inc.',)]
expected_results: [('Google Inc.',)]
==================
Overall score: 9.00

少し上がったように見えるが、まあ誤差レベルだと思うし、データも手法も雑なので実際にやる場合にはもっと詰める必要はあると思う。

kun432kun432

まとめ

Testing and Evals、かなり苦戦したが、なんとか一通り終わらせた。

とりあえずのまとめは既に書いていて、そこから印象が大きく変わったということはないけど、Python力がもっと必要というのは感じたかな。

https://zenn.dev/link/comments/3691599ac0d1e7

自分はLLMが話題になりだした去年からPythonを触り始めた程度なので、まだ色々理解が足りていないとは思う。ただそれでもLangChain/LangGraph/LlamaIndexなどでマルチエージェントを実装してみたことはあるし、今回ほど苦労を感じた記憶もなく、動くものを作りながら徐々に学ぶ、慣れてきたら少し中身を追いかけてみたりいじってみたり、みたいなことを少しづつやってきた。(既存のフレームワークの高い抽象化に甘やかされていたのは当然ある。)

その体験からすると、PydanticAIの場合は、初期の学習コストというか前提となるPythonの知識などはそれなりに求められるような気が個人的にはしている。だからといって高い抽象化のフレームワークのほうが良いとは言えないし、逆にハマる場合も当然あるので、つまりユースケース次第。

例えるなら、雑な知識・雑な実装でPoC作るみたいなユースケースなら、PydanticAIよりももっと抽象化度合いが高いフレームワークを使って早く作るほうがいいと思う。ReActエージェントとかもビルトインで実装されていたりして、簡単に推論ループを作ったりできると思う。

逆にPythonに精通してて商用できちんと制御可能なものを作りたい、とかならPydanticAIのほうがいいのかもしれないけど、それこそReActエージェントをやろうと思うと、自分で実装する必要は出てくる。

この辺はユースケースにあわせて選択すれば良いのではないかと思う。

このスクラップは1日前にクローズされました