Koog応用編 - イベント駆動とサブグラフでAIエージェントの動作を制御する

に公開

はじめに

前回まで「Koog入門」「Koog実践編」と、KoogでAIエージェントを実装する基本的な方法を学んできました。今回は第3回として、Koogのイベント駆動機能とサブグラフ機能を実際に触りながら、より高度なエージェント制御について学んでいきます。

この記事で試すこと

Koogには、エージェントの動作を透明化し、複雑な処理を構造化するための2つの重要な機能があるようです。今回はこれらを実際に動かしてみます。

1. EventHandler - エージェント動作の可視化

エージェントの内部動作をリアルタイムで監視できる機能とのこと。デバッグに便利そうなので試してみます。

2. サブグラフ - 処理の構造化

Koogは本格的なSubgraph DSLを提供しているようですが、今回は入門者でも扱いやすいStringSubgraphResultを使って、複雑なタスクを段階的に処理する方法を試してみます。

なぜイベント駆動が必要なのか?

従来のAIエージェント実装では、処理が完了するまで内部で何が起きているか分からない「ブラックボックス」状態でした。これには以下の課題があります。

  • デバッグが困難(どこで時間がかかっているか不明)
  • エラー時の原因特定が難しい
  • 処理の進捗状況が分からない
  • パフォーマンス最適化のポイントが見えない

イベント駆動アプローチを使うことで、エージェントの動作を詳細に追跡し、これらの課題を解決できます。

EventHandlerでエージェントの動作を監視する

EventHandlerとは

EventHandlerは、Koogエージェントの実行ライフサイクル中に発生する様々なイベントにフックして、カスタムロジックを実行できる強力な機能です。これにより、エージェントの動作を詳細に監視・分析・制御することが可能になります。

利用可能なイベントハンドラ

KoogのEventHandlerでは、以下のカテゴリのイベントをフックできます。

1. エージェントライフサイクルイベント

handleEvents {
    // エージェント開始前
    onBeforeAgentStarted { eventContext ->
        println("エージェント開始: strategy=${eventContext.strategy.javaClass.simpleName}")
    }

    // エージェント完了時
    onAgentFinished { eventContext ->
        println("エージェント完了: result=${eventContext.result}")
    }

    // エージェント実行エラー時
    onAgentRunError { eventContext ->
        println("エラー発生: ${eventContext.error.message}")
    }
}

2. ノード実行イベント

handleEvents {
    // ノード実行前
    onBeforeNode { nodeContext ->
        println("ノード実行開始: ${nodeContext.nodeName}")
    }

    // ノード実行後
    onAfterNode { nodeContext, result ->
        println("ノード実行完了: ${nodeContext.nodeName} => $result")
    }
}

3. LLM呼び出しイベント

handleEvents {
    // LLM呼び出し前
    onBeforeLLMCall { eventContext ->
        println("LLM呼び出し: model=${eventContext.model}")
    }

    // LLM呼び出し後
    onAfterLLMCall { eventContext ->
        println("LLM応答: ${eventContext.response}")
    }
}

4. ツール呼び出しイベント

handleEvents {
    // ツール呼び出し時
    onToolCall { eventContext ->
        println("ツール呼び出し: ${eventContext.tool.name}")
        println("引数: ${eventContext.toolArgs}")
    }

    // ツール結果取得時
    onToolCallResult { eventContext ->
        println("ツール結果: ${eventContext.toolName} => ${eventContext.result}")
    }
}

実用的な活用例

メトリクス収集

var totalTokens = 0
var apiCallCount = 0

handleEvents {
    onAfterLLMCall { _, response ->
        // トークン使用量の追跡
        totalTokens += response.usage?.totalTokens ?: 0
        apiCallCount++
    }
}

デバッグとロギング

handleEvents {
    onToolCall { context ->
        logger.debug("Tool: ${context.tool.name}, Args: ${context.toolArgs}")
    }

    onAgentRunError { error ->
        logger.error("Agent error", error)
        // エラー通知の送信など
    }
}

パフォーマンス監視

val timings = mutableMapOf<String, Long>()

handleEvents {
    onBeforeNode { context ->
        timings["${context.nodeName}_start"] = System.currentTimeMillis()
    }

    onAfterNode { context, _ ->
        val duration = System.currentTimeMillis() - (timings["${context.nodeName}_start"] ?: 0)
        println("Node ${context.nodeName} took ${duration}ms")
    }
}

実装を試してみる

今回はPhase3として、2つの実装パターンを実際に動かしてみました。EventHandlerを使った基本的な実装と、サブグラフを活用した実装です。

1. SimpleEventAgent - 最小限のイベント追跡

最もシンプルなイベント処理の実装です。

@Component
class SimpleEventAgent(
    private val config: Phase3Config,
    @param:Value("\${api.google-api-key}")
    private val googleApiKey: String
) {
    suspend fun processWithEvents(userMessage: String): SimpleEventResponse {
        val startTime = System.currentTimeMillis()
        val events = mutableListOf<String>()

        try {
            events.add("${LocalDateTime.now()}: Processing started")

            val agent = AIAgent(
                llmModel = GoogleModels.Gemini2_0Flash001,
                executor = simpleGoogleAIExecutor(googleApiKey),
                systemPrompt = config.systemPrompt,
                temperature = config.temperature,
                toolRegistry = ToolRegistry {
                    tool(SayToUser)
                    tool(AskUser)
                },
                maxIterations = config.maxIterations
            ) {
                // イベントハンドラーの設定(コンストラクタブロック内)
                handleEvents {
                    // ツール呼び出し時のイベント
                    onToolCall { eventContext ->
                        val event = "${LocalDateTime.now()}: Tool called: ${eventContext.tool.name}"
                        events.add(event)
                    }

                    // エージェント完了時のイベント
                    onAgentFinished { eventContext ->
                        val event = "${LocalDateTime.now()}: Agent finished with result: ${eventContext.result}"
                        events.add(event)
                    }
                }
            }

            // エージェントの実行
            val result = agent.run(userMessage)
            val duration = System.currentTimeMillis() - startTime

            events.add("${LocalDateTime.now()}: Processing completed in ${duration}ms")

            return SimpleEventResponse(
                success = true,
                message = result,
                events = events,
                duration = duration
            )
        } catch (e: Exception) {
            return SimpleEventResponse(
                success = false,
                message = "Error: ${e.message}",
                events = events,
                duration = System.currentTimeMillis() - startTime
            )
        }
    }
}

ポイント

  • handleEventsブロックでイベントリスナーを設定
  • onToolCallでツールの呼び出しを検知
  • onAgentFinishedで処理完了を検知
  • タイムスタンプ付きでイベントをログに記録

使用例と実行結果

curl -X POST http://localhost:8080/api/phase3/simple-events \
  -H "Content-Type: application/json" \
  -d '{"message": "1+5は?"}'

レスポンス

{
    "success": true,
    "message": "6\n",
    "events": [
        "2025-09-13T13:25:34.003504170: Processing started",
        "2025-09-13T13:25:35.123256954: Tool called: say_to_user",
        "2025-09-13T13:25:35.906600183: Agent finished",
        "2025-09-13T13:25:35.907224122: Processing completed in 1904ms"
    ],
    "duration": 1904
}

SimpleEventAgentでは、シンプルなイベント追跡が可能で、各イベントのタイムスタンプと内容が記録されます。

サブグラフとは?

サブグラフ(Subgraph)は、Koogにおける処理フローを構造化するための概念です。AIエージェントのワークフロー内で論理的なセグメントを表現する構造化された実行単位として機能します。

Koogのサブグラフ機能の全体像

本格的なSubgraph DSL

Koogは本格的なSubgraph DSL(Domain Specific Language)を提供しており、以下のような高度な機能を持っています。

基本構造

  • 開始ノード(nodeStart)と終了ノード(nodeFinish) による明確な入出力
  • ノードベースの処理フロー - 各ノードが独立した処理単位
  • エッジによる接続 - ノード間の実行フローを定義
  • 独自のツールセット - サブグラフごとに利用可能なツールを制御

高度な機能

  • 並列実行サポート - 複数ノードの同時実行
  • 階層的構成 - サブグラフ内にサブグラフを含める
  • 独自のLLMモデル設定 - 各サブグラフで異なるモデルやパラメータ
// 本格的なSubgraph DSLの例(概念のみ)
val mySubgraph by subgraph<Input, Output>("subgraph-name") {
    // ノードの定義
    val processNode by node<String, String> { input ->
        // 処理ロジック
    }

    // エッジでノードを接続
    edge(nodeStart forwardTo processNode)
    edge(processNode forwardTo nodeFinish)
}

今回使用する簡易版(StringSubgraphResult)

本記事では、より実践的で扱いやすい簡易版のサブグラフ機能であるStringSubgraphResultを使用します。

StringSubgraphResultの特徴

StringSubgraphResultは、文字列ベースの結果を返すシンプルなサブグラフ実装です。

  • シンプルなAPI - 複雑なDSLを使わずに段階的処理を実現
  • 文字列ベース - 各段階の入出力を文字列で管理
  • 即座に使える - ノード定義なしで直接実行可能

従来の処理との違い

従来の単一エージェント処理

入力 → [エージェント(すべて処理)] → 出力

StringSubgraphResultを使った処理

入力 → [分析段階] → [処理段階] → [最終化段階] → 出力
         ↓             ↓             ↓
      タスク理解    具体的処理    結果整形

StringSubgraphResultを選ぶ理由

  1. 学習コストが低い - 複雑なDSLを学ばずに段階的処理を実装可能
  2. 実用的 - 多くのユースケースで十分な機能
  3. デバッグが簡単 - 各段階の出力を文字列として確認可能
  4. 段階的な制御 - 各段階で異なるプロンプトとパラメータを設定可能
// 基本的な使い方
val result = StringSubgraphResult(
    llmModel = GoogleModels.Gemini2_0Flash001,
    executor = simpleGoogleAIExecutor(apiKey),
    systemPrompt = "特定の役割を持つプロンプト",
    temperature = 0.5f
).run("入力テキスト")

StringSubgraphResultによる段階的処理を試す

次に、StringSubgraphResultを使って、複雑なタスクを複数の段階に分けて処理する実装を試してみます。

SubgraphAgent - 3段階パイプライン処理

@Component
class SubgraphAgent(
    private val config: Phase3Config,
    @param:Value("\${api.google-api-key}")
    private val googleApiKey: String
) {
    /**
     * サブグラフを使った段階的処理
     * 分析 → 処理 → 最終化の3段階で実行
     */
    suspend fun processWithSubgraph(
        userMessage: String
    ): SubgraphResponse {
        val sessionId = UUID.randomUUID().toString()
        val startTime = System.currentTimeMillis()

        // 各段階でのプロンプト定義
        val analysisPrompt = """
            あなたはタスク分析の専門家です。
            与えられたメッセージを分析し、以下を明確にしてください:
            1. 主要なトピックや要求事項
            2. 必要なアクション
            3. 期待される出力形式
        """.trimIndent()

        val processingPrompt = """
            あなたはタスク実行の専門家です。
            分析結果に基づいて、具体的な処理を実行してください。
            明確で構造化された出力を心がけてください。
        """.trimIndent()

        val finalizationPrompt = """
            あなたは出力整形の専門家です。
            処理結果を最終的なユーザー向けの形式に整形してください。
            読みやすく、分かりやすい形式を心がけてください。
        """.trimIndent()

        // StringSubgraphResultを使用した3段階パイプライン
        val analysisResult = StringSubgraphResult(
            llmModel = config.llmModel,
            executor = simpleGoogleAIExecutor(googleApiKey),
            systemPrompt = analysisPrompt,
            temperature = 0.3f  // 分析は低めの温度で正確性重視
        ).run(userMessage)

        val processingResult = StringSubgraphResult(
            llmModel = config.llmModel,
            executor = simpleGoogleAIExecutor(googleApiKey),
            systemPrompt = processingPrompt,
            temperature = 0.7f  // 処理は中程度の温度でバランス重視
        ).run("""
            【分析結果】
            $analysisResult

            【元のメッセージ】
            $userMessage
        """.trimIndent())

        val finalResult = StringSubgraphResult(
            llmModel = config.llmModel,
            executor = simpleGoogleAIExecutor(googleApiKey),
            systemPrompt = finalizationPrompt,
            temperature = 0.5f  // 最終化は適度な温度で可読性重視
        ).run("""
            【処理結果】
            $processingResult

            【元のメッセージ】
            $userMessage
        """.trimIndent())

        val duration = System.currentTimeMillis() - startTime

        return SubgraphResponse(
            success = true,
            sessionId = sessionId,
            analysisResult = analysisResult,
            processingResult = processingResult,
            finalResult = finalResult,
            duration = duration
        )
    }
}

ポイント

  • 3つの異なるプロンプトで段階的に処理
  • 各段階で異なるtemperature設定(分析は低め、処理は高め)
  • 前段階の結果を次段階の入力として使用
  • 各段階の結果を個別に返却可能

使用例と実行結果

curl -X POST http://localhost:8080/api/phase3/subgraph \
  -H "Content-Type: application/json" \
  -d '{"message": "RESTful APIの設計原則について説明してください"}'

レスポンス

{
    "success": true,
    "sessionId": "d8d9b92d-442a-4772-b39b-1e01862e86e0",
    "analysisResult": "分析結果:\n\n1.  主要なトピック:RESTful APIの設計原則\n2.  必要なアクション:RESTful APIの設計原則について説明する。\n3.  期待される出力:RESTful APIの設計原則に関する説明。原則の名前、原則の説明、原則の理由を含む。\n",
    "processingResult": "RESTful APIの設計原則について説明します。\n\nRESTful APIの設計原則は、以下のとおりです。\n\n1.  クライアント/サーバー:クライアントとサーバーは、互いに独立して進化できる必要があります。クライアントはサーバーに関する情報を保持すべきではなく、サーバーはクライアントに関する情報を保持すべきではありません。これは、クライアントとサーバーが互いに依存しないようにすることで、柔軟性とスケーラビリティを向上させるためです。\n2.  ステートレス:サーバーは、クライアントの状態を保持しません。クライアントからの各リクエストには、サーバーがリクエストを完了するために必要なすべての情報が含まれている必要があります。これは、サーバーの負荷を軽減し、スケーラビリティを向上させるためです。\n3.  キャッシュ可能:クライアントは、サーバーからの応答をキャッシュできる必要があります。これは、ネットワークトラフィックを削減し、クライアントの応答時間を短縮するためです。\n4.  統一インターフェース:クライアントとサーバー間のインターフェースは、統一されている必要があります。これは、クライアントとサーバーが互いに理解しやすくし、柔軟性を向上させるためです。\n5.  階層化システム:クライアントは、サーバーとの間に仲介者が存在することを知る必要はありません。これは、セキュリティを向上させ、スケーラビリティを向上させるためです。\n6.  コードオンデマンド(オプション):サーバーは、クライアントに実行可能なコードを提供できます。これは、クライアントの機能を拡張するために使用できます。\n\nこれらの原則に従うことで、スケーラブルで保守しやすいAPIを設計できます。",
    "finalResult": "RESTful APIの設計原則は以下の通りです。\n\n1.  **クライアント/サーバー:** クライアントとサーバーは独立して進化でき、互いの情報を保持しません。これにより、柔軟性とスケーラビリティが向上します。\n2.  **ステートレス:** サーバーはクライアントの状態を保持せず、各リクエストに必要な情報はリクエスト自体に含まれます。サーバーの負荷が軽減され、スケーラビリティが向上します。\n3.  **キャッシュ可能:** クライアントはサーバーからの応答をキャッシュできます。これにより、ネットワークトラフィックが削減され、応答時間が短縮されます。\n4.  **統一インターフェース:** クライアントとサーバー間のインターフェースは統一されている必要があります。これにより、相互理解が容易になり、柔軟性が向上します。\n5.  **階層化システム:** クライアントは、サーバーとの間に仲介者が存在することを意識する必要はありません。セキュリティとスケーラビリティが向上します。\n6.  **コードオンデマンド (オプション):** サーバーはクライアントに実行可能なコードを提供できます。クライアントの機能拡張に利用可能です。\n\nこれらの原則に従うことで、スケーラブルで保守しやすいAPIを設計できます。\n\n**実行可能なアクション:** 上記の原則をAPI設計に適用し、設計を見直すことで、より良いAPIを構築できます。\n",
    "duration": 6912
}

SubgraphAgentでは、分析→処理→最終化の3段階で処理が行われ、各段階の結果が確認できます。

まとめ

今回は、Koogのイベント駆動(EventHandler)とサブグラフ機能(StringSubgraphResult)を実際に試しながら学んでみました。

学んだこと

  • EventHandlerによるエージェント動作の可視化とリアルタイム監視
  • StringSubgraphResultを使った複雑なタスクの段階的処理
  • 各実装パターンの特徴と使い分け

これらの機能を活用することで、より透明性が高く、デバッグしやすく、制御可能なAIエージェントを構築できます。特に本番環境では、EventHandlerによるモニタリングとStringSubgraphResultによる処理の構造化が重要になるでしょう。

参考資料

Discussion