LangGraphのフロー制御について
目次
LangGraphのフロー制御について
LangGraphでは、ルートの分岐がある場合でも、どちらか一方のルートのみを通るのであれば、直列処理フローになりますので、処理フローはシンプルで理解しやすいかなと思います。
■例:
ノードa
の次に分岐があり、b_1
とb_2
どちらかのノードを経由してc
ノードにたどり着く。
この場合の処理フローでは、必ず
_start_ -> a -> b_1 -> c -> _end_
もしくは
_start_ -> a -> b_2 -> c -> _end_
どちらかの順でノードが実行され、stateが更新されていきます。
しかし、b_1
とb_2
の両方を同時実行して、複数の処理を並行的に実施させた場合、複数のルートが同時に動き出します。このとき、各ノードの実行順序や、異なるルート間でのstateの競合解決・マージがどのように制御されるのかという疑問が生じます。この記事では、これらの点について調査し、理解した内容を解説します。
LangGraphにおける並行処理の際の挙動
LangGraphの公式ドキュメントによると、ノード間の遷移フローはfan-out and fan-in
というメカニズムで制御されています。公式ドキュメントでは簡単な例で説明されていますが、この記事ではもう少し複雑なグラフを用いて、より詳細に深掘りしていきます。
https://langchain-ai.github.io/langgraph/how-tos/graph-api/#create-branches
サンプルコード実行時の注意点
グラフの表示はJupyter上での実行が必要
本記事のサンプルコードはJupyter Notebookで実行しています。通常のPython環境でも実行できますが、IPythonによるグラフ表示ができない点にご注意ください。
Pythonコードを#%%
で囲むと、VS CodeではJupyter Notebookのセルとして認識され、エディターに表示されるRun Cell
などをクリックして実行できます。
必須パッケージのインストール手順
pip install --upgrade pip
pip install langgraph ipython
fan-out and fan-in
仕組みの説明
まず、fan-out and fan-in
の仕組みを理解するためには、以下のstep
に関する3つの基本概念を理解する必要があります。
①. 1つのstepで実行されたノードからエッジ(矢印)が指す次のノード群が、次のstepの実行候補になります。
②. 実行候補となったノードが実際に実行されるかは、エッジの定義方法や条件によって決まります。
③. 1つのステップで複数のノードが実行された後、stateのマージが行われてから次のステップに進みます。
では、これらの概念について順番に説明していきます。
①. 1つのstepで実行されたノードからエッジ(矢印)が指す次のノード群が、次のstepの実行候補になります。
LangGraphにおけるfan-out and fan-in
の基本的な仕組みを、以下のグラフで説明します。これは公式ドキュメントにも掲載されている例です。
↑上記のグラフでは、ノードa
から2つのルートに分岐し、それぞれb_1
とc
に遷移します(b_1
とc
は並列実行)。その後、b_1
のルートだけがもう1つのノードb_2
を経由し、最終的に両方のルートがノードd
で合流して処理が終了します。
ここでは後述の②の概念は考慮しないため、すべてのエッジ(矢印)は条件のない通常のエッジであるとします。つまり、矢印が指す先のノードは必ず実行されるものとします。
上記グラフの実際のコードは以下になります。
#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
return {"aggregate": ["a"]}
def b_1(state: State):
return {"aggregate": ["b_1"]}
def b_2(state: State):
return {"aggregate": ["b_2"]}
def c(state: State):
return {"aggregate": ["c"]}
def d(state: State):
return {"aggregate": ["d"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b_1)
builder.add_node(c)
builder.add_node(b_2)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b_1")
builder.add_edge("a", "c")
builder.add_edge("b_1", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
graph.invoke(State(aggregate=[]), debug=True)
display(Image(graph.get_graph().draw_mermaid_png()))
#%%
↑上記のコードの各ノードで行っている処理は非常に単純で、自身のノード名をstate内のaggregate
というリストの末尾に追加しているだけです。aggregate
の中身を見ることで、どのノードがどの順番でstateを更新したかを確認できます。
また、graph.invoke
を実行する際にdebug
オプションを有効にしているため、実行結果から各ステップでどのノードが実行されたか、およびインプットとアウトプットのstate内容が分かります。
[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': []}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': []}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a']}
[2:tasks] Starting 2 tasks for step 2:
- b_1 -> {'aggregate': ['a']}
- c -> {'aggregate': ['a']}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['b_1'], ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b_1', 'c']}
[3:tasks] Starting 2 tasks for step 3:
- b_2 -> {'aggregate': ['a', 'b_1', 'c']}
- d -> {'aggregate': ['a', 'b_1', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['b_2'], ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
[4:tasks] Starting 1 task for step 4:
- d -> {'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
[4:writes] Finished step 4 with writes to 1 channel:
- aggregate -> ['d']
[4:checkpoint] State at the end of step 4:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd', 'd']}
上記の出力結果はそのままでは見づらいので、各ステップを図で示すと以下のようになります。
↑上記のstep 3
以降が注目ポイントです。
-
step 3
:step 2
でb_1
とc
が実行されたため、矢印に従ってstep 3
ではb_2
とd
が実行されます。 -
step 4
:step 3
でb_2
とd
が実行されたため、矢印に従ってstep 4
ではb_2
とd
が実行されます。※c
を経由したルートは一歩早くd
にたどり着き、その後もう次のノードがないので、ここで終了して_end_
に行きます -
step 5
:step 4
でb_2
、b_2
ルート経由でd
が実行されたため、矢印に従ってd
が実行されます。
ここで、「あれ、思った通りの動きをしていないのでは?」と感じる方がいるのではないでしょうか。
その通りです。多くの方は、このグラフを見ると「b_2
とc
の両方の実行が終わった後にd
が実行される」と期待するでしょう。しかし、実際にはルール①に従うため、上記のような直感に反した動作となります。c
からのパスがstep 3
で先にd
に到達して実行し、その後のstep 4
でb_1
→ b_2
からのパスが再びd
を実行しているのです。
では、期待通りに「b_2
とc
の両方の実行が終わった後にd
を実行する」にはどうすればよいでしょうか。その方法は次の②で説明します。
②. 実行候補となったノードが実際に実行されるかは、エッジの定義方法や条件によって決まります。
複数ソースエッジ
ここでは、①のサンプルコードを少し修正します。
#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
return {"aggregate": ["a"]}
def b_1(state: State):
return {"aggregate": ["b_1"]}
def b_2(state: State):
return {"aggregate": ["b_2"]}
def c(state: State):
return {"aggregate": ["c"]}
def d(state: State):
return {"aggregate": ["d"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b_1)
builder.add_node(c)
builder.add_node(b_2)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b_1")
builder.add_edge("a", "c")
builder.add_edge("b_1", "b_2")
# builder.add_edge("b_2", "d") ##ここをコマンドアウト
# builder.add_edge("c", "d") ##ここをコマンドアウト
builder.add_edge(["b_2","c"], "d") ##ここを追加
builder.add_edge("d", END)
graph = builder.compile()
graph.invoke(State(aggregate=[]), debug=True)
display(Image(graph.get_graph().draw_mermaid_png()))
#%%
このコードから生成されるグラフの見た目は①と全く同じですが、実際の挙動は異なります。
次に実際の出力結果を見てみましょう。
[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': []}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': []}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a']}
[2:tasks] Starting 2 tasks for step 2:
- b_1 -> {'aggregate': ['a']}
- c -> {'aggregate': ['a']}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['b_1'], ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b_1', 'c']}
[3:tasks] Starting 1 task for step 3:
- b_2 -> {'aggregate': ['a', 'b_1', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['b_2']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b_1', 'c', 'b_2']}
[4:tasks] Starting 1 task for step 4:
- d -> {'aggregate': ['a', 'b_1', 'c', 'b_2']}
[4:writes] Finished step 4 with writes to 1 channel:
- aggregate -> ['d']
[4:checkpoint] State at the end of step 4:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
①のコードとの違いは、d
を指すエッジの定義です。
# 旧: 個別に定義
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
# 新: リストで複数を指定
builder.add_edge(["b_2","c"], "d")
このように、add_edge
の第一引数にソースノードをリスト形式で複数指定すると、リスト内のすべてのソースノード(この場合はb_2
とc
)の処理が完了するまで待ってから、ターゲットノード(d
)が実行されます。これにより、①の例のようにd
が2回実行される事態を回避でき、期待通りのフローになります。
コンディションナルエッジ
これまでの例では、a
からb_1
とc
へそれぞれ独立したエッジでつながっており、a
の次にb_1
とc
が並列実行されていました。これを、条件に応じてb_1
とc
のどちらか一方のみを実行するようにしたい場合は、条件付きエッジ(Conditional Edge)で制御できます。
#%%
import operator
from typing import Annotated, Any, Literal
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
switch: Annotated[bool, "コントロールフラグ"] ##ここを注目
aggregate: Annotated[list, operator.add]
def a(state: State):
return {"aggregate": ["a"]}
def b_1(state: State):
return {"aggregate": ["b_1"]}
def b_2(state: State):
return {"aggregate": ["b_2"]}
def c(state: State):
return {"aggregate": ["c"]}
def d(state: State):
return {"aggregate": ["d"]}
##ここを注目
def judge_flow(state: State) -> Literal["b_1", "c"]:
if state["switch"]:
return "b_1"
else:
return "c"
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b_1)
builder.add_node(c)
builder.add_node(b_2)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_conditional_edges(source="a", path=judge_flow) #ここを注目
builder.add_edge("b_1", "b_2")
builder.add_edge(["b_2","c"], "d") #ここを注目
builder.add_edge("d", END)
graph = builder.compile()
graph.invoke(State(aggregate=[], switch=False), debug=True) #ここを注目。switchの値がTrueの場合はb_1,b_2ルート、Falseの場合はcルートが実行される
display(Image(graph.get_graph().draw_mermaid_png()))
#%%
実際の出力結果
[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': [], 'switch': False}
[0:writes] Finished step 0 with writes to 2 channels:
- aggregate -> []
- switch -> False
[0:checkpoint] State at the end of step 0:
{'aggregate': [], 'switch': False}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': [], 'switch': False}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a'], 'switch': False}
[2:tasks] Starting 1 task for step 2:
- c -> {'aggregate': ['a'], 'switch': False}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'c'], 'switch': False}
一点注意が必要です。このコードでは、d
へのエッジは builder.add_edge(["b_2","c"], "d")
のままで、b_2
とc
の両方の完了を待つ設定になっています。しかし、条件付きエッジによってb_1
(b_2
へ続く)かc
のどちらか一方しか実行されないため、d
が実行される条件は満たされず、処理はそこで終了します。
同じステップ内では1つのノードは1回しか実行されない
#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
return {"aggregate": ["a"]}
def b(state: State):
return {"aggregate": ["b"]}
def c(state: State):
return {"aggregate": ["c"]}
def d(state: State):
return {"aggregate": ["d"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
graph.invoke(State(aggregate=[]), debug=True)
display(Image(graph.get_graph().draw_mermaid_png()))
#%%
LangGraph
実際の出力結果
[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': []}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': []}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a']}
[2:tasks] Starting 2 tasks for step 2:
- b -> {'aggregate': ['a']}
- c -> {'aggregate': ['a']}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['b'], ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b', 'c']}
[3:tasks] Starting 1 task for step 3:
- d -> {'aggregate': ['a', 'b', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b', 'c', 'd']}
①の例では、異なるステップで同じノード(d
)に到達したため、ノードはステップごとに1回ずつ、合計2回実行されました。しかし、この例のように、同じステップ内で複数の元ノード(b
とc
)から同時に同じノード(d
)に到達する場合、d
は1回しか実行されません。これは、「同じステップ内では、1つのノードは1回しか実行されない」というルールがあるためです。
③. 1つのステップで複数のノードが実行された後、stateのマージが行われてから次のステップに進みます。
最後に、各ステップ間でStateがどのように変化していくかを見ていきましょう。
まず①のサンプルコードに戻ります。
#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
# The operator.add reducer fn makes this append-only
aggregate: Annotated[list, operator.add]
def a(state: State):
return {"aggregate": ["a"]}
def b_1(state: State):
return {"aggregate": ["b_1"]}
def b_2(state: State):
return {"aggregate": ["b_2"]}
def c(state: State):
return {"aggregate": ["c"]}
def d(state: State):
return {"aggregate": ["d"]}
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b_1)
builder.add_node(c)
builder.add_node(b_2)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b_1")
builder.add_edge("a", "c")
builder.add_edge("b_1", "b_2")
builder.add_edge("b_2", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
graph.invoke(State(aggregate=[]), debug=True)
display(Image(graph.get_graph().draw_mermaid_png()))
#%%
出力結果
[-1:checkpoint] State at the end of step -1:
{'aggregate': []}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': []}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': []}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a']}
[2:tasks] Starting 2 tasks for step 2:
- b_1 -> {'aggregate': ['a']}
- c -> {'aggregate': ['a']}
[2:writes] Finished step 2 with writes to 1 channel:
- aggregate -> ['b_1'], ['c']
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b_1', 'c']}
[3:tasks] Starting 2 tasks for step 3:
- b_2 -> {'aggregate': ['a', 'b_1', 'c']}
- d -> {'aggregate': ['a', 'b_1', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['b_2'], ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
[4:tasks] Starting 1 task for step 4:
- d -> {'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
[4:writes] Finished step 4 with writes to 1 channel:
- aggregate -> ['d']
[4:checkpoint] State at the end of step 4:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd', 'd']}
処理フロー
ここで注目してほしいのはstep 3
です。
[3:tasks] Starting 2 tasks for step 3:
- b_2 -> {'aggregate': ['a', 'b_1', 'c']}
- d -> {'aggregate': ['a', 'b_1', 'c']}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['b_2'], ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b_1', 'c', 'b_2', 'd']}
b_2
とd
が並列で実行されますが、両方のノードに渡されたstateは{'aggregate': ['a', 'b_1', 'c']}
であり、同一です。
ここで、直感に反する点が2つあります(これらは本質的に同じ原因です):
-
b_2
の視点:a
→b_1
→b_2
というルートを辿ったにもかかわらず、渡されたstateには並列ルートで実行されたc
の結果が含まれています。 -
d
の視点:a
→c
→d
というルートを辿ったにもかかわらず、渡されたstateには並列ルートで実行されたb_1
の結果が含まれています。
このような挙動になる原因は、LangGraphの仕組みにあります。1つのステップ内で複数のノードが実行されると、それらのノードからの更新結果はすべてマージされて1つの新しいstateが作られます。そして、次のステップで実行されるすべてのノードには、このマージ済みの同じstateが渡されるのです。
もし同じステップで複数のノードが意図せず同じstateの値を更新してしまうと、競合が発生します。次の例を見てください。
Step内でのstate更新競合
以下のサンプルでは、a
の次にb
とc
が並列で実行され、b
とc
の両方がstateの同じキーfield1
を更新しようとします。
#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
aggregate: Annotated[list, operator.add]
field1: Annotated[int, "何かしらの説明文字列"]
def a(state: State):
result = {"aggregate": ["a"]}
print(f'Adding {result} to {state}')
return result
def b(state: State):
result = {"aggregate": ["b"], "field1": 1}
print(f'Adding {result} to {state}')
return result
def c(state: State):
result = {"aggregate": ["c"], "field1": 2}
print(f'Adding {result} to {state}')
return result
def d(state: State):
result = {"aggregate": ["d"]}
print(f'Adding {result} to {state}')
return result
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
graph.invoke(State(aggregate=[]), debug=True)
display(Image(graph.get_graph().draw_mermaid_png()))
#%%
上記コードを実行すると以下のエラーが発生します。
<略>
InvalidUpdateError: At key 'field1': Can receive only one value per step. Use an Annotated key to handle multiple values.
For troubleshooting, visit: https://python.langchain.com/docs/troubleshooting/errors/INVALID_CONCURRENT_GRAPH_UPDATE
デフォルトの動作では、ノードから返されたstateの値は、元のstateを単純に上書きします。しかし、この例のように同じステップ内で実行された複数のノード(b
とc
)が同じキー(field1
)を更新しようとすると、LangGraphはどちらの値を採用すればよいか判断できず、更新が競合してエラーが発生します。
このstate更新の競合を解決するには、reducer
と呼ばれる関数を明示的に指定する必要があります。reducer
は、複数の更新値をどのように1つにまとめるか(マージするか)を定義するハンドラーです。例えば、後勝ち、先勝ち、加算など、任意のロジックを持つ関数を指定できます。
例えば、単純に後勝ち(最後にマージされる更新を優先する)にしたい場合は、以下のように修正します。
#%%
import operator
from typing import Annotated, Any
from IPython.display import Image, display
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
def override_previous(a, b):##ここを注目
return b
class State(TypedDict):
aggregate: Annotated[list, operator.add]
field1: Annotated[int, override_previous]##ここを注目
def a(state: State):
result = {"aggregate": ["a"]}
print(f'Adding {result} to {state}')
return result
def b(state: State):
result = {"aggregate": ["b"], "field1": 1}
print(f'Adding {result} to {state}')
return result
def c(state: State):
result = {"aggregate": ["c"], "field1": 2}
print(f'Adding {result} to {state}')
return result
def d(state: State):
result = {"aggregate": ["d"]}
print(f'Adding {result} to {state}')
return result
builder = StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)
builder.add_edge(START, "a")
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)
graph = builder.compile()
graph.invoke(State(aggregate=[]), debug=True)
display(Image(graph.get_graph().draw_mermaid_png()))
#%%
実際の出力結果は以下になります。
[-1:checkpoint] State at the end of step -1:
{'aggregate': [], 'field1': 0}
[0:tasks] Starting 1 task for step 0:
- __start__ -> {'aggregate': []}
[0:writes] Finished step 0 with writes to 1 channel:
- aggregate -> []
[0:checkpoint] State at the end of step 0:
{'aggregate': [], 'field1': 0}
[1:tasks] Starting 1 task for step 1:
- a -> {'aggregate': [], 'field1': 0}
Adding {'aggregate': ['a']} to {'aggregate': [], 'field1': 0}
[1:writes] Finished step 1 with writes to 1 channel:
- aggregate -> ['a']
[1:checkpoint] State at the end of step 1:
{'aggregate': ['a'], 'field1': 0}
[2:tasks] Starting 2 tasks for step 2:
- b -> {'aggregate': ['a'], 'field1': 0}
- c -> {'aggregate': ['a'], 'field1': 0}
Adding {'aggregate': ['b'], 'field1': 1} to {'aggregate': ['a'], 'field1': 0}
Adding {'aggregate': ['c'], 'field1': 2} to {'aggregate': ['a'], 'field1': 0}
[2:writes] Finished step 2 with writes to 2 channels:
- aggregate -> ['b'], ['c']
- field1 -> 1, 2
[2:checkpoint] State at the end of step 2:
{'aggregate': ['a', 'b', 'c'], 'field1': 2}
[3:tasks] Starting 1 task for step 3:
- d -> {'aggregate': ['a', 'b', 'c'], 'field1': 2}
Adding {'aggregate': ['d']} to {'aggregate': ['a', 'b', 'c'], 'field1': 2}
[3:writes] Finished step 3 with writes to 1 channel:
- aggregate -> ['d']
[3:checkpoint] State at the end of step 3:
{'aggregate': ['a', 'b', 'c', 'd'], 'field1': 2}
実行結果を見ると、field1
の値は2
になっています。これは、reducer
として後勝ちのロジックを指定したためです。では、同じステップで並列実行されるb
とc
のどちらが「後」になるのでしょうか。このマージの順序は、builder.add_node()
でノードをグラフに追加した順番によって決まります。この例ではc
がb
の後に追加されているため、c
の更新が優先され(勝ち)、field1
の値が2
となります。
builder.add_node(b)
builder.add_node(c) # cが後に追加されている
ちなみに、これまでの例でaggregate
フィールドがエラーにならなかったのは、State定義の時点でAnnotated[list, operator.add]
のように、reducer
としてoperator.add
が指定されていたからです。aggregate
はリストなので、operator.add
はリストの結合(list + list
)として機能し、更新の競合を適切に解決していました。
Discussion