PydanticAI の新機能:pydantic-graph で AI エージェントのワークフローを作る
はじめに
先月の Advent Calendar の記事で、PydanticAI という Pydantic の開発チームから公開された AI エージェントライブラリの紹介をしました。
そして先日の v0.0.19 のリリースで、pydantic-graph
というグラフ API が追加され、いわゆる Agentic Workflow が実装できるようになりました。
機能としては LangGraph 的なものと想定されますが、PydanticAI は使いやすかったので、こちらも触って見たいと思います。
pydantic-graph
pydantic-graph
は、Pydantic の思想に基づいて型ヒントを使ってノードとエッジを定義する非同期グラフとステートマシンのライブラリで、複雑なワークフローのモデリングが可能です。
install
pydantic-graph は、PydanticAI の一部として開発されていますが、PydanticAI に依存せず単独で使用できます。
uv add pydantic-graph
PydanticAI と組み合わせて AI Agent を実装する場合は追加でインストールする必要があります。
uv add pydantic-ai
pydantic-graph の主要コンポーネント
pydantic-graph
の主な構成要素は、GraphRunContext、End、Nodes、Graph の 4 つです。
GraphRunContext
グラフの実行コンテキストを提供し、グラフの状態と依存関係を保持し、ノードの実行時に渡されます。StateT
という型でグラフの状態をジェネリックに表現します。
End
グラフの実行が終了することを示す戻り値で、RunEndT
という型でグラフの戻り値をジェネリックに表現します。
Nodes
グラフ内で実行される処理の単位で、BaseNode
のサブクラスとして定義されます。
- 通常
dataclass
として定義され、ノードの実行に必要なパラメータをフィールド定義 - Node のビジネスロジックは、
run
メソッド内に実装 -
run
メソッドの戻り値のアノテーションは、pydantic-graph
がノードの出力エッジを決定するために使用
また Node は、state
、deps
、graph return type
の 3 つのジェネリックパラメータを持ちます。
-
state
: グラフの状態の型を指定(デフォルト:None
) -
deps
: グラフの依存関係の型を指定(デフォルト:None
) -
graph return type
: ノードがEnd
を返す場合にのみ適用され、グラフの戻り値の型を指定(デフォルト:Never
)
Graph
実行グラフ自体であり、BaseNode
のサブクラスであるノードクラスの集合で構成されます。Graph は、state
、deps
、graph return type
の 3 つのクラスパラメータを持ちます。
-
state
: グラフの状態の型を指定 -
deps
: グラフの依存関係の型を指定 -
graph return type
: グラフの実行の戻り値の型を指定
これらの要素を組み合わせることで、複雑なワークフローをモデル化し、実行することができます。pydantic-graph
は、非同期処理をサポートし、型ヒントを活用することで、より堅牢で保守性の高いグラフベースのアプリケーションを開発できます。
サンプルコード
イメージを掴むために、ドキュメント内のサンプルコードを実行してみます。以下は自動販売機を pydantic-graph でワークフロー化したものです。この例ではユーザーが硬貨を投入し、商品を選択して購入するプロセスをワークフロー化しています。
"""vending_machine.py"""
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
from rich.prompt import Prompt
@dataclass
class MachineState:
user_balance: int = 0
product: str | None = None
@dataclass
class InsertCoin(BaseNode[MachineState]):
async def run(self, ctx: GraphRunContext[MachineState]) -> CoinsInserted:
return CoinsInserted(int(Prompt.ask("お金を入れてください。")))
@dataclass
class CoinsInserted(BaseNode[MachineState]):
amount: int
async def run(
self, ctx: GraphRunContext[MachineState]
) -> SelectProduct | Purchase:
ctx.state.user_balance += self.amount
if ctx.state.product is not None:
return Purchase(ctx.state.product)
else:
return SelectProduct()
@dataclass
class SelectProduct(BaseNode[MachineState]):
async def run(self, ctx: GraphRunContext[MachineState]) -> Purchase:
return Purchase(Prompt.ask("Select product"))
PRODUCT_PRICES = {
"天然水": 100,
"コーラ": 120,
"ポテトチップス": 150,
"チョコレート": 200,
}
@dataclass
class Purchase(BaseNode[MachineState, None, None]):
product: str
async def run(
self, ctx: GraphRunContext[MachineState]
) -> End | InsertCoin | SelectProduct:
if price := PRODUCT_PRICES.get(self.product):
ctx.state.product = self.product
if ctx.state.user_balance >= price:
ctx.state.user_balance -= price
return End(None)
else:
diff = price - ctx.state.user_balance
print(
f"お金が足りません。{self.product}を買うには{diff}円足りません。"
)
return InsertCoin()
else:
print(f"その商品はありません。{self.product}を選んでください。")
return SelectProduct()
vending_machine_graph = Graph(
nodes=[InsertCoin, CoinsInserted, SelectProduct, Purchase]
)
async def main() -> None:
state = MachineState()
await vending_machine_graph.run(start_node=InsertCoin(), state=state)
print(f"購入成功 商品={state.product} 残高={state.user_balance}")
if __name__ == "__main__":
asyncio.run(main())
状態管理
自動販売機の状態は、MachineState
というデータクラスで管理され、ユーザーの投入金額(user_balance
)と選択した商品(product
)を保持します。このデータクラスは、GraphRunContext
の state
パラメータで管理されます。このパラメータは、グラフの実行コンテキストに渡され、ノードの実行時に使用されます。
@dataclass
class MachineState:
user_balance: int = 0
product: str | None = None
ノード
自動販売機の各動作は、BaseNode
を継承したInsertCoin
、CoinsInserted
、SelectProduct
、Purchase
という 4 つのノードで構成されます。
-
InsertCoin
: ユーザに硬貨投入を促し、投入金額を受け取る -
CoinsInserted
: 投入金額をMachineState
のuser_balance
に加算 -
SelectProduct
: ユーザーに商品選択を促す -
Purchase
: 選択された商品の価格を確認し、残高が十分であれば購入を完了し、不足している場合は硬貨の追加投入を求める
各ノードのロジックは、run
メソッドを実装しており、その戻り値の型はグラフの次のノードを示します。
制御フロー
各ノードの分岐、終了等の制御フローもrun
メソッド内で実装し、戻り値で次の遷移するノードを指定することができます。
-
CoinsInserted
- すでに商品が選択されている場合は
Purchase
ノードに遷移 - 商品が選択されていない場合は
SelectProduct
ノードに遷移
@dataclass class CoinsInserted(BaseNode[MachineState]): amount: int async def run( self, ctx: GraphRunContext[MachineState] ) -> SelectProduct | Purchase: ctx.state.user_balance += self.amount if ctx.state.product is not None: return Purchase(ctx.state.product) else: return SelectProduct()
- すでに商品が選択されている場合は
-
Purchase
- 選択された商品の価格を確認し、残高が十分であればワークフローの終了(
End
) - 残高が不足している場合は
InsertCoin
ノードに遷移 - 選択された商品が存在しない場合は
SelectProduct
ノードに遷移
@dataclass class Purchase(BaseNode[MachineState, None, None]): product: str async def run( self, ctx: GraphRunContext[MachineState] ) -> End | InsertCoin | SelectProduct: if price := PRODUCT_PRICES.get(self.product): ctx.state.product = self.product if ctx.state.user_balance >= price: ctx.state.user_balance -= price return End(None) else: diff = price - ctx.state.user_balance print( f"お金が足りません。{self.product}を買うには{diff}円足りません。" ) return InsertCoin() else: print(f"その商品はありません。{self.product}を選んでください。") return SelectProduct()
- 選択された商品の価格を確認し、残高が十分であればワークフローの終了(
Graph
vending_machine_graph
は、Graph
クラスのインスタンスで、ノードのリストとしてInsertCoin
、CoinsInserted
、SelectProduct
、Purchase
を定義しています。グラフはrun
メソッドで実行し、この際に初期状態と開始ノードを指定する必要があります。
# グラフの定義
vending_machine_graph = Graph(
nodes=[InsertCoin, CoinsInserted, SelectProduct, Purchase]
)
# グラフの実行(初期状態:state, 開始ノード:InsertCoin)
state = MachineState()
await vending_machine_graph.run(start_node=InsertCoin(), state=state)
Agentic Workflow
ここでようやく LLM と連携し、AI Agent を PydanticAI と pydantic-graph で実装します。先日 Anthropic 社から効果的な AI エージェントの開発方法に関する記事で紹介されれているワークフローのうち、Evaluator-optimizer のワークフローを pydantic-graph で実装してみます。
from __future__ import annotations as _annotations
import asyncio
from dataclasses import dataclass, field
import logfire
from dotenv import load_dotenv
from pydantic import BaseModel
from pydantic_ai import Agent
from pydantic_ai.format_as_xml import format_as_xml
from pydantic_ai.messages import ModelMessage
from pydantic_graph import BaseNode, End, Graph, GraphRunContext
logfire.configure()
load_dotenv()
@dataclass
class State:
product_name: str
write_agent_messages: list[ModelMessage] = field(default_factory=list)
retry: int = 0
content_generator_agent = Agent(
"openai:gpt-4o-mini",
result_type=str,
system_prompt="あなたはお菓子メーカーのマーケティング担当の社員です。",
)
@dataclass
class Generator(BaseNode[State]):
content_feedback: str | None = None
async def run(self, ctx: GraphRunContext[State]) -> Feedback:
if self.content_feedback:
prompt = (
f"フィードバック内容を受けて、内容を修正してください。\n"
f"{format_as_xml(ctx.state.product_name)}\n"
f"Feedback: {self.content_feedback}"
)
else:
prompt = f"{ctx.state.product_name}のマーケティング戦略を考えてください。"
result = await content_generator_agent.run(
prompt,
message_history=ctx.state.write_agent_messages,
)
ctx.state.write_agent_messages += result.all_messages()
ctx.state.retry += 1
print(result.data)
return Feedback(content=result.data)
class ContentRequiresWrite(BaseModel):
feedback: str
class ContentOK(BaseModel):
pass
feedback_agent = Agent[None, ContentRequiresWrite | ContentOK](
"openai:gpt-4o-mini",
result_type=ContentRequiresWrite | ContentOK, # type: ignore
system_prompt=(
"マーケティング戦略に対して、以下の観点に基づいたフィードバックをください。\n"
"1.現状分析(市場規模、競合状況、ユーザーのニーズなど)\n"
"2.マーケティング目標の設定\n"
"3.ターゲット層の定義とアプローチ案の提案\n"
),
)
@dataclass
class Feedback(BaseNode[State, None, str]):
content: str
async def run(
self,
ctx: GraphRunContext[State],
) -> Generator | End[str]:
if ctx.state.retry > 2:
return End(self.content)
prompt = format_as_xml(
{"product_name": ctx.state.product_name, "content": self.content}
)
result = await feedback_agent.run(prompt)
if isinstance(result.data, ContentRequiresWrite):
print(result.data.feedback)
return Generator(content_feedback=result.data.feedback)
else:
return End(self.content)
async def main() -> None:
state = State(product_name="チョコレート")
feedback_graph = Graph(nodes=(Generator, Feedback))
content, _ = await feedback_graph.run(Generator(), state=state)
print(content)
if __name__ == "__main__":
asyncio.run(main())
ここでは、Generator
ノードでマーケティング戦略を生成し、Feedback
ノードでその内容を評価し、問題がある場合はフィードバックしています。Feedback
ノードは、Generator
ノードの出力を受け取り、フィードバックを行った後、再度Generator
ノードに戻るか、グラフを終了するかを決定します。なお試行回数が増えるとトークン数が増えてコストが高くなるので、試行回数に上限を設けています。
最終的に得られたアウトプットは以下のようになります。
フィードバックを考慮して、チョコレートのマーケティング戦略を修正しました。以下に新しい内容を示します。
### 1. 現状分析
チョコレート市場は現在、健康志向が高まる一方で、プレミアム市場やユニークなフレーバーに注力するブランドも多く、競争が激化しています。競合他社の強みや弱みを詳細に分析し、自社の差別化要素を明確にすることが必要です。また、市場調査をさらに進め、消費者の具体的な購入動機や嗜好の傾向を把握し、データに基づいた戦略を立案しましょう。
### 2. マーケティング目標
ブランド認知度を30%向上させるとともに、具体的な売上目標(例えば、1年で売上を15%増加させる)を設定します。また、プロモーションごとの効果を測定するために、KPIを明確に設定し、以下のような評価基準を取り入れることで進捗を把握しやすくなります:
- リピート購入率
- SNSでのエンゲージメント率
- キャンペーン参加者数
### 3. ターゲット層の定義とアプローチ提案
ターゲット層は健康志向の20代後半から40代初めの男女ですが、さらにセグメンテーションを行い、以下のように属性や行動パターンに基づいた細分化を行います:
- **フィットネス愛好者**: トレーニング後のスナックとして位置付け、プロテイン入りのチョコレートを提案。
- **ストレス解消を求める層**: ストレス軽減効果を打ち出したフレーバーを提案し、リラクセーション効果を訴求。
キャンペーンは、ソーシャルメディアを活用し、ユーザー生成コンテンツ(UGC)の促進を重視します。具体的なインセンティブとして、キャンペーン参加者にはオリジナル商品や割引クーポンを提供し、参加率を高めます。例えば、「#私のチョコレートレシピ」キャンペーンを実施し、投稿者の中から優秀なレシピを選出することで、消費者の関心を集めつつ参加意欲を喚起します。
これらの戦略を組み合わせることで、ターゲット層との接点を増やし、ブランドと消費者の関係を強化することを目指します。
まとめ
pydantic-graph
を使用することで、ワークフローをモデル化し、実行することができました。
PydanticAI と同様にシンプルな記述ができる点は魅力の一つです。
LLM ワークフローの比較対象として LangGraph が挙げられると思いますが、LangGraph で用意されているチェックポイント機能やステートの永続化機能は今のところ pydantic-graph では提供されていなさそうです。ただまだ開発が始まったばかりなので、今後のアップデートで追加されることを期待したいと思います。
Discussion