AWSのエージェントフレームワーク「Strands Agents」 のコンセプト ③Streaming
続き。
Strands Agentsのコンセプトを見ていく、ということで、次は「ストリーミング」。
Quickstartでもあったように、Strands Agentsのストリーミングの方法は2つ。
- 非同期イテレータ
- コールバックハンドラ
それぞれを見ていく
ストリーミング用非同期イテレータ
stream_async
メソッドを使う。これは、 FastAPI ・aiohttp・ Django Channels のような非同期フレームワーク向けに向いている。
import asyncio
from strands import Agent
from strands_tools import calculator
# コールバックハンドラなしでエージェントを初期化
agent = Agent(
tools=[calculator],
callback_handler=None
)
# ストリーミングされたエージェントイベントを反復処理する非同期関数
async def process_streaming_response():
agent_stream = agent.stream_async("2+2 を計算して。")
async for event in agent_stream:
if "data" in event:
# 生成されたテキストチャンクを出力
print(event["data"], end="\n", flush=True) # 実際は `end=""` だが説明のため改行
elif "current_tool_use" in event and event["current_tool_use"].get("name"):
# ツール使用情報の出力
print(f"\n[部分的なツール使用情報: {event['current_tool_use']['name']}]")
# エージェントを実行
asyncio.run(process_streaming_response())
2+2を計算します。
[部分的なツール使用情報: calculator]
[部分的なツール使用情報: calculator]
[部分的なツール使用情報: calculator]
[部分的なツール使用情報: calculator]
[部分的なツール使用情報: calculator]
2
+2の計算結果は
**
4** です。
余談だが、イテレータではないただの非同期には invoke_async
が使える。
import asyncio
from strands import Agent
from strands_tools import calculator
agent = Agent(
tools=[calculator],
)
# 非同期でエージェントを実行する
async def async_response():
await agent.invoke_async("2+2 を計算して。")
# エージェントを実行
asyncio.run(async_response())
Tool #1: calculator
2+2の計算結果は **4** です。
イテレータが返すストリームイベントは、コールバックハンドラと同じで、以下のイベントがある。
カテゴリ | イベントタイプ | 説明 |
---|---|---|
テキスト生成 | data |
モデル出力のテキストチャンク |
delta |
モデルからの生の差分データ | |
ツール | current_tool_use |
現在使われているツールの情報toolUseId : このツール呼び出しの一意なIDname : 使用されているツールの名前input : ツールに渡される入力パラメータ(ストリーミングで蓄積) |
ライフサイクル | init_event_loop |
イベントループの初期化時にTrue |
start_event_loop |
イベントループ開始時にTrue | |
start |
新しいサイクル開始時にTrue | |
message |
新しいメッセージが生成されたときに含まれる | |
event |
モデルストリームからの生データイベント | |
force_stop |
イベントループが強制停止された場合にTrue | |
force_stop_reason |
強制停止の理由 | |
result |
最終的な AgentResult が格納される | |
推論 | reasoning |
推論イベント時にTrue |
reasoningText |
推論プロセス中のテキスト | |
reasoning_signature |
推論プロセスのシグネチャ |
FastAPIを使った非同期ストリーミングAPIエンドポイントの例
FastAPIとuvicornをインストール
pip install fastapi "uvicorn[standard]"
サーバ側。サンプル通り。
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from strands import Agent
from strands_tools import calculator, http_request
app = FastAPI()
class PromptRequest(BaseModel):
prompt: str
@app.post("/stream")
async def stream_response(request: PromptRequest):
async def generate():
agent = Agent(
tools=[calculator, http_request],
callback_handler=None
)
try:
async for event in agent.stream_async(request.prompt):
if "data" in event:
# クライアントにテキストチャンクのみをストリーミング
yield event["data"]
except Exception as e:
yield f"Error: {str(e)}"
return StreamingResponse(
generate(),
media_type="text/plain"
)
サーバ起動
uvicorn server:app
クライアント側
import httpx
import asyncio
async def main():
async with httpx.AsyncClient() as client:
# POSTリクエストでストリーミング開始
async with client.stream(
"POST",
"http://localhost:8000/stream",
json={"prompt": "こんにちは。25 * 48 は?計算過程も説明して"}
) as response:
async for line in response.aiter_lines():
if line:
print(line)
if __name__ == "__main__":
asyncio.run(main())
実行
python client.py
こんにちは!25 × 48 を計算してみますね。**答え: 1200**
**計算過程の説明:**
25 × 48 の計算にはいくつかの方法があります:
**方法1: 筆算**
```
48
× 25
----
240 (48 × 5)
960 (48 × 20)
----
1200
```
**方法2: 因数分解を使った計算**
- 25 = 5²
- 48 = 16 × 3 = 2⁴ × 3
25 × 48 = 25 × (50 - 2) = 25 × 50 - 25 × 2 = 1250 - 50 = 1200
**方法3: 簡単な分解**
25 × 48 = 25 × (40 + 8) = (25 × 40) + (25 × 8) = 1000 + 200 = 1200
どの方法でも答えは **1200** になります。
コールバックハンドラ
コールバックハンドラは、エージェントの実行中に発生するイベントをインターセプトして処理することができる。 これにより、リアルタイムモニタリング、カスタムな出力フォーマットの変更、外部システムとの統合が可能になる。以下のようなイベントを受け取る。
- モデルからのテキスト生成
- ツールの選択と実行
- 推論プロセス
- エラーと完了
コールバックハンドラはエージェントにコールバック関数を渡すことで使用できる。
from strands import Agent
from strands_tools import calculator
def custom_callback_handler(**kwargs):
# ストリーミングデータを処理
if "data" in kwargs:
print(f"モデル出力: {kwargs['data']}")
elif "current_tool_use" in kwargs and kwargs["current_tool_use"].get("name"):
print(f"\nツール使用: {kwargs['current_tool_use']['name']}")
# カスタムコールバックハンドラを使用してエージェントを作成
agent = Agent(
tools=[calculator],
callback_handler=custom_callback_handler
)
agent("2 + 2 を計算して。")
ツール使用: calculator
ツール使用: calculator
ツール使用: calculator
ツール使用: calculator
ツール使用: calculator
ツール使用: calculator
モデル出力: 2
モデル出力: + 2 の計算
モデル出力: 結果は **
モデル出力: 4** です。
コールバックハンドラのイベントは非同期イテレータと同じなので、1つ前を参照。これをキーワード引数として受け取る。
Strands Agentsではコンソールの出力をフォーマットするデフォルトのコールバックハンドラが提供されている。
from strands import Agent
from strands_tools import calculator
from strands.handlers.callback_handler import PrintingCallbackHandler
# デフォルトのコールバックハンドラは、コンソールに、テキストおよびツール使用を表示
agent = Agent(
callback_handler=PrintingCallbackHandler(),
tools=[calculator]
)
agent("2 + 2 を計算して。")
2 + 2 を計算します。
Tool #1: calculator
2 + 2 = 4 です。
これを無効化するにはNone
を渡す。
from strands import Agent
from strands_tools import calculator
agent = Agent(
callback_handler=None,
tools=[calculator]
)
response = agent("2 + 2 を計算して。")
print(response)
2 + 2 の計算結果は **4** です。
あぁ、なるほど。レスポンスを変数で受け取って出力しても、コンソール出力が重複するのはこのせいか。
ここだ。
最初の例にもあったように、カスタムなコールバックハンドラを使うことで、エージェントからのストリームを細かく制御できる。ここではいくつかの例が紹介されている。
例1: ストリームシーケンス内の全てのイベントを表示
一連のイベントを全て表示するカスタムなコールバックハンドラを指定すると、デバッグに便利
from strands import Agent
from strands_tools import calculator
def debugger_callback_handler(**kwargs):
# kwargsの値をすべて表示することで、すべての情報を確認できる
print(kwargs)
agent = Agent(
tools=[calculator],
callback_handler=debugger_callback_handler
)
agent("922 + 5321 は?")
ここは出力量が多いので割愛
例2: メッセージごとに出力をバッファリング
ストリーミングだとチャンクごとメッセージの断片が飛んでくるが、このコールバックハンドラでは完全なメッセージが作成されたときにトリガーされる message
イベントを利用して、最終的なメッセージだけを表示している。
import json
from strands import Agent
from strands_tools import calculator
def message_buffer_handler(**kwargs):
# エージェントから新しいメッセージが作成されたとき、その内容を表示
if "message" in kwargs and kwargs["message"].get("role") == "assistant":
print(json.dumps(kwargs["message"], indent=2, ensure_ascii=False))
# エージェントとともに使用
agent = Agent(
tools=[calculator],
callback_handler=message_buffer_handler
)
agent("2 + 2 を計算して。あと、AWS Lambda についても教えて")
{
"role": "assistant",
"content": [
{
"text": "\u307e\u305a\u30012 + 2 \u306e\u8a08\u7b97\u3092\u884c\u3044\u307e\u3059\u306d\u3002"
},
{
"toolUse": {
"toolUseId": "tooluse_PYcw2Z3GTe-2zMw4dvDsEw",
"name": "calculator",
"input": {
"expression": "2 + 2"
}
}
}
]
}
{
"role": "assistant",
"content": [
{
"text": "**\u8a08\u7b97\u7d50\u679c**: 2 + 2 = 4\n\n\u6b21\u306b\u3001AWS Lambda \u306b\u3064\u3044\u3066\u3054\u8aac\u660e\u3057\u307e\u3059\u3002\n\n## AWS Lambda \u3068\u306f\n\nAWS Lambda \u306f\u3001Amazon Web Services (AWS) \u304c\u63d0\u4f9b\u3059\u308b\u30b5\u30fc\u30d0\u30fc\u30ec\u30b9\u30b3\u30f3\u30d4\u30e5\u30fc\u30c6\u30a3\u30f3\u30b0\u30b5\u30fc\u30d3\u30b9\u3067\u3059\u3002\n\n### \u4e3b\u306a\u7279\u5fb4\n\n**\ud83d\udd27 \u30b5\u30fc\u30d0\u30fc\u30ec\u30b9**\n- \u30b5\u30fc\u30d0\u30fc\u306e\u7ba1\u7406\u304c\u4e0d\u8981\n- \u30a4\u30f3\u30d5\u30e9\u306e\u8a2d\u5b9a\u3084\u4fdd\u5b88\u3092 AWS \u304c\u81ea\u52d5\u3067\u884c\u3046\n\n**\ud83d\udcb0 \u5f93\u91cf\u8ab2\u91d1\u5236**\n- \u5b9f\u884c\u6642\u9593\u3068\u30ea\u30af\u30a8\u30b9\u30c8\u6570\u306b\u57fa\u3065\u304f\u8ab2\u91d1\n- \u4f7f\u7528\u3057\u306a\u3044\u6642\u306f\u6599\u91d1\u304c\u767a\u751f\u3057\u306a\u3044\n\n**\u26a1 \u81ea\u52d5\u30b9\u30b1\u30fc\u30ea\u30f3\u30b0**\n- \u30c8\u30e9\u30d5\u30a3\u30c3\u30af\u306b\u5fdc\u3058\u3066\u81ea\u52d5\u3067\u30b9\u30b1\u30fc\u30eb\u30a2\u30c3\u30d7\u30fb\u30c0\u30a6\u30f3\n- \u540c\u6642\u5b9f\u884c\u6570\u306e\u5236\u5fa1\u3082\u53ef\u80fd\n\n### \u4e3b\u306a\u7528\u9014\n\n1. **API \u306e\u30d0\u30c3\u30af\u30a8\u30f3\u30c9\u51e6\u7406**\n - REST API \u3084 GraphQL \u306e\u30a8\u30f3\u30c9\u30dd\u30a4\u30f3\u30c8\n\n2. **\u30a4\u30d9\u30f3\u30c8\u99c6\u52d5\u51e6\u7406**\n - S3 \u30d5\u30a1\u30a4\u30eb\u30a2\u30c3\u30d7\u30ed\u30fc\u30c9\u6642\u306e\u51e6\u7406\n - DynamoDB \u306e\u5909\u66f4\u30c8\u30ea\u30ac\u30fc\n\n3. **\u30b9\u30b1\u30b8\u30e5\u30fc\u30eb\u5b9f\u884c**\n - \u5b9a\u671f\u7684\u306a\u30d0\u30c3\u30c1\u51e6\u7406\n - \u30c7\u30fc\u30bf\u30d9\u30fc\u30b9\u306e\u30d0\u30c3\u30af\u30a2\u30c3\u30d7\n\n4. **\u30ea\u30a2\u30eb\u30bf\u30a4\u30e0\u51e6\u7406**\n - \u30ed\u30b0\u5206\u6790\n - IoT \u30c7\u30fc\u30bf\u306e\u51e6\u7406\n\n### \u5bfe\u5fdc\u8a00\u8a9e\n- Python\n- Node.js\n- Java\n- C#\n- Go\n- Ruby\n- \u306a\u3069\n\n### \u30e1\u30ea\u30c3\u30c8\u30fb\u30c7\u30e1\u30ea\u30c3\u30c8\n\n**\u30e1\u30ea\u30c3\u30c8:**\n- \u904b\u7528\u30b3\u30b9\u30c8\u304c\u4f4e\u3044\n- \u9ad8\u53ef\u7528\u6027\n- \u81ea\u52d5\u30b9\u30b1\u30fc\u30ea\u30f3\u30b0\n\n**\u30c7\u30e1\u30ea\u30c3\u30c8:**\n- \u30b3\u30fc\u30eb\u30c9\u30b9\u30bf\u30fc\u30c8\u306b\u3088\u308b\u9045\u5ef6\n- \u5b9f\u884c\u6642\u9593\u306e\u5236\u9650\uff08\u6700\u592715\u5206\uff09\n- \u30d9\u30f3\u30c0\u30fc\u30ed\u30c3\u30af\u30a4\u30f3\n\n\u4f55\u304b\u7279\u5b9a\u306e AWS Lambda \u306e\u6a5f\u80fd\u306b\u3064\u3044\u3066\u8a73\u3057\u304f\u77e5\u308a\u305f\u3044\u3053\u3068\u304c\u3042\u308c\u3070\u3001\u304a\u805e\u304b\u305b\u304f\u3060\u3055\u3044\uff01"
}
]
}
例3: イベントループのライフサイクルの追跡
イベントループのライフサイクルイベントがどのように関連しているかを出力する。
from strands import Agent
from strands_tools import calculator
def event_loop_tracker(**kwargs):
# イベントループライフサイクルを追跡
if kwargs.get("init_event_loop", False):
print("🔄 イベントループ初期化")
elif kwargs.get("start_event_loop", False):
print("▶️ イベントループサイクル開始")
elif kwargs.get("start", False):
print("📝 新しいサイクル開始")
elif "message" in kwargs:
print(f"📬 新しいメッセージ作成: {kwargs['message']['role']}")
elif kwargs.get("complete", False):
print("✅ サイクル完了")
elif kwargs.get("force_stop", False):
print(f"🛑 イベントループ強制停止: {kwargs.get('force_stop_reason', '不明な理由')}")
# ツール使用を追跡
if "current_tool_use" in kwargs and kwargs["current_tool_use"].get("name"):
tool_name = kwargs["current_tool_use"]["name"]
print(f"🔧 ツール使用: {tool_name}")
# 出力をクリーンに保つため、テキストの一部のみを表示
if "data" in kwargs:
# デモのため、各チャンクの最初の20文字のみを表示
data_snippet = kwargs["data"][:20] + ("..." if len(kwargs["data"]) > 20 else "")
print(f"📟 テキスト: {data_snippet}")
# イベントループトラッカーを使用してエージェントを作成
agent = Agent(
tools=[calculator],
callback_handler=event_loop_tracker
)
# これは、コンソールに完全なイベントライフサイクルを表示する
agent("フランスの首都は?あと、42+7 は?")
🔄 イベントループ初期化
📝 新しいサイクル開始
▶️ イベントループサイクル開始
📟 テキスト: フランスの首都は
📟 テキスト: パリ
📟 テキスト: (Paris)です。
42
📟 テキスト: + 7 の
📟 テキスト: 計算を
📟 テキスト: します
📟 テキスト: ね。
🔧 ツール使用: calculator
🔧 ツール使用: calculator
🔧 ツール使用: calculator
🔧 ツール使用: calculator
🔧 ツール使用: calculator
📬 新しいメッセージ作成: assistant
📬 新しいメッセージ作成: user
📝 新しいサイクル開始
📝 新しいサイクル開始
▶️ イベントループサイクル開始
📟 テキスト: 42
📟 テキスト: + 7 = 49
📟 テキスト:
📟 テキスト: です
📟 テキスト: 。
📬 新しいメッセージ作成: assistant
イベントループのライフサイクルイベントの流れがわかる。
- まず、イベントループが初期化される(init_event_loop)
- それからサイクルが始まる (start_event_loop)
- 新しいサイクルは実行中に複数回開始することができる (start)
- テキスト生成とツール使用イベントがサイクル中に発生する。
- 最後に、サイクルが完了(完了)するか、強制的に停止される。
コールバックハンドラのベストプラクティスは以下
-
速く処理すること
- コールバックハンドラはエージェントの実行中のクリティカルなタイミングで呼ばれる。
- 処理が遅いと全体のパフォーマンスが下がってしまう。
-
あらゆるイベントタイプに対応すること
- どんなイベントが来てもちゃんと処理できるようにしておく。
-
エラーはgracefulに処理すること
- ハンドラーの中で例外を処理すること
- エラーが起きても、全体が止まらないように
-
状態管理は
request_state
でやること- 途中でデータを貯めたり、何か記録したいときは
request_state
を使って管理すること
- 途中でデータを貯めたり、何か記録したいときは
まとめ
とりあえず一通り気になるところは抑えたかな。最近フルスタックなエージェントフレームワークを色々見ていたせいか、それらに比較するとやはり薄いフレームワークという印象。その分、学習ハードルは低めだと思うし、細いところも把握しやすそう。反面、フルスタックであるようなRAGのインテグレーションとか複雑なメモリとかはないので、そこは自分で実装するか、もしくはBedrock AgentCoreとかを見る、ってことになるのかな?
とりあえずBedrock AgentCoreに向けた素振りは一旦完了。