OpenAI Go SDKを使ってReAct Agentを作る
1. はじめに
1-1. この記事のゴール
この記事を読み終わると、Go から OpenAI Go SDK を叩いて、
- 会話履歴(Memory)を読む
- ReAct ループで LLM にツール呼び出しをさせる
- ツールを実行して結果を LLM にフィードバックする
- ストリーミングでトークンをフロントに流しつつ、DB にメッセージを永続化する
という一連の「ReAct Agent」の骨格を自分で実装できるようになります。
1-2. なぜ自前 ReAct Agent を Go で書くのか
Python だと LangChain / LangGraph があるので「ReAct やりたい」はライブラリに寄せれば済みますが、
- バックエンドが Go で統一されている
- トレース用にメッセージやツール実行ログを全部 DB で持ちたい
- フレームワークにロックインされずに、エージェントの挙動を細かく制御したい
といった事情があると、
「OpenAI Go SDK + 自前の ReAct ループ」
という構成が現実的になります。
この記事では、実際にプロダクションで動かしている ReAct Agent の実装を、なるべく汎用化した形で紹介します。
2. 理論の基礎:ReAct と Go からのツール呼び出し
2-1. ReAct のざっくり復習
ReAct (Reason + Act) の基本はシンプルです。
- LLM が「考える(Thought)」+「行動する(Action)」を出す
- Action に応じて環境側でツールを実行し、その結果(Observation)を LLM に渡す
- それを何回か繰り返したあと、最終的な回答を返す

今回の実装では、テキストの Thought/Action をきれいにパースするのではなく、
- OpenAI の Tool Calling(Function Calling)を Action として扱う
- Tool の実行結果を
role=toolメッセージとして渡し直す
ことで、ReAct の構造を ChatCompletion API 上で実現しています。
2-2. OpenAI Go SDK での Tool Calling / Streaming のポイント
OpenAI Go SDK v3 系(github.com/openai/openai-go/v3)だと、
-
client.Chat.Completions.New… 非ストリーミング -
client.Chat.Completions.NewStreaming… ストリーミング
の 2 つを押さえておけば OK です。
Tool Calling を使うときは、
- リクエストに
Toolsを定義 - レスポンスの
message.ToolCallsに「どのツールをどう呼ぶか」が入る - ストリーミング時は delta に
ToolCallsの断片が乗ってくるので、自前で蓄積して復元
という流れになります。
今回の ReactAgent は、
-
GenerateStream()で OpenAI からStreamEventを受け取る - その中に含まれる text delta と tool_calls delta を解釈し、
「途中経過のテキスト表示」+「ツール実行」+「再プロンプト」を回す
という構造になっています。
3. 実装:Go で ReAct Agent の骨格を組む
全体構成は以下の 4 コンポーネントに分けています。
-
AgentConfig… エージェントの動作パラメータ -
AgentState… ループ中の状態管理(イテレーション数、ツール履歴など) -
ContinueCondition群 … ループ停止条件(MaxIteration, Timeout など) -
ReactAgent… 実際の ReAct ループ本体
3-1. AgentConfig:挙動を外から差し替えられるようにする
まずは「どのモデルで、どれくらい回すか」をまとめた設定構造体です。
type AgentMode string
const (
ModeAgent AgentMode = "agent"
ModeExplore AgentMode = "explore"
ModeProcess AgentMode = "process"
)
type ThinkingLevel string
const (
ThinkingInstant ThinkingLevel = "instant"
ThinkingNormal ThinkingLevel = "normal"
ThinkingDeep ThinkingLevel = "deep"
)
type AgentConfig struct {
Mode AgentMode `json:"mode"`
ThinkingDepth ThinkingLevel `json:"thinking_depth"`
Model string `json:"model"`
Temperature float32 `json:"temperature"`
MaxTokens int `json:"max_tokens"`
TimeoutSeconds int `json:"timeout_seconds"`
MaxIterations int `json:"max_iterations"`
EnabledTools []string `json:"enabled_tools,omitempty"`
SystemPrompt string `json:"system_prompt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func (c *AgentConfig) GetMaxIterations() int {
if c.MaxIterations > 0 {
return c.MaxIterations
}
switch c.ThinkingDepth {
case ThinkingInstant:
return 3
case ThinkingDeep:
return 10
default:
return 5
}
}
モードごとに MaxIterations や EnabledTools を切り替えることで、「探索モード」「タスク実行モード」などを作りやすくしており、また、ReAct の「どれだけ考えさせるか」をThinkingDepth 経由で外から調整できます。
3-2. AgentState:反復状態とツール実行履歴
ReAct のループは「どこで止めるか」が重要なので、そのための状態を AgentState にまとめています。
type AgentState struct {
ThreadID string `json:"thread_id"`
AgentID string `json:"agent_id"`
StartTime time.Time `json:"start_time"`
Iteration int `json:"iteration"`
Messages []ai.Message `json:"messages"`
ToolCallHistory map[string]int `json:"tool_call_history"`
ToolResults []ToolResult `json:"tool_results"`
TotalToolCalls int `json:"total_tool_calls"`
NoProgressCount int `json:"no_progress_count"`
LastObsHash string `json:"last_obs_hash"`
Config *AgentConfig `json:"config"`
}
type ToolResult struct {
Name string `json:"name"`
ArgsHash string `json:"args_hash"`
Args map[string]interface{} `json:"args,omitempty"`
Result interface{} `json:"result,omitempty"`
Summary string `json:"summary"`
LatencyMS int `json:"latency_ms"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
ここでポイントなのは、
- ツール実行ごとに
ToolResultを積んでおく - メッセージとツール結果から「観測ハッシュ」を計算して、進捗をチェックする
という仕掛けです。
func (s *AgentState) ComputeObservationHash() string {
data := ""
if len(s.ToolResults) > 0 {
last := s.ToolResults[len(s.ToolResults)-1]
data += last.Summary
}
if len(s.Messages) > 0 {
lastMsg := s.Messages[len(s.Messages)-1]
if text, ok := lastMsg.Content["text"].(string); ok {
data += text
}
}
hash := sha256.Sum256([]byte(data))
return fmt.Sprintf("%x", hash[:8])
}
func (s *AgentState) UpdateProgress() {
current := s.ComputeObservationHash()
if current == s.LastObsHash {
s.NoProgressCount++
} else {
s.NoProgressCount = 0
s.LastObsHash = current
}
}
これにより、
- 同じ Thought/Observation をぐるぐる回るような無限ループを検知して止める
- 「ツール結果」と「最終発話」の両方をもとに進捗を見る
といったコントロールができます。
3-3. ContinueCondition:ReAct ループの停止条件を組み合わせる
ReAct を実装するときのよくある落とし穴が、
LLM が延々とツールを呼び続けて止まらない
ことです。そこで「ループ継続条件」をインターフェースで切っています。
type ContinueCondition interface {
ShouldContinue(state *AgentState) (bool, string)
Name() string
}
type MaxIterationCondition struct {
maxIterations int
}
func (c *MaxIterationCondition) ShouldContinue(s *AgentState) (bool, string) {
if s.Iteration >= c.maxIterations {
return false, fmt.Sprintf("最大イテレーション数(%d)に達しました", c.maxIterations)
}
return true, ""
}
他にも、
- 進捗がない状態が続いたら止める
NoProgressCondition - ツールの呼びすぎを止める
ToolLimitCondition - 実行時間で切る
TimeoutCondition
を用意して、CompositeCondition で AND 結合しています。
func NewCompositeCondition(conditions ...ContinueCondition) *CompositeCondition {
return &CompositeCondition{conditions: conditions}
}
func (c *CompositeCondition) ShouldContinue(s *AgentState) (bool, string) {
for _, cond := range c.conditions {
if ok, reason := cond.ShouldContinue(s); !ok {
return false, fmt.Sprintf("[%s] %s", cond.Name(), reason)
}
}
return true, ""
}
これで ReAct ループを「安全に回して止める」ための仕組みが揃います。
3-4. ReactAgent.RunStream:ReAct ループ本体
いよいよ本体です。大まかな流れは次の通り。
-
Memory から履歴をロードして、System Prompt を先頭に挿入
-
ユーザーメッセージを履歴に追加し、DB に保存
-
AgentState を初期化して、
agent_startイベントを送る -
以下を繰り返す:
- 継続条件をチェック
- OpenAI にストリーミングリクエストを投げる
- text delta をフロントに流しつつ蓄積
- tool_calls delta を蓄積
- ツール呼び出しがあれば実行して次のイテレーションへ
- ツール呼び出しがなく、テキストだけなら最終回答として終了
重要部分だけ抜粋します。
// RunStream は ReAct ループ本体
func (a *ReactAgent) RunStream(ctx context.Context, input string, w streaming.StreamWriter) error {
// 1. メモリから履歴を取得
messages, err := a.memory.LoadHistory(ctx)
if err != nil {
return fmt.Errorf("failed to load history: %w", err)
}
// System Prompt を動的挿入
if a.config.SystemPrompt != "" {
messages = append([]ai.Message{{
Role: "system",
Content: map[string]interface{}{"text": a.config.SystemPrompt},
}}, messages...)
}
// 2. ユーザーメッセージを追加して保存
userMsg := ai.Message{
Role: "user",
Content: map[string]interface{}{"text": input},
}
messages = append(messages, userMsg)
userInfo, err := a.memory.Save(ctx, userMsg)
if err != nil {
logger.Error(ctx, err, "ユーザーメッセージの保存に失敗", nil)
a.lastUserMessage = nil
} else {
a.lastUserMessage = userInfo
}
// 3. 状態初期化 & agent_start
state := NewAgentState(a.config)
_ = w.Write(ctx, streaming.StreamEvent{
Type: streaming.EventTypeAgentStart,
Data: map[string]interface{}{
"thread_id": state.ThreadID,
"mode": a.config.Mode,
},
})
// 4. ReAct ループ
for {
ok, reason := a.conditions.ShouldContinue(state)
if !ok {
_ = w.Write(ctx, streaming.StreamEvent{
Type: streaming.EventTypeAgentStop,
Data: map[string]interface{}{"reason": reason},
})
break
}
_ = w.Write(ctx, streaming.StreamEvent{
Type: streaming.EventTypeIterationStart,
Data: map[string]interface{}{"iteration": state.Iteration},
})
// モデルが StreamingModel かどうかで分岐
streamingModel, ok := a.model.(models.StreamingModel)
if !ok {
// 非ストリーミング版(省略)
}
eventChan, err := streamingModel.GenerateStream(ctx, messages,
ai.WithModelMaxTokens(a.config.MaxTokens),
ai.WithTemperature(a.config.Temperature),
ai.WithModel(a.config.Model),
)
if err != nil {
return fmt.Errorf("failed to generate stream: %w", err)
}
var (
toolCalls []ai.ToolCall
hasContent bool
collectedTxt string
)
// OpenAI からのストリームを処理
for ev := range eventChan {
if ev.Type == streaming.EventTypeError {
return fmt.Errorf("stream error: %v", ev.Data["error"])
}
if ev.Type == streaming.EventTypeToolStart {
// tool_calls を JSON 化→構造体に戻す
if raw, ok := ev.Data["tool_calls"]; ok {
b, _ := json.Marshal(raw)
_ = json.Unmarshal(b, &toolCalls)
}
}
if ev.Type == streaming.EventTypeTextDelta {
hasContent = true
if delta, ok := ev.Data["delta"].(string); ok {
collectedTxt += delta
}
// テキストはそのままフロントに流す
if err := w.Write(ctx, ev); err != nil {
return fmt.Errorf("failed to write event: %w", err)
}
}
}
// 4-1. ツール呼び出しがある場合 → ツール実行して次ループ
if len(toolCalls) > 0 {
// assistant メッセージ(tool_calls)を保存
assistantMsg := ai.Message{
Role: "assistant",
Content: map[string]interface{}{},
ToolCalls: toolCalls,
}
messages = append(messages, assistantMsg)
if _, err := a.memory.Save(ctx, assistantMsg); err != nil {
logger.Error(ctx, err, "アシスタントメッセージの保存に失敗", nil)
}
for _, tc := range toolCalls {
_ = a.executeToolCall(ctx, state, tc, &messages, w)
}
_ = w.Write(ctx, streaming.StreamEvent{
Type: streaming.EventTypeIterationEnd,
Data: map[string]interface{}{
"iteration": state.Iteration,
"tool_calls": len(toolCalls),
},
})
state.UpdateProgress()
state.Iteration++
continue
}
// 4-2. ツール呼び出しなし + テキストあり → 最終回答
if hasContent {
assistantMsg := ai.Message{
Role: "assistant",
Content: map[string]interface{}{"text": collectedTxt},
}
messages = append(messages, assistantMsg)
info, err := a.memory.Save(ctx, assistantMsg)
if err != nil {
logger.Error(ctx, err, "最終回答メッセージの保存に失敗", nil)
a.lastAssistantMessage = nil
} else {
a.lastAssistantMessage = info
}
_ = w.Write(ctx, streaming.StreamEvent{
Type: streaming.EventTypeIterationEnd,
Data: map[string]interface{}{
"iteration": state.Iteration,
"final_answer": true,
},
})
// agent_end イベント(メッセージID付き)
_ = w.Write(ctx, streaming.NewAgentEndEvent(
state.ThreadID,
state.Iteration,
state.TotalToolCalls,
a.lastUserMessage,
a.lastAssistantMessage,
))
return nil
}
state.Iteration++
}
return nil
}
ここで、
- 「ReAct ループ自体」は
ReactAgentに閉じ込める - メッセージの保存やイベント出力は
MemoryとStreamWriterに委譲する
という分離をすると、テスト・差し替えがかなり楽になります。
3-5. executeToolCall:ツール呼び出しの去り際をデザインする
ツール側の実装も少し見ておきます。
func (a *ReactAgent) executeToolCall(
ctx context.Context,
state *AgentState,
tc ai.ToolCall,
messages *[]ai.Message,
w streaming.StreamWriter,
) error {
start := time.Now()
argsHash := a.computeArgsHash(tc.Function.Name, tc.Function.Arguments)
// 同じ引数の重複呼び出しを抑制
if state.IsDuplicateToolCall(argsHash, 2) {
_ = w.Write(ctx, streaming.NewToolDuplicateEvent(tc.Function.Name, tc.ID))
*messages = append(*messages, ai.Message{
Role: "tool",
Content: map[string]interface{}{
"tool_call_id": tc.ID,
"result": "このツールは既に実行済みです。前回の結果を参照してください。",
},
})
return nil
}
// args をパースしてログに載せやすくする
var args map[string]interface{}
if err := json.Unmarshal([]byte(tc.Function.Arguments), &args); err != nil {
args = map[string]interface{}{"raw": tc.Function.Arguments}
}
_ = w.Write(ctx, streaming.NewToolStartEvent(tc.Function.Name, tc.ID, args))
// 実際のツール実行(外部APIやDB検索など)
result, err := a.registry.Execute(ctx, tc.Function.Name, json.RawMessage(tc.Function.Arguments))
latency := time.Since(start)
toolResult := ToolResult{
Name: tc.Function.Name,
ArgsHash: argsHash,
Args: args,
Result: result,
LatencyMS: int(latency.Milliseconds()),
Success: err == nil,
Timestamp: time.Now(),
}
if err != nil {
toolResult.Error = err.Error()
toolResult.Summary = fmt.Sprintf("エラー: %s", err.Error())
_ = w.Write(ctx, streaming.NewToolErrorEvent(tc.Function.Name, tc.ID, err.Error()))
} else {
toolResult.Summary = a.generateSummary(tc.Function.Name, result)
_ = w.Write(ctx, streaming.NewToolResultEvent(
tc.Function.Name,
tc.ID,
result,
toolResult.Summary,
latency.Milliseconds(),
))
}
// 状態更新
state.AddToolResult(toolResult)
// tool メッセージとして履歴に追加・永続化
toolMsg := ai.Message{
Role: "tool",
Content: map[string]interface{}{
"tool_call_id": tc.ID,
"result": result,
},
}
*messages = append(*messages, toolMsg)
if _, saveErr := a.memory.Save(ctx, toolMsg); saveErr != nil {
logger.Error(ctx, saveErr, "ツールメッセージの保存に失敗", nil)
}
return err
}
ここでやっていることは、
- ツール呼び出しの重複検知(同じ引数で何度も呼ばない)
- ツールの latency, 成否を
ToolResultにまとめてトレース可能にする -
role=toolメッセージとして LLM に返す用の履歴を残す
という、ReAct の「Act → Observation」の部分の実装です。
4. 結果と考察:実際にどう動くのか
4-1. 典型的なイベントフロー
例えば「このプロジェクトのゴールと関連タスクを一覧にして」といった質問を投げると、
イベント的にはざっくり以下のような流れになります。
agent_start-
iteration_start(iteration=0) -
text_delta… LLM がツールを使う前に少し説明し始める -
tool_start(search_goals, args={...}) -
tool_result(search_goals, summary="10件のゴールを検索") -
iteration_end(iteration=0, tool_calls=1) -
iteration_start(iteration=1) -
text_delta… 検索結果を踏まえて回答を生成 -
iteration_end(iteration=1, final_answer=true) -
agent_end(ユーザー/アシスタントメッセージの ID 付き)
フロント側から見ると、
- ユーザー入力を送った瞬間に「pending の user メッセージ」を表示
-
text_deltaを受け取るたびに assistant メッセージを伸ばしていく -
tool_start/tool_resultでサイドバーにツールログを表示する - 最後に
agent_endで確定し、スレッド一覧に反映
という UI が組めます。
4-2. ReAct を自前実装してみて良かった点・難しかった点
良かった点:
- Go から直接 ReAct の挙動をコントロールできるので、「ツール重複の抑制」「NoProgress 検知」「タイムアウト」などを細かく設計できた
- すべてのメッセージ/ツール結果が自前の DB に乗るので、後続の分析(エラー原因分析、UX 改善、プロンプト改善)がやりやすくなる
- OpenAI Go SDK が素直な API なので、streaming / tool_calls の処理に集中できた
難しかった点:
- ストリーミング時の
tool_callsは delta で流れてくるので、正しく復元する処理を書かないといけない - 「どこまで抽象化するか」の線引きが難しく、
- 最初から作り込みすぎると実装コストが膨らむ
- かといってベタ書きすると変更に弱くなる
あたりはトレードオフでした。
5. 振り返りと今後の展望
5-1. 結局何が分かったか
今回の実装を通して整理できたポイントはだいたい次の 3 つです。
- ReAct 自体はシンプルなループだが、
「どこで止めるか」「どうトレースするか」を設計しないと本番運用は厳しい - OpenAI Go SDK だけでも、
- Tool Calling
- Streaming
を組み合わせれば、LangChain 的なエージェントは十分実装できる
- Model / Memory / Tool / Agent / Streaming を interface で切っておくと、
モデル乗り換えやツール追加に強い基盤になる
5-2. 応用できそうな方向
この ReAct Agent を土台にして、次のような拡張が素直に載せられます。
- RAG をツールとして追加(
search_documents,get_documentなど) - 複数エージェントを組み合わせる Planner-Executor パターン
- AgentState をそのままトレースに出して、Observability 基盤と連携
- MCP などプロトコルベースの外部ツールを
Tool実装として統合
いずれも、
「エージェントのコアは
ReactAgentに閉じ込め、周辺を差し替える」
という設計を守っておけば、段階的に導入しやすいはずです。
Discussion