LLMフレームワーク「Burr」を試す
このスレで知った
公式ブログ
AIアプリケーションの状態を管理するのは難しい。特に、ユーザーに影響を与えるような意思決定に使われる場合はなおさらだ。私たちはこれを簡単にするためにBurrを開発した。ステートマシンとしてアプリケーションを構築するための強力なAPI、統合のホスト、そして本番環境でのデバッグやトラッキングのための遠隔測定UIが付属している。その動機と理由を説明し、簡単なチャットボットを実装する。ただ始めたい方は、AIのためのステートマシン(そしてもっと!)のセクションへどうぞ。
レポジトリ
Burrは、シンプルなPythonのビルディング・ブロックから、状態に基づいて意思決定を行うアプリケーション(チャットボット、エージェント、シミュレーションなど)を簡単に開発できる。Burrには、これらの決定をリアルタイムで追跡/監視できるUIが含まれている。
ドキュメント
ということで、Getting Startedに従って進める。今回はローカルのJupyterLabで行う。予め仮想環境を用意しておくこと。
Why Burr?
なぜアプリケーションにステートマシンが必要なのか?通常のプログラミング構成で十分ではないか?
そう、ある時点までは。本番レベルのLLMアプリケーションを構築するために必要なものを見てみよう:
- トレース/テレメトリー
- LLMは混沌としていることがあり、どのような決定がなされ、それにどれくらいの時間がかかったかを可視化する必要がある。
- 状態の永続性
- アプリケーションのセーブ/ロード方法を考えることは、あなたが心配する必要がある、まったく別のレベルのインフラストラクチャです。
- ビジュアライゼーション/デバッグ
- 開発時には、何をしているのか/何をしたのかを表示し、いつでもデータをロードできるようにしたい。
- ユーザー/LLM間のインタラクションを管理する
- 特定の条件下で入力のために一時停止する
- 評価のためのデータ収集+テスト生成
- 本番で実行されたデータを保存し、後の分析/微調整に使用する。
様々なフレームワークをつなぎ合わせたり、すべてを自分で構築することはいつでもできるが、その時点で、ソフトウェアの中核的な価値提案とは関係のない作業に多くの時間を費やすことになる。
Burrは、これらすべてを容易にするために作られた。
アプリケーションをシンプルなPythonの構成要素からなるステートマシンとしてモデル化することで、両方の世界のベストを手に入れることができる。 好きなインフラやツールを持ち込んで、上記のすべてを手に入れることができる。Burrは、LLM(とその他多くの)アプリケーションを簡単に構築するための非常に軽量なツールとしてスタートすることを意図している。Burrが提供するエコシステム、プラグイン、追加機能を活用すればするほど、その価値は高まっていく。
ステートマシンということが全面にアピールされている。この点からはLangGraphあたりが比較対象になりそう。
Installing
burrにはいくつかのエクストラが用意されているが、初めて触る場合に便利な依存パッケージをstart
で指定するのがオススメされている。start
を指定すると可視化・トラッキング・GUIなどが有効になる様子。
!pip install "burr[start]"
あとpython-dotenvを使ってOpenAI APIキーを読み出す。APIキーをセットした.envを作成しておくこと(python-dotenvとopenaiパッケージはburrの依存関係で既にインストールされていると思う)
from dotenv import load_dotenv
load_dotenv(verbose=True)
Simple Example
Burrでシンプルなチャットボットを作成する。
from typing import Tuple
from openai import OpenAI
from burr.core import action, State, ApplicationBuilder, when, persistence
client = OpenAI()
@action(reads=[], writes=["prompt", "chat_history"])
def human_input(state: State, prompt: str) -> Tuple[dict, State]:
chat_item = {
"content": prompt,
"role": "user"
}
return (
{"prompt": prompt},
state.update(prompt=prompt).append(chat_history=chat_item)
)
@action(reads=["chat_history"], writes=["response", "chat_history"])
def ai_response(state: State) -> Tuple[dict, State]:
content = client.chat.completions.create(
model="gpt-3.5-turbo-0125",
messages=state["chat_history"],
).choices[0].message.content
chat_item = {
"content": content,
"role": "assistant"
}
return (
{"response": content},
state.update(response=content).append(chat_history=chat_item)
)
Burrでは、@action
デコレータを使って、何を読み取って、何を書くのかを定義し、そしてそれらを実際にどうやって読み書きするのか、を関数で定義する。
ここでは、ユーザからの入力を処理するhuman_input
と、LLMからの出力を処理するai_response
が定義されている。
-
State
が入出力間での状態を保持する。ここに対してactionで定義されたフィールドに対して入出力を行う。 -
State
オブジェクトは基本的にimmutableで、.append()
や.update()
を通じてのみ更新できる。 - よって重要なのは
State
になるため、関数内ではどんなフレームワークを使っても良い。
そしてApplicationBuilder
を使ってこれらを組み合わせる。
app = (
ApplicationBuilder()
.with_actions(human_input, ai_response)
.with_transitions(
("human_input", "ai_response"),
("ai_response", "human_input")
).with_state(chat_history=[])
.with_entrypoint("human_input")
.build()
)
これを可視化する。
app.visualize("./graph", format="png")
- まず、フローとしては無限ループになっていて、新しいプロンプト入力を待つ際に一時停止する
-
.with_actions
で上記で作成した関数を指定する。 -
.with_state
で保持したい状態を指定する。ここではchat_history
を追加して、初期状態は殻になっている。 -
with_transitions
で処理の遷移を指定する。今回の場合だと以下となる- ユーザからの入力(human_input)が行われたら、次はLLMからの回答(ai_response)が行われる
- LLMからの回答(ai_response)が行われたら、次はユーザからの入力(human_input)が行われる
- つまり、各処理がノードで、ここでエッジでつなげて、グラフを作成しているということだと思う。
- どこかで終端のエッジ用意しないとほんとに無限ループになりそう
- 参考
-
with_entrypoint
でフローのエントリーポイントを指定する。今回はユーザの入力になる。
なお、Builderパターンを採用しているらしい。自分はデザインパターンとか詳しくないので、わかってないけども。
では実行してみる。
*_, state = app.run(halt_after=["ai_response"], inputs={"prompt": "日本の総理大臣は誰?"})
print("answer:", app.state["response"])
print(len(state["chat_history"]), "items in chat")
for i in state["chat_history"]:
print(i)
answer: 現在の日本の総理大臣は菅義偉(すが よしひで)です。
2 items in chat
{'content': '日本の総理大臣は誰?', 'role': 'user'}
{'content': '現在の日本の総理大臣は菅義偉(すが よしひで)です。', 'role': 'assistant'}
Click to add a cell.
なるほど、常にState経由で呼び出すということになるのね。自分の理解としてもう少し噛み砕いて図にしてみた。多分こんな感じだと思う。
次にUIを立ち上げてみる。
!burr
2024-05-18 21:29:51.812 | INFO | burr.cli.__main__:_run_server:107 - Starting server on port 7241
2024-05-18 21:29:51.812 | INFO | burr.cli.__main__:_run_server:113 - Copying demo data over to /home/kun432/.burr...
2024-05-18 21:29:51.812 | INFO | burr.cli.__main__:_run_server:123 - Copying demo_counter over...
2024-05-18 21:29:51.813 | INFO | burr.cli.__main__:_run_server:123 - Copying demo_chatbot over...
2024-05-18 21:29:51.814 | INFO | burr.cli.__main__:_run_server:123 - Copying demo_conversational-rag over...
2024-05-18 21:29:51.815 | INFO | burr.cli.__main__:_run_server:123 - Copying demo_tracing over...
2024-05-18 21:29:51.816 | INFO | burr.cli.__main__:_command:36 - Running command: uvicorn burr.tracking.server.run:app --port 7241
INFO: Started server process [1510302]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://127.0.0.1:7241 (Press CTRL+C to quit)
INFO: 127.0.0.1:58826 - "GET //ready HTTP/1.1" 200 OK
おっと、localhostだけでオープンしてる。うちの場合はリモートのサーバで動かしているので、これだとアクセスできない。
一旦止めて、ヘルプを見てみる。
!burr help
Usage: burr [OPTIONS]
Options:
--port INTEGER Port to run the server on
--dev-mode Run the server in development mode
--no-open Run the server without opening it
--no-copy-demo_data Don't copy demo data over.
--help
それっぽいオプションはないなー。ドキュメントにもそのあたりの記載はないので、一旦SSHでポートフォワーディングしてアクセスすることにする。
$ ssh -L 7241:127.0.0.1:7241 XXX.XXX.XXX.XXX
ブラウザで開くとこんな感じ。
上記は自分で作ったコードのものではないのだけど、ちゃんとプロジェクト?を作成してやればこういう風にトラッキングができるということなのかなー。
ここまでの所感
自分の現時点での認識は以下。
- burrがフレームワークとして提供してくれるのは、フローの制御とStateの管理。
- LangChainやLlamaIndexのような、LLMアプリで必要な機能をフルパッケージで抽象化してくれるフレームワークではない。
- 自分のイメージはこんな感じ
- LangChainでいうところの、LangGraphの部分だけを切り出して、LangChainとの結合度をなくしたような感じ
- なんとなくワークフローエンジンあたりの発想に近しい位置付けじゃないかな、ただ自分が見たワークフローエンジンはどれもDAGになってるものがほとんどで、ステートマシンになっているものはほとんどないように思う
自分も実際にLLMアプリを開発してみて、いろいろな要件ごとにあわせたり、様々な新しいモデル・テクニックなどに対応するためには、柔軟な設計が必要と感じている。
ただそれを実際に実装しようと思うと大変で、それが実現できそうなのは現時点ではワークフロー的に定義できるようなものを使うのが良いと考えて、色々過去に調べていた。
LlamaIndexのQuery Pipeline
LangChainのLCEL
LLM向けは他にも色々ある
LLMに限らないワークフローエンジンも
一通り触ってみて、自分の中ではもうLCELでよいのではないか?という気はしていた。LangChainを採用することの是非はずっと言われ続けているけど、去年言われていたころよりは全然マシになっているだろうと思っているし、LCELを使うことで得られる柔軟性等の恩恵はとても大きいと感じている。
とはいえ、チームでの開発になると学習度や理解度のグラデーションは出てくるし、今後の運用みたいなところに全く不安がないというわけではない。このあたりの技術選定の難しさはケースバイケースであると思っていて(何が「正しい」のかは個々の置かれた状況によって変わる)、LangChainやLlamaIndexなどの巨大なフレームワークを使いたくない、というケースでも、フロー制御とState管理だけはburrを活用して、あとはネイティブで頑張る、みたいな使い方で十分活用できるのではないか?という気がする。
個人的にはシンプルでわかりやすい点も含めて、好印象を持った。
LCLEやQuery Pipeline、LangGraphなどでやっているような複雑な制御と同じことができるか、はもう少し使ってみての判断かな。
ざっくり他にできることを見てみる。
グラフの遷移で条件分岐は可能
Stateの永続化。今のところはSQLite,PostgreSQL、Redisに対応している様子
トラッキングはApplicationBuilder
でState初期化時にトラッキングオブジェクトを渡しておけばよいみたい。
前回の実行結果からテストデータを作成してテストを行うことができる。
デモがいろいろ用意されている。
Actionを並列実行するのは今のところできなさそう。
コンセプトもそれほどボリューム量はないと思うのでサラッと見てみる。
Actions
Actionがやること
-
run
- 計算、というか具体的な処理。この部分が自分の実装のメインになる。
- 処理結果を辞書で返す
-
update
- Stateの更新
- 更新された状態を返す
よって、Stateの何を読み書きするか?を事前に宣言する必要がある。
Actionの書き方は2種類
- 関数ベース
- クラスベース
関数ベース
関数ベースの場合は@action
デコレータを使って書く。シンプルなカウンターだとこんな感じになる。
@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
result = {"counter": state["counter"] + 1}
print(f"counted to {result['counter']}")
return result, state.update(**result)
Stateからcounterを読み出して、+1した結果を辞書で返しつつ、Stateを更新する。
全部書くとこうなる。なお、この例だと、一つのアクションを延々と続ける感じの無限ループになるので、sleepを入れつつ、止めるときは手動で。
from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import time
@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
result = {"counter": state["counter"] + 1}
print(f"counted to {result['counter']}")
time.sleep(1)
return result, state.update(**result)
app = (
ApplicationBuilder()
.with_actions(counter)
.with_transitions(
("counter", "counter")
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
可視化すると無限ループになっているのがわかる。
app.visualize("./graph", format="png")
実行してみる。
action, result, state = app.run()
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 1
counted to 2
counted to 3
counted to 4
counted to 5
counted to 6
counted to 7
counted to 8
counted to 9
(snip)
Actionにパラメータを渡す場合は.bind()
が使える。アプリケーション実行に渡す入力とはちょっと使い方が違うので、こちらは初期化パラメータの変更とかで使えそう。
from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import time
@action(reads=["counter"], writes=["counter"])
def counter(state: State, increment_by: int = 1) -> Tuple[dict, State]: # increment_byをオプション引数で追加
result = {"counter": state["counter"] + increment_by}
print(f"counted to {result['counter']}")
time.sleep(1)
return result, state.update(**result)
app = (
ApplicationBuilder()
.with_actions(counter.bind(increment_by=2)) # Actionにbindして渡す
.with_transitions(
("counter", "counter")
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
action, result, state = app.run()
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 2
counted to 4
counted to 6
counted to 8
counted to 10
counted to 12
counted to 14
counted to 16
(snip)
.bindを使わない場合はrunの実行時に指定する。
action, result, state = app.run(inputs={"increment_by":2})
クラスベース
クラスベースの場合はActionクラスを継承してクラスを定義する。上と同じCounterクラスを定義した。
from burr.core import Action, State, ApplicationBuilder
class Counter(Action):
@property
def reads(self) -> list[str]:
return ["counter"]
def run(self, state: State) -> dict:
new_counter = state["counter"] + 1
print(f"counted to {new_counter}")
time.sleep(1)
return {"counter": new_counter}
@property
def writes(self) -> list[str]:
return ["counter"]
def update(self, result: dict, state: State) -> State:
return state.update(**result)
呼び出すときはこんな感じで名前付きで定義する。
app = (
ApplicationBuilder()
.with_actions(counter=Counter())
.with_transitions(
("counter", "counter")
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
action, result, state = app.run()
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 1
counted to 2
counted to 3
counted to 4
counted to 5
counted to 6
(snip)
クラスベースでパラメータを渡す場合は、inputsプロパティを追加する。
class Counter(Action):
@property
def reads(self) -> list[str]:
return ["counter"]
# runで引数を受けれるようにする。
def run(self, state: State, increment_by: int) -> dict:
new_counter = state["counter"] + increment_by
print(f"counted to {new_counter}")
time.sleep(1)
return {"counter": new_counter}
@property
def writes(self) -> list[str]:
return ["counter"]
def update(self, result: dict, state: State) -> State:
return state.update(**result)
# 以下を追加
@property
def inputs(self) -> list[str]:
return ["increment_by"]
app = (
ApplicationBuilder()
.with_actions(counter=Counter())
.with_transitions(
("counter", "counter")
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
で実行時にinputsを渡す。
action, result, state = app.run(inputs={"increment_by":2})
ただしinputsプロパティで指定した場合は、指定が必須になってしまう。inputsはタプルで (必須の引数, オプションの引数)
を返すので、以下のように指定すれば良さそう。
class Counter(Action):
(snip)
def run(self, state: State, increment_by: int = 1) -> dict:
new_counter = state["counter"] + increment_by
print(f"counted to {new_counter}")
time.sleep(1)
return {"counter": new_counter}
(snip)
@property
def inputs(self) -> Tuple[list[str], list[str]]:
return [], ["increment_by"]
action, result, state = app.run()
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 1
counted to 2
counted to 3
counted to 4
counted to 5
(snip)
action, result, state = app.run(inputs={"increment_by":2})
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 6
counted to 8
counted to 10
counted to 12
counted to 14
counted to 16
ただ、クラスベースの場合はbind()
相当のものがない。まあそらそうか。となると、インスタンス作成するタイミングで渡す感じにすればいいのかな?
class Counter(Action):
def __init__(self, increment_by: int = 1):
super().__init__()
self.increment_by = increment_by
@property
def reads(self) -> list[str]:
return ["counter"]
def run(self, state: State, increment_by: int = None) -> dict:
if not increment_by:
increment_by = self.increment_by
new_counter = state["counter"] + increment_by
print(f"counted to {new_counter}")
time.sleep(1)
return {"counter": new_counter}
@property
def writes(self) -> list[str]:
return ["counter"]
def update(self, result: dict, state: State) -> State:
return state.update(**result)
@property
def inputs(self) -> Tuple[list[str], list[str]]:
return [], ["increment_by"]
Action指定時に「必要ならば」パラメータを渡してやる
app = (
ApplicationBuilder()
.with_actions(counter=Counter(increment_by=2))
.with_transitions(
("counter", "counter")
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
一応どの場合でもこれならいけそう。
action, result, state = app.run()
action, result, state = app.run(inputs={"increment_by":5})
Inputs
/Results
Inputs
/Results
を使うと、入力をそのままStateに渡すノード、Stateから値を取り出すだけのノードが作成できる。あんまりいい例じゃないけどこんな感じ?
from typing import Tuple
from burr.core import action, State, ApplicationBuilder, Result
from burr.core.action import Input
@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
result = {"counter": state["counter"] + state["increment_by"]}
time.sleep(1)
return result, state.update(**result)
app = (
ApplicationBuilder()
.with_actions(counter, get_input=Input("increment_by"), get_result=Result("counter"))
.with_transitions(
("get_input", "counter"),
("counter", "get_result")
)
.with_state(counter=0)
.with_entrypoint("get_input")
.build()
)
action, result, state = app.run(halt_after=["get_result"], inputs={"increment_by":10})
print(state["counter"])
10
実行時の入力
上でも少し書いたけど、入力の受け渡しは、2種類。
-
.bind()
を使う -
run()
時にinputs
で渡す
from typing import Tuple
from openai import OpenAI
from burr.core import action, State, ApplicationBuilder, when, persistence
openai_client = OpenAI()
@action(reads=[], writes=["prompt", "chat_history"])
def human_input(state: State, prompt: str) -> Tuple[dict, State]:
chat_item = {
"content": prompt,
"role": "user"
}
return (
{"prompt": prompt},
state.update(prompt=prompt).append(chat_history=chat_item)
)
@action(reads=["chat_history"], writes=["response", "chat_history"])
def ai_response(state: State, client: OpenAI) -> Tuple[dict, State]:
content = client.chat.completions.create(
model="gpt-3.5-turbo-0125",
messages=state["chat_history"],
).choices[0].message.content
chat_item = {
"content": content,
"role": "assistant"
}
return (
{"response": content},
state.update(response=content).append(chat_history=chat_item)
)
app = (
ApplicationBuilder()
.with_actions(human_input, ai_response.bind(client=openai_client))
.with_transitions(
("human_input", "ai_response"),
("ai_response", "human_input")
).with_state(chat_history=[])
.with_entrypoint("human_input")
.build()
)
*_, state = app.run(halt_after=["ai_response"], inputs={"prompt": "日本の総理大臣は誰?"})
.bind()
で渡すものは初回だけ必要なものとかStateで管理が不要なもの、常に変わるものとかStateで管理が必要なものはrun()
でinputs
で渡す、みたいな使い分けが良さそう。
State
Stateクラスは状態を保持し管理するためのインタフェースを提供する。Stateはimmutableでその馬で変更はできない=新しい状態として作成(して上書き)することで、変更する。
from typing import Tuple
from burr.core import State, action
# State初期化
some_state = State({"counter": 0, "foo": "foo", "bar": ["bar"]})
# 全部取り出し
print(some_state.get_all()) # --> {'counter': 0, 'foo': 'bar', 'buz': [1, 2, 3]}
# 特定のフィールドのみ取り出し(ドキュメント通りに配列で指定するとエラーになる、文字列で指定)
print(some_state.subset("counter","foo")) # --> {'counter': 0, 'foo': 'foo'}
# 特定のフィールドの更新、存在しない場合はフィールドが追加される
some_state = some_state.update(foo="qux")
print(some_state.get_all()) # --> {'counter': 0, 'foo': 'qux', 'bar': ['bar']}
# 特定のリストフィールドへの追加、存在しない場合はフィールドが追加されリストに値が入る
some_state = some_state.append(bar="quux")
print(some_state.get_all()) # --> {'counter': 0, 'foo': 'qux', 'bar': ['bar', 'quux']}
# フィールドが整数値の場合はインクリメント
some_state = some_state.increment(counter=1)
print(some_state.get_all()) # --> {'counter': 1, 'foo': 'qux', 'bar': ['bar', 'quux']}
# 該当のフィールド以外を削除
some_state = some_state.wipe(keep=["counter", "foo"])
print(some_state.get_all()) # --> {'counter': 1, 'foo': 'qux'}
# 該当のフィールドを削除
some_state = some_state.wipe(delete=["foo"])
print(some_state.get_all()) # --> {'counter': 1}
更新時にはgitのcommit/checkout/mergeっぽい動きで更新されるらしい。擬似コードが紹介されていて、実際になんかそれっぽく再現できないかなと思ったけど、手間なので諦めた。
# Stateの初期化
current_state = ...
# Actionが読み取るStateのフィールドを取得
read_state = current_state.subset(action.reads)
# Actionを任意のStateで実行し、結果を取得
result = action.run(new_state)
# Actionが書き込むStateのフィールドを取得
write_state = current_state.subset(action.writes)
# Actionの実行結果から更新後のStateを作成
new_state = action.update(result, new_state)
# 更新後のStateを現在のStateにマージ
current_state = current_state.merge(new_state)
Applications
ApplicationBuilder
をつかって、ActionとStateをつなぐことで、ステートマシンとなる。最低限定義が必要なのは以下とある。
-
with_action()
に、Actionを**kwargsで渡す - Transitions(条件付きの場合も)
- 最初に実行されるエントリーポイント
といいつつ、Stateがなければ使う意味はほとんどないと思うので、実質的には最低限でもこうなると思う。
app = (
ApplicationBuilder()
.with_actions(human_input, ai_response)
.with_transitions(
("human_input", "ai_response"),
("ai_response", "human_input")
).with_state(chat_history=[])
.with_entrypoint("human_input")
.build()
)
アプリケーションの実行
アプリケーションの実行方法には3つのAPIが使える。
step
/astep
step
は、Action、Actionの結果、実行後の更新されたStateを返し、ステップを1回だけ実行するっぽい。
例えば以下のカウンターのサンプル、そのままrun
で実行すると無限ループになるが、
from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import time
@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
result = {"counter": state["counter"] + 1}
#print(f"counted to {result['counter']}")
time.sleep(1)
return result, state.update(**result)
app = (
ApplicationBuilder()
.with_actions(counter)
.with_transitions(
("counter", "counter")
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
step
を使えば、1回だけ実行して終わる。
action, result, state = app.step()
print(action)
print(result)
print(state)
counter: counter -> counter
{'counter': 1}
{'counter': 1, '__SEQUENCE_ID': 0, '__PRIOR_STEP': 'counter'}
再度実行するとこうなる。
counter: counter -> counter
{'counter': 2}
{'counter': 2, '__SEQUENCE_ID': 1, '__PRIOR_STEP': 'counter'}
arun
はActionが非同期の場合に使う。上のやつをasync/awaitを使って書いてみるとこんな感じかな?全然意味がないけれども。
from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import asyncio
import nest_asyncio
nest_asyncio.apply()
@action(reads=["counter"], writes=["counter"])
async def counter(state: State) -> Tuple[dict, State]:
result = {"counter": state["counter"] + 1}
await asyncio.sleep(1)
return result, state.update(**result)
app = (
ApplicationBuilder()
.with_actions(counter)
.with_transitions(
("counter", "counter")
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
action, result, state = await app.astep()
iterate
/aiterate
iterate
/aiterate
はstep
/astep
と同じだけど、ジェネレータっぽく実行させることができる。
同じくカウンターのサンプル。
from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import time
@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
result = {"counter": state["counter"] + 1}
#print(f"counted to {result['counter']}")
time.sleep(1)
return result, state.update(**result)
app = (
ApplicationBuilder()
.with_actions(counter)
.with_transitions(
("counter", "counter")
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
こんな感じで実行。
for action, result, state in app.iterate():
print(action)
print(result)
print(state)
print("----")
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counter: counter -> counter
{'counter': 1}
{'counter': 1, '__SEQUENCE_ID': 0, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 2}
{'counter': 2, '__SEQUENCE_ID': 1, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 3}
{'counter': 3, '__SEQUENCE_ID': 2, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 4}
{'counter': 4, '__SEQUENCE_ID': 3, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 5}
{'counter': 5, '__SEQUENCE_ID': 4, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 6}
{'counter': 6, '__SEQUENCE_ID': 5, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 7}
{'counter': 7, '__SEQUENCE_ID': 6, '__PRIOR_STEP': 'counter'}
ここで返ってくるactionは、halt_after
もしくはhalt_before
で指定したものになるらしい。
というかhalt_after
/halt_before
についての説明がGetting Startedでもされていなかったのだけど、実行時にhalt_after
/halt_before
でActionを指定すると、TransitionでそのActionが実行される場合に無限ループの終端になる、というものだと認識している。
halt_after
/halt_before
の違いは、そのActionが実行されてから終わるのか、そのActionが実行される直前で終わるのか、ということみたい。
当然resultの結果も変わる。halt_after
の場合はその結果になるし、halt_before
の場合はNoneになる。aiterate
の場合は返ってこないらしい。
上の例では指定していないので無限ループになっているけれど、halt_after=[], halt_before=[]
と同じことっぽい。
aitereate
はその非同期版。
run
/arun
でrun
/arun
は結局のところiterate
/aiterate
を実行しているということと同じになるみたい。実際上の例も同じ動きになってる。
ちょっと非同期とかジェネレータみたいなのが絡むと一気に難しくなるよなぁ、サンプル作って理解したかったけど、スキルが足りない。。。
Transitions
TransitionはActionのつながりを指定する、つまりフローを定義する重要な部分。グラフで言うとActionがノードを定義して、Transitionsでノード間のエッジを設定することになる。
Transitionsは3つの要素がある
- from
- to
- condtion
Getting Startedのサンプルだと、"human_input"から"ai_response"、"ai_response"から"human_input"というfrom/toだけを使っていた。
app = (
ApplicationBuilder()
.with_actions(human_input, ai_response)
.with_transitions(
("human_input", "ai_response"),
("ai_response", "human_input")
).with_state(chat_history=[])
.with_entrypoint("human_input")
.build()
)
条件を指定することで、一本道ではないフローを作ることができる。
条件の指定例はこんな感じ。
with_transitions(
("from", "to", when(foo="bar"), # State"foo"の値が"bar"になったら評価される
)
with_transitions(
("from", "to", expr('epochs>100')) # epocsが100以上になったら評価される
)
with_transitions(
("from", "to", default) # 常に評価される
)
with_transitions(
("from", "to") # 指定しない場合はdefaultと同じ
)
否定の場合は~
を使う
with_transitions(
~("from", "to", when(foo="bar"), # State"foo"の値が"bar"以外なら評価される
)
指定するキーは必ずStateに存在する必要がある。存在しないStateのキーを指定した場合はエラーになる。
カウンターが10になるまでインクリメントする例。
from typing import Tuple
from burr.core import action, State, ApplicationBuilder
from burr.core import when, expr, default
import time
@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
result = {"counter": state["counter"] + 1}
print(f"counted to {result['counter']}")
time.sleep(1)
return result, state.update(**result)
app = (
ApplicationBuilder()
.with_actions(counter, Result)
.with_transitions(
("counter", "counter", expr('counter<10'))
)
.with_state(counter=0)
.with_entrypoint("counter")
.build()
)
action, result, state = app.run()
print(action)
print(result)
print(state)
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 1
counted to 2
counted to 3
counted to 4
counted to 5
counted to 6
counted to 7
counted to 8
counted to 9
counted to 10
WARNING:burr.core.application:This is trying to return without having computed a single action -- we'll end up just returning some Nones. This means that nothing was executed (E.G. that the state machine had nowhere to go). Either fix the state machine orthe halt conditions, or both... Halt conditions are: halt_before=[], halt_after=[].Note that this is considered undefined behavior -- if you get here, you should fix!
counter: counter -> counter
{'counter': 10}
{'counter': 10, '__SEQUENCE_ID': 9, '__PRIOR_STEP': 'counter'}
とりあえずこのあたりまでである程度ステートマシンの制御に必要な最低限は概ねカバーしたのではないだろうか。
全体的に良さげな印象は持ってる。あとはLangGraphあたりと使い勝手を比べてみたり、実際に何かしら作ってみて、というところかな。