⛓️

LangGraphおけるNodeの並列実行について

2024/07/22に公開

この記事の概要

こんにちは。PharmaX でエンジニアをしている諸岡(@hakoten)です。

この記事では、大規模言語モデル(LLM)を活用したアプリケーションの開発を支援するフレームワークであるLangChain内にあるツールの一つ、LangGraphを使ってNodeを並列に実行した時の挙動や注意すべき点について、紹介しています。

LangGraphの基本的な使い方の記事をいくつか書いてますので良ければ、こちらもご覧ください。

https://zenn.dev/pharmax/articles/8796b892eed183
https://zenn.dev/pharmax/articles/148bc9497d68dd

環境

この記事執筆時点では、以下のバージョンで実施しています。
とくにLangChain周りは非常に開発速度が早いため、現在の最新バージョンを合わせてご確認ください

  • langgraph: 0.1.8
  • Python: 3.10.12

Nodeの並列実行

LangGraphでは、グラフの実行を最適化するために、分岐があるグラフは並列実行されます。

例えば次のようなグラフの場合、node_a、node_b、node_cは実行時に並列で動作します。

このグラフをLangGraphのコードにすると、次のようになります。
ここでは、意図的にnode_aの処理を遅らせるために3秒間のsleep処理を入れています。

from langgraph.graph import StateGraph
from operator import add
from typing_extensions import TypedDict
from typing import Annotated
import time

class State(TypedDict):
    path: Annotated[list[str], add]

graph_builder = StateGraph(State)

def start_node(state: State) -> State:
    return { "path": ["start_node"]}

def node_a(state: State) -> State:
    # node_aの処理に3秒スリープを入れる
    time.sleep(3)
    print("log ----> a start")
    return { "path": ["node_a"]}

def node_b(state: State) -> State: 
    print("log ----> b start")
    return { "path": ["node_b"]}

def node_c(state: State) -> State: 
    print("log ----> c start")
    return { "path": ["node_c"]}

def end_node(state: State) -> State:
    return { "path": ["end_node"]}

graph_builder.add_node("start_node", start_node)
graph_builder.add_node("node_a", node_a)
graph_builder.add_node("node_b", node_b)
graph_builder.add_node("node_c", node_c)
graph_builder.add_node("end_node", end_node)

graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a")
graph_builder.add_edge("start_node", "node_b")
graph_builder.add_edge("start_node", "node_c")
graph_builder.add_edge(["node_a", "node_b", "node_c"], "end_node")
graph_builder.set_finish_point("end_node")

graph = graph_builder.compile()

graph.invoke({'path': []})

invokeメソッドの実行結果は以下の通りです。

(ログ)

log ----> b start
log ----> c start
log ----> a start

(実行結果)

{'path': ['start_node', 'node_a', 'node_b', 'node_c', 'end_node']}

ログから、node_aの処理を待たずにnode_bとnode_cが実行されていることがわかります。

また、実行結果を見ると、pathの最後に end_node が追加されていることから、node_a、node_b、node_cの結果をend_nodeで待ち合わせていることも確認できます。

このようにLangGraphでは、特に意識しなくてもNodeを並列で定義するだけで、並列実行が実現できます。

並列実行の順序とStateのマージの挙動

ここでは、並列のNodeがどのような順序で実行されるのか、並列実行後にStateがどのように扱われるのか、説明していきます。
例として、次のような3つの分岐があり、各分岐にはさらに直列にNodeが存在するグラフの挙動を見てみます。

実際のコード
from langgraph.graph import StateGraph
from operator import add
from typing_extensions import TypedDict
from typing import Annotated

class State(TypedDict):
    path: Annotated[list[str], add]

graph_builder = StateGraph(State)

def start_node(state: State) -> State:
    print("log ----> start")
    return { "path": ["start_node"]}

def node_a1(state: State) -> State:
    print("log ----> a1 start")
    return { "path": ["node_a1"]}

def node_a2(state: State) -> State:
    print("log ----> a2 start")
    return { "path": ["node_a2"]}

def node_b1(state: State) -> State:
    print("log ----> b1 start")
    return { "path": ["node_b1"]}

def node_b2(state: State) -> State:
    print("log ----> b2 start")
    return { "path": ["node_b2"]}

def node_b3(state: State) -> State:
    print("log ----> b3 start")
    return { "path": ["node_b3"]}

def node_c(state: State) -> State:
    print("log ----> c start")
    return { "path": ["node_c"]}

def end_node(state: State) -> State:
    print("log ----> end")
    return { "path": ["end_node"]}

graph_builder.add_node("start_node", start_node)
graph_builder.add_node("node_a1", node_a1)
graph_builder.add_node("node_a2", node_a2)
graph_builder.add_node("node_b1", node_b1)
graph_builder.add_node("node_b2", node_b2)
graph_builder.add_node("node_b3", node_b3)
graph_builder.add_node("node_c", node_c)
graph_builder.add_node("end_node", end_node)

graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a1")
graph_builder.add_edge("node_a1", "node_a2")
graph_builder.add_edge("start_node", "node_b1")
graph_builder.add_edge("node_b1", "node_b2")
graph_builder.add_edge("node_b2", "node_b3")
graph_builder.add_edge("start_node", "node_c")
graph_builder.add_edge(["node_a2", "node_b3", "node_c"], "end_node")
graph_builder.set_finish_point("end_node")

graph = graph_builder.compile()

graph.invoke({'path': []}, debug=True)

このグラフをdebugフラグを付けて実行した結果は以下のとおりです。

(グラフの実行)

graph.invoke({'path': []}, debug=True)

(実行結果)

[0:tasks] Starting step 0 with 1 task:
- __start__ -> {'path': []}
[0:writes] Finished step 0 with writes to 1 channel:
- path -> []
[1:tasks] Starting step 1 with 1 task:
- start_node -> {'path': []}
log ----> start
[1:writes] Finished step 1 with writes to 1 channel:
- path -> ['start_node']
[2:tasks] Starting step 2 with 3 tasks:
- node_a1 -> {'path': ['start_node']}
- node_b1 -> {'path': ['start_node']}
- node_c -> {'path': ['start_node']}
log ----> a1 start
log ----> b1 start
log ----> c start
[2:writes] Finished step 2 with writes to 1 channel:
- path -> ['node_a1'], ['node_b1'], ['node_c']
[3:tasks] Starting step 3 with 2 tasks:
- node_a2 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c']}
- node_b2 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c']}
log ----> a2 start
log ----> b2 start
[3:writes] Finished step 3 with writes to 1 channel:
- path -> ['node_a2'], ['node_b2']
[4:tasks] Starting step 4 with 1 task:
- node_b3 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c', 'node_a2', 'node_b2']}
log ----> b3 start
[4:writes] Finished step 4 with writes to 1 channel:
- path -> ['node_b3']
[5:tasks] Starting step 5 with 1 task:
- end_node -> {'path': ['start_node',
          'node_a1',
          'node_b1',
          'node_c',
          'node_a2',
          'node_b2',
          'node_b3']}
log ----> start
[5:writes] Finished step 5 with writes to 1 channel:
- path -> ['end_node']
{'path': ['start_node',
  'node_a1',
  'node_b1',
  'node_c',
  'node_a2',
  'node_b2',
  'node_b3',
  'end_node']}

この結果をもとに挙動を並列実行がどのように行われているか見ていきましょう。

並列実行は、1ステップずつ処理される

出力結果の実行ログは、次のようになりました。

log ----> start
log ----> a1 start
log ----> b1 start
log ----> c start
log ----> a2 start
log ----> b2 start
log ----> b3 start
log ----> end

ここでわかるのは、(a1, b1, c)のNodeと(a2, b2)のNode、(b3)のNodeで実行制御が別ステップになっている点です。
※ デバッグログでもステップが分かれていることがわかります。

つまり、Nodeの並列実行の単位は(a1, b1, c)で区切られており、この中で順番が変わることがありますが、a1よりもb2が早く実行されることはないということです。

ここから、並列実行は分岐ごと行われるわけではなく、各分岐の1ステップごとに実行されるということがわかります。

Stateのマージ順は変わらない

次に、マージされたStateの挙動を確認します。

(a1, b1, cのNodeが完了した直後のログ)

[3:tasks] Starting step 3 with 2 tasks:
- node_a2 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c']}
- node_b2 -> {'path': ['start_node', 'node_a1', 'node_b1', 'node_c']}

このように、Node a2とNode b2の実行では、a1, b1, cのNodeが全て完了後、実行結果がマージされたStateが渡されます。

ポイントとしては、a1, b1, cのNodeの実行順序が変更されても、マージされたpathのStateは必ず ['start_node', 'node_a1', 'node_b1', 'node_c'] の順番になる点です。何度か試行してみて、「a -> c -> b」、「c -> a -> b」とNodeの実行順序は変わったとしても、出力されるpathの順序は変わりませんでした。

どうやらこれは、add_nodeメソッドの定義順に依存しているらしく、add_nodeの順番を変えるとpathの順序も変わるようです。(今後余裕があれば、実装コードも確認してみたいです。)

並列実行時のStateにはReducerが必要

最後に、「並列したNodeの定義では、Reducerを指定する必要がある」という注意点について説明します。

from langgraph.graph import StateGraph
from operator import add
...

class State(TypedDict):
    path: Annotated[list[str], add]

これまでのサンプルコードでは、list型のpathプロパティのReducerとしてadd関数を指定していました。こうすることで、stateの更新はadd関数経由で行われ、pathの更新は単純な値の置き換えではなく、「listへの値の追記」になります。

一方、Reducerを指定しない場合、Stateの値は単純な置き換えになります。試しにpathを次のようなstr型に変更してReducerを指定せずに挙動を確認してみましょう。

class State(TypedDict):
    path: str
実際のコード
from langgraph.graph import StateGraph
from operator import add
from typing_extensions import TypedDict
from typing import Annotated
import time

class State(TypedDict):
    path: str

graph_builder = StateGraph(State)

def start_node(state: State) -> State:
    return { "path": "start_node"}

def node_a(state: State) -> State:
    print("log ----> a start")
    return { "path": "node_a"}

def node_b(state: State) -> State:
    print("log ----> b start")
    return { "path": "node_b"}

def node_c(state: State) -> State:
    print("log ----> c start")
    return { "path": "node_c"}

def end_node(state: State) -> State:
    return { "path": "end_node"}

graph_builder.add_node("start_node", start_node)
graph_builder.add_node("node_a", node_a)
graph_builder.add_node("node_b", node_b)
graph_builder.add_node("node_c", node_c)
graph_builder.add_node("end_node", end_node)

graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a")
graph_builder.add_edge("start_node", "node_b")
graph_builder.add_edge("start_node", "node_c")
graph_builder.add_edge(["node_a", "node_b", "node_c"], "end_node")
graph_builder.set_finish_point("end_node")

graph = graph_builder.compile()

このコードは、graph_builder.add_edge("start_node", "node_b")の箇所でコンパイルエラーになります。エラー内容は次のとおりです。

ValueError                                Traceback (most recent call last)
<ipython-input-9-8db999c63a01> in <cell line: 38>()
     36 graph_builder.set_entry_point("start_node")
     37 graph_builder.add_edge("start_node", "node_a")
---> 38 graph_builder.add_edge("start_node", "node_b")
     39 graph_builder.add_edge("start_node", "node_c")
     40 graph_builder.add_edge(["node_a", "node_b", "node_c"], "end_node")

1 frames
/usr/local/lib/python3.10/dist-packages/langgraph/graph/graph.py in add_edge(self, start_key, end_key)
    185             start for start, _ in self.edges
    186         ):
--> 187             raise ValueError(
    188                 f"Already found path for node '{start_key}'.\n"
    189                 "For multiple edges, use StateGraph with an annotated state key."

ValueError: Already found path for node 'start_node'.
For multiple edges, use StateGraph with an annotated state key.

このエラーは、Nodeを並列に定義した場合、更新の順序が不定となるため「Reducerを使用してロジックを保証する必要がある」ということを示しています。

こちらのGithub issueでも説明されています。

次のようにNodeの分岐をなくし、直列で実行する場合にはエラーは発生しません。

(edgeの定義)

...
graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a")
graph_builder.add_edge("node_a", "node_b")
graph_builder.add_edge("node_b", "node_c")
graph_builder.add_edge( "node_c", "end_node")
graph_builder.set_finish_point("end_node")
...
実際のコード

from langgraph.graph import StateGraph
from operator import add
from typing_extensions import TypedDict
from typing import Annotated
import time

class State(TypedDict):
path: str

graph_builder = StateGraph(State)

def start_node(state: State) -> State:
return { "path": "start_node"}

def node_a(state: State) -> State:
print("log ----> a start")
return { "path": "node_a"}

def node_b(state: State) -> State:
print("log ----> b start")
return { "path": "node_b"}

def node_c(state: State) -> State:
print("log ----> c start")
return { "path": "node_c"}

def end_node(state: State) -> State:
return { "path": "end_node"}

graph_builder.add_node("start_node", start_node)
graph_builder.add_node("node_a", node_a)
graph_builder.add_node("node_b", node_b)
graph_builder.add_node("node_c", node_c)
graph_builder.add_node("end_node", end_node)

graph_builder.set_entry_point("start_node")
graph_builder.add_edge("start_node", "node_a")
graph_builder.add_edge("node_a", "node_b")
graph_builder.add_edge("node_b", "node_c")
graph_builder.add_edge( "node_c", "end_node")
graph_builder.set_finish_point("end_node")

graph = graph_builder.compile()

これは、分岐が発生しない場合、Reducerがなくても更新順序が保証されているためです。

実行ログ
graph.invoke({'path': []}, debug=True)

[0:tasks] Starting step 0 with 1 task:
- __start__ -> {'path': []}
[0:writes] Finished step 0 with writes to 1 channel:
- path -> []
[1:tasks] Starting step 1 with 1 task:
- start_node -> {'path': []}
[1:writes] Finished step 1 with writes to 1 channel:
- path -> 'start_node'
[2:tasks] Starting step 2 with 1 task:
- node_a -> {'path': 'start_node'}
log ----> a start
[2:writes] Finished step 2 with writes to 1 channel:
- path -> 'node_a'
[3:tasks] Starting step 3 with 1 task:
- node_b -> {'path': 'node_a'}
log ----> b start
[3:writes] Finished step 3 with writes to 1 channel:
- path -> 'node_b'
[4:tasks] Starting step 4 with 1 task:
- node_c -> {'path': 'node_b'}
log ----> c start
[4:writes] Finished step 4 with writes to 1 channel:
- path -> 'node_c'
[5:tasks] Starting step 5 with 1 task:
- end_node -> {'path': 'node_c'}
[5:writes] Finished step 5 with writes to 1 channel:
- path -> 'end_node'
{'path': 'end_node'}

このように並列に定義したNodeでは、現時点では必ずReducerの指定が必要なため注意が必要です。

まとめ

  • LangGraphでは、分岐のあるNodeを定義するだけで分岐は並列実行される
  • 並列実行は、分岐全体に対して行われるのではなく、各分岐の1ステップごとに実行される
  • 並列実行されるNodeが存在する場合、StateのプロパティにReducerをつける必要がある

終わりに

以上、LangGraphの並列実行について挙動を踏まえて紹介しました。今後も引き続き、LangGraphの様々な使い方を紹介していきたいと思います。

PharmaXでは、様々なバックグラウンドを持つエンジニアの採用をお待ちしております。弊社はAI活用にも力を入れていますので、LLM関連の開発に興味がある方もぜひ気軽にお声がけください。

興味をお持ちの場合は、私のXアカウント(@hakoten)や記事のコメントにお気軽にメッセージをいただければと思います。まずはカジュアルにお話できれば嬉しいです!

PharmaXテックブログ

Discussion