Open7
LangGraphに入門する

Overview
LangGraphは、LLMを使用して、状態を持つマルチアクターアプリケーションを構築するためのライブラリです。エージェントおよびマルチエージェントのワークフローを作成するのに使用される。
LangGraphの特徴:
- サイクルを含むフローの定義
- DAGベースのソリューションとの差別化要因
- 低レベルフレームワーク
- アプリケーションのフローと状態を細かく制御可能
- 信頼性の高いエージェントの作成に重要
- 組み込みの永続性
- 高度なヒューマンインザループ機能
- メモリ機能のサポート
主要機能:
- サイクルと分岐
- アプリケーションにループと条件分岐を実装可能
- 永続性
- グラフの各ステップ後に自動的に状態を保存
- グラフ実行の一時停止と再開が可能
- エラー回復、ヒューマンインザループワークフロー、タイムトラベルなどをサポート
- ヒューマンインザループ
- エージェントが計画した次のアクションを承認または編集するためにグラフ実行を中断可能
- ストリーミングサポート
- 各ノードで生成される出力をストリーミング可能
- LangChainとの統合
- LangChainおよびLangSmithとシームレスに統合
Conceptual Guides
What does it mean to be agentic?
- agentic(エージェント的)とはLLMがアプリケーションの制御フローを決定するシステムを指す
- 明確な線引きはなく、スペクトラムとして捉えるべき
- 下記のような要素を持つ。これらはエージェント的であることの副産物
- ツール呼び出し
- アクション実行
- メモリ
- プランニング
Why LangGraph?
下記の3つの核となる原則を持っている
- 制御可能性
- 低レベルな設計で、動作を細かく制御できる
- ヒューマンインザループ
- 人間と機械の様々な対話パターンが可能
- ストリーミング優先
- エージェント的なアプリケーションは実行に時間がかかるのが多いので、ユーザに進行状況を知らせるのが重要
- 生成やツール呼び出しのイベントでサポート

Example
@tool
def search(query: str):
"""Call to surf the web."""
# This is a placeholder, but don't tell the LLM that...
if "sf" in query.lower() or "san francisco" in query.lower():
return "It's 60 degrees and foggy."
return "It's 90 degrees and sunny."
- queryを受け取り、天気予報を返すtoolを定義
def should_continue(state: MessagesState) -> Literal["tools", END]:
messages = state['messages']
last_message = messages[-1]
# If the LLM makes a tool call, then we route to the "tools" node
if last_message.tool_calls:
return "tools"
# Otherwise, we stop (reply to the user)
return END
# Define the function that calls the model
def call_model(state: MessagesState):
messages = state['messages']
response = model.invoke(messages)
# We return a list, because this will get added to the existing list
return {"messages": [response]}
- nodeを定義
- nodeはstateを受け取る
workflow = StateGraph(MessagesState)
# Define the two nodes we will cycle between
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
# Set the entrypoint as `agent`
# This means that this node is the first one called
workflow.set_entry_point("agent")
# We now add a conditional edge
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
)
- nodeとエントリーポイント(agent)をグラフ上に定義
workflow.add_conditional_edges(
# First, we define the start node. We use `agent`.
# This means these are the edges taken after the `agent` node is called.
"agent",
# Next, we pass in the function that will determine which node is called next.
should_continue,
)
# We now add a normal edge from `tools` to `agent`.
# This means that after `tools` is called, `agent` node is called next.
workflow.add_edge("tools", 'agent')
- edgeを定義
- edgeは
- 条件付きなら、Agentがアクションを指示した場合はツールを実行し、指示しなければ終了する
- 通常のedgeであれば、必ずAgentに戻り次に実行すべきかを決定する

Quick Start. Part 1: Build a Basic Chatbot
from typing import Annotated
from langchain_anthropic import ChatAnthropic
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
class State(TypedDict):
messages: Annotated[list, add_messages]
graph_builder = StateGraph(State)
llm = ChatAnthropic(model="claude-3-haiku-20240307")
def chatbot(state: State):
return {"messages": [llm.invoke(state["messages"])]}
# The first argument is the unique node name
# The second argument is the function or object that will be called whenever
# the node is used.
graph_builder.add_node("chatbot", chatbot)
graph_builder.set_entry_point("chatbot")
graph_builder.set_finish_point("chatbot")
graph = graph_builder.compile()
while True:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
for event in graph.stream({"messages": ("user", user_input)}):
for value in event.values():
print("Assistant:", value["messages"][-1].content)
-
messages: Annotated[list, add_messages]
という記述は以下のことを意味する- messages はリスト型である
- このリストの更新には add_messages 関数を使用する
- add_messagesはグラフの状態を更新するreducer methodである
- reducerが指定されてないStateのKeyは更新の度上書きされ、最新の値のみを保持する
def chatbot(state: State):
return {"messages": [llm.invoke(state["messages"])]}
- nodeはstateを受け取り、state内の要素を更新する
while True:
user_input = input("User: ")
if user_input.lower() in ["quit", "exit", "q"]:
print("Goodbye!")
break
for event in graph.stream({"messages": ("user", user_input)}):
for value in event.values():
print("Assistant:", value["messages"][-1].content)
- graphからmessageを受け取り、AIが回答する
- graph.streamは更新された状態を逐次的にyieldで返す
ちゃんと応答できた
Part 2: Enhancing the Chatbot with Tools
from typing import Annotated
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import BaseMessage
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode, tools_condition
class State(TypedDict):
messages: Annotated[list, add_messages]
graph_builder = StateGraph(State)
tool = TavilySearchResults(max_results=2)
tools = [tool]
llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm_with_tools = llm.bind_tools(tools)
def chatbot(state: State):
return {"messages": [llm_with_tools.invoke(state["messages"])]}
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)
graph_builder.add_conditional_edges(
"chatbot",
tools_condition,
)
# Any time a tool is called, we return to the chatbot to decide the next step
graph_builder.add_edge("tools", "chatbot")
graph_builder.set_entry_point("chatbot")
graph = graph_builder.compile()
- ツールとして、TavilySearchResults(max_results=2)を紐付けている
- このツールはList[Dict[str, str]] を返す。urlとcontentがkey
- llmにtoolsをbindし、それをノードとして付与する
- llmにtoolを呼び出せるということを覚えさせる
- その後、呼び出せるtoolをnodeとして定義する
- graph_builder.add_conditional_edgesでは、通常なら、chatbot, toolを呼び出すとllmが判断したらtools_nodeに遷移させる
- tools_conditionで分岐を定義している
graphはこんなかんじ

Part 3: Adding Memory to the Chatbot
from typing import Annotated
from langchain_anthropic import ChatAnthropic
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_core.messages import BaseMessage
from typing_extensions import TypedDict
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages
from langgraph.prebuilt import ToolNode
class State(TypedDict):
messages: Annotated[list, add_messages]
graph_builder = StateGraph(State)
tool = TavilySearchResults(max_results=2)
tools = [tool]
llm = ChatAnthropic(model="claude-3-haiku-20240307")
llm_with_tools = llm.bind_tools(tools)
def chatbot(state: State):
return {"messages": [llm_with_tools.invoke(state["messages"])]}
graph_builder.add_node("chatbot", chatbot)
tool_node = ToolNode(tools=[tool])
graph_builder.add_node("tools", tool_node)
graph_builder.add_conditional_edges(
"chatbot",
tools_condition,
)
graph_builder.add_edge("tools", "chatbot")
graph_builder.set_entry_point("chatbot")
graph = graph_builder.compile(checkpointer=memory)
- part2までは過去の記憶を保存していない
- persistent checkpointingを使って解決する
- 使い方は簡単で
graph = graph_builder.compile(checkpointer=memory)
で渡してあげるだけ - MemorySaverはインメモリなので、DBに保存したい場合はSqliteSaver or PostgresSaverなどを使う
-
config = {"configurable": {"thread_id": "1"}}
をgraphを呼び出すときに渡すことで、記憶する
from langgraph.checkpoint.memory import MemorySaver
memory = MemorySaver()
Part 4: Human-in-the-loop
- interrupt_beforeを使うことで、ツールノード実行前に常に中断することが出来る
- 人間が介入し、提案されたツールの使用を確認または修正できます。人間が承認すると、グラフの実行が再開される
- これによるメリット
- AIの動作に対する人間の監視や介入が可能になる
- チェックポインターを使用しているため、グラフの実行を無期限に一時停止し、いつでも再開できます。
graph = graph_builder.compile(
checkpointer=memory,
# This is new!
interrupt_before=["tools"],
# Note: can also interrupt __after__ tools, if desired.
# interrupt_after=["tools"]
)

Low Level Conceptual Guide
Claudeの説明より
-
グラフの基本構造
1.1 State(状態)- グラフの現在のスナップショットを表す共有データ構造
- TypedDictまたはPydantic BaseModelで定義可能
- 例:
class State(TypedDict): foo: int, bar: list[str]
1.2 Node(ノード)
- エージェントのロジックを実装するPython関数
- 現在のStateを入力として受け取り、更新されたStateを返す
- 例:
def my_node(state: State) -> Dict[str, Any]: ...
1.3 Edge(エッジ)
- 次に実行するNodeを決定するPython関数
- 条件分岐や固定遷移を表現可能
- 例:
graph.add_edge("node_a", "node_b")
-
グラフの構築と実行
2.1 StateGraph- メインのグラフクラス
- ユーザー定義のStateオブジェクトでパラメータ化
- 例:
graph = StateGraph(State)
2.2 グラフのコンパイル
- ノードやエッジの追加後、実行前に必要な手順
- 構造チェックやランタイム引数の指定が可能
- 例:
compiled_graph = graph.compile()
2.3 グラフの実行
- コンパイル済みグラフのinvokeメソッドを使用
- 例:
result = compiled_graph.invoke(input_data)
-
高度な機能
3.1 Reducers(リデューサー)- Stateの更新方法を定義する関数
- デフォルト、Context、Messagesなど様々なタイプが存在
- 例:
bar: Annotated[list[str], add_messages]
3.2 Checkpointer(チェックポインター)
- グラフの状態を保存・復元する機能
- ヒューマンインザループワークフローを可能にする
- 例:
graph.get_state(config)
3.3 Threads(スレッド)
- 複数の独立した実行状態を管理
- マルチテナントチャットアプリケーションに有用
- 例:
config = {"configurable": {"thread_id": "a"}}
3.4 Configuration(構成)
- グラフの動作をカスタマイズするための仕組み
- モデルやシステムプロンプトの切り替えに使用
- 例:
graph = StateGraph(State, config_schema=ConfigSchema)
3.5 Breakpoints(ブレークポイント)
- 特定のノードの前後で実行を一時停止
- ヒューマンインザループの承認プロセスに有用
- 例:
graph.compile(interrupt_before=["node_a"])
3.6 Visualization(可視化)
- グラフ構造を視覚的に表現する機能
- 複雑なグラフの理解を助ける
- 例:
graph.get_graph().draw_mermaid_png()
3.7 Streaming(ストリーミング)
- グラフ実行中のデータをリアルタイムで取得
- 値、更新、デバッグ情報などのモードが存在
- 例:
for event in graph.stream_events(input_data): ...

参考資料

も気になる