🐕
セマンティックカーネルからWorkflowを繋げる
はじめに
よく分かっていないです。
Semantic KernelからWorkflowを呼んでみました。
Microsoft Agent Framework = Semantic Kernel + AutoGen
とのことです。
私が実装した内容はSemantic KernelからプラグインとしてWorkflowを繋げていて、Semantic Kernel自身が必要に応じてWorkflowの関数を呼んでいます。
これがMicrosoft Agent Framework??
誰か詳しい人教えてください。
ちなみに、こちらの記事のワークフローオーケストレーションのコードを一部変えて使っています。
実装
この記事で実装したSemantic Kernelの続きからやっていきます。
実装しているディレクトリ構成のイメージとしては次です。
backend
|-- main.py
|-- plugins/utils.py
|-- workflows/calculate.py <- ここにワークフローのコードを追加
@kernel_functionにしてワークフローを呼ぶので、処理フローはmain.py -> utils.py -> calculate.pyになります。
workflows/calculate.py
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler, WorkflowViz
from typing_extensions import Never
class Dispatcher(Executor):
"""
このエグゼキューターの唯一の目的は、ワークフローの入力を
他のエグゼキューターに分配することです。
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("入力は有効な整数のリストである必要があります。")
await ctx.send_message(numbers)
class Average(Executor):
"""整数のリストの平均を計算します。"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""整数のリストの合計を計算します。"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
class Aggregator(Executor):
"""異なるタスクからの結果を集約し、最終的な出力を生成します。"""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""ソースエグゼキューターからの結果を受信します。
フレームワークは自動的にソースエグゼキューターからのメッセージを収集し、
リストとして配信します。
Args:
results (list[int | float]): 上流のエグゼキューターからの実行結果。
型注釈は、上流のエグゼキューターが生成するユニオン型のリストである必要があります。
ctx (WorkflowContext[Never, list[int | float]]): 最終的な出力を生成できるワークフローコンテキスト。
"""
await ctx.yield_output(results)
async def calc(numbers: list[int] | None = None) -> dict:
"""ワークフローを実行し、入力リストの合計と平均を返します。
Args:
numbers: 計算対象の整数リスト。None の場合は 10 個のランダム値を生成します。
Returns:
dict: {
"input_numbers": list[int],
"workflow_results": list[int | float],
"sum": int,
"average": float,
}
"""
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
viz = WorkflowViz(workflow)
try:
viz.save_svg("workflow_architecture.svg")
except Exception:
print("Failed to save workflow architecture visualization.")
pass
input_numbers: list[int] = numbers or [random.randint(1, 100) for _ in range(10)]
output: list[int | float] | None = None
async for event in workflow.run_stream(input_numbers):
if isinstance(event, WorkflowOutputEvent):
output = event.data
return {
"input_numbers": input_numbers,
"workflow_results": output or [],
"sum": output[0],
"average": output[1],
}
plugins/utils.py
class CalcPlugin:
"""合計と平均の計算をエージェントにツールとして提供します。
内部で `workflows.calculate.calc` を利用して、必要に応じてランダム生成も可能です。
"""
@kernel_function(
name="calc_numbers",
description=(
"整数配列の合計と平均を計算します。入力が省略された場合は 10 個のランダムな整数を生成します。"
),
)
async def calc_numbers(self, numbers: list[int] | None = None) -> dict:
print("CalcPlugin::合計と平均の計算をエージェントにツールとして提供します。")
try:
from workflows.calculate import calc as _calc
except Exception as e:
return {"error": f"Failed to import workflow: {e}"}
try:
result = await _calc(numbers)
return result
except Exception as e:
return {"error": f"Calculation failed: {e}"}
utils.pyの内容をインポートしてmain.pyに追記する。
kernel.add_plugin(CalcPlugin(), plugin_name="Calc")
実行結果
実装前
実装前はそもそもの情報量が少ないので、10個の数字を聞き返します。
実装後
Workflowを使って計算結果を得てPluginから実装した内容を引っ張ってきてくれていそうです!
実装後はプラグインとしてのツールを持っていますので、どんな情報をプラグインに渡せば良いか理解して、雑な指示でも計算結果を返してくれています。
Microsoft Agent Frameworkについて理解できていませんが、少しずつ勉強していきたいと思いました!
Discussion