🐈

LangGraphのフロー制御について

に公開

目次

LangGraphのフロー制御について

LangGraphでは、ルートの分岐がある場合でも、どちらか一方のルートのみを通るのであれば、直列処理フローになりますので、処理フローはシンプルで理解しやすいかなと思います。

■例:

ノードaの次に分岐があり、b_1b_2どちらかのノードを経由してcノードにたどり着く。

simple-flow.drawio.png

この場合の処理フローでは、必ず

_start_ -> a -> b_1 -> c -> _end_

もしくは

_start_ -> a -> b_2 -> c -> _end_

どちらかの順でノードが実行され、stateが更新されていきます。

しかし、b_1b_2の両方を同時実行して、複数の処理を並行的に実施させた場合、複数のルートが同時に動き出します。このとき、各ノードの実行順序や、異なるルート間でのstateの競合解決・マージがどのように制御されるのかという疑問が生じます。この記事では、これらの点について調査し、理解した内容を解説します。

LangGraphにおける並行処理の際の挙動

LangGraphの公式ドキュメントによると、ノード間の遷移フローはfan-out and fan-inというメカニズムで制御されています。公式ドキュメントでは簡単な例で説明されていますが、この記事ではもう少し複雑なグラフを用いて、より詳細に深掘りしていきます。

https://langchain-ai.github.io/langgraph/how-tos/graph-api/#create-branches

サンプルコード実行時の注意点

グラフの表示はJupyter上での実行が必要

本記事のサンプルコードはJupyter Notebookで実行しています。通常のPython環境でも実行できますが、IPythonによるグラフ表示ができない点にご注意ください。

Pythonコードを#%%で囲むと、VS CodeではJupyter Notebookのセルとして認識され、エディターに表示されるRun Cellなどをクリックして実行できます。

alt text

必須パッケージのインストール手順

pip install --upgrade pip
pip install langgraph ipython

fan-out and fan-in仕組みの説明

まず、fan-out and fan-inの仕組みを理解するためには、以下のstepに関する3つの基本概念を理解する必要があります。

①. 1つのstepで実行されたノードからエッジ(矢印)が指す次のノード群が、次のstepの実行候補になります。

②. 実行候補となったノードが実際に実行されるかは、エッジの定義方法や条件によって決まります。

③. 1つのステップで複数のノードが実行された後、stateのマージが行われてから次のステップに進みます。

では、これらの概念について順番に説明していきます。

①. 1つのstepで実行されたノードからエッジ(矢印)が指す次のノード群が、次のstepの実行候補になります。

LangGraphにおけるfan-out and fan-inの基本的な仕組みを、以下のグラフで説明します。これは公式ドキュメントにも掲載されている例です。

alt text

↑上記のグラフでは、ノードaから2つのルートに分岐し、それぞれb_1cに遷移します(b_1cは並列実行)。その後、b_1のルートだけがもう1つのノードb_2を経由し、最終的に両方のルートがノードdで合流して処理が終了します。

ここでは後述の②の概念は考慮しないため、すべてのエッジ(矢印)は条件のない通常のエッジであるとします。つまり、矢印が指す先のノードは必ず実行されるものとします。

上記グラフの実際のコードは以下になります。

#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]


def a(state: State):
    return {"aggregate": ["a"]}


def b_1(state: State):
    return {"aggregate": ["b_1"]}


def b_2(state: State):
    return {"aggregate": ["b_2"]}


def c(state: State):
    return {"aggregate": ["c"]}


def d(state: State):
    return {"aggregate": ["d"]}


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b_1)
builder.add_node(c)
builder.add_node(b_2)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b_1")
builder.add_edge("a", "c")
builder.add_edge("b_1", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

graph.invoke(State(aggregate=[]), debug=True)

display(Image(graph.get_graph().draw_mermaid_png()))
#%%

↑上記のコードの各ノードで行っている処理は非常に単純で、自身のノード名をstate内のaggregateというリストの末尾に追加しているだけです。aggregateの中身を見ることで、どのノードがどの順番でstateを更新したかを確認できます。

また、graph.invokeを実行する際にdebugオプションを有効にしているため、実行結果から各ステップでどのノードが実行されたか、およびインプットとアウトプットのstate内容が分かります。

[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': []}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': []}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a']}
[2:tasks] Starting 2 tasks for step 2:
- b_1 -> {'aggregate': ['a']}
- c -> {'aggregate': ['a']}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['b_1'], ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b_1', 'c']}
[3:tasks] Starting 2 tasks for step 3:
- b_2 -> {'aggregate': ['a', 'b_1', 'c']}
- d -> {'aggregate': ['a', 'b_1', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['b_2'], ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
[4:tasks] Starting 1 task for step 4:
- d -> {'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
[4:writes] Finished step 4 with writes to 1 channel:
- aggregate -> ['d']
[4:checkpoint] State at the end of step 4:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd', 'd']}

上記の出力結果はそのままでは見づらいので、各ステップを図で示すと以下のようになります。

fan-in-fan-out.drawio-case1.png

↑上記のstep 3以降が注目ポイントです。

  • step 3step 2b_1cが実行されたため、矢印に従ってstep 3ではb_2dが実行されます。
  • step 4step 3b_2dが実行されたため、矢印に従ってstep 4ではb_2dが実行されます。※cを経由したルートは一歩早くdにたどり着き、その後もう次のノードがないので、ここで終了して_end_に行きます
  • step 5step 4b_2b_2ルート経由でdが実行されたため、矢印に従ってdが実行されます。

ここで、「あれ、思った通りの動きをしていないのでは?」と感じる方がいるのではないでしょうか。

その通りです。多くの方は、このグラフを見ると「b_2cの両方の実行が終わった後にdが実行される」と期待するでしょう。しかし、実際にはルール①に従うため、上記のような直感に反した動作となります。cからのパスがstep 3で先にdに到達して実行し、その後のstep 4b_1b_2からのパスが再びdを実行しているのです。

では、期待通りに「b_2cの両方の実行が終わった後にdを実行する」にはどうすればよいでしょうか。その方法は次の②で説明します。

②. 実行候補となったノードが実際に実行されるかは、エッジの定義方法や条件によって決まります。

複数ソースエッジ

ここでは、①のサンプルコードを少し修正します。

#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]


def a(state: State):
    return {"aggregate": ["a"]}


def b_1(state: State):
    return {"aggregate": ["b_1"]}


def b_2(state: State):
    return {"aggregate": ["b_2"]}


def c(state: State):
    return {"aggregate": ["c"]}


def d(state: State):
    return {"aggregate": ["d"]}


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b_1)
builder.add_node(c)
builder.add_node(b_2)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b_1")
builder.add_edge("a", "c")
builder.add_edge("b_1", "b_2")
# builder.add_edge("b_2", "d") ##ここをコマンドアウト
# builder.add_edge("c", "d") ##ここをコマンドアウト
builder.add_edge(["b_2","c"], "d") ##ここを追加
builder.add_edge("d", END)
graph = builder.compile()

graph.invoke(State(aggregate=[]), debug=True)

display(Image(graph.get_graph().draw_mermaid_png()))
#%%

このコードから生成されるグラフの見た目は①と全く同じですが、実際の挙動は異なります。

alt text

次に実際の出力結果を見てみましょう。

[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': []}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': []}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a']}
[2:tasks] Starting 2 tasks for step 2:
- b_1 -> {'aggregate': ['a']}
- c -> {'aggregate': ['a']}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['b_1'], ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b_1', 'c']}
[3:tasks] Starting 1 task for step 3:
- b_2 -> {'aggregate': ['a', 'b_1', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['b_2']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b_1', 'c', 'b_2']}
[4:tasks] Starting 1 task for step 4:
- d -> {'aggregate': ['a', 'b_1', 'c', 'b_2']}
[4:writes] Finished step 4 with writes to 1 channel:
- aggregate -> ['d']
[4:checkpoint] State at the end of step 4:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}

①のコードとの違いは、dを指すエッジの定義です。

# 旧: 個別に定義
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")

# 新: リストで複数を指定
builder.add_edge(["b_2","c"], "d")

このように、add_edgeの第一引数にソースノードをリスト形式で複数指定すると、リスト内のすべてのソースノード(この場合はb_2c)の処理が完了するまで待ってから、ターゲットノード(d)が実行されます。これにより、①の例のようにdが2回実行される事態を回避でき、期待通りのフローになります。

fan-in-fan-out.drawio-case2.png

コンディションナルエッジ

これまでの例では、aからb_1cへそれぞれ独立したエッジでつながっており、aの次にb_1cが並列実行されていました。これを、条件に応じてb_1cのどちらか一方のみを実行するようにしたい場合は、条件付きエッジ(Conditional Edge)で制御できます。

#%%
import operator
from typing import Annotated, Any, Literal
from IPython.display import Image, display

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    switch: Annotated[bool, "コントロールフラグ"] ##ここを注目
    aggregate: Annotated[list, operator.add]


def a(state: State):
    return {"aggregate": ["a"]}


def b_1(state: State):
    return {"aggregate": ["b_1"]}


def b_2(state: State):
    return {"aggregate": ["b_2"]}


def c(state: State):
    return {"aggregate": ["c"]}


def d(state: State):
    return {"aggregate": ["d"]}

 ##ここを注目
def judge_flow(state: State) -> Literal["b_1", "c"]:
    if state["switch"]:
        return "b_1"
    else:
        return "c"

builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b_1)
builder.add_node(c)
builder.add_node(b_2)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_conditional_edges(source="a", path=judge_flow) #ここを注目
builder.add_edge("b_1", "b_2")
builder.add_edge(["b_2","c"], "d") #ここを注目
builder.add_edge("d", END)
graph = builder.compile()

graph.invoke(State(aggregate=[], switch=False), debug=True) #ここを注目。switchの値がTrueの場合はb_1,b_2ルート、Falseの場合はcルートが実行される

display(Image(graph.get_graph().draw_mermaid_png()))
#%%

実際の出力結果

[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': [], 'switch': False}
[0:writes] Finished step 0 with writes to 2 channels:
- aggregate -> []
- switch -> False
[0:checkpoint] State at the end of step 0:
{'aggregate': [], 'switch': False}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': [], 'switch': False}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a'], 'switch': False}
[2:tasks] Starting 1 task for step 2:
- c -> {'aggregate': ['a'], 'switch': False}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'c'], 'switch': False}

一点注意が必要です。このコードでは、dへのエッジは builder.add_edge(["b_2","c"], "d") のままで、b_2cの両方の完了を待つ設定になっています。しかし、条件付きエッジによってb_1(b_2へ続く)かcのどちらか一方しか実行されないため、dが実行される条件は満たされず、処理はそこで終了します。

fan-in-fan-out.drawio-case3.png

同じステップ内では1つのノードは1回しか実行されない

#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]


def a(state: State):
    return {"aggregate": ["a"]}


def b(state: State):
    return {"aggregate": ["b"]}


def c(state: State):
    return {"aggregate": ["c"]}


def d(state: State):
    return {"aggregate": ["d"]}


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

graph.invoke(State(aggregate=[]), debug=True)

display(Image(graph.get_graph().draw_mermaid_png()))
#%%

LangGraph

alt text

実際の出力結果

[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': []}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': []}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a']}
[2:tasks] Starting 2 tasks for step 2:
- b -> {'aggregate': ['a']}
- c -> {'aggregate': ['a']}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['b'], ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b', 'c']}
[3:tasks] Starting 1 task for step 3:
- d -> {'aggregate': ['a', 'b', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b', 'c', 'd']}

①の例では、異なるステップで同じノード(d)に到達したため、ノードはステップごとに1回ずつ、合計2回実行されました。しかし、この例のように、同じステップ内で複数の元ノード(bc)から同時に同じノード(d)に到達する場合、dは1回しか実行されません。これは、「同じステップ内では、1つのノードは1回しか実行されない」というルールがあるためです。

③. 1つのステップで複数のノードが実行された後、stateのマージが行われてから次のステップに進みます。

最後に、各ステップ間でStateがどのように変化していくかを見ていきましょう。

まず①のサンプルコードに戻ります。

#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    # The operator.add reducer fn makes this append-only
    aggregate: Annotated[list, operator.add]


def a(state: State):
    return {"aggregate": ["a"]}


def b_1(state: State):
    return {"aggregate": ["b_1"]}


def b_2(state: State):
    return {"aggregate": ["b_2"]}


def c(state: State):
    return {"aggregate": ["c"]}


def d(state: State):
    return {"aggregate": ["d"]}


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b_1)
builder.add_node(c)
builder.add_node(b_2)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b_1")
builder.add_edge("a", "c")
builder.add_edge("b_1", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

graph.invoke(State(aggregate=[]), debug=True)

display(Image(graph.get_graph().draw_mermaid_png()))
#%%

出力結果

[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': []}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': []}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a']}
[2:tasks] Starting 2 tasks for step 2:
- b_1 -> {'aggregate': ['a']}
- c -> {'aggregate': ['a']}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['b_1'], ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b_1', 'c']}
[3:tasks] Starting 2 tasks for step 3:
- b_2 -> {'aggregate': ['a', 'b_1', 'c']}
- d -> {'aggregate': ['a', 'b_1', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['b_2'], ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
[4:tasks] Starting 1 task for step 4:
- d -> {'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
[4:writes] Finished step 4 with writes to 1 channel:
- aggregate -> ['d']
[4:checkpoint] State at the end of step 4:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd', 'd']}

処理フロー

fan-in-fan-out.drawio-case1.png

ここで注目してほしいのはstep 3です。

[3:tasks] Starting 2 tasks for step 3:
- b_2 -> {'aggregate': ['a', 'b_1', 'c']}
- d -> {'aggregate': ['a', 'b_1', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['b_2'], ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}

b_2dが並列で実行されますが、両方のノードに渡されたstateは{'aggregate': ['a', 'b_1', 'c']}であり、同一です。

ここで、直感に反する点が2つあります(これらは本質的に同じ原因です):

  • b_2の視点: ab_1b_2というルートを辿ったにもかかわらず、渡されたstateには並列ルートで実行されたcの結果が含まれています。
  • dの視点: acdというルートを辿ったにもかかわらず、渡されたstateには並列ルートで実行されたb_1の結果が含まれています。

このような挙動になる原因は、LangGraphの仕組みにあります。1つのステップ内で複数のノードが実行されると、それらのノードからの更新結果はすべてマージされて1つの新しいstateが作られます。そして、次のステップで実行されるすべてのノードには、このマージ済みの同じstateが渡されるのです。

もし同じステップで複数のノードが意図せず同じstateの値を更新してしまうと、競合が発生します。次の例を見てください。

Step内でのstate更新競合

以下のサンプルでは、aの次にbcが並列で実行され、bcの両方がstateの同じキーfield1を更新しようとします。

#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END


class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    field1: Annotated[int, "何かしらの説明文字列"]


def a(state: State):
    result = {"aggregate": ["a"]}
    print(f'Adding {result} to {state}')
    return result


def b(state: State):
    result = {"aggregate": ["b"], "field1": 1}
    print(f'Adding {result} to {state}')
    return result


def c(state: State):
    result = {"aggregate": ["c"], "field1": 2}
    print(f'Adding {result} to {state}')
    return result


def d(state: State):
    result = {"aggregate": ["d"]}
    print(f'Adding {result} to {state}')
    return result


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

graph.invoke(State(aggregate=[]), debug=True)

display(Image(graph.get_graph().draw_mermaid_png()))
#%%

上記コードを実行すると以下のエラーが発生します。

<略>
InvalidUpdateError: At key 'field1': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE

デフォルトの動作では、ノードから返されたstateの値は、元のstateを単純に上書きします。しかし、この例のように同じステップ内で実行された複数のノード(bc)が同じキー(field1)を更新しようとすると、LangGraphはどちらの値を採用すればよいか判断できず、更新が競合してエラーが発生します。

このstate更新の競合を解決するには、reducerと呼ばれる関数を明示的に指定する必要があります。reducerは、複数の更新値をどのように1つにまとめるか(マージするか)を定義するハンドラーです。例えば、後勝ち、先勝ち、加算など、任意のロジックを持つ関数を指定できます。

例えば、単純に後勝ち(最後にマージされる更新を優先する)にしたい場合は、以下のように修正します。

#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display

from typing_extensions import TypedDict

from langgraph.graph import StateGraph, START, END

def override_previous(a, b):##ここを注目
    return b

class State(TypedDict):
    aggregate: Annotated[list, operator.add]
    field1: Annotated[int, override_previous]##ここを注目


def a(state: State):
    result = {"aggregate": ["a"]}
    print(f'Adding {result} to {state}')
    return result


def b(state: State):
    result = {"aggregate": ["b"], "field1": 1}
    print(f'Adding {result} to {state}')
    return result


def c(state: State):
    result = {"aggregate": ["c"], "field1": 2}
    print(f'Adding {result} to {state}')
    return result


def d(state: State):
    result = {"aggregate": ["d"]}
    print(f'Adding {result} to {state}')
    return result


builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()

graph.invoke(State(aggregate=[]), debug=True)

display(Image(graph.get_graph().draw_mermaid_png()))
#%%

実際の出力結果は以下になります。

[-1:checkpoint] State at the end of step -1:
{'aggregate': [], 'field1': 0}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': [], 'field1': 0}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': [], 'field1': 0}
Adding {'aggregate': ['a']} to {'aggregate': [], 'field1': 0}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a'], 'field1': 0}
[2:tasks] Starting 2 tasks for step 2:
- b -> {'aggregate': ['a'], 'field1': 0}
- c -> {'aggregate': ['a'], 'field1': 0}
Adding {'aggregate': ['b'], 'field1': 1} to {'aggregate': ['a'], 'field1': 0}
Adding {'aggregate': ['c'], 'field1': 2} to {'aggregate': ['a'], 'field1': 0}
[2:writes] Finished step 2 with writes to 2 channels:
- aggregate -> ['b'], ['c']
- field1 -> 1, 2
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b', 'c'], 'field1': 2}
[3:tasks] Starting 1 task for step 3:
- d -> {'aggregate': ['a', 'b', 'c'], 'field1': 2}
Adding {'aggregate': ['d']} to {'aggregate': ['a', 'b', 'c'], 'field1': 2}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b', 'c', 'd'], 'field1': 2}

実行結果を見ると、field1の値は2になっています。これは、reducerとして後勝ちのロジックを指定したためです。では、同じステップで並列実行されるbcのどちらが「後」になるのでしょうか。このマージの順序は、builder.add_node()でノードをグラフに追加した順番によって決まります。この例ではcbの後に追加されているため、cの更新が優先され(勝ち)、field1の値が2となります。

builder.add_node(b)
builder.add_node(c) # cが後に追加されている

ちなみに、これまでの例でaggregateフィールドがエラーにならなかったのは、State定義の時点でAnnotated[list, operator.add]のように、reducerとしてoperator.addが指定されていたからです。aggregateはリストなので、operator.addはリストの結合(list + list)として機能し、更新の競合を適切に解決していました。

Discussion