Closed11

LLMフレームワーク「Burr」を試す

kun432kun432

このスレで知った

https://www.reddit.com/r/LangChain/comments/1cstpmx/llm_orchestration_framework_or_own_python_code/

公式ブログ

https://blog.dagworks.io/p/burr-develop-stateful-ai-applications?r=2cg5z1&utm_campaign=post&utm_medium=web

AIアプリケーションの状態を管理するのは難しい。特に、ユーザーに影響を与えるような意思決定に使われる場合はなおさらだ。私たちはこれを簡単にするためにBurrを開発した。ステートマシンとしてアプリケーションを構築するための強力なAPI、統合のホスト、そして本番環境でのデバッグやトラッキングのための遠隔測定UIが付属している。その動機と理由を説明し、簡単なチャットボットを実装する。ただ始めたい方は、AIのためのステートマシン(そしてもっと!)のセクションへどうぞ。

レポジトリ
https://github.com/DAGWorks-Inc/burr

Burrは、シンプルなPythonのビルディング・ブロックから、状態に基づいて意思決定を行うアプリケーション(チャットボット、エージェント、シミュレーションなど)を簡単に開発できる。Burrには、これらの決定をリアルタイムで追跡/監視できるUIが含まれている。

ドキュメント
https://burr.dagworks.io/

kun432kun432

ということで、Getting Startedに従って進める。今回はローカルのJupyterLabで行う。予め仮想環境を用意しておくこと。

https://burr.dagworks.io/getting_started/

Why Burr?

なぜアプリケーションにステートマシンが必要なのか?通常のプログラミング構成で十分ではないか?

そう、ある時点までは。本番レベルのLLMアプリケーションを構築するために必要なものを見てみよう:

  1. トレース/テレメトリー
    • LLMは混沌としていることがあり、どのような決定がなされ、それにどれくらいの時間がかかったかを可視化する必要がある。
  2. 状態の永続性
    • アプリケーションのセーブ/ロード方法を考えることは、あなたが心配する必要がある、まったく別のレベルのインフラストラクチャです。
  3. ビジュアライゼーション/デバッグ
    • 開発時には、何をしているのか/何をしたのかを表示し、いつでもデータをロードできるようにしたい。
  4. ユーザー/LLM間のインタラクションを管理する
    • 特定の条件下で入力のために一時停止する
  5. 評価のためのデータ収集+テスト生成
    • 本番で実行されたデータを保存し、後の分析/微調整に使用する。

様々なフレームワークをつなぎ合わせたり、すべてを自分で構築することはいつでもできるが、その時点で、ソフトウェアの中核的な価値提案とは関係のない作業に多くの時間を費やすことになる。

Burrは、これらすべてを容易にするために作られた。

アプリケーションをシンプルなPythonの構成要素からなるステートマシンとしてモデル化することで、両方の世界のベストを手に入れることができる。 好きなインフラやツールを持ち込んで、上記のすべてを手に入れることができる。Burrは、LLM(とその他多くの)アプリケーションを簡単に構築するための非常に軽量なツールとしてスタートすることを意図している。Burrが提供するエコシステム、プラグイン、追加機能を活用すればするほど、その価値は高まっていく。

ステートマシンということが全面にアピールされている。この点からはLangGraphあたりが比較対象になりそう。

Installing

burrにはいくつかのエクストラが用意されているが、初めて触る場合に便利な依存パッケージをstartで指定するのがオススメされている。startを指定すると可視化・トラッキング・GUIなどが有効になる様子。

!pip install "burr[start]"

あとpython-dotenvを使ってOpenAI APIキーを読み出す。APIキーをセットした.envを作成しておくこと(python-dotenvとopenaiパッケージはburrの依存関係で既にインストールされていると思う)

from dotenv import load_dotenv

load_dotenv(verbose=True)

Simple Example

Burrでシンプルなチャットボットを作成する。

from typing import Tuple
from openai import OpenAI
from burr.core import action, State, ApplicationBuilder, when, persistence

client = OpenAI()

@action(reads=[], writes=["prompt", "chat_history"])
def human_input(state: State, prompt: str) -> Tuple[dict, State]:
    chat_item = {
        "content": prompt,
        "role": "user"
    }
    return (
        {"prompt": prompt},
        state.update(prompt=prompt).append(chat_history=chat_item)
    )

@action(reads=["chat_history"], writes=["response", "chat_history"])
def ai_response(state: State) -> Tuple[dict, State]:
    content = client.chat.completions.create(
        model="gpt-3.5-turbo-0125",
        messages=state["chat_history"],
    ).choices[0].message.content
    chat_item = {
        "content": content,
        "role": "assistant"
    }
    return (
        {"response": content},
        state.update(response=content).append(chat_history=chat_item)
    )

Burrでは、@actionデコレータを使って、何を読み取って、何を書くのかを定義し、そしてそれらを実際にどうやって読み書きするのか、を関数で定義する。

ここでは、ユーザからの入力を処理するhuman_inputと、LLMからの出力を処理するai_responseが定義されている。

  • Stateが入出力間での状態を保持する。ここに対してactionで定義されたフィールドに対して入出力を行う。
  • Stateオブジェクトは基本的にimmutableで、.append().update()を通じてのみ更新できる。
  • よって重要なのはStateになるため、関数内ではどんなフレームワークを使っても良い。

そしてApplicationBuilderを使ってこれらを組み合わせる。

app = (
    ApplicationBuilder()
    .with_actions(human_input, ai_response)
    .with_transitions(
        ("human_input", "ai_response"),
        ("ai_response", "human_input")
    ).with_state(chat_history=[])
    .with_entrypoint("human_input")
    .build()
)

これを可視化する。

app.visualize("./graph", format="png")

  • まず、フローとしては無限ループになっていて、新しいプロンプト入力を待つ際に一時停止する
  • .with_actionsで上記で作成した関数を指定する。
  • .with_stateで保持したい状態を指定する。ここではchat_historyを追加して、初期状態は殻になっている。
  • with_transitionsで処理の遷移を指定する。今回の場合だと以下となる
    • ユーザからの入力(human_input)が行われたら、次はLLMからの回答(ai_response)が行われる
    • LLMからの回答(ai_response)が行われたら、次はユーザからの入力(human_input)が行われる
    • つまり、各処理がノードで、ここでエッジでつなげて、グラフを作成しているということだと思う。
  • with_entrypointでフローのエントリーポイントを指定する。今回はユーザの入力になる。

なお、Builderパターンを採用しているらしい。自分はデザインパターンとか詳しくないので、わかってないけども。

https://ja.wikipedia.org/wiki/Builder_パターン

では実行してみる。

*_, state = app.run(halt_after=["ai_response"], inputs={"prompt": "日本の総理大臣は誰?"})
print("answer:", app.state["response"])
print(len(state["chat_history"]), "items in chat")
for i in state["chat_history"]:
    print(i)
answer: 現在の日本の総理大臣は菅義偉(すが よしひで)です。
2 items in chat
{'content': '日本の総理大臣は誰?', 'role': 'user'}
{'content': '現在の日本の総理大臣は菅義偉(すが よしひで)です。', 'role': 'assistant'}
Click to add a cell.

なるほど、常にState経由で呼び出すということになるのね。自分の理解としてもう少し噛み砕いて図にしてみた。多分こんな感じだと思う。

次にUIを立ち上げてみる。

!burr
2024-05-18 21:29:51.812 | INFO     | burr.cli.__main__:_run_server:107 - Starting server on port 7241
2024-05-18 21:29:51.812 | INFO     | burr.cli.__main__:_run_server:113 - Copying demo data over to /home/kun432/.burr...
2024-05-18 21:29:51.812 | INFO     | burr.cli.__main__:_run_server:123 - Copying demo_counter over...
2024-05-18 21:29:51.813 | INFO     | burr.cli.__main__:_run_server:123 - Copying demo_chatbot over...
2024-05-18 21:29:51.814 | INFO     | burr.cli.__main__:_run_server:123 - Copying demo_conversational-rag over...
2024-05-18 21:29:51.815 | INFO     | burr.cli.__main__:_run_server:123 - Copying demo_tracing over...
2024-05-18 21:29:51.816 | INFO     | burr.cli.__main__:_command:36 - Running command: uvicorn burr.tracking.server.run:app --port 7241
INFO:     Started server process [1510302]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:7241 (Press CTRL+C to quit)
INFO:     127.0.0.1:58826 - "GET //ready HTTP/1.1" 200 OK

おっと、localhostだけでオープンしてる。うちの場合はリモートのサーバで動かしているので、これだとアクセスできない。

一旦止めて、ヘルプを見てみる。

!burr help
Usage: burr [OPTIONS]

Options:
  --port INTEGER       Port to run the server on
  --dev-mode           Run the server in development mode
  --no-open            Run the server without opening it
  --no-copy-demo_data  Don't copy demo data over.
  --help

それっぽいオプションはないなー。ドキュメントにもそのあたりの記載はないので、一旦SSHでポートフォワーディングしてアクセスすることにする。

$ ssh -L 7241:127.0.0.1:7241 XXX.XXX.XXX.XXX

ブラウザで開くとこんな感じ。

上記は自分で作ったコードのものではないのだけど、ちゃんとプロジェクト?を作成してやればこういう風にトラッキングができるということなのかなー。

kun432kun432

ここまでの所感

自分の現時点での認識は以下。

  • burrがフレームワークとして提供してくれるのは、フローの制御とStateの管理。
  • LangChainやLlamaIndexのような、LLMアプリで必要な機能をフルパッケージで抽象化してくれるフレームワークではない。
  • 自分のイメージはこんな感じ
    • LangChainでいうところの、LangGraphの部分だけを切り出して、LangChainとの結合度をなくしたような感じ
    • なんとなくワークフローエンジンあたりの発想に近しい位置付けじゃないかな、ただ自分が見たワークフローエンジンはどれもDAGになってるものがほとんどで、ステートマシンになっているものはほとんどないように思う

自分も実際にLLMアプリを開発してみて、いろいろな要件ごとにあわせたり、様々な新しいモデル・テクニックなどに対応するためには、柔軟な設計が必要と感じている。

ただそれを実際に実装しようと思うと大変で、それが実現できそうなのは現時点ではワークフロー的に定義できるようなものを使うのが良いと考えて、色々過去に調べていた。

LlamaIndexのQuery Pipeline

https://zenn.dev/kun432/scraps/17b82ca0f7ad2e

LangChainのLCEL

https://zenn.dev/kun432/scraps/854a976153b481
https://zenn.dev/kun432/scraps/db821198c4207d

LLM向けは他にも色々ある

https://zenn.dev/kun432/scraps/c7173a6d6527e8

LLMに限らないワークフローエンジンも

https://zenn.dev/kun432/scraps/98ebb0c95cefb1

https://zenn.dev/kun432/scraps/fa6023c11aebf0

一通り触ってみて、自分の中ではもうLCELでよいのではないか?という気はしていた。LangChainを採用することの是非はずっと言われ続けているけど、去年言われていたころよりは全然マシになっているだろうと思っているし、LCELを使うことで得られる柔軟性等の恩恵はとても大きいと感じている。

とはいえ、チームでの開発になると学習度や理解度のグラデーションは出てくるし、今後の運用みたいなところに全く不安がないというわけではない。このあたりの技術選定の難しさはケースバイケースであると思っていて(何が「正しい」のかは個々の置かれた状況によって変わる)、LangChainやLlamaIndexなどの巨大なフレームワークを使いたくない、というケースでも、フロー制御とState管理だけはburrを活用して、あとはネイティブで頑張る、みたいな使い方で十分活用できるのではないか?という気がする。

個人的にはシンプルでわかりやすい点も含めて、好印象を持った。

kun432kun432

LCLEやQuery Pipeline、LangGraphなどでやっているような複雑な制御と同じことができるか、はもう少し使ってみての判断かな。

kun432kun432

ざっくり他にできることを見てみる。

グラフの遷移で条件分岐は可能

https://burr.dagworks.io/concepts/transitions/#id1

Stateの永続化。今のところはSQLite,PostgreSQL、Redisに対応している様子

https://burr.dagworks.io/concepts/state-persistence/#id1

https://burr.dagworks.io/reference/persister/#persistersref

トラッキングはApplicationBuilderでState初期化時にトラッキングオブジェクトを渡しておけばよいみたい。

https://burr.dagworks.io/concepts/tracking/#tracking

前回の実行結果からテストデータを作成してテストを行うことができる。

https://github.com/DAGWorks-Inc/burr/tree/main/examples/test-case-creation

デモがいろいろ用意されている。

https://burr.dagworks.io/getting_started/up-next/

Actionを並列実行するのは今のところできなさそう。
https://github.com/DAGWorks-Inc/burr/issues/52

kun432kun432

Actions

https://burr.dagworks.io/concepts/actions/

Actionがやること

  • run
    • 計算、というか具体的な処理。この部分が自分の実装のメインになる。
    • 処理結果を辞書で返す
  • update
    • Stateの更新
    • 更新された状態を返す

よって、Stateの何を読み書きするか?を事前に宣言する必要がある。

Actionの書き方は2種類

  • 関数ベース
  • クラスベース

関数ベース

関数ベースの場合は@actionデコレータを使って書く。シンプルなカウンターだとこんな感じになる。

@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
    result = {"counter": state["counter"] + 1}
    print(f"counted to {result['counter']}")
    return result, state.update(**result)

Stateからcounterを読み出して、+1した結果を辞書で返しつつ、Stateを更新する。

全部書くとこうなる。なお、この例だと、一つのアクションを延々と続ける感じの無限ループになるので、sleepを入れつつ、止めるときは手動で。

from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import time

@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
    result = {"counter": state["counter"] + 1}
    print(f"counted to {result['counter']}")
    time.sleep(1)
    return result, state.update(**result)

app = (
    ApplicationBuilder()
    .with_actions(counter)
    .with_transitions(
        ("counter", "counter")
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

可視化すると無限ループになっているのがわかる。

app.visualize("./graph", format="png")

実行してみる。

action, result, state = app.run()
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 1
counted to 2
counted to 3
counted to 4
counted to 5
counted to 6
counted to 7
counted to 8
counted to 9
(snip)

Actionにパラメータを渡す場合は.bind()が使える。アプリケーション実行に渡す入力とはちょっと使い方が違うので、こちらは初期化パラメータの変更とかで使えそう。

from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import time

@action(reads=["counter"], writes=["counter"])
def counter(state: State, increment_by: int = 1) -> Tuple[dict, State]:     # increment_byをオプション引数で追加
    result = {"counter": state["counter"] + increment_by}
    print(f"counted to {result['counter']}")
    time.sleep(1)
    return result, state.update(**result)

app = (
    ApplicationBuilder()
    .with_actions(counter.bind(increment_by=2))     # Actionにbindして渡す
    .with_transitions(
        ("counter", "counter")
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

action, result, state = app.run()
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 2
counted to 4
counted to 6
counted to 8
counted to 10
counted to 12
counted to 14
counted to 16
(snip)

.bindを使わない場合はrunの実行時に指定する。

action, result, state = app.run(inputs={"increment_by":2})

クラスベース

クラスベースの場合はActionクラスを継承してクラスを定義する。上と同じCounterクラスを定義した。

from burr.core import Action, State, ApplicationBuilder

class Counter(Action):
   @property
   def reads(self) -> list[str]:
       return ["counter"]

   def run(self, state: State) -> dict:
       new_counter = state["counter"] + 1
       print(f"counted to {new_counter}")
       time.sleep(1)
       return {"counter": new_counter}

   @property
   def writes(self) -> list[str]:
       return ["counter"]

   def update(self, result: dict, state: State) -> State:
       return state.update(**result)

呼び出すときはこんな感じで名前付きで定義する。

app = (
    ApplicationBuilder()
    .with_actions(counter=Counter())
    .with_transitions(
        ("counter", "counter")
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

action, result, state = app.run()
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 1
counted to 2
counted to 3
counted to 4
counted to 5
counted to 6
(snip)

クラスベースでパラメータを渡す場合は、inputsプロパティを追加する。

class Counter(Action):
    @property
    def reads(self) -> list[str]:
        return ["counter"]

    # runで引数を受けれるようにする。
    def run(self, state: State, increment_by: int) -> dict:
        new_counter = state["counter"] + increment_by
        print(f"counted to {new_counter}")
        time.sleep(1)
        return {"counter": new_counter}

    @property
    def writes(self) -> list[str]:
        return ["counter"]

    def update(self, result: dict, state: State) -> State:
        return state.update(**result)

    # 以下を追加
    @property
    def inputs(self) -> list[str]:
        return ["increment_by"]

app = (
    ApplicationBuilder()
    .with_actions(counter=Counter())
    .with_transitions(
        ("counter", "counter")
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

で実行時にinputsを渡す。

action, result, state = app.run(inputs={"increment_by":2})

ただしinputsプロパティで指定した場合は、指定が必須になってしまう。inputsはタプルで (必須の引数, オプションの引数)を返すので、以下のように指定すれば良さそう。

class Counter(Action):
    (snip)

    def run(self, state: State, increment_by: int = 1) -> dict:
        new_counter = state["counter"] + increment_by
        print(f"counted to {new_counter}")
        time.sleep(1)
        return {"counter": new_counter}

    (snip)

    @property
    def inputs(self) -> Tuple[list[str], list[str]]:
        return [], ["increment_by"]
action, result, state = app.run()
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 1
counted to 2
counted to 3
counted to 4
counted to 5
(snip)
action, result, state = app.run(inputs={"increment_by":2})
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 6
counted to 8
counted to 10
counted to 12
counted to 14
counted to 16

ただ、クラスベースの場合はbind()相当のものがない。まあそらそうか。となると、インスタンス作成するタイミングで渡す感じにすればいいのかな?

class Counter(Action):
    def __init__(self, increment_by: int = 1):
        super().__init__()
        self.increment_by = increment_by

    @property
    def reads(self) -> list[str]:
        return ["counter"]

    def run(self, state: State, increment_by: int = None) -> dict:
        if not increment_by:
            increment_by = self.increment_by
        new_counter = state["counter"] + increment_by
        print(f"counted to {new_counter}")
        time.sleep(1)
        return {"counter": new_counter}

    @property
    def writes(self) -> list[str]:
        return ["counter"]

    def update(self, result: dict, state: State) -> State:
        return state.update(**result)

    @property
    def inputs(self) -> Tuple[list[str], list[str]]:
        return [], ["increment_by"]

Action指定時に「必要ならば」パラメータを渡してやる

app = (
    ApplicationBuilder()
    .with_actions(counter=Counter(increment_by=2))
    .with_transitions(
        ("counter", "counter")
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

一応どの場合でもこれならいけそう。

action, result, state = app.run()
action, result, state = app.run(inputs={"increment_by":5})

Inputs/Results

Inputs/Resultsを使うと、入力をそのままStateに渡すノード、Stateから値を取り出すだけのノードが作成できる。あんまりいい例じゃないけどこんな感じ?

from typing import Tuple
from burr.core import action, State, ApplicationBuilder, Result
from burr.core.action import Input


@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
    result = {"counter": state["counter"] + state["increment_by"]}
    time.sleep(1)
    return result, state.update(**result)

app = (
    ApplicationBuilder()
    .with_actions(counter, get_input=Input("increment_by"), get_result=Result("counter"))
    .with_transitions(
        ("get_input", "counter"),
        ("counter", "get_result")
    )
    .with_state(counter=0)
    .with_entrypoint("get_input")
    .build()
)
action, result, state = app.run(halt_after=["get_result"], inputs={"increment_by":10})
print(state["counter"])
10

実行時の入力

上でも少し書いたけど、入力の受け渡しは、2種類。

  • .bind()を使う
  • run()時にinputsで渡す
from typing import Tuple
from openai import OpenAI
from burr.core import action, State, ApplicationBuilder, when, persistence

openai_client = OpenAI()

@action(reads=[], writes=["prompt", "chat_history"])
def human_input(state: State, prompt: str) -> Tuple[dict, State]:
    chat_item = {
        "content": prompt,
        "role": "user"
    }
    return (
        {"prompt": prompt},
        state.update(prompt=prompt).append(chat_history=chat_item)
    )

@action(reads=["chat_history"], writes=["response", "chat_history"])
def ai_response(state: State, client: OpenAI) -> Tuple[dict, State]:
    content = client.chat.completions.create(
        model="gpt-3.5-turbo-0125",
        messages=state["chat_history"],
    ).choices[0].message.content
    chat_item = {
        "content": content,
        "role": "assistant"
    }
    return (
        {"response": content},
        state.update(response=content).append(chat_history=chat_item)
    )

app = (
    ApplicationBuilder()
    .with_actions(human_input, ai_response.bind(client=openai_client))
    .with_transitions(
        ("human_input", "ai_response"),
        ("ai_response", "human_input")
    ).with_state(chat_history=[])
    .with_entrypoint("human_input")
    .build()
)

*_, state = app.run(halt_after=["ai_response"], inputs={"prompt": "日本の総理大臣は誰?"})

.bind()で渡すものは初回だけ必要なものとかStateで管理が不要なもの、常に変わるものとかStateで管理が必要なものはrun()inputsで渡す、みたいな使い分けが良さそう。

kun432kun432

State

Stateクラスは状態を保持し管理するためのインタフェースを提供する。Stateはimmutableでその馬で変更はできない=新しい状態として作成(して上書き)することで、変更する。

from typing import Tuple
from burr.core import State, action

# State初期化
some_state = State({"counter": 0, "foo": "foo", "bar": ["bar"]})

# 全部取り出し
print(some_state.get_all())     # --> {'counter': 0, 'foo': 'bar', 'buz': [1, 2, 3]}

# 特定のフィールドのみ取り出し(ドキュメント通りに配列で指定するとエラーになる、文字列で指定)
print(some_state.subset("counter","foo"))     # --> {'counter': 0, 'foo': 'foo'}

# 特定のフィールドの更新、存在しない場合はフィールドが追加される
some_state = some_state.update(foo="qux")
print(some_state.get_all())     # --> {'counter': 0, 'foo': 'qux', 'bar': ['bar']}

# 特定のリストフィールドへの追加、存在しない場合はフィールドが追加されリストに値が入る
some_state = some_state.append(bar="quux")
print(some_state.get_all())     # --> {'counter': 0, 'foo': 'qux', 'bar': ['bar', 'quux']}

# フィールドが整数値の場合はインクリメント
some_state = some_state.increment(counter=1)
print(some_state.get_all())     # --> {'counter': 1, 'foo': 'qux', 'bar': ['bar', 'quux']}

# 該当のフィールド以外を削除
some_state = some_state.wipe(keep=["counter", "foo"])
print(some_state.get_all())     # --> {'counter': 1, 'foo': 'qux'}

# 該当のフィールドを削除
some_state = some_state.wipe(delete=["foo"])
print(some_state.get_all())     # --> {'counter': 1}

更新時にはgitのcommit/checkout/mergeっぽい動きで更新されるらしい。擬似コードが紹介されていて、実際になんかそれっぽく再現できないかなと思ったけど、手間なので諦めた。

# Stateの初期化
current_state = ...

# Actionが読み取るStateのフィールドを取得
read_state = current_state.subset(action.reads)

# Actionを任意のStateで実行し、結果を取得
result = action.run(new_state)

# Actionが書き込むStateのフィールドを取得
write_state = current_state.subset(action.writes)

# Actionの実行結果から更新後のStateを作成
new_state = action.update(result, new_state)

# 更新後のStateを現在のStateにマージ
current_state = current_state.merge(new_state)
kun432kun432

Applications

https://burr.dagworks.io/concepts/state-machine/

ApplicationBuilderをつかって、ActionとStateをつなぐことで、ステートマシンとなる。最低限定義が必要なのは以下とある。

  1. with_action()に、Actionを**kwargsで渡す
  2. Transitions(条件付きの場合も)
  3. 最初に実行されるエントリーポイント

といいつつ、Stateがなければ使う意味はほとんどないと思うので、実質的には最低限でもこうなると思う。

app = (
    ApplicationBuilder()
    .with_actions(human_input, ai_response)
    .with_transitions(
        ("human_input", "ai_response"),
        ("ai_response", "human_input")
    ).with_state(chat_history=[])
    .with_entrypoint("human_input")
    .build()
)

アプリケーションの実行

アプリケーションの実行方法には3つのAPIが使える。

step/astep

stepは、Action、Actionの結果、実行後の更新されたStateを返し、ステップを1回だけ実行するっぽい。

例えば以下のカウンターのサンプル、そのままrunで実行すると無限ループになるが、

from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import time

@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
    result = {"counter": state["counter"] + 1}
    #print(f"counted to {result['counter']}")
    time.sleep(1)
    return result, state.update(**result)

app = (
    ApplicationBuilder()
    .with_actions(counter)
    .with_transitions(
        ("counter", "counter")
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

stepを使えば、1回だけ実行して終わる。

action, result, state = app.step()
print(action)
print(result)
print(state)
counter: counter -> counter
{'counter': 1}
{'counter': 1, '__SEQUENCE_ID': 0, '__PRIOR_STEP': 'counter'}

再度実行するとこうなる。

counter: counter -> counter
{'counter': 2}
{'counter': 2, '__SEQUENCE_ID': 1, '__PRIOR_STEP': 'counter'}

arunはActionが非同期の場合に使う。上のやつをasync/awaitを使って書いてみるとこんな感じかな?全然意味がないけれども。

from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import asyncio
import nest_asyncio

nest_asyncio.apply()

@action(reads=["counter"], writes=["counter"])
async def counter(state: State) -> Tuple[dict, State]:
    result = {"counter": state["counter"] + 1}
    await asyncio.sleep(1)
    return result, state.update(**result)

app = (
    ApplicationBuilder()
    .with_actions(counter)
    .with_transitions(
        ("counter", "counter")
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

action, result, state = await app.astep()

iterate/aiterate

iterate/aiteratestep/astepと同じだけど、ジェネレータっぽく実行させることができる。

同じくカウンターのサンプル。

from typing import Tuple
from burr.core import action, State, ApplicationBuilder
import time

@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
    result = {"counter": state["counter"] + 1}
    #print(f"counted to {result['counter']}")
    time.sleep(1)
    return result, state.update(**result)

app = (
    ApplicationBuilder()
    .with_actions(counter)
    .with_transitions(
        ("counter", "counter")
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

こんな感じで実行。

for action, result, state in app.iterate():
    print(action)
    print(result)
    print(state)
    print("----")
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counter: counter -> counter
{'counter': 1}
{'counter': 1, '__SEQUENCE_ID': 0, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 2}
{'counter': 2, '__SEQUENCE_ID': 1, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 3}
{'counter': 3, '__SEQUENCE_ID': 2, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 4}
{'counter': 4, '__SEQUENCE_ID': 3, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 5}
{'counter': 5, '__SEQUENCE_ID': 4, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 6}
{'counter': 6, '__SEQUENCE_ID': 5, '__PRIOR_STEP': 'counter'}
----
counter: counter -> counter
{'counter': 7}
{'counter': 7, '__SEQUENCE_ID': 6, '__PRIOR_STEP': 'counter'}

ここで返ってくるactionは、halt_afterもしくはhalt_beforeで指定したものになるらしい。

というかhalt_after/halt_beforeについての説明がGetting Startedでもされていなかったのだけど、実行時にhalt_after/halt_beforeでActionを指定すると、TransitionでそのActionが実行される場合に無限ループの終端になる、というものだと認識している。

halt_after/halt_beforeの違いは、そのActionが実行されてから終わるのか、そのActionが実行される直前で終わるのか、ということみたい。

当然resultの結果も変わる。halt_afterの場合はその結果になるし、halt_beforeの場合はNoneになる。aiterateの場合は返ってこないらしい。

上の例では指定していないので無限ループになっているけれど、halt_after=[], halt_before=[]と同じことっぽい。

aitereateはその非同期版。

run/arun

run/arunは結局のところiterate/aiterateを実行しているということと同じになるみたい。実際上の例も同じ動きになってる。


ちょっと非同期とかジェネレータみたいなのが絡むと一気に難しくなるよなぁ、サンプル作って理解したかったけど、スキルが足りない。。。

kun432kun432

Transitions

https://burr.dagworks.io/concepts/transitions/

TransitionはActionのつながりを指定する、つまりフローを定義する重要な部分。グラフで言うとActionがノードを定義して、Transitionsでノード間のエッジを設定することになる。

Transitionsは3つの要素がある

  • from
  • to
  • condtion

Getting Startedのサンプルだと、"human_input"から"ai_response"、"ai_response"から"human_input"というfrom/toだけを使っていた。

app = (
    ApplicationBuilder()
    .with_actions(human_input, ai_response)
    .with_transitions(
        ("human_input", "ai_response"),
        ("ai_response", "human_input")
    ).with_state(chat_history=[])
    .with_entrypoint("human_input")
    .build()
)

条件を指定することで、一本道ではないフローを作ることができる。

条件の指定例はこんな感じ。

with_transitions(
    ("from", "to", when(foo="bar"),  # State"foo"の値が"bar"になったら評価される
)
with_transitions(
    ("from", "to", expr('epochs>100')) # epocsが100以上になったら評価される
)
with_transitions(
    ("from", "to", default)  # 常に評価される
)
with_transitions(
    ("from", "to") # 指定しない場合はdefaultと同じ
)

否定の場合は~を使う

with_transitions(
    ~("from", "to", when(foo="bar"),  # State"foo"の値が"bar"以外なら評価される
)

指定するキーは必ずStateに存在する必要がある。存在しないStateのキーを指定した場合はエラーになる。

カウンターが10になるまでインクリメントする例。

from typing import Tuple
from burr.core import action, State, ApplicationBuilder
from burr.core import when, expr, default
import time

@action(reads=["counter"], writes=["counter"])
def counter(state: State) -> Tuple[dict, State]:
    result = {"counter": state["counter"] + 1}
    print(f"counted to {result['counter']}")
    time.sleep(1)
    return result, state.update(**result)

app = (
    ApplicationBuilder()
    .with_actions(counter, Result)
    .with_transitions(
        ("counter", "counter", expr('counter<10'))
    )
    .with_state(counter=0)
    .with_entrypoint("counter")
    .build()
)

action, result, state = app.run()
print(action)
print(result)
print(state)
WARNING:burr.core.application:No halt termination specified -- this has the possibility of running forever!
counted to 1
counted to 2
counted to 3
counted to 4
counted to 5
counted to 6
counted to 7
counted to 8
counted to 9
counted to 10
WARNING:burr.core.application:This is trying to return without having computed a single action -- we'll end up just returning some Nones. This means that nothing was executed (E.G. that the state machine had nowhere to go). Either fix the state machine orthe halt conditions, or both... Halt conditions are: halt_before=[], halt_after=[].Note that this is considered undefined behavior -- if you get here, you should fix!
counter: counter -> counter
{'counter': 10}
{'counter': 10, '__SEQUENCE_ID': 9, '__PRIOR_STEP': 'counter'}
kun432kun432

とりあえずこのあたりまでである程度ステートマシンの制御に必要な最低限は概ねカバーしたのではないだろうか。

全体的に良さげな印象は持ってる。あとはLangGraphあたりと使い勝手を比べてみたり、実際に何かしら作ってみて、というところかな。

このスクラップは2024/05/19にクローズされました