Closed4

AWSのエージェントフレームワーク「Strands Agents」 のコンセプト ③Streaming

kun432kun432

続き。

https://zenn.dev/kun432/scraps/e314df9c6c7bb5

https://zenn.dev/kun432/scraps/69b84f892dda9f

https://zenn.dev/kun432/scraps/c5205ddc8f5182

Strands Agentsのコンセプトを見ていく、ということで、次は「ストリーミング」。

https://strandsagents.com/latest/documentation/docs/user-guide/concepts/streaming/async-iterators/

Quickstartでもあったように、Strands Agentsのストリーミングの方法は2つ。

  1. 非同期イテレータ
  2. コールバックハンドラ

それぞれを見ていく

kun432kun432

ストリーミング用非同期イテレータ

https://strandsagents.com/latest/documentation/docs/user-guide/concepts/streaming/async-iterators/

stream_asyncメソッドを使う。これは、 FastAPI ・aiohttp・ Django Channels のような非同期フレームワーク向けに向いている。

import asyncio
from strands import Agent
from strands_tools import calculator

# コールバックハンドラなしでエージェントを初期化
agent = Agent(
    tools=[calculator],
    callback_handler=None
)

# ストリーミングされたエージェントイベントを反復処理する非同期関数
async def process_streaming_response():
    agent_stream = agent.stream_async("2+2 を計算して。")
    async for event in agent_stream:
        if "data" in event:
            # 生成されたテキストチャンクを出力
            print(event["data"], end="\n", flush=True)  # 実際は `end=""` だが説明のため改行
        elif "current_tool_use" in event and event["current_tool_use"].get("name"):
            # ツール使用情報の出力
            print(f"\n[部分的なツール使用情報: {event['current_tool_use']['name']}]")

# エージェントを実行
asyncio.run(process_streaming_response())
出力
2+2を計算します。

[部分的なツール使用情報: calculator]

[部分的なツール使用情報: calculator]

[部分的なツール使用情報: calculator]

[部分的なツール使用情報: calculator]

[部分的なツール使用情報: calculator]
2
+2の計算結果は
 **
4** です。

余談だが、イテレータではないただの非同期には invoke_async が使える。

import asyncio
from strands import Agent
from strands_tools import calculator

agent = Agent(
    tools=[calculator],
)

# 非同期でエージェントを実行する
async def async_response():
    await agent.invoke_async("2+2 を計算して。")

# エージェントを実行
asyncio.run(async_response())
出力
Tool #1: calculator
2+2の計算結果は **4** です。

イテレータが返すストリームイベントは、コールバックハンドラと同じで、以下のイベントがある。

カテゴリ イベントタイプ 説明
テキスト生成 data モデル出力のテキストチャンク
delta モデルからの生の差分データ
ツール current_tool_use 現在使われているツールの情報
toolUseId: このツール呼び出しの一意なID
name: 使用されているツールの名前
input: ツールに渡される入力パラメータ(ストリーミングで蓄積)
ライフサイクル init_event_loop イベントループの初期化時にTrue
start_event_loop イベントループ開始時にTrue
start 新しいサイクル開始時にTrue
message 新しいメッセージが生成されたときに含まれる
event モデルストリームからの生データイベント
force_stop イベントループが強制停止された場合にTrue
force_stop_reason 強制停止の理由
result 最終的な AgentResult が格納される
推論 reasoning 推論イベント時にTrue
reasoningText 推論プロセス中のテキスト
reasoning_signature 推論プロセスのシグネチャ

FastAPIを使った非同期ストリーミングAPIエンドポイントの例

FastAPIとuvicornをインストール

pip install fastapi "uvicorn[standard]"

サーバ側。サンプル通り。

server.py
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from strands import Agent
from strands_tools import calculator, http_request

app = FastAPI()

class PromptRequest(BaseModel):
    prompt: str

@app.post("/stream")
async def stream_response(request: PromptRequest):
    async def generate():
        agent = Agent(
            tools=[calculator, http_request],
            callback_handler=None
        )

        try:
            async for event in agent.stream_async(request.prompt):
                if "data" in event:
                    # クライアントにテキストチャンクのみをストリーミング
                    yield event["data"]
        except Exception as e:
            yield f"Error: {str(e)}"

    return StreamingResponse(
        generate(),
        media_type="text/plain"
    )

サーバ起動

uvicorn server:app

クライアント側

import httpx
import asyncio

async def main():
    async with httpx.AsyncClient() as client:
        # POSTリクエストでストリーミング開始
        async with client.stream(
            "POST",
            "http://localhost:8000/stream",
            json={"prompt": "こんにちは。25 * 48 は?計算過程も説明して"}
        ) as response:
            async for line in response.aiter_lines():
                if line:
                    print(line)

if __name__ == "__main__":
    asyncio.run(main())

実行

python client.py
出力
こんにちは!25 × 48 を計算してみますね。**答え: 1200**
**計算過程の説明:**
25 × 48 の計算にはいくつかの方法があります:
**方法1: 筆算**
```
    48
  × 25
  ----
   240  (48 × 5)
  960   (48 × 20)
  ----
  1200
```
**方法2: 因数分解を使った計算**
- 25 = 5²
- 48 = 16 × 3 = 2⁴ × 3
25 × 48 = 25 × (50 - 2) = 25 × 50 - 25 × 2 = 1250 - 50 = 1200
**方法3: 簡単な分解**
25 × 48 = 25 × (40 + 8) = (25 × 40) + (25 × 8) = 1000 + 200 = 1200
どの方法でも答えは **1200** になります。
kun432kun432

コールバックハンドラ

https://strandsagents.com/latest/documentation/docs/user-guide/concepts/streaming/callback-handlers/

コールバックハンドラは、エージェントの実行中に発生するイベントをインターセプトして処理することができる。 これにより、リアルタイムモニタリング、カスタムな出力フォーマットの変更、外部システムとの統合が可能になる。以下のようなイベントを受け取る。

  • モデルからのテキスト生成
  • ツールの選択と実行
  • 推論プロセス
  • エラーと完了

コールバックハンドラはエージェントにコールバック関数を渡すことで使用できる。

from strands import Agent
from strands_tools import calculator

def custom_callback_handler(**kwargs):
    # ストリーミングデータを処理
    if "data" in kwargs:
        print(f"モデル出力: {kwargs['data']}")
    elif "current_tool_use" in kwargs and kwargs["current_tool_use"].get("name"):
        print(f"\nツール使用: {kwargs['current_tool_use']['name']}")

# カスタムコールバックハンドラを使用してエージェントを作成
agent = Agent(
    tools=[calculator],
    callback_handler=custom_callback_handler
)

agent("2 + 2 を計算して。")
出力

ツール使用: calculator

ツール使用: calculator

ツール使用: calculator

ツール使用: calculator

ツール使用: calculator

ツール使用: calculator
モデル出力: 2
モデル出力:  + 2 の計算
モデル出力: 結果は **
モデル出力: 4** です。

コールバックハンドラのイベントは非同期イテレータと同じなので、1つ前を参照。これをキーワード引数として受け取る。

Strands Agentsではコンソールの出力をフォーマットするデフォルトのコールバックハンドラが提供されている。

from strands import Agent
from strands_tools import calculator
from strands.handlers.callback_handler import PrintingCallbackHandler

# デフォルトのコールバックハンドラは、コンソールに、テキストおよびツール使用を表示
agent = Agent(
    callback_handler=PrintingCallbackHandler(),
    tools=[calculator]
)

agent("2 + 2 を計算して。")
出力
2 + 2 を計算します。
Tool #1: calculator
2 + 2 = 4 です。

これを無効化するにはNoneを渡す。

from strands import Agent
from strands_tools import calculator

agent = Agent(
    callback_handler=None,
    tools=[calculator]
)

response = agent("2 + 2 を計算して。")
print(response)
出力
2 + 2 の計算結果は **4** です。

あぁ、なるほど。レスポンスを変数で受け取って出力しても、コンソール出力が重複するのはこのせいか。

ここだ。

https://github.com/strands-agents/sdk-python/blob/b30e7e6e41e7a2dce70d74e8c1753503959f3619/src/strands/agent/agent.py#L190-L232

最初の例にもあったように、カスタムなコールバックハンドラを使うことで、エージェントからのストリームを細かく制御できる。ここではいくつかの例が紹介されている。

例1: ストリームシーケンス内の全てのイベントを表示

一連のイベントを全て表示するカスタムなコールバックハンドラを指定すると、デバッグに便利

from strands import Agent
from strands_tools import calculator

def debugger_callback_handler(**kwargs):
    # kwargsの値をすべて表示することで、すべての情報を確認できる
    print(kwargs)

agent = Agent(
    tools=[calculator],
    callback_handler=debugger_callback_handler
)

agent("922 + 5321 は?")

ここは出力量が多いので割愛

例2: メッセージごとに出力をバッファリング

ストリーミングだとチャンクごとメッセージの断片が飛んでくるが、このコールバックハンドラでは完全なメッセージが作成されたときにトリガーされる message イベントを利用して、最終的なメッセージだけを表示している。

import json
from strands import Agent
from strands_tools import calculator

def message_buffer_handler(**kwargs):
    # エージェントから新しいメッセージが作成されたとき、その内容を表示
    if "message" in kwargs and kwargs["message"].get("role") == "assistant":
        print(json.dumps(kwargs["message"], indent=2, ensure_ascii=False))

# エージェントとともに使用
agent = Agent(
    tools=[calculator],
    callback_handler=message_buffer_handler
)

agent("2 + 2 を計算して。あと、AWS Lambda についても教えて")
{
  "role": "assistant",
  "content": [
    {
      "text": "\u307e\u305a\u30012 + 2 \u306e\u8a08\u7b97\u3092\u884c\u3044\u307e\u3059\u306d\u3002"
    },
    {
      "toolUse": {
        "toolUseId": "tooluse_PYcw2Z3GTe-2zMw4dvDsEw",
        "name": "calculator",
        "input": {
          "expression": "2 + 2"
        }
      }
    }
  ]
}
{
  "role": "assistant",
  "content": [
    {
      "text": "**\u8a08\u7b97\u7d50\u679c**: 2 + 2 = 4\n\n\u6b21\u306b\u3001AWS Lambda \u306b\u3064\u3044\u3066\u3054\u8aac\u660e\u3057\u307e\u3059\u3002\n\n## AWS Lambda \u3068\u306f\n\nAWS Lambda \u306f\u3001Amazon Web Services (AWS) \u304c\u63d0\u4f9b\u3059\u308b\u30b5\u30fc\u30d0\u30fc\u30ec\u30b9\u30b3\u30f3\u30d4\u30e5\u30fc\u30c6\u30a3\u30f3\u30b0\u30b5\u30fc\u30d3\u30b9\u3067\u3059\u3002\n\n### \u4e3b\u306a\u7279\u5fb4\n\n**\ud83d\udd27 \u30b5\u30fc\u30d0\u30fc\u30ec\u30b9**\n- \u30b5\u30fc\u30d0\u30fc\u306e\u7ba1\u7406\u304c\u4e0d\u8981\n- \u30a4\u30f3\u30d5\u30e9\u306e\u8a2d\u5b9a\u3084\u4fdd\u5b88\u3092 AWS \u304c\u81ea\u52d5\u3067\u884c\u3046\n\n**\ud83d\udcb0 \u5f93\u91cf\u8ab2\u91d1\u5236**\n- \u5b9f\u884c\u6642\u9593\u3068\u30ea\u30af\u30a8\u30b9\u30c8\u6570\u306b\u57fa\u3065\u304f\u8ab2\u91d1\n- \u4f7f\u7528\u3057\u306a\u3044\u6642\u306f\u6599\u91d1\u304c\u767a\u751f\u3057\u306a\u3044\n\n**\u26a1 \u81ea\u52d5\u30b9\u30b1\u30fc\u30ea\u30f3\u30b0**\n- \u30c8\u30e9\u30d5\u30a3\u30c3\u30af\u306b\u5fdc\u3058\u3066\u81ea\u52d5\u3067\u30b9\u30b1\u30fc\u30eb\u30a2\u30c3\u30d7\u30fb\u30c0\u30a6\u30f3\n- \u540c\u6642\u5b9f\u884c\u6570\u306e\u5236\u5fa1\u3082\u53ef\u80fd\n\n### \u4e3b\u306a\u7528\u9014\n\n1. **API \u306e\u30d0\u30c3\u30af\u30a8\u30f3\u30c9\u51e6\u7406**\n   - REST API \u3084 GraphQL \u306e\u30a8\u30f3\u30c9\u30dd\u30a4\u30f3\u30c8\n\n2. **\u30a4\u30d9\u30f3\u30c8\u99c6\u52d5\u51e6\u7406**\n   - S3 \u30d5\u30a1\u30a4\u30eb\u30a2\u30c3\u30d7\u30ed\u30fc\u30c9\u6642\u306e\u51e6\u7406\n   - DynamoDB \u306e\u5909\u66f4\u30c8\u30ea\u30ac\u30fc\n\n3. **\u30b9\u30b1\u30b8\u30e5\u30fc\u30eb\u5b9f\u884c**\n   - \u5b9a\u671f\u7684\u306a\u30d0\u30c3\u30c1\u51e6\u7406\n   - \u30c7\u30fc\u30bf\u30d9\u30fc\u30b9\u306e\u30d0\u30c3\u30af\u30a2\u30c3\u30d7\n\n4. **\u30ea\u30a2\u30eb\u30bf\u30a4\u30e0\u51e6\u7406**\n   - \u30ed\u30b0\u5206\u6790\n   - IoT \u30c7\u30fc\u30bf\u306e\u51e6\u7406\n\n### \u5bfe\u5fdc\u8a00\u8a9e\n- Python\n- Node.js\n- Java\n- C#\n- Go\n- Ruby\n- \u306a\u3069\n\n### \u30e1\u30ea\u30c3\u30c8\u30fb\u30c7\u30e1\u30ea\u30c3\u30c8\n\n**\u30e1\u30ea\u30c3\u30c8:**\n- \u904b\u7528\u30b3\u30b9\u30c8\u304c\u4f4e\u3044\n- \u9ad8\u53ef\u7528\u6027\n- \u81ea\u52d5\u30b9\u30b1\u30fc\u30ea\u30f3\u30b0\n\n**\u30c7\u30e1\u30ea\u30c3\u30c8:**\n- \u30b3\u30fc\u30eb\u30c9\u30b9\u30bf\u30fc\u30c8\u306b\u3088\u308b\u9045\u5ef6\n- \u5b9f\u884c\u6642\u9593\u306e\u5236\u9650\uff08\u6700\u592715\u5206\uff09\n- \u30d9\u30f3\u30c0\u30fc\u30ed\u30c3\u30af\u30a4\u30f3\n\n\u4f55\u304b\u7279\u5b9a\u306e AWS Lambda \u306e\u6a5f\u80fd\u306b\u3064\u3044\u3066\u8a73\u3057\u304f\u77e5\u308a\u305f\u3044\u3053\u3068\u304c\u3042\u308c\u3070\u3001\u304a\u805e\u304b\u305b\u304f\u3060\u3055\u3044\uff01"
    }
  ]
}

例3: イベントループのライフサイクルの追跡

イベントループのライフサイクルイベントがどのように関連しているかを出力する。

from strands import Agent
from strands_tools import calculator

def event_loop_tracker(**kwargs):
    # イベントループライフサイクルを追跡
    if kwargs.get("init_event_loop", False):
        print("🔄 イベントループ初期化")
    elif kwargs.get("start_event_loop", False):
        print("▶️ イベントループサイクル開始")
    elif kwargs.get("start", False):
        print("📝 新しいサイクル開始")
    elif "message" in kwargs:
        print(f"📬 新しいメッセージ作成: {kwargs['message']['role']}")
    elif kwargs.get("complete", False):
        print("✅ サイクル完了")
    elif kwargs.get("force_stop", False):
        print(f"🛑 イベントループ強制停止: {kwargs.get('force_stop_reason', '不明な理由')}")

    # ツール使用を追跡
    if "current_tool_use" in kwargs and kwargs["current_tool_use"].get("name"):
        tool_name = kwargs["current_tool_use"]["name"]
        print(f"🔧 ツール使用: {tool_name}")

    # 出力をクリーンに保つため、テキストの一部のみを表示
    if "data" in kwargs:
        # デモのため、各チャンクの最初の20文字のみを表示
        data_snippet = kwargs["data"][:20] + ("..." if len(kwargs["data"]) > 20 else "")
        print(f"📟 テキスト: {data_snippet}")

# イベントループトラッカーを使用してエージェントを作成
agent = Agent(
    tools=[calculator],
    callback_handler=event_loop_tracker
)

# これは、コンソールに完全なイベントライフサイクルを表示する
agent("フランスの首都は?あと、42+7 は?")
出力
🔄 イベントループ初期化
📝 新しいサイクル開始
▶️ イベントループサイクル開始
📟 テキスト: フランスの首都は
📟 テキスト: パリ
📟 テキスト: (Paris)です。

42
📟 テキスト:  + 7 の
📟 テキスト: 計算を
📟 テキスト: します
📟 テキスト: ね。
🔧 ツール使用: calculator
🔧 ツール使用: calculator
🔧 ツール使用: calculator
🔧 ツール使用: calculator
🔧 ツール使用: calculator
📬 新しいメッセージ作成: assistant
📬 新しいメッセージ作成: user
📝 新しいサイクル開始
📝 新しいサイクル開始
▶️ イベントループサイクル開始
📟 テキスト: 42
📟 テキスト:  + 7 = 49
📟 テキスト:  
📟 テキスト: です
📟 テキスト: 。
📬 新しいメッセージ作成: assistant

イベントループのライフサイクルイベントの流れがわかる。

  • まず、イベントループが初期化される(init_event_loop)
  • それからサイクルが始まる (start_event_loop)
  • 新しいサイクルは実行中に複数回開始することができる (start)
  • テキスト生成とツール使用イベントがサイクル中に発生する。
  • 最後に、サイクルが完了(完了)するか、強制的に停止される。

コールバックハンドラのベストプラクティスは以下

  1. 速く処理すること
    • コールバックハンドラはエージェントの実行中のクリティカルなタイミングで呼ばれる。
    • 処理が遅いと全体のパフォーマンスが下がってしまう。
  2. あらゆるイベントタイプに対応すること
    • どんなイベントが来てもちゃんと処理できるようにしておく。
  3. エラーはgracefulに処理すること
    • ハンドラーの中で例外を処理すること
    • エラーが起きても、全体が止まらないように
  4. 状態管理は‎request_stateでやること
    • 途中でデータを貯めたり、何か記録したいときは‎request_stateを使って管理すること
kun432kun432

まとめ

とりあえず一通り気になるところは抑えたかな。最近フルスタックなエージェントフレームワークを色々見ていたせいか、それらに比較するとやはり薄いフレームワークという印象。その分、学習ハードルは低めだと思うし、細いところも把握しやすそう。反面、フルスタックであるようなRAGのインテグレーションとか複雑なメモリとかはないので、そこは自分で実装するか、もしくはBedrock AgentCoreとかを見る、ってことになるのかな?

とりあえずBedrock AgentCoreに向けた素振りは一旦完了。

このスクラップは2ヶ月前にクローズされました