🤖

OpenAI Go SDKを使ってReAct Agentを作る

に公開

1. はじめに

1-1. この記事のゴール

この記事を読み終わると、Go から OpenAI Go SDK を叩いて、

  • 会話履歴(Memory)を読む
  • ReAct ループで LLM にツール呼び出しをさせる
  • ツールを実行して結果を LLM にフィードバックする
  • ストリーミングでトークンをフロントに流しつつ、DB にメッセージを永続化する

という一連の「ReAct Agent」の骨格を自分で実装できるようになります。

1-2. なぜ自前 ReAct Agent を Go で書くのか

Python だと LangChain / LangGraph があるので「ReAct やりたい」はライブラリに寄せれば済みますが、

  • バックエンドが Go で統一されている
  • トレース用にメッセージやツール実行ログを全部 DB で持ちたい
  • フレームワークにロックインされずに、エージェントの挙動を細かく制御したい

といった事情があると、

「OpenAI Go SDK + 自前の ReAct ループ」

という構成が現実的になります。

この記事では、実際にプロダクションで動かしている ReAct Agent の実装を、なるべく汎用化した形で紹介します。

2. 理論の基礎:ReAct と Go からのツール呼び出し

2-1. ReAct のざっくり復習

ReAct (Reason + Act) の基本はシンプルです。

  1. LLM が「考える(Thought)」+「行動する(Action)」を出す
  2. Action に応じて環境側でツールを実行し、その結果(Observation)を LLM に渡す
  3. それを何回か繰り返したあと、最終的な回答を返す

ReAct(Reason and Action)の図

今回の実装では、テキストの Thought/Action をきれいにパースするのではなく、

  • OpenAI の Tool Calling(Function Calling)を Action として扱う
  • Tool の実行結果を role=tool メッセージとして渡し直す

ことで、ReAct の構造を ChatCompletion API 上で実現しています。

2-2. OpenAI Go SDK での Tool Calling / Streaming のポイント

OpenAI Go SDK v3 系(github.com/openai/openai-go/v3)だと、

  • client.Chat.Completions.New … 非ストリーミング
  • client.Chat.Completions.NewStreaming … ストリーミング

の 2 つを押さえておけば OK です。

Tool Calling を使うときは、

  • リクエストに Tools を定義
  • レスポンスの message.ToolCalls に「どのツールをどう呼ぶか」が入る
  • ストリーミング時は delta に ToolCalls の断片が乗ってくるので、自前で蓄積して復元

という流れになります。

今回の ReactAgent は、

  • GenerateStream() で OpenAI から StreamEvent を受け取る
  • その中に含まれる text delta と tool_calls delta を解釈し、
    「途中経過のテキスト表示」+「ツール実行」+「再プロンプト」を回す

という構造になっています。

3. 実装:Go で ReAct Agent の骨格を組む

全体構成は以下の 4 コンポーネントに分けています。

  • AgentConfig … エージェントの動作パラメータ
  • AgentState … ループ中の状態管理(イテレーション数、ツール履歴など)
  • ContinueCondition 群 … ループ停止条件(MaxIteration, Timeout など)
  • ReactAgent … 実際の ReAct ループ本体

3-1. AgentConfig:挙動を外から差し替えられるようにする

まずは「どのモデルで、どれくらい回すか」をまとめた設定構造体です。

pkg/ai/agents/agent_config.go
type AgentMode string

const (
    ModeAgent   AgentMode = "agent"
    ModeExplore AgentMode = "explore"
    ModeProcess AgentMode = "process"
)

type ThinkingLevel string

const (
    ThinkingInstant ThinkingLevel = "instant"
    ThinkingNormal  ThinkingLevel = "normal"
    ThinkingDeep    ThinkingLevel = "deep"
)

type AgentConfig struct {
    Mode           AgentMode              `json:"mode"`
    ThinkingDepth  ThinkingLevel          `json:"thinking_depth"`
    Model          string                 `json:"model"`
    Temperature    float32                `json:"temperature"`
    MaxTokens      int                    `json:"max_tokens"`
    TimeoutSeconds int                    `json:"timeout_seconds"`
    MaxIterations  int                    `json:"max_iterations"`
    EnabledTools   []string               `json:"enabled_tools,omitempty"`
    SystemPrompt   string                 `json:"system_prompt,omitempty"`
    Metadata       map[string]interface{} `json:"metadata,omitempty"`
}

func (c *AgentConfig) GetMaxIterations() int {
    if c.MaxIterations > 0 {
        return c.MaxIterations
    }
    switch c.ThinkingDepth {
    case ThinkingInstant:
        return 3
    case ThinkingDeep:
        return 10
    default:
        return 5
    }
}

モードごとに MaxIterationsEnabledTools を切り替えることで、「探索モード」「タスク実行モード」などを作りやすくしており、また、ReAct の「どれだけ考えさせるか」をThinkingDepth 経由で外から調整できます。

3-2. AgentState:反復状態とツール実行履歴

ReAct のループは「どこで止めるか」が重要なので、そのための状態を AgentState にまとめています。

pkg/ai/agents/agent_state.go
type AgentState struct {
    ThreadID  string    `json:"thread_id"`
    AgentID   string    `json:"agent_id"`
    StartTime time.Time `json:"start_time"`
    Iteration int       `json:"iteration"`

    Messages []ai.Message `json:"messages"`

    ToolCallHistory map[string]int `json:"tool_call_history"`
    ToolResults     []ToolResult   `json:"tool_results"`
    TotalToolCalls  int            `json:"total_tool_calls"`

    NoProgressCount int    `json:"no_progress_count"`
    LastObsHash     string `json:"last_obs_hash"`

    Config *AgentConfig `json:"config"`
}

type ToolResult struct {
    Name      string                 `json:"name"`
    ArgsHash  string                 `json:"args_hash"`
    Args      map[string]interface{} `json:"args,omitempty"`
    Result    interface{}            `json:"result,omitempty"`
    Summary   string                 `json:"summary"`
    LatencyMS int                    `json:"latency_ms"`
    Success   bool                   `json:"success"`
    Error     string                 `json:"error,omitempty"`
    Timestamp time.Time              `json:"timestamp"`
}

ここでポイントなのは、

  • ツール実行ごとに ToolResult を積んでおく
  • メッセージとツール結果から「観測ハッシュ」を計算して、進捗をチェックする

という仕掛けです。

func (s *AgentState) ComputeObservationHash() string {
    data := ""
    if len(s.ToolResults) > 0 {
        last := s.ToolResults[len(s.ToolResults)-1]
        data += last.Summary
    }
    if len(s.Messages) > 0 {
        lastMsg := s.Messages[len(s.Messages)-1]
        if text, ok := lastMsg.Content["text"].(string); ok {
            data += text
        }
    }
    hash := sha256.Sum256([]byte(data))
    return fmt.Sprintf("%x", hash[:8])
}

func (s *AgentState) UpdateProgress() {
    current := s.ComputeObservationHash()
    if current == s.LastObsHash {
        s.NoProgressCount++
    } else {
        s.NoProgressCount = 0
        s.LastObsHash = current
    }
}

これにより、

  • 同じ Thought/Observation をぐるぐる回るような無限ループを検知して止める
  • 「ツール結果」と「最終発話」の両方をもとに進捗を見る

といったコントロールができます。

3-3. ContinueCondition:ReAct ループの停止条件を組み合わせる

ReAct を実装するときのよくある落とし穴が、

LLM が延々とツールを呼び続けて止まらない

ことです。そこで「ループ継続条件」をインターフェースで切っています。

pkg/ai/agents/conditions.go
type ContinueCondition interface {
    ShouldContinue(state *AgentState) (bool, string)
    Name() string
}

type MaxIterationCondition struct {
    maxIterations int
}

func (c *MaxIterationCondition) ShouldContinue(s *AgentState) (bool, string) {
    if s.Iteration >= c.maxIterations {
        return false, fmt.Sprintf("最大イテレーション数(%d)に達しました", c.maxIterations)
    }
    return true, ""
}

他にも、

  • 進捗がない状態が続いたら止める NoProgressCondition
  • ツールの呼びすぎを止める ToolLimitCondition
  • 実行時間で切る TimeoutCondition

を用意して、CompositeCondition で AND 結合しています。

func NewCompositeCondition(conditions ...ContinueCondition) *CompositeCondition {
    return &CompositeCondition{conditions: conditions}
}

func (c *CompositeCondition) ShouldContinue(s *AgentState) (bool, string) {
    for _, cond := range c.conditions {
        if ok, reason := cond.ShouldContinue(s); !ok {
            return false, fmt.Sprintf("[%s] %s", cond.Name(), reason)
        }
    }
    return true, ""
}

これで ReAct ループを「安全に回して止める」ための仕組みが揃います。

3-4. ReactAgent.RunStream:ReAct ループ本体

いよいよ本体です。大まかな流れは次の通り。

  1. Memory から履歴をロードして、System Prompt を先頭に挿入

  2. ユーザーメッセージを履歴に追加し、DB に保存

  3. AgentState を初期化して、agent_start イベントを送る

  4. 以下を繰り返す:

    • 継続条件をチェック
    • OpenAI にストリーミングリクエストを投げる
    • text delta をフロントに流しつつ蓄積
    • tool_calls delta を蓄積
    • ツール呼び出しがあれば実行して次のイテレーションへ
    • ツール呼び出しがなく、テキストだけなら最終回答として終了

重要部分だけ抜粋します。

// RunStream は ReAct ループ本体
func (a *ReactAgent) RunStream(ctx context.Context, input string, w streaming.StreamWriter) error {
    // 1. メモリから履歴を取得
    messages, err := a.memory.LoadHistory(ctx)
    if err != nil {
        return fmt.Errorf("failed to load history: %w", err)
    }

    // System Prompt を動的挿入
    if a.config.SystemPrompt != "" {
        messages = append([]ai.Message{{
            Role: "system",
            Content: map[string]interface{}{"text": a.config.SystemPrompt},
        }}, messages...)
    }

    // 2. ユーザーメッセージを追加して保存
    userMsg := ai.Message{
        Role: "user",
        Content: map[string]interface{}{"text": input},
    }
    messages = append(messages, userMsg)

    userInfo, err := a.memory.Save(ctx, userMsg)
    if err != nil {
        logger.Error(ctx, err, "ユーザーメッセージの保存に失敗", nil)
        a.lastUserMessage = nil
    } else {
        a.lastUserMessage = userInfo
    }

    // 3. 状態初期化 & agent_start
    state := NewAgentState(a.config)
    _ = w.Write(ctx, streaming.StreamEvent{
        Type: streaming.EventTypeAgentStart,
        Data: map[string]interface{}{
            "thread_id": state.ThreadID,
            "mode":      a.config.Mode,
        },
    })

    // 4. ReAct ループ
    for {
        ok, reason := a.conditions.ShouldContinue(state)
        if !ok {
            _ = w.Write(ctx, streaming.StreamEvent{
                Type: streaming.EventTypeAgentStop,
                Data: map[string]interface{}{"reason": reason},
            })
            break
        }

        _ = w.Write(ctx, streaming.StreamEvent{
            Type: streaming.EventTypeIterationStart,
            Data: map[string]interface{}{"iteration": state.Iteration},
        })

        // モデルが StreamingModel かどうかで分岐
        streamingModel, ok := a.model.(models.StreamingModel)
        if !ok {
            // 非ストリーミング版(省略)
        }

        eventChan, err := streamingModel.GenerateStream(ctx, messages,
            ai.WithModelMaxTokens(a.config.MaxTokens),
            ai.WithTemperature(a.config.Temperature),
            ai.WithModel(a.config.Model),
        )
        if err != nil {
            return fmt.Errorf("failed to generate stream: %w", err)
        }

        var (
            toolCalls    []ai.ToolCall
            hasContent   bool
            collectedTxt string
        )

        // OpenAI からのストリームを処理
        for ev := range eventChan {
            if ev.Type == streaming.EventTypeError {
                return fmt.Errorf("stream error: %v", ev.Data["error"])
            }
            if ev.Type == streaming.EventTypeToolStart {
                // tool_calls を JSON 化→構造体に戻す
                if raw, ok := ev.Data["tool_calls"]; ok {
                    b, _ := json.Marshal(raw)
                    _ = json.Unmarshal(b, &toolCalls)
                }
            }
            if ev.Type == streaming.EventTypeTextDelta {
                hasContent = true
                if delta, ok := ev.Data["delta"].(string); ok {
                    collectedTxt += delta
                }
                // テキストはそのままフロントに流す
                if err := w.Write(ctx, ev); err != nil {
                    return fmt.Errorf("failed to write event: %w", err)
                }
            }
        }

        // 4-1. ツール呼び出しがある場合 → ツール実行して次ループ
        if len(toolCalls) > 0 {
            // assistant メッセージ(tool_calls)を保存
            assistantMsg := ai.Message{
                Role:      "assistant",
                Content:   map[string]interface{}{},
                ToolCalls: toolCalls,
            }
            messages = append(messages, assistantMsg)
            if _, err := a.memory.Save(ctx, assistantMsg); err != nil {
                logger.Error(ctx, err, "アシスタントメッセージの保存に失敗", nil)
            }

            for _, tc := range toolCalls {
                _ = a.executeToolCall(ctx, state, tc, &messages, w)
            }

            _ = w.Write(ctx, streaming.StreamEvent{
                Type: streaming.EventTypeIterationEnd,
                Data: map[string]interface{}{
                    "iteration":  state.Iteration,
                    "tool_calls": len(toolCalls),
                },
            })

            state.UpdateProgress()
            state.Iteration++
            continue
        }

        // 4-2. ツール呼び出しなし + テキストあり → 最終回答
        if hasContent {
            assistantMsg := ai.Message{
                Role: "assistant",
                Content: map[string]interface{}{"text": collectedTxt},
            }
            messages = append(messages, assistantMsg)

            info, err := a.memory.Save(ctx, assistantMsg)
            if err != nil {
                logger.Error(ctx, err, "最終回答メッセージの保存に失敗", nil)
                a.lastAssistantMessage = nil
            } else {
                a.lastAssistantMessage = info
            }

            _ = w.Write(ctx, streaming.StreamEvent{
                Type: streaming.EventTypeIterationEnd,
                Data: map[string]interface{}{
                    "iteration":    state.Iteration,
                    "final_answer": true,
                },
            })

            // agent_end イベント(メッセージID付き)
            _ = w.Write(ctx, streaming.NewAgentEndEvent(
                state.ThreadID,
                state.Iteration,
                state.TotalToolCalls,
                a.lastUserMessage,
                a.lastAssistantMessage,
            ))
            return nil
        }

        state.Iteration++
    }

    return nil
}

ここで、

  • 「ReAct ループ自体」は ReactAgent に閉じ込める
  • メッセージの保存やイベント出力は MemoryStreamWriter に委譲する

という分離をすると、テスト・差し替えがかなり楽になります。

3-5. executeToolCall:ツール呼び出しの去り際をデザインする

ツール側の実装も少し見ておきます。

func (a *ReactAgent) executeToolCall(
    ctx context.Context,
    state *AgentState,
    tc ai.ToolCall,
    messages *[]ai.Message,
    w streaming.StreamWriter,
) error {
    start := time.Now()
    argsHash := a.computeArgsHash(tc.Function.Name, tc.Function.Arguments)

    // 同じ引数の重複呼び出しを抑制
    if state.IsDuplicateToolCall(argsHash, 2) {
        _ = w.Write(ctx, streaming.NewToolDuplicateEvent(tc.Function.Name, tc.ID))
        *messages = append(*messages, ai.Message{
            Role: "tool",
            Content: map[string]interface{}{
                "tool_call_id": tc.ID,
                "result":       "このツールは既に実行済みです。前回の結果を参照してください。",
            },
        })
        return nil
    }

    // args をパースしてログに載せやすくする
    var args map[string]interface{}
    if err := json.Unmarshal([]byte(tc.Function.Arguments), &args); err != nil {
        args = map[string]interface{}{"raw": tc.Function.Arguments}
    }

    _ = w.Write(ctx, streaming.NewToolStartEvent(tc.Function.Name, tc.ID, args))

    // 実際のツール実行(外部APIやDB検索など)
    result, err := a.registry.Execute(ctx, tc.Function.Name, json.RawMessage(tc.Function.Arguments))
    latency := time.Since(start)

    toolResult := ToolResult{
        Name:      tc.Function.Name,
        ArgsHash:  argsHash,
        Args:      args,
        Result:    result,
        LatencyMS: int(latency.Milliseconds()),
        Success:   err == nil,
        Timestamp: time.Now(),
    }

    if err != nil {
        toolResult.Error = err.Error()
        toolResult.Summary = fmt.Sprintf("エラー: %s", err.Error())
        _ = w.Write(ctx, streaming.NewToolErrorEvent(tc.Function.Name, tc.ID, err.Error()))
    } else {
        toolResult.Summary = a.generateSummary(tc.Function.Name, result)
        _ = w.Write(ctx, streaming.NewToolResultEvent(
            tc.Function.Name,
            tc.ID,
            result,
            toolResult.Summary,
            latency.Milliseconds(),
        ))
    }

    // 状態更新
    state.AddToolResult(toolResult)

    // tool メッセージとして履歴に追加・永続化
    toolMsg := ai.Message{
        Role: "tool",
        Content: map[string]interface{}{
            "tool_call_id": tc.ID,
            "result":       result,
        },
    }
    *messages = append(*messages, toolMsg)
    if _, saveErr := a.memory.Save(ctx, toolMsg); saveErr != nil {
        logger.Error(ctx, saveErr, "ツールメッセージの保存に失敗", nil)
    }

    return err
}

ここでやっていることは、

  • ツール呼び出しの重複検知(同じ引数で何度も呼ばない)
  • ツールの latency, 成否を ToolResult にまとめてトレース可能にする
  • role=tool メッセージとして LLM に返す用の履歴を残す

という、ReAct の「Act → Observation」の部分の実装です。

4. 結果と考察:実際にどう動くのか

4-1. 典型的なイベントフロー

例えば「このプロジェクトのゴールと関連タスクを一覧にして」といった質問を投げると、
イベント的にはざっくり以下のような流れになります。

  1. agent_start
  2. iteration_start (iteration=0)
  3. text_delta … LLM がツールを使う前に少し説明し始める
  4. tool_start (search_goals, args={...})
  5. tool_result (search_goals, summary="10件のゴールを検索")
  6. iteration_end (iteration=0, tool_calls=1)
  7. iteration_start (iteration=1)
  8. text_delta … 検索結果を踏まえて回答を生成
  9. iteration_end (iteration=1, final_answer=true)
  10. agent_end(ユーザー/アシスタントメッセージの ID 付き)

フロント側から見ると、

  • ユーザー入力を送った瞬間に「pending の user メッセージ」を表示
  • text_delta を受け取るたびに assistant メッセージを伸ばしていく
  • tool_start / tool_result でサイドバーにツールログを表示する
  • 最後に agent_end で確定し、スレッド一覧に反映

という UI が組めます。

4-2. ReAct を自前実装してみて良かった点・難しかった点

良かった点:

  • Go から直接 ReAct の挙動をコントロールできるので、「ツール重複の抑制」「NoProgress 検知」「タイムアウト」などを細かく設計できた
  • すべてのメッセージ/ツール結果が自前の DB に乗るので、後続の分析(エラー原因分析、UX 改善、プロンプト改善)がやりやすくなる
  • OpenAI Go SDK が素直な API なので、streaming / tool_calls の処理に集中できた

難しかった点:

  • ストリーミング時の tool_calls は delta で流れてくるので、正しく復元する処理を書かないといけない
  • 「どこまで抽象化するか」の線引きが難しく、
    • 最初から作り込みすぎると実装コストが膨らむ
    • かといってベタ書きすると変更に弱くなる

あたりはトレードオフでした。

5. 振り返りと今後の展望

5-1. 結局何が分かったか

今回の実装を通して整理できたポイントはだいたい次の 3 つです。

  1. ReAct 自体はシンプルなループだが、
    「どこで止めるか」「どうトレースするか」を設計しないと本番運用は厳しい
  2. OpenAI Go SDK だけでも、
    • Tool Calling
    • Streaming
      を組み合わせれば、LangChain 的なエージェントは十分実装できる
  3. Model / Memory / Tool / Agent / Streaming を interface で切っておくと、
    モデル乗り換えやツール追加に強い基盤になる

5-2. 応用できそうな方向

この ReAct Agent を土台にして、次のような拡張が素直に載せられます。

  • RAG をツールとして追加(search_documents, get_document など)
  • 複数エージェントを組み合わせる Planner-Executor パターン
  • AgentState をそのままトレースに出して、Observability 基盤と連携
  • MCP などプロトコルベースの外部ツールを Tool 実装として統合

いずれも、

「エージェントのコアは ReactAgent に閉じ込め、周辺を差し替える」

という設計を守っておけば、段階的に導入しやすいはずです。

アドネス株式会社 開発部

Discussion