🐕

セマンティックカーネルからWorkflowを繋げる

に公開

はじめに

よく分かっていないです。
Semantic KernelからWorkflowを呼んでみました。

https://zenn.dev/headwaters/articles/2d8222bf2214ad

Microsoft Agent Framework = Semantic Kernel + AutoGen

とのことです。

私が実装した内容はSemantic KernelからプラグインとしてWorkflowを繋げていて、Semantic Kernel自身が必要に応じてWorkflowの関数を呼んでいます。
これがMicrosoft Agent Framework??
誰か詳しい人教えてください。

https://zenn.dev/headwaters/articles/2d8222bf2214ad#ワークフローオーケストレーション

ちなみに、こちらの記事のワークフローオーケストレーションのコードを一部変えて使っています。

実装

https://zenn.dev/headwaters/articles/ef5f4f0bebd14d

この記事で実装したSemantic Kernelの続きからやっていきます。
実装しているディレクトリ構成のイメージとしては次です。

backend
   |-- main.py
   |-- plugins/utils.py
   |-- workflows/calculate.py <- ここにワークフローのコードを追加

@kernel_functionにしてワークフローを呼ぶので、処理フローはmain.py -> utils.py -> calculate.pyになります。

workflows/calculate.py
import random

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler, WorkflowViz
from typing_extensions import Never

class Dispatcher(Executor):
    """
    このエグゼキューターの唯一の目的は、ワークフローの入力を
    他のエグゼキューターに分配することです。
    """

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("入力は有効な整数のリストである必要があります。")

        await ctx.send_message(numbers)

class Average(Executor):
    """整数のリストの平均を計算します。"""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """整数のリストの合計を計算します。"""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)

class Aggregator(Executor):
    """異なるタスクからの結果を集約し、最終的な出力を生成します。"""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """ソースエグゼキューターからの結果を受信します。

        フレームワークは自動的にソースエグゼキューターからのメッセージを収集し、
        リストとして配信します。

        Args:
            results (list[int | float]): 上流のエグゼキューターからの実行結果。
                型注釈は、上流のエグゼキューターが生成するユニオン型のリストである必要があります。
            ctx (WorkflowContext[Never, list[int | float]]): 最終的な出力を生成できるワークフローコンテキスト。
        """
        await ctx.yield_output(results)

async def calc(numbers: list[int] | None = None) -> dict:
    """ワークフローを実行し、入力リストの合計と平均を返します。

    Args:
        numbers: 計算対象の整数リスト。None の場合は 10 個のランダム値を生成します。

    Returns:
        dict: {
            "input_numbers": list[int],
            "workflow_results": list[int | float],
            "sum": int,
            "average": float,
        }
    """
    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")

    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

    viz = WorkflowViz(workflow)
    try:
        viz.save_svg("workflow_architecture.svg")
    except Exception:
        print("Failed to save workflow architecture visualization.")
        pass

    input_numbers: list[int] = numbers or [random.randint(1, 100) for _ in range(10)]

    output: list[int | float] | None = None
    async for event in workflow.run_stream(input_numbers):
        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    return {
        "input_numbers": input_numbers,
        "workflow_results": output or [],
        "sum": output[0],
        "average": output[1],
    }
plugins/utils.py
class CalcPlugin:
    """合計と平均の計算をエージェントにツールとして提供します。

    内部で `workflows.calculate.calc` を利用して、必要に応じてランダム生成も可能です。
    """

    @kernel_function(
        name="calc_numbers",
        description=(
            "整数配列の合計と平均を計算します。入力が省略された場合は 10 個のランダムな整数を生成します。"
        ),
    )
    async def calc_numbers(self, numbers: list[int] | None = None) -> dict:
        print("CalcPlugin::合計と平均の計算をエージェントにツールとして提供します。")
        try:
            from workflows.calculate import calc as _calc
        except Exception as e:
            return {"error": f"Failed to import workflow: {e}"}

        try:
            result = await _calc(numbers)
            return result
        except Exception as e:
            return {"error": f"Calculation failed: {e}"}

utils.pyの内容をインポートしてmain.pyに追記する。

kernel.add_plugin(CalcPlugin(), plugin_name="Calc")

実行結果

実装前


実装前はそもそもの情報量が少ないので、10個の数字を聞き返します。

実装後


Workflowを使って計算結果を得てPluginから実装した内容を引っ張ってきてくれていそうです!
実装後はプラグインとしてのツールを持っていますので、どんな情報をプラグインに渡せば良いか理解して、雑な指示でも計算結果を返してくれています。

Microsoft Agent Frameworkについて理解できていませんが、少しずつ勉強していきたいと思いました!

ヘッドウォータース

Discussion