LangGraph: Send, Commandを利用した動的なワークフロー構築
LangGraphは、AIエージェントのワークフローを構築するのに便利なフレームワークです。
特に、Send, Commandを利用した動的なワークフロー構築は、複雑なエージェントシステムを作るときに便利です。
本記事では、ワークフローの構築コード例およびSendの挙動をご紹介します。
シンプルなワークフロー
LangGraphを触ったことがない方向けに、シンプルなワークフローのコード例をご紹介します。
from typing import List, TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
def add_message(existing: List[str], message: str) -> List[str]:
return existing + [message]
# Stateを宣言
class State(TypedDict):
value: Annotated[List[str], add_message]
def create_node(state: State, name: str) -> State:
return {
"value": name
}
# Stateを引数としてGraphを初期化
workflow = StateGraph(State)
# ノードを追加
workflow.add_node("node1", lambda state: create_node(state, "node1"))
workflow.add_node("node2", lambda state: create_node(state, "node2"))
# エッジを追加
workflow.add_edge(START, "node1")
workflow.add_edge("node1", "node2")
workflow.add_edge("node2", END)
# グラフをコンパイル
graph = workflow.compile()
# グラフを表示
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))
LangGraphではStateを利用して、ノード間でデータを連携します。上記ではStateをTypedDictで定義しています。各プロパティの更新方法をAnnotated[型, 更新関数]で指定できます。今回はvaluesプロパティはリストで定義され、stringをわたすと末尾に追加されるようになっています。
ノードでは主に処理を定義します。LLMによる応答生成やデータベースアクセスなどのツール実行などを定義します。今回は簡単のためただノード名を返しています。
エッジはノード間の遷移を定義します。今回はSTARTからnode1へ、node1からnode2へ、node2からENDへ遷移します。Start/ENDはそれぞれ処理の開始位置/終了位置を表します。
定義したワークフローを画像にして出力することで、ワークフローの構造を確認できます。
Command
Commandはノード内で次のノードを指定するために利用できます。
例えば以下のような分岐するワークフローを考えてみましょう。
import random
from typing import List, TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
def add_message(existing: List[str], message: str) -> List[str]:
return existing + [message]
# Stateを宣言
class State(TypedDict):
value: Annotated[List[str], add_message]
def create_root_node(state: State) -> Command:
# ランダムにノードを選択
if random.random() > 0.5:
name = "node1"
else:
name = "node2"
return Command(
goto=name,
update={ "value": "root" }
)
def create_node(state: State, name: str) -> Command:
return Command(
goto=END,
update={ "value": name }
)
# Stateを引数としてGraphを初期化
workflow = StateGraph(State)
workflow.add_node("root", create_root_node, destinations=["node1", "node2"])
workflow.add_node("node1", lambda state: create_node(state, "node1"), destinations=[END])
workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=[END])
workflow.add_edge(START, "root")
graph = workflow.compile()
result = graph.invoke({"value": "start"})
print(result)
root nodeでは、random.random()の値に応じてnode1かnode2に遷移します。このように条件付きの遷移をしたい場合にCommandが便利です。生成AIが分岐を判断する場合やツールを実行する/しないなどの分岐をするにCommandが役立つでしょう。
Send
Sendは並列に実行するノードを指定するために利用できます。
例えば、以下のようなワークフローを考えてみましょう。
import copy
from typing import List, TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send
def add_message(existing: List[str], message: str) -> List[str]:
return existing + [message]
# Stateを宣言
class State(TypedDict):
value: Annotated[List[str], add_message]
def create_root_node(state: State) -> Command:
send_nodes = []
for name in ["node1", "node2"]:
state_ = copy.deepcopy(state)
state_["value"].append(name)
send_nodes.append(Send(
name,
state_,
))
return Command(
goto=send_nodes,
update={ "value": "root" }
)
def create_node(state: State, name: str) -> Command:
return Command(
goto="integrator",
update={ "value": "|".join(state["value"]) + "|" + name }
)
def create_integrator_node(state: State) -> Command:
return Command(
goto=END,
update={ "value": "|".join(state["value"]) + "|" + "integrator" }
)
# Stateを引数としてGraphを初期化
workflow = StateGraph(State)
workflow.add_node("root", create_root_node, destinations=["node1", "node2"])
workflow.add_node("node1", lambda state: create_node(state, "node1"), destinations=["integrator"])
workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=["integrator"])
workflow.add_node("integrator", create_integrator_node, destinations=[END])
workflow.add_edge(START, "root")
graph = workflow.compile()
result = graph.invoke({"value": "start"})
print(result)
上記の処理ではまずrootノードから始まります。rootノードではnode1とnode2の両方に処理が渡されます。その際に元のstateにそれぞれ"node1"と"node2"が追加されます。node1とnode2の処理の後はintegratorノードに遷移します。integratorノードではvalueを結合した文字列を追加してENDに遷移します。
実行結果を見てみましょう。
{'value': ['start', 'root', 'start|node1|node1', 'start|node2|node2', 'start|root|start|node1|node1|start|node2|node2|integrator']}
なるほど、Stateの更新には注意が必要なようです。
まず、最終的なstateの値は初期値およびupdateで指定した値が格納されています。
しかし、Sendで渡されたノードでは別のstateが見えているようです。例えばnode1,node2には初期値に加えそれぞれのnameを追加したstateが渡されています。そのため、node1, node2で見えるstateは別の値となっています。
しかし、integratorではnode1, node2のstateは受け継がず、updateで更新した値のみが渡されています。
つまり、グローバルに更新したいものはupdateで更新、個別のノードで渡すものを変えたい場合は、deepcopyしたstateの値を更新してわたすとよさそうです。
各分岐で長さが違う場合
もう少し複雑な処理を考えてみましょう。
import copy
from typing import List, TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send
def add_message(existing: List[str], message: str) -> List[str]:
return existing + [message]
# Stateを宣言
class State(TypedDict):
value: Annotated[List[str], add_message]
def create_root_node(state: State) -> Command:
send_nodes = []
for name in ["node1_1", "node2"]:
state_ = copy.deepcopy(state)
state_["value"].append(name)
send_nodes.append(Send(
name,
state_,
))
return Command(
goto=send_nodes,
update={ "value": "root" }
)
def create_node(state: State, name: str) -> Command:
if name == "node1_1":
return Command(
goto=Send("node1_2", state),
# goto="node1_2",
update={ "value": ".".join(state["value"]) + "." + name }
)
else:
return Command(
goto="integrator",
update={ "value": ".".join(state["value"]) + "." + name }
)
def create_integrator_node(state: State) -> Command:
return Command(
goto=END,
update={ "value": "|".join(state["value"]) + "|" + "integrator" }
)
# Stateを引数としてGraphを初期化
workflow = StateGraph(State)
workflow.add_node("root", create_root_node, destinations=["node1_1", "node2"])
workflow.add_node("node1_1", lambda state: create_node(state, "node1_1"), destinations=["node1_2"])
workflow.add_node("node1_2", lambda state: create_node(state, "node1_2"), destinations=["integrator"])
workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=["integrator"])
workflow.add_node("integrator", create_integrator_node, destinations=[END])
workflow.add_edge(START, "root")
graph = workflow.compile()
result = graph.invoke({"value": "start"})
print(result)
実行結果は以下のようになります。
{'value': ['start', 'root', 'start.node1_1.node1_1', 'start.node2.node2', 'start|root|start.node1_1.node1_1|start.node2.node2|integrator', 'start.node1_1.node1_2', 'start|root|start.node1_1.node1_1|start.node2.node2|start|root|start.node1_1.node1_1|start.node2.node2|integrator|start.node1_1.node1_2|integrator']}
処理の順番は、root -> node1_1 -> node2 -> node2のintegrater -> node1_2 -> node1_2のintegrater となっているようです。
つまり、Sendで並列化したノードが直後に同じノードにルーティングされない場合は別の処理として進んでいくようです。
サブグラフを使った場合
分岐の長さが違う場合、うまく結合ができませんでした。それではSubGraphをSendに渡すことで複数の処理をまとめてみるとどうでしょうか?
import copy
from typing import List, TypedDict, Annotated, Any
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send
def add_message(existing: List[str], message: List[str]) -> List[str]:
return existing + message
def last_name_reducer(existing: str | None, message: str | List[str]) -> str:
if isinstance(message, list):
if not message:
return existing if existing is not None else ""
return message[-1]
else:
return message
# Stateを宣言
class State(TypedDict):
value: Annotated[List[str], add_message]
name: Annotated[str, last_name_reducer]
def create_root_node(state: State) -> Command:
print("root", state)
send_nodes = []
for name in ["node1", "node2"]:
state_ = copy.deepcopy(state)
state_["value"].append(name)
state_["name"] = name
send_nodes.append(Send(
"subgraph",
state_,
))
return Command(
goto=send_nodes,
update={ "value": ["root"] }
)
def create_subgraph_root_node(state: State) -> Command:
print("subgraph root", state)
if state["name"] == "node1":
return Command(
goto="node1",
update={"value": ["subroot1"]}
)
elif state["name"] == "node2":
return Command(goto="node2", update={"value": ["subroot2"]})
else:
raise ValueError(f"Invalid name: {state['name']}")
def create_node(state: State, name: str) -> Command:
new_value_list = [name]
print(f"node({name})", state)
if state["name"] == "node1":
return Command(
goto="node1_2",
update={ "value": new_value_list, "name": name }
)
else: # name == "node2" のパスを想定
return Command(
goto=END,
update={ "value": new_value_list }
)
def create_integrator_node(state: State) -> Command:
print("integrator", state)
# 最終的な文字列を生成
final_string = "integrator"
return Command(
goto=END,
update={ "value": [final_string] }
)
# Stateを引数としてGraphを初期化
sub_workflow = StateGraph(State)
sub_workflow.add_node("root", create_subgraph_root_node, destinations=["node1", "node2"])
sub_workflow.add_node("node1", lambda state: create_node(state, "node1"), destinations=["node1_2"])
sub_workflow.add_node("node1_2", lambda state: create_node(state, "node1_2"), destinations=[END])
sub_workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=[END])
sub_workflow.add_edge(START, "root")
subgraph = sub_workflow.compile()
workflow = StateGraph(State)
workflow.add_node("root", create_root_node, destinations=["subgraph"])
workflow.add_node("subgraph", subgraph, destinations=["integrator"])
workflow.add_node("integrator", create_integrator_node, destinations=[END])
workflow.add_edge(START, "root")
workflow.add_edge("subgraph", "integrator")
graph = workflow.compile()
result = graph.invoke({"value": ["start"], "name": ""})
print(result)
実行結果:
root {'value': ['start'], 'name': ''}
subgraph root {'value': ['start', 'node1'], 'name': 'node1'}
node(node1) {'value': ['start', 'node1', 'subroot1'], 'name': 'node1'}
node(node1_2) {'value': ['start', 'node1', 'subroot1', 'node1'], 'name': 'node1'}
node(node1_2) {'value': ['start', 'node1', 'subroot1', 'node1', 'node1_2'], 'name': 'node1_2'}
subgraph root {'value': ['start', 'node2'], 'name': 'node2'}
node(node2) {'value': ['start', 'node2', 'subroot2'], 'name': 'node2'}
integrator {'value': ['start', 'root', 'start', 'node1', 'subroot1', 'node1', 'node1_2', 'node1_2', 'start', 'node2', 'subroot2', 'node2'], 'name': 'node2'}
{'value': ['start', 'root', 'start', 'node1', 'subroot1', 'node1', 'node1_2', 'node1_2', 'start', 'node2', 'subroot2', 'node2', 'integrator'], 'name': 'node2'}
なるほど、処理自体はうまくintegratorに渡されているようです。
しかし、subgraph終了時にstate["value"]がまとめて渡されるため、valueの値が余計に追加されてしまっています。
それではsubGraph用のプロパティを用意して、処理をさせてみましょう。
import copy
from typing import List, TypedDict, Annotated, Any
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send
def add_message(existing: List[str], message: List[str]) -> List[str]:
return existing + message
def replace_message(existing: List[str], message: List[str]) -> List[str]:
return message
def last_name_reducer(existing: str | None, message: str | List[str]) -> str:
if isinstance(message, list):
if not message:
return existing if existing is not None else ""
return message[-1]
else:
return message
# Stateを宣言
class State(TypedDict):
value: Annotated[List[str], add_message]
value_in_subgraph: Annotated[List[str], replace_message]
name: Annotated[str, last_name_reducer]
def create_root_node(state: State) -> Command:
print("root", state)
send_nodes = []
for name in ["node1", "node2"]:
state_ = copy.deepcopy(state)
state_["value_in_subgraph"] = state_["value"] + [name]
state_["name"] = name
state_["value"] = []
send_nodes.append(Send(
"subgraph",
state_,
))
return Command(
goto=send_nodes,
update={ "value": ["root"] }
)
def create_subgraph_root_node(state: State) -> Command:
print("subgraph root", state)
if state["name"] == "node1":
return Command(
goto="node1",
update={"value_in_subgraph": state["value_in_subgraph"] + ["subroot1"]}
)
elif state["name"] == "node2":
return Command(goto="node2", update={"value_in_subgraph": state["value_in_subgraph"] + ["subroot2"]})
else:
raise ValueError(f"Invalid name: {state['name']}")
def create_node(state: State, name: str) -> Command:
new_value_list = [name]
print(f"node({name})", state)
if state["name"] == "node1":
return Command(
goto="node1_2",
update={ "value_in_subgraph": state["value_in_subgraph"] + new_value_list, "name": name }
)
else: # name == "node2" のパスを想定
return Command(
goto=END,
update={ "value": new_value_list , "value_in_subgraph": [] }
)
def create_integrator_node(state: State) -> Command:
print("integrator", state)
# 最終的な文字列を生成
final_string = "integrator"
return Command(
goto=END,
# update には文字列ではなく、リストでラップして渡す
update={ "value": [final_string] }
)
# Stateを引数としてGraphを初期化
sub_workflow = StateGraph(State)
sub_workflow.add_node("root", create_subgraph_root_node, destinations=["node1", "node2"])
sub_workflow.add_node("node1", lambda state: create_node(state, "node1"), destinations=["node1_2"])
sub_workflow.add_node("node1_2", lambda state: create_node(state, "node1_2"), destinations=[END])
sub_workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=[END])
sub_workflow.add_edge(START, "root")
subgraph = sub_workflow.compile()
workflow = StateGraph(State)
workflow.add_node("root", create_root_node, destinations=["subgraph"])
workflow.add_node("subgraph", subgraph, destinations=["integrator"])
workflow.add_node("integrator", create_integrator_node, destinations=[END])
workflow.add_edge(START, "root")
workflow.add_edge("subgraph", "integrator")
graph = workflow.compile()
result = graph.invoke({"value": ["start"], "value_in_subgraph": [], "name": ""})
print("Final result", result)
実行結果:
root {'value': ['start'], 'value_in_subgraph': [], 'name': ''}
subgraph root {'value': [], 'value_in_subgraph': ['start', 'node1'], 'name': 'node1'}
node(node1) {'value': [], 'value_in_subgraph': ['start', 'node1', 'subroot1'], 'name': 'node1'}
node(node1_2) {'value': [], 'value_in_subgraph': ['start', 'node1', 'subroot1', 'node1'], 'name': 'node1'}
node(node1_2) {'value': [], 'value_in_subgraph': ['start', 'node1', 'subroot1', 'node1', 'node1_2'], 'name': 'node1_2'}
subgraph root {'value': [], 'value_in_subgraph': ['start', 'node2'], 'name': 'node2'}
node(node2) {'value': [], 'value_in_subgraph': ['start', 'node2', 'subroot2'], 'name': 'node2'}
integrator {'value': ['start', 'root', 'node1_2', 'node2'], 'value_in_subgraph': [], 'name': 'node2'}
Final result {'value': ['start', 'root', 'node1_2', 'node2', 'integrator'], 'value_in_subgraph': [], 'name': 'node2'}
サブグラフ終了時に余計な値を削除しておくことで、integratorに渡す値を想定通りにすることができました。
制御が難しいため推奨はできませんが、複雑なワークフローを構築する必要がある場合に、この情報がお役に立てば幸いです。
また、より簡単に上記のワークフローを定義できる場合は教えていただけると幸いです。
お読みいただきありがとうございました。
Discussion