🕸️

PydanticAI の新機能:pydantic-graph で AI エージェントのワークフローを作る

2025/01/23に公開

はじめに

先月の Advent Calendar の記事で、PydanticAI という Pydantic の開発チームから公開された AI エージェントライブラリの紹介をしました。

https://qiita.com/atsukish/items/a1613c77cecd41980467

そして先日の v0.0.19 のリリースで、pydantic-graphというグラフ API が追加され、いわゆる Agentic Workflow が実装できるようになりました。

https://x.com/samuel_colvin/status/1879627376990224417

https://github.com/pydantic/pydantic-ai/tree/main/pydantic_graph

機能としては LangGraph 的なものと想定されますが、PydanticAI は使いやすかったので、こちらも触って見たいと思います。

pydantic-graph

pydantic-graphは、Pydantic の思想に基づいて型ヒントを使ってノードとエッジを定義する非同期グラフとステートマシンのライブラリで、複雑なワークフローのモデリングが可能です。

install

pydantic-graph は、PydanticAI の一部として開発されていますが、PydanticAI に依存せず単独で使用できます。

uv add pydantic-graph

PydanticAI と組み合わせて AI Agent を実装する場合は追加でインストールする必要があります。

uv add pydantic-ai

pydantic-graph の主要コンポーネント

pydantic-graphの主な構成要素は、GraphRunContext、End、Nodes、Graph の 4 つです。

GraphRunContext

グラフの実行コンテキストを提供し、グラフの状態と依存関係を保持し、ノードの実行時に渡されます。StateT という型でグラフの状態をジェネリックに表現します。

End

グラフの実行が終了することを示す戻り値で、RunEndT という型でグラフの戻り値をジェネリックに表現します。

Nodes

グラフ内で実行される処理の単位で、BaseNodeのサブクラスとして定義されます。

  • 通常 dataclass として定義され、ノードの実行に必要なパラメータをフィールド定義
  • Node のビジネスロジックは、runメソッド内に実装
  • runメソッドの戻り値のアノテーションは、pydantic-graphがノードの出力エッジを決定するために使用

また Node は、statedepsgraph return type の 3 つのジェネリックパラメータを持ちます。

  • state: グラフの状態の型を指定(デフォルト:None
  • deps: グラフの依存関係の型を指定(デフォルト:None
  • graph return type: ノードがEndを返す場合にのみ適用され、グラフの戻り値の型を指定(デフォルト:Never

Graph

実行グラフ自体であり、BaseNodeのサブクラスであるノードクラスの集合で構成されます。Graph は、statedepsgraph return type の 3 つのクラスパラメータを持ちます。

  • state: グラフの状態の型を指定
  • deps: グラフの依存関係の型を指定
  • graph return type: グラフの実行の戻り値の型を指定

これらの要素を組み合わせることで、複雑なワークフローをモデル化し、実行することができます。pydantic-graphは、非同期処理をサポートし、型ヒントを活用することで、より堅牢で保守性の高いグラフベースのアプリケーションを開発できます。

サンプルコード

イメージを掴むために、ドキュメント内のサンプルコードを実行してみます。以下は自動販売機を pydantic-graph でワークフロー化したものです。この例ではユーザーが硬貨を投入し、商品を選択して購入するプロセスをワークフロー化しています。

vending_machine.py
"""vending_machine.py"""
from __future__ import annotations

import asyncio
from dataclasses import dataclass

from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from rich.prompt import Prompt


@dataclass
class MachineState:
    user_balance: int = 0
    product: str | None = None


@dataclass
class InsertCoin(BaseNode[MachineState]):
    async def run(self, ctx: GraphRunContext[MachineState]) -> CoinsInserted:
        return CoinsInserted(int(Prompt.ask("お金を入れてください。")))


@dataclass
class CoinsInserted(BaseNode[MachineState]):
    amount: int

    async def run(
        self, ctx: GraphRunContext[MachineState]
    ) -> SelectProduct | Purchase:
        ctx.state.user_balance += self.amount
        if ctx.state.product is not None:
            return Purchase(ctx.state.product)
        else:
            return SelectProduct()


@dataclass
class SelectProduct(BaseNode[MachineState]):
    async def run(self, ctx: GraphRunContext[MachineState]) -> Purchase:
        return Purchase(Prompt.ask("Select product"))


PRODUCT_PRICES = {
    "天然水": 100,
    "コーラ": 120,
    "ポテトチップス": 150,
    "チョコレート": 200,
}


@dataclass
class Purchase(BaseNode[MachineState, None, None]):
    product: str

    async def run(
        self, ctx: GraphRunContext[MachineState]
    ) -> End | InsertCoin | SelectProduct:
        if price := PRODUCT_PRICES.get(self.product):
            ctx.state.product = self.product
            if ctx.state.user_balance >= price:
                ctx.state.user_balance -= price
                return End(None)
            else:
                diff = price - ctx.state.user_balance
                print(
                    f"お金が足りません。{self.product}を買うには{diff}円足りません。"
                )
                return InsertCoin()
        else:
            print(f"その商品はありません。{self.product}を選んでください。")
            return SelectProduct()


vending_machine_graph = Graph(
    nodes=[InsertCoin, CoinsInserted, SelectProduct, Purchase]
)


async def main() -> None:
    state = MachineState()
    await vending_machine_graph.run(start_node=InsertCoin(), state=state)
    print(f"購入成功 商品={state.product} 残高={state.user_balance}")


if __name__ == "__main__":
    asyncio.run(main())

状態管理

自動販売機の状態は、MachineState というデータクラスで管理され、ユーザーの投入金額(user_balance)と選択した商品(product)を保持します。このデータクラスは、GraphRunContextstate パラメータで管理されます。このパラメータは、グラフの実行コンテキストに渡され、ノードの実行時に使用されます。

@dataclass
class MachineState:
    user_balance: int = 0
    product: str | None = None

ノード

自動販売機の各動作は、BaseNode を継承したInsertCoinCoinsInsertedSelectProductPurchase という 4 つのノードで構成されます。

  • InsertCoin : ユーザに硬貨投入を促し、投入金額を受け取る
  • CoinsInserted : 投入金額をMachineStateuser_balanceに加算
  • SelectProduct : ユーザーに商品選択を促す
  • Purchase : 選択された商品の価格を確認し、残高が十分であれば購入を完了し、不足している場合は硬貨の追加投入を求める

各ノードのロジックは、runメソッドを実装しており、その戻り値の型はグラフの次のノードを示します。

制御フロー

各ノードの分岐、終了等の制御フローもrunメソッド内で実装し、戻り値で次の遷移するノードを指定することができます。

  • CoinsInserted

    • すでに商品が選択されている場合はPurchaseノードに遷移
    • 商品が選択されていない場合はSelectProductノードに遷移
    @dataclass
    class CoinsInserted(BaseNode[MachineState]):
        amount: int
    
        async def run(
            self, ctx: GraphRunContext[MachineState]
        ) -> SelectProduct | Purchase:
            ctx.state.user_balance += self.amount
            if ctx.state.product is not None:
                return Purchase(ctx.state.product)
            else:
                return SelectProduct()
    
  • Purchase

    • 選択された商品の価格を確認し、残高が十分であればワークフローの終了(End
    • 残高が不足している場合はInsertCoinノードに遷移
    • 選択された商品が存在しない場合はSelectProductノードに遷移
    @dataclass
    class Purchase(BaseNode[MachineState, None, None]):
        product: str
    
        async def run(
            self, ctx: GraphRunContext[MachineState]
        ) -> End | InsertCoin | SelectProduct:
            if price := PRODUCT_PRICES.get(self.product):
                ctx.state.product = self.product
                if ctx.state.user_balance >= price:
                    ctx.state.user_balance -= price
                    return End(None)
                else:
                    diff = price - ctx.state.user_balance
                    print(
                        f"お金が足りません。{self.product}を買うには{diff}円足りません。"
                    )
                    return InsertCoin()
            else:
                print(f"その商品はありません。{self.product}を選んでください。")
                return SelectProduct()
    

Graph

vending_machine_graphは、Graphクラスのインスタンスで、ノードのリストとしてInsertCoinCoinsInsertedSelectProductPurchaseを定義しています。グラフはrunメソッドで実行し、この際に初期状態と開始ノードを指定する必要があります。

# グラフの定義
vending_machine_graph = Graph(
    nodes=[InsertCoin, CoinsInserted, SelectProduct, Purchase]
)

# グラフの実行(初期状態:state, 開始ノード:InsertCoin)
state = MachineState()
await vending_machine_graph.run(start_node=InsertCoin(), state=state)

Agentic Workflow

ここでようやく LLM と連携し、AI Agent を PydanticAI と pydantic-graph で実装します。先日 Anthropic 社から効果的な AI エージェントの開発方法に関する記事で紹介されれているワークフローのうち、Evaluator-optimizer のワークフローを pydantic-graph で実装してみます。

image

https://www.anthropic.com/research/building-effective-agents

evaluator_optimizer.py
from __future__ import annotations as _annotations

import asyncio
from dataclasses import dataclass, field

import logfire
from dotenv import load_dotenv
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_ai.format_as_xml import format_as_xml
from pydantic_ai.messages import ModelMessage
from pydantic_graph import BaseNode, End, Graph, GraphRunContext

logfire.configure()
load_dotenv()


@dataclass
class State:
    product_name: str
    write_agent_messages: list[ModelMessage] = field(default_factory=list)
    retry: int = 0


content_generator_agent = Agent(
    "openai:gpt-4o-mini",
    result_type=str,
    system_prompt="あなたはお菓子メーカーのマーケティング担当の社員です。",
)


@dataclass
class Generator(BaseNode[State]):
    content_feedback: str | None = None

    async def run(self, ctx: GraphRunContext[State]) -> Feedback:
        if self.content_feedback:
            prompt = (
                f"フィードバック内容を受けて、内容を修正してください。\n"
                f"{format_as_xml(ctx.state.product_name)}\n"
                f"Feedback: {self.content_feedback}"
            )
        else:
            prompt = f"{ctx.state.product_name}のマーケティング戦略を考えてください。"

        result = await content_generator_agent.run(
            prompt,
            message_history=ctx.state.write_agent_messages,
        )
        ctx.state.write_agent_messages += result.all_messages()
        ctx.state.retry += 1
        print(result.data)
        return Feedback(content=result.data)


class ContentRequiresWrite(BaseModel):
    feedback: str


class ContentOK(BaseModel):
    pass


feedback_agent = Agent[None, ContentRequiresWrite | ContentOK](
    "openai:gpt-4o-mini",
    result_type=ContentRequiresWrite | ContentOK,  # type: ignore
    system_prompt=(
        "マーケティング戦略に対して、以下の観点に基づいたフィードバックをください。\n"
        "1.現状分析(市場規模、競合状況、ユーザーのニーズなど)\n"
        "2.マーケティング目標の設定\n"
        "3.ターゲット層の定義とアプローチ案の提案\n"
    ),
)


@dataclass
class Feedback(BaseNode[State, None, str]):
    content: str

    async def run(
        self,
        ctx: GraphRunContext[State],
    ) -> Generator | End[str]:
        if ctx.state.retry > 2:
            return End(self.content)

        prompt = format_as_xml(
            {"product_name": ctx.state.product_name, "content": self.content}
        )
        result = await feedback_agent.run(prompt)
        if isinstance(result.data, ContentRequiresWrite):
            print(result.data.feedback)
            return Generator(content_feedback=result.data.feedback)
        else:
            return End(self.content)


async def main() -> None:
    state = State(product_name="チョコレート")
    feedback_graph = Graph(nodes=(Generator, Feedback))
    content, _ = await feedback_graph.run(Generator(), state=state)
    print(content)


if __name__ == "__main__":
    asyncio.run(main())

ここでは、Generatorノードでマーケティング戦略を生成し、Feedbackノードでその内容を評価し、問題がある場合はフィードバックしています。Feedbackノードは、Generatorノードの出力を受け取り、フィードバックを行った後、再度Generatorノードに戻るか、グラフを終了するかを決定します。なお試行回数が増えるとトークン数が増えてコストが高くなるので、試行回数に上限を設けています。

最終的に得られたアウトプットは以下のようになります。

フィードバックを考慮して、チョコレートのマーケティング戦略を修正しました。以下に新しい内容を示します。

### 1. 現状分析
チョコレート市場は現在、健康志向が高まる一方で、プレミアム市場やユニークなフレーバーに注力するブランドも多く、競争が激化しています。競合他社の強みや弱みを詳細に分析し、自社の差別化要素を明確にすることが必要です。また、市場調査をさらに進め、消費者の具体的な購入動機や嗜好の傾向を把握し、データに基づいた戦略を立案しましょう。

### 2. マーケティング目標
ブランド認知度を30%向上させるとともに、具体的な売上目標(例えば、1年で売上を15%増加させる)を設定します。また、プロモーションごとの効果を測定するために、KPIを明確に設定し、以下のような評価基準を取り入れることで進捗を把握しやすくなります:
- リピート購入率
- SNSでのエンゲージメント率
- キャンペーン参加者数

### 3. ターゲット層の定義とアプローチ提案
ターゲット層は健康志向の20代後半から40代初めの男女ですが、さらにセグメンテーションを行い、以下のように属性や行動パターンに基づいた細分化を行います:
- **フィットネス愛好者**: トレーニング後のスナックとして位置付け、プロテイン入りのチョコレートを提案。
- **ストレス解消を求める層**: ストレス軽減効果を打ち出したフレーバーを提案し、リラクセーション効果を訴求。

キャンペーンは、ソーシャルメディアを活用し、ユーザー生成コンテンツ(UGC)の促進を重視します。具体的なインセンティブとして、キャンペーン参加者にはオリジナル商品や割引クーポンを提供し、参加率を高めます。例えば、「#私のチョコレートレシピ」キャンペーンを実施し、投稿者の中から優秀なレシピを選出することで、消費者の関心を集めつつ参加意欲を喚起します。

これらの戦略を組み合わせることで、ターゲット層との接点を増やし、ブランドと消費者の関係を強化することを目指します。

まとめ

pydantic-graphを使用することで、ワークフローをモデル化し、実行することができました。
PydanticAI と同様にシンプルな記述ができる点は魅力の一つです。

LLM ワークフローの比較対象として LangGraph が挙げられると思いますが、LangGraph で用意されているチェックポイント機能やステートの永続化機能は今のところ pydantic-graph では提供されていなさそうです。ただまだ開発が始まったばかりなので、今後のアップデートで追加されることを期待したいと思います。

Discussion