LangGraphおけるNodeの並列実行について
この記事の概要
こんにちは。PharmaX でエンジニアをしている諸岡(@hakoten)です。
この記事では、大規模言語モデル(LLM)を活用したアプリケーションの開発を支援するフレームワークであるLangChain内にあるツールの一つ、LangGraphを使ってNodeを並列に実行した時の挙動や注意すべき点について、紹介しています。
LangGraphの基本的な使い方の記事をいくつか書いてますので良ければ、こちらもご覧ください。
環境
この記事執筆時点では、以下のバージョンで実施しています。
とくにLangChain周りは非常に開発速度が早いため、現在の最新バージョンを合わせてご確認ください
- langgraph: 0.1.8
- Python: 3.10.12
Nodeの並列実行
LangGraphでは、グラフの実行を最適化するために、分岐があるグラフは並列実行されます。
例えば次のようなグラフの場合、node_a、node_b、node_cは実行時に並列で動作します。
このグラフをLangGraphのコードにすると、次のようになります。
ここでは、意図的にnode_aの処理を遅らせるために3秒間のsleep処理を入れています。
from langgraph.graph import StateGraph
from operator import add
from typing_extensions import TypedDict
from typing import Annotated
import time
class State(TypedDict):
path: Annotated[list[str], add]
graph_builder = StateGraph(State)
def start_node(state: State) -> State:
return { "path": ["start_node"]}
def node_a(state: State) -> State:
# node_aの処理に3秒スリープを入れる
time.sleep(3)
print("log ----> a start")
return { "path": ["node_a"]}
def node_b(state: State) -> State:
print("log ----> b start")
return { "path": ["node_b"]}
def node_c(state: State) -> State:
print("log ----> c start")
return { "path": ["node_c"]}
def end_node(state: State) -> State:
return { "path": ["end_node"]}
graph_builder.add_node("start_node", start_node)
graph_builder.add_node("node_a", node_a)
graph_builder.add_node("node_b", node_b)
graph_builder.add_node("node_c", node_c)
graph_builder.add_node("end_node", end_node)
graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a")
graph_builder.add_edge("start_node", "node_b")
graph_builder.add_edge("start_node", "node_c")
graph_builder.add_edge(["node_a", "node_b", "node_c"], "end_node")
graph_builder.set_finish_point("end_node")
graph = graph_builder.compile()
graph.invoke({'path': []})
invokeメソッドの実行結果は以下の通りです。
(ログ)
log ----> b start
log ----> c start
log ----> a start
(実行結果)
{'path': ['start_node', 'node_a', 'node_b', 'node_c', 'end_node']}
ログから、node_aの処理を待たずにnode_bとnode_cが実行されていることがわかります。
また、実行結果を見ると、pathの最後に end_node
が追加されていることから、node_a、node_b、node_cの結果をend_nodeで待ち合わせていることも確認できます。
このようにLangGraphでは、特に意識しなくてもNodeを並列で定義するだけで、並列実行が実現できます。
並列実行の順序とStateのマージの挙動
ここでは、並列のNodeがどのような順序で実行されるのか、並列実行後にStateがどのように扱われるのか、説明していきます。
例として、次のような3つの分岐があり、各分岐にはさらに直列にNodeが存在するグラフの挙動を見てみます。
実際のコード
from langgraph.graph import StateGraph
from operator import add
from typing_extensions import TypedDict
from typing import Annotated
class State(TypedDict):
path: Annotated[list[str], add]
graph_builder = StateGraph(State)
def start_node(state: State) -> State:
print("log ----> start")
return { "path": ["start_node"]}
def node_a1(state: State) -> State:
print("log ----> a1 start")
return { "path": ["node_a1"]}
def node_a2(state: State) -> State:
print("log ----> a2 start")
return { "path": ["node_a2"]}
def node_b1(state: State) -> State:
print("log ----> b1 start")
return { "path": ["node_b1"]}
def node_b2(state: State) -> State:
print("log ----> b2 start")
return { "path": ["node_b2"]}
def node_b3(state: State) -> State:
print("log ----> b3 start")
return { "path": ["node_b3"]}
def node_c(state: State) -> State:
print("log ----> c start")
return { "path": ["node_c"]}
def end_node(state: State) -> State:
print("log ----> end")
return { "path": ["end_node"]}
graph_builder.add_node("start_node", start_node)
graph_builder.add_node("node_a1", node_a1)
graph_builder.add_node("node_a2", node_a2)
graph_builder.add_node("node_b1", node_b1)
graph_builder.add_node("node_b2", node_b2)
graph_builder.add_node("node_b3", node_b3)
graph_builder.add_node("node_c", node_c)
graph_builder.add_node("end_node", end_node)
graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a1")
graph_builder.add_edge("node_a1", "node_a2")
graph_builder.add_edge("start_node", "node_b1")
graph_builder.add_edge("node_b1", "node_b2")
graph_builder.add_edge("node_b2", "node_b3")
graph_builder.add_edge("start_node", "node_c")
graph_builder.add_edge(["node_a2", "node_b3", "node_c"], "end_node")
graph_builder.set_finish_point("end_node")
graph = graph_builder.compile()
graph.invoke({'path': []}, debug=True)
このグラフをdebugフラグを付けて実行した結果は以下のとおりです。
(グラフの実行)
graph.invoke({'path': []}, debug=True)
(実行結果)
[0:tasks] Starting step 0 with 1 task:
- __start__ -> {'path': []}
[0:writes] Finished step 0 with writes to 1 channel:
- path -> []
[1:tasks] Starting step 1 with 1 task:
- start_node -> {'path': []}
log ----> start
[1:writes] Finished step 1 with writes to 1 channel:
- path -> ['start_node']
[2:tasks] Starting step 2 with 3 tasks:
- node_a1 -> {'path': ['start_node']}
- node_b1 -> {'path': ['start_node']}
- node_c -> {'path': ['start_node']}
log ----> a1 start
log ----> b1 start
log ----> c start
[2:writes] Finished step 2 with writes to 1 channel:
- path -> ['node_a1'], ['node_b1'], ['node_c']
[3:tasks] Starting step 3 with 2 tasks:
- node_a2 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c']}
- node_b2 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c']}
log ----> a2 start
log ----> b2 start
[3:writes] Finished step 3 with writes to 1 channel:
- path -> ['node_a2'], ['node_b2']
[4:tasks] Starting step 4 with 1 task:
- node_b3 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c', 'node_a2', 'node_b2']}
log ----> b3 start
[4:writes] Finished step 4 with writes to 1 channel:
- path -> ['node_b3']
[5:tasks] Starting step 5 with 1 task:
- end_node -> {'path': ['start_node',
'node_a1',
'node_b1',
'node_c',
'node_a2',
'node_b2',
'node_b3']}
log ----> start
[5:writes] Finished step 5 with writes to 1 channel:
- path -> ['end_node']
{'path': ['start_node',
'node_a1',
'node_b1',
'node_c',
'node_a2',
'node_b2',
'node_b3',
'end_node']}
この結果をもとに挙動を並列実行がどのように行われているか見ていきましょう。
並列実行は、1ステップずつ処理される
出力結果の実行ログは、次のようになりました。
log ----> start
log ----> a1 start
log ----> b1 start
log ----> c start
log ----> a2 start
log ----> b2 start
log ----> b3 start
log ----> end
ここでわかるのは、(a1, b1, c)
のNodeと(a2, b2)
のNode、(b3)
のNodeで実行制御が別ステップになっている点です。
※ デバッグログでもステップが分かれていることがわかります。
つまり、Nodeの並列実行の単位は(a1, b1, c)
で区切られており、この中で順番が変わることがありますが、a1よりもb2が早く実行されることはないということです。
ここから、並列実行は分岐ごと行われるわけではなく、各分岐の1ステップごとに実行されるということがわかります。
Stateのマージ順は変わらない
次に、マージされたStateの挙動を確認します。
(a1, b1, c
のNodeが完了した直後のログ)
[3:tasks] Starting step 3 with 2 tasks:
- node_a2 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c']}
- node_b2 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c']}
このように、Node a2とNode b2の実行では、a1, b1, c
のNodeが全て完了後、実行結果がマージされたStateが渡されます。
ポイントとしては、a1, b1, c
のNodeの実行順序が変更されても、マージされたpathのStateは必ず ['start_node', 'node_a1', 'node_b1', 'node_c']
の順番になる点です。何度か試行してみて、「a -> c -> b」、「c -> a -> b」とNodeの実行順序は変わったとしても、出力されるpathの順序は変わりませんでした。
どうやらこれは、add_node
メソッドの定義順に依存しているらしく、add_nodeの順番を変えるとpathの順序も変わるようです。(今後余裕があれば、実装コードも確認してみたいです。)
並列実行時のStateにはReducerが必要
最後に、「並列したNodeの定義では、Reducerを指定する必要がある」という注意点について説明します。
from langgraph.graph import StateGraph
from operator import add
...
class State(TypedDict):
path: Annotated[list[str], add]
これまでのサンプルコードでは、list
型のpathプロパティのReducerとしてadd関数を指定していました。こうすることで、stateの更新はadd関数経由で行われ、pathの更新は単純な値の置き換えではなく、「listへの値の追記」になります。
一方、Reducerを指定しない場合、Stateの値は単純な置き換えになります。試しにpathを次のようなstr型に変更してReducerを指定せずに挙動を確認してみましょう。
class State(TypedDict):
path: str
実際のコード
from langgraph.graph import StateGraph
from operator import add
from typing_extensions import TypedDict
from typing import Annotated
import time
class State(TypedDict):
path: str
graph_builder = StateGraph(State)
def start_node(state: State) -> State:
return { "path": "start_node"}
def node_a(state: State) -> State:
print("log ----> a start")
return { "path": "node_a"}
def node_b(state: State) -> State:
print("log ----> b start")
return { "path": "node_b"}
def node_c(state: State) -> State:
print("log ----> c start")
return { "path": "node_c"}
def end_node(state: State) -> State:
return { "path": "end_node"}
graph_builder.add_node("start_node", start_node)
graph_builder.add_node("node_a", node_a)
graph_builder.add_node("node_b", node_b)
graph_builder.add_node("node_c", node_c)
graph_builder.add_node("end_node", end_node)
graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a")
graph_builder.add_edge("start_node", "node_b")
graph_builder.add_edge("start_node", "node_c")
graph_builder.add_edge(["node_a", "node_b", "node_c"], "end_node")
graph_builder.set_finish_point("end_node")
graph = graph_builder.compile()
このコードは、graph_builder.add_edge("start_node", "node_b")
の箇所でコンパイルエラーになります。エラー内容は次のとおりです。
ValueError Traceback (most recent call last)
<ipython-input-9-8db999c63a01> in <cell line: 38>()
36 graph_builder.set_entry_point("start_node")
37 graph_builder.add_edge("start_node", "node_a")
---> 38 graph_builder.add_edge("start_node", "node_b")
39 graph_builder.add_edge("start_node", "node_c")
40 graph_builder.add_edge(["node_a", "node_b", "node_c"], "end_node")
1 frames
/usr/local/lib/python3.10/dist-packages/langgraph/graph/graph.py in add_edge(self, start_key, end_key)
185 start for start, _ in self.edges
186 ):
--> 187 raise ValueError(
188 f"Already found path for node '{start_key}'.\n"
189 "For multiple edges, use StateGraph with an annotated state key."
ValueError: Already found path for node 'start_node'.
For multiple edges, use StateGraph with an annotated state key.
このエラーは、Nodeを並列に定義した場合、更新の順序が不定となるため「Reducerを使用してロジックを保証する必要がある」ということを示しています。
こちらのGithub issueでも説明されています。
次のようにNodeの分岐をなくし、直列で実行する場合にはエラーは発生しません。
(edgeの定義)
...
graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a")
graph_builder.add_edge("node_a", "node_b")
graph_builder.add_edge("node_b", "node_c")
graph_builder.add_edge( "node_c", "end_node")
graph_builder.set_finish_point("end_node")
...
実際のコード
from langgraph.graph import StateGraph
from operator import add
from typing_extensions import TypedDict
from typing import Annotated
import time
class State(TypedDict):
path: str
graph_builder = StateGraph(State)
def start_node(state: State) -> State:
return { "path": "start_node"}
def node_a(state: State) -> State:
print("log ----> a start")
return { "path": "node_a"}
def node_b(state: State) -> State:
print("log ----> b start")
return { "path": "node_b"}
def node_c(state: State) -> State:
print("log ----> c start")
return { "path": "node_c"}
def end_node(state: State) -> State:
return { "path": "end_node"}
graph_builder.add_node("start_node", start_node)
graph_builder.add_node("node_a", node_a)
graph_builder.add_node("node_b", node_b)
graph_builder.add_node("node_c", node_c)
graph_builder.add_node("end_node", end_node)
graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a")
graph_builder.add_edge("node_a", "node_b")
graph_builder.add_edge("node_b", "node_c")
graph_builder.add_edge( "node_c", "end_node")
graph_builder.set_finish_point("end_node")
graph = graph_builder.compile()
これは、分岐が発生しない場合、Reducerがなくても更新順序が保証されているためです。
実行ログ
graph.invoke({'path': []}, debug=True)
[0:tasks] Starting step 0 with 1 task:
- __start__ -> {'path': []}
[0:writes] Finished step 0 with writes to 1 channel:
- path -> []
[1:tasks] Starting step 1 with 1 task:
- start_node -> {'path': []}
[1:writes] Finished step 1 with writes to 1 channel:
- path -> 'start_node'
[2:tasks] Starting step 2 with 1 task:
- node_a -> {'path': 'start_node'}
log ----> a start
[2:writes] Finished step 2 with writes to 1 channel:
- path -> 'node_a'
[3:tasks] Starting step 3 with 1 task:
- node_b -> {'path': 'node_a'}
log ----> b start
[3:writes] Finished step 3 with writes to 1 channel:
- path -> 'node_b'
[4:tasks] Starting step 4 with 1 task:
- node_c -> {'path': 'node_b'}
log ----> c start
[4:writes] Finished step 4 with writes to 1 channel:
- path -> 'node_c'
[5:tasks] Starting step 5 with 1 task:
- end_node -> {'path': 'node_c'}
[5:writes] Finished step 5 with writes to 1 channel:
- path -> 'end_node'
{'path': 'end_node'}
このように並列に定義したNodeでは、現時点では必ずReducerの指定が必要なため注意が必要です。
まとめ
- LangGraphでは、分岐のあるNodeを定義するだけで分岐は並列実行される
- 並列実行は、分岐全体に対して行われるのではなく、各分岐の1ステップごとに実行される
- 並列実行されるNodeが存在する場合、StateのプロパティにReducerをつける必要がある
終わりに
以上、LangGraphの並列実行について挙動を踏まえて紹介しました。今後も引き続き、LangGraphの様々な使い方を紹介していきたいと思います。
PharmaXでは、様々なバックグラウンドを持つエンジニアの採用をお待ちしております。弊社はAI活用にも力を入れていますので、LLM関連の開発に興味がある方もぜひ気軽にお声がけください。
興味をお持ちの場合は、私のXアカウント(@hakoten)や記事のコメントにお気軽にメッセージをいただければと思います。まずはカジュアルにお話できれば嬉しいです!
PharmaXエンジニアチームのテックブログです。エンジニアメンバーが、PharmaXの事業を通じて得た技術的な知見や、チームマネジメントについての知見を共有します。 PharmaXエンジニアチームやメンバーの雰囲気が分かるような記事は、note(note.com/pharmax)もご覧ください。
Discussion