🐡

LangGraph: Send, Commandを利用した動的なワークフロー構築

に公開

LangGraphは、AIエージェントのワークフローを構築するのに便利なフレームワークです。
特に、Send, Commandを利用した動的なワークフロー構築は、複雑なエージェントシステムを作るときに便利です。
本記事では、ワークフローの構築コード例およびSendの挙動をご紹介します。

シンプルなワークフロー

LangGraphを触ったことがない方向けに、シンプルなワークフローのコード例をご紹介します。

from typing import List, TypedDict, Annotated
from langgraph.graph import StateGraph, START, END

def add_message(existing: List[str], message: str) -> List[str]:
    return existing + [message]

# Stateを宣言
class State(TypedDict):
    value: Annotated[List[str], add_message]

def create_node(state: State, name: str) -> State:
    return {
        "value": name
    }

# Stateを引数としてGraphを初期化
workflow = StateGraph(State)

# ノードを追加
workflow.add_node("node1", lambda state: create_node(state, "node1"))
workflow.add_node("node2", lambda state: create_node(state, "node2"))

# エッジを追加
workflow.add_edge(START, "node1")
workflow.add_edge("node1", "node2")
workflow.add_edge("node2", END)

# グラフをコンパイル
graph = workflow.compile()

# グラフを表示
from IPython.display import Image, display
display(Image(graph.get_graph().draw_mermaid_png()))

LangGraphではStateを利用して、ノード間でデータを連携します。上記ではStateをTypedDictで定義しています。各プロパティの更新方法をAnnotated[型, 更新関数]で指定できます。今回はvaluesプロパティはリストで定義され、stringをわたすと末尾に追加されるようになっています。
ノードでは主に処理を定義します。LLMによる応答生成やデータベースアクセスなどのツール実行などを定義します。今回は簡単のためただノード名を返しています。
エッジはノード間の遷移を定義します。今回はSTARTからnode1へ、node1からnode2へ、node2からENDへ遷移します。Start/ENDはそれぞれ処理の開始位置/終了位置を表します。
定義したワークフローを画像にして出力することで、ワークフローの構造を確認できます。

Command

Commandはノード内で次のノードを指定するために利用できます。
例えば以下のような分岐するワークフローを考えてみましょう。

import random
from typing import List, TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command

def add_message(existing: List[str], message: str) -> List[str]:
    return existing + [message]

# Stateを宣言
class State(TypedDict):
    value: Annotated[List[str], add_message]

def create_root_node(state: State) -> Command:
    # ランダムにノードを選択
    if random.random() > 0.5:
        name = "node1"
    else:
        name = "node2"
    return Command(
        goto=name,
        update={ "value": "root" }
    )

def create_node(state: State, name: str) -> Command:
    return Command(
        goto=END,
        update={ "value": name }
    )

# Stateを引数としてGraphを初期化
workflow = StateGraph(State)

workflow.add_node("root", create_root_node, destinations=["node1", "node2"])
workflow.add_node("node1", lambda state: create_node(state, "node1"), destinations=[END])
workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=[END])

workflow.add_edge(START, "root")

graph = workflow.compile()

result = graph.invoke({"value": "start"})
print(result)

root nodeでは、random.random()の値に応じてnode1かnode2に遷移します。このように条件付きの遷移をしたい場合にCommandが便利です。生成AIが分岐を判断する場合やツールを実行する/しないなどの分岐をするにCommandが役立つでしょう。

Send

Sendは並列に実行するノードを指定するために利用できます。
例えば、以下のようなワークフローを考えてみましょう。

import copy
from typing import List, TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send

def add_message(existing: List[str], message: str) -> List[str]:
    return existing + [message]

# Stateを宣言
class State(TypedDict):
    value: Annotated[List[str], add_message]

def create_root_node(state: State) -> Command:
    send_nodes = []
    for name in ["node1", "node2"]:
        state_ = copy.deepcopy(state)
        state_["value"].append(name)
        send_nodes.append(Send(
            name,
            state_,
        ))

    return Command(
        goto=send_nodes,
        update={ "value": "root" }
    )

def create_node(state: State, name: str) -> Command:
    return Command(
        goto="integrator",
        update={ "value": "|".join(state["value"]) + "|" + name }
    )

def create_integrator_node(state: State) -> Command:
    return Command(
        goto=END,
        update={ "value": "|".join(state["value"]) + "|" + "integrator" }
    )

# Stateを引数としてGraphを初期化
workflow = StateGraph(State)

workflow.add_node("root", create_root_node, destinations=["node1", "node2"])
workflow.add_node("node1", lambda state: create_node(state, "node1"), destinations=["integrator"])
workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=["integrator"])
workflow.add_node("integrator", create_integrator_node, destinations=[END])

workflow.add_edge(START, "root")

graph = workflow.compile()

result = graph.invoke({"value": "start"})
print(result)

上記の処理ではまずrootノードから始まります。rootノードではnode1とnode2の両方に処理が渡されます。その際に元のstateにそれぞれ"node1"と"node2"が追加されます。node1とnode2の処理の後はintegratorノードに遷移します。integratorノードではvalueを結合した文字列を追加してENDに遷移します。

実行結果を見てみましょう。

{'value': ['start', 'root', 'start|node1|node1', 'start|node2|node2', 'start|root|start|node1|node1|start|node2|node2|integrator']}

なるほど、Stateの更新には注意が必要なようです。
まず、最終的なstateの値は初期値およびupdateで指定した値が格納されています。
しかし、Sendで渡されたノードでは別のstateが見えているようです。例えばnode1,node2には初期値に加えそれぞれのnameを追加したstateが渡されています。そのため、node1, node2で見えるstateは別の値となっています。
しかし、integratorではnode1, node2のstateは受け継がず、updateで更新した値のみが渡されています。

つまり、グローバルに更新したいものはupdateで更新、個別のノードで渡すものを変えたい場合は、deepcopyしたstateの値を更新してわたすとよさそうです。

各分岐で長さが違う場合

もう少し複雑な処理を考えてみましょう。

import copy
from typing import List, TypedDict, Annotated
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send

def add_message(existing: List[str], message: str) -> List[str]:
    return existing + [message]

# Stateを宣言
class State(TypedDict):
    value: Annotated[List[str], add_message]

def create_root_node(state: State) -> Command:
    send_nodes = []
    for name in ["node1_1", "node2"]:
        state_ = copy.deepcopy(state)
        state_["value"].append(name)
        send_nodes.append(Send(
            name,
            state_,
        ))

    return Command(
        goto=send_nodes,
        update={ "value": "root" }
    )

def create_node(state: State, name: str) -> Command:
    if name == "node1_1":
        return Command(
            goto=Send("node1_2", state),
            # goto="node1_2",
            update={ "value": ".".join(state["value"]) + "." + name }
        )
    else:
        return Command(
            goto="integrator",
            update={ "value": ".".join(state["value"]) + "." + name }
        )

def create_integrator_node(state: State) -> Command:
    return Command(
        goto=END,
        update={ "value": "|".join(state["value"]) + "|" + "integrator" }
    )

# Stateを引数としてGraphを初期化
workflow = StateGraph(State)

workflow.add_node("root", create_root_node, destinations=["node1_1", "node2"])
workflow.add_node("node1_1", lambda state: create_node(state, "node1_1"), destinations=["node1_2"])
workflow.add_node("node1_2", lambda state: create_node(state, "node1_2"), destinations=["integrator"])
workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=["integrator"])
workflow.add_node("integrator", create_integrator_node, destinations=[END])

workflow.add_edge(START, "root")

graph = workflow.compile()

result = graph.invoke({"value": "start"})
print(result)

実行結果は以下のようになります。

{'value': ['start', 'root', 'start.node1_1.node1_1', 'start.node2.node2', 'start|root|start.node1_1.node1_1|start.node2.node2|integrator', 'start.node1_1.node1_2', 'start|root|start.node1_1.node1_1|start.node2.node2|start|root|start.node1_1.node1_1|start.node2.node2|integrator|start.node1_1.node1_2|integrator']}

処理の順番は、root -> node1_1 -> node2 -> node2のintegrater -> node1_2 -> node1_2のintegrater となっているようです。
つまり、Sendで並列化したノードが直後に同じノードにルーティングされない場合は別の処理として進んでいくようです。

サブグラフを使った場合

分岐の長さが違う場合、うまく結合ができませんでした。それではSubGraphをSendに渡すことで複数の処理をまとめてみるとどうでしょうか?

import copy
from typing import List, TypedDict, Annotated, Any
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send

def add_message(existing: List[str], message: List[str]) -> List[str]:
    return existing + message

def last_name_reducer(existing: str | None, message: str | List[str]) -> str:
    if isinstance(message, list):
        if not message:
             return existing if existing is not None else ""
        return message[-1]
    else:
        return message

# Stateを宣言
class State(TypedDict):
    value: Annotated[List[str], add_message]
    name: Annotated[str, last_name_reducer]

def create_root_node(state: State) -> Command:
    print("root", state)
    send_nodes = []
    for name in ["node1", "node2"]:
        state_ = copy.deepcopy(state)
        state_["value"].append(name)
        state_["name"] = name
        send_nodes.append(Send(
            "subgraph",
            state_,
        ))
    return Command(
        goto=send_nodes,
        update={ "value": ["root"] }
    )

def create_subgraph_root_node(state: State) -> Command:
    print("subgraph root", state)
    if state["name"] == "node1":
        return Command(
            goto="node1",
            update={"value": ["subroot1"]}
        )
    elif state["name"] == "node2":
        return Command(goto="node2", update={"value": ["subroot2"]})
    else:
        raise ValueError(f"Invalid name: {state['name']}")

def create_node(state: State, name: str) -> Command:
    new_value_list = [name]
    print(f"node({name})", state)
    if state["name"] == "node1":
        return Command(
            goto="node1_2",
            update={ "value": new_value_list, "name": name }
        )
    else: # name == "node2" のパスを想定
        return Command(
            goto=END,
            update={ "value": new_value_list }
        )

def create_integrator_node(state: State) -> Command:
    print("integrator", state)
    # 最終的な文字列を生成
    final_string = "integrator"
    return Command(
        goto=END,
        update={ "value": [final_string] }
    )

# Stateを引数としてGraphを初期化
sub_workflow = StateGraph(State)
sub_workflow.add_node("root", create_subgraph_root_node, destinations=["node1", "node2"])
sub_workflow.add_node("node1", lambda state: create_node(state, "node1"), destinations=["node1_2"])
sub_workflow.add_node("node1_2", lambda state: create_node(state, "node1_2"), destinations=[END])
sub_workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=[END])
sub_workflow.add_edge(START, "root")
subgraph = sub_workflow.compile()

workflow = StateGraph(State)
workflow.add_node("root", create_root_node, destinations=["subgraph"])
workflow.add_node("subgraph", subgraph, destinations=["integrator"])
workflow.add_node("integrator", create_integrator_node, destinations=[END])
workflow.add_edge(START, "root")
workflow.add_edge("subgraph", "integrator")

graph = workflow.compile()

result = graph.invoke({"value": ["start"], "name": ""})
print(result)

実行結果:

root {'value': ['start'], 'name': ''}
subgraph root {'value': ['start', 'node1'], 'name': 'node1'}
node(node1) {'value': ['start', 'node1', 'subroot1'], 'name': 'node1'}
node(node1_2) {'value': ['start', 'node1', 'subroot1', 'node1'], 'name': 'node1'}
node(node1_2) {'value': ['start', 'node1', 'subroot1', 'node1', 'node1_2'], 'name': 'node1_2'}
subgraph root {'value': ['start', 'node2'], 'name': 'node2'}
node(node2) {'value': ['start', 'node2', 'subroot2'], 'name': 'node2'}
integrator {'value': ['start', 'root', 'start', 'node1', 'subroot1', 'node1', 'node1_2', 'node1_2', 'start', 'node2', 'subroot2', 'node2'], 'name': 'node2'}
{'value': ['start', 'root', 'start', 'node1', 'subroot1', 'node1', 'node1_2', 'node1_2', 'start', 'node2', 'subroot2', 'node2', 'integrator'], 'name': 'node2'}

なるほど、処理自体はうまくintegratorに渡されているようです。
しかし、subgraph終了時にstate["value"]がまとめて渡されるため、valueの値が余計に追加されてしまっています。

それではsubGraph用のプロパティを用意して、処理をさせてみましょう。

import copy
from typing import List, TypedDict, Annotated, Any
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send

def add_message(existing: List[str], message: List[str]) -> List[str]:
    return existing + message

def replace_message(existing: List[str], message: List[str]) -> List[str]:
    return message

def last_name_reducer(existing: str | None, message: str | List[str]) -> str:
    if isinstance(message, list):
        if not message:
             return existing if existing is not None else ""
        return message[-1]
    else:
        return message

# Stateを宣言
class State(TypedDict):
    value: Annotated[List[str], add_message]
    value_in_subgraph: Annotated[List[str], replace_message]
    name: Annotated[str, last_name_reducer]

def create_root_node(state: State) -> Command:
    print("root", state)
    send_nodes = []
    for name in ["node1", "node2"]:
        state_ = copy.deepcopy(state)
        state_["value_in_subgraph"] = state_["value"] + [name]
        state_["name"] = name
        state_["value"] = []
        send_nodes.append(Send(
            "subgraph",
            state_,
        ))
    return Command(
        goto=send_nodes,
        update={ "value": ["root"] }
    )

def create_subgraph_root_node(state: State) -> Command:
    print("subgraph root", state)
    if state["name"] == "node1":
        return Command(
            goto="node1",
            update={"value_in_subgraph": state["value_in_subgraph"] + ["subroot1"]}
        )
    elif state["name"] == "node2":
        return Command(goto="node2", update={"value_in_subgraph": state["value_in_subgraph"] + ["subroot2"]})
    else:
        raise ValueError(f"Invalid name: {state['name']}")

def create_node(state: State, name: str) -> Command:
    new_value_list = [name]
    print(f"node({name})", state)
    if state["name"] == "node1":
        return Command(
            goto="node1_2",
            update={ "value_in_subgraph": state["value_in_subgraph"] + new_value_list, "name": name }
        )
    else: # name == "node2" のパスを想定
        return Command(
            goto=END,
            update={ "value": new_value_list , "value_in_subgraph": [] }
        )

def create_integrator_node(state: State) -> Command:
    print("integrator", state)
    # 最終的な文字列を生成
    final_string = "integrator"
    return Command(
        goto=END,
        # update には文字列ではなく、リストでラップして渡す
        update={ "value": [final_string] }
    )

# Stateを引数としてGraphを初期化
sub_workflow = StateGraph(State)
sub_workflow.add_node("root", create_subgraph_root_node, destinations=["node1", "node2"])
sub_workflow.add_node("node1", lambda state: create_node(state, "node1"), destinations=["node1_2"])
sub_workflow.add_node("node1_2", lambda state: create_node(state, "node1_2"), destinations=[END])
sub_workflow.add_node("node2", lambda state: create_node(state, "node2"), destinations=[END])
sub_workflow.add_edge(START, "root")
subgraph = sub_workflow.compile()

workflow = StateGraph(State)
workflow.add_node("root", create_root_node, destinations=["subgraph"])
workflow.add_node("subgraph", subgraph, destinations=["integrator"])
workflow.add_node("integrator", create_integrator_node, destinations=[END])
workflow.add_edge(START, "root")
workflow.add_edge("subgraph", "integrator")

graph = workflow.compile()

result = graph.invoke({"value": ["start"], "value_in_subgraph": [], "name": ""})
print("Final result", result)

実行結果:

root {'value': ['start'], 'value_in_subgraph': [], 'name': ''}
subgraph root {'value': [], 'value_in_subgraph': ['start', 'node1'], 'name': 'node1'}
node(node1) {'value': [], 'value_in_subgraph': ['start', 'node1', 'subroot1'], 'name': 'node1'}
node(node1_2) {'value': [], 'value_in_subgraph': ['start', 'node1', 'subroot1', 'node1'], 'name': 'node1'}
node(node1_2) {'value': [], 'value_in_subgraph': ['start', 'node1', 'subroot1', 'node1', 'node1_2'], 'name': 'node1_2'}   
subgraph root {'value': [], 'value_in_subgraph': ['start', 'node2'], 'name': 'node2'}
node(node2) {'value': [], 'value_in_subgraph': ['start', 'node2', 'subroot2'], 'name': 'node2'}
integrator {'value': ['start', 'root', 'node1_2', 'node2'], 'value_in_subgraph': [], 'name': 'node2'}
Final result {'value': ['start', 'root', 'node1_2', 'node2', 'integrator'], 'value_in_subgraph': [], 'name': 'node2'}

サブグラフ終了時に余計な値を削除しておくことで、integratorに渡す値を想定通りにすることができました。

制御が難しいため推奨はできませんが、複雑なワークフローを構築する必要がある場合に、この情報がお役に立てば幸いです。
また、より簡単に上記のワークフローを定義できる場合は教えていただけると幸いです。

お読みいただきありがとうございました。

Discussion