🏭

Microsoft Agent Frameworkのワークフローを見てみる

に公開

Agent Frameworkとは

Microsoft Agent Frameworkは、AIエージェントとマルチエージェント・ワークフローを構築するためのオープンソースSDK/ランタイムです。
もともと、Microsoft製のAIエージェントフレームワークにはSemantic KernelとAutoGenの2つが存在していましたが、これらの長所を統合する新たなフレームワークとしてリリースされました。

今回は、業務プロセスにAIエージェントを組み込んで自動化した処理を定義することのできるWorkflowsという機能を中心にみていきます。

https://zenn.dev/headwaters/articles/2d8222bf2214ad

Workflowsを見てみる

主要なコンポーネントや機能の実装例を紹介します。(Python)

ワークフローの基本概念

有向グラフ構造アーキテクチャにより、柔軟なワークフローを直感的に定義できます。

ワークフローの構築には以下のコンポーネントを組み合わせていきます。

  • Executor:ワークフロー内の個々の処理単位を表します。 AI エージェントまたはカスタム ロジック コンポーネントを指定できます。 入力メッセージを受信し、特定のタスクを実行し、出力メッセージを生成します。
  • Edge:Executor 間の接続を定義し、メッセージのフローを決定します。
  • Workflows:Executor とEdgeで構成される有向グラフです。 これらは、最初の Executor から始まり、エッジで定義された条件とロジックに基づいてさまざまなパスを進むプロセス全体を定義する

Executor

グラフ構造のノード(四角い部分)にあたります。処理を実行する部分です。

Executorは、Executor基底クラスを継承して作成します。@handlerデコレーターで修飾されたメソッドがExecutorで実行されます。

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class UpperCase(Executor):

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """入力された文字列を大文字に変換し、次のノードに転送します。"""
        # ここにExecutorで実行する任意の処理を記述できます。
        await ctx.send_message(text.upper())
  • ワークフローは、Messageというオブジェクトでノード間のデータの受け渡しやルーティングの管理をします。

  • ハンドラーは、ctx.send_message()を実行することで、そのExecutorの出力としてMessageを送信し、次のExecutorに連携します。(WorkflowContextは、ハンドラーがワークフローと対話するためのメソッドを提供する役割を持ちます。)

  • ハンドラーの第一引数では、前Executorから送信されるMessageデータを受け取ります。(型を一致させる必要があります)

  • 第二引数のWorkflowContext は、このハンドラーが出力する型でパラメータ化されています。

  • ここでは WorkflowContext[str] となっているため、下流のノードは str 型を受け取ることを想定しています。


また、@executorデコレーターを使用して、関数からExecutorを作成することもできます。

@executor(id="upper_case_executor")
async def upper_case(text: str, ctx: WorkflowContext[str]) -> None:
    await ctx.send_message(text.upper())

Edge

グラフ構造の接続を表すエッジ(線)にあたります。
Executor間のメッセージフローを定義し、データフロー パスを決定します。

エッジはワークフロー定義時に設定するため、ワークフローの構築方法も一緒に見ていきましょう。

ワークフローの構築

ワークフローの構築にはWorkflowBuilderクラスを使用します。

from agent_framework import WorkflowBuilder

# Executor
processor = DataProcessor()
validator = Validator()
formatter = Formatter()

# ワークフローの構築
builder = WorkflowBuilder()
builder.set_start_executor(processor)  # はじめのExecutorをセット
builder.add_edge(processor, validator) # processorからvalidatorへのEdgeを定義
builder.add_edge(validator, formatter)
workflow = builder.build()

この例では、processor → validator → formatter というダイレクトエッジ(単純な1対1の接続)からなるワークフローを構築しています。

Edgeの種類

その他のエッジパターンは以下の通りです。

  • 条件付きエッジ:
特定の条件が満たされた場合にのみアクティブにする
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.add_edge(spam_detector, email_processor, condition=lambda result: isinstance(result, SpamResult) and not result.is_spam)
builder.add_edge(spam_detector, spam_handler, condition=lambda result: isinstance(result, SpamResult) and result.is_spam)
builder.set_start_executor(spam_detector)
workflow = builder.build()
  • スイッチケース:
条件に基づいて異なる Executor にメッセージをルーティングする
from agent_framework import (
   Case,
   Default,
   WorkflowBuilder,
)

builder = WorkflowBuilder()
builder.set_start_executor(router_executor)
builder.add_switch_case_edge_group(
   router_executor,
   [
       Case(
           condition=lambda message: message.priority < Priority.NORMAL,
           target=executor_a,
       ),
       Case(
           condition=lambda message: message.priority < Priority.HIGH,
           target=executor_b,
       ),
       Default(target=executor_c)
   ],
)
workflow = builder.build()
  • ファンアウト エッジ:
1 つの Executor から複数のターゲットにメッセージを配布する
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(splitter_executor, [worker1, worker2, worker3])
workflow = builder.build()

# Send to specific targets based on partitioner function
builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(
   splitter_executor,
   [worker1, worker2, worker3],
   selection_func=lambda message, target_ids: (
       [0] if message.priority == Priority.HIGH else
       [1, 2] if message.priority == Priority.NORMAL else
       list(range(target_count))
   )
)
workflow = builder.build()
  • ファンイン エッジ
複数のソースから 1 つのターゲットにメッセージを収集する
builder.add_fan_in_edge([worker1, worker2, worker3], aggregator_executor)

ワークフローの実行

ワークフロー実行イベント(後述)を非同期で受け取るストリーミング実行と、実行が完了してからまとめて受け取る非ストリーミング実行の両方がサポートされています。

from agent_framework import WorkflowCompletedEvent

# ストリーミング実行:イベントを逐次取得する
async for event in workflow.run_stream(input_message):
    if isinstance(event, WorkflowCompletedEvent):
        print(f"Workflow completed: {event.data}")

# 非ストリーミング実行:完了まで待機
events = await workflow.run(input_message)
print(f"Final result: {events.get_completed_event()}")

Event

イベントは、ワークフローの実行のレスポンスとして出力され、実行中の状態や実行結果を監視できます。

組み込みイベント

# ワークフローのライフサイクル イベント
WorkflowStartedEvent    # ワークフローの実行開始
WorkflowOutputEvent     # ワークフローの出力
WorkflowErrorEvent      # エラー

# Executor イベント
ExecutorInvokeEvent     # Executorの処理開始
ExecutorCompleteEvent   # Executor処理終了

# Request イベント
RequestInfoEvent        # リクエストの発行

カスタムイベント

カスタムイベントを定義して出力することで、監視を強化できます。

# カスタムイベントの定義
@dataclass
class ExecutorWarningEvent(WorkflowEvent):
    """カスタムイベント:警告を通知"""
    executor_id: str
    message: str
    severity: str  # "info", "warning", "error"

@executor(id="validator")
async def validate_input(data: dict, ctx: WorkflowContext[dict]) -> None:
    if not data.get("required_field"):
        # 警告イベントを発行
        await ctx.add_event(ExecutorWarningEvent(
            executor_id="validator",
            message="必須フィールドが不足しています",
            severity="warning"
        ))
        # デフォルト値で続行
        data["required_field"] = "default"
    
    await ctx.send_message(data)

ハンドラーでctx.add_event()を実行することでイベントを出力できます。


ExecutorにAIエージェントを使用する

Microsoft Agent Frameworkでは、ワークフローの実行にAIエージェントを統合するための複数のオプションが用意されています。

AgentExecutor

AgentExecutorオブジェクトを使用して、エージェントの実行をラップしたExecutorを作成できます。

from agent_framework.azure import AzureOpenAIChatClient
from agent_framework import AgentExecutor

# エージェント作成
agent = AzureOpenAIChatClient(
    endpoint=AZURE_OPENAI_ENDPOINT,
    deployment_name="gpt-5",
    api_key=API_KEY,
).create_agent(
    instructions="ユーザーの質問に回答してください。",
    name="Your Assistant",
)

agent_executor = AgentExecutor(agent)

この場合、Executorが受信/送信するMessageがエージェントの入力/出力となります。

また、単にエッジにエージェントを指定するだけもAgentExecutorとしてワークフローを構築できます。便利。

writer_agent = AzureOpenAIChatClient(
・・・
)
reviewer_agent = AzureOpenAIChatClient(
・・・
)

# エージェントでワークフローを構築
builder = WorkflowBuilder()
builder.set_start_executor(writer_agent) # Executorとしてエージェントを指定
builder.add_edge(writer_agent, reviewer_agent)
workflow = builder.build()

カスタム Executorの作成

通常のExecutor定義にAIエージェントの実行を組み込むことで、エージェントの実行をカスタマイズしたり、その他の処理と統合したりなどの細かい制御ができます。

from agent_framework import (
    ChatAgent,
    ChatMessage,
    Executor,
    WorkflowContext,
    handler
)

class Writer(Executor):

    agent: ChatAgent

    def __init__(self, chat_client: AzureChatClient, id: str = "writer"):
        # 構成済みの AzureChatClient を使用して、ドメイン固有のエージェントを作成します。
        agent = chat_client.create_agent(
            instructions=(
                "あなたは優れたコンテンツライターです。新しいコンテンツを作成し、フィードバックに基づいて編集を行います。"
            ),
        )
        # この executor ノードにエージェントを関連付けます。基底の Executor クラスが self.agent に保存します。
        super().__init__(agent=agent, id=id)

    @handler
    async def handle(self, message: ChatMessage, ctx: WorkflowContext[list[ChatMessage]]) -> None:
        """1つのチャットメッセージを処理し、蓄積されたメッセージをワークフロー内の次の executor に転送します。"""
        # 受信したメッセージを使ってエージェントを呼び出し、応答を取得します。
        messages: list[ChatMessage] = [message]
        response = await self.agent.run(messages)
        # メッセージを蓄積し、ワークフロー内の次の executor に送信します。
        messages.extend(response.messages)
        await ctx.send_message(messages)


Shared State

Shared Stateは、ワークフロー全体で共有するKey-Valueストレージです。
tx.send_message()では隣接したExecutor同士でのデータ連携しかできないのに対して、SharedStateではすべての WorkflowContext が同じ SharedState を参照しており、ctx.set_shared_state() / ctx.get_shared_state() でワークフロー横断の情報を保管・取得できます。これにより、特定の Executor が計算した結果やフラグを別の Executor が参照する、といった連携が可能です。
複数のエージェントでコンテキストを共有したい場合にも有効だと思いました。

Shared Stateへの書込み

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        """ファイルパスを受け取り、ファイルの内容を読み込んで、次の executor にファイル ID を転送します。"""
        # 埋め込まれたリソースからファイルの内容を読み込みます
        with open(file_path, 'r') as file:
            file_content = file.read()
        # 他の executor がアクセスできるように、共有状態にファイルの内容を保存します
        file_id = str(uuid.uuid4())
        await ctx.set_shared_state(file_id, file_content)

        # ファイル ID を次の executor に送信します
        await ctx.send_message(file_id)

Shared Stateへのアクセス

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        # shared stateからファイル内容を取得
        file_content = await ctx.get_shared_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

まとめ

今回はMicrosoft Agent FrameworkのWorkflowsについて、ワークフローを構築するためのコンポーネントたちを見ていきました。
他にもおもしろそうな機能があるので、次回はこれらを組み合わせたワークフローを実際に作ってみたいと思います。

ヘッドウォータース

Discussion