🌟

Ginを用いたSSE (Server-Sent Events) の実装方法

に公開

はじめに

この記事は Atrae Engineers Advent Calendar 2024 の11日目の記事です。

こんにちは!株式会社アトラエでエンジニアをしております、takenokoyaです。
普段は求人メディアGreenのバックエンドを担当していますが、最近ではLLMを活用した機能開発がトピックに挙がる機会が増えてきました。

このような機会の変化に対応するため、GreenではメインのGoバックエンドサーバーの他に、機械学習やLLMとの接続の役割をもつPython(FastAPI)サーバーを用意しています。
当初から、FastAPIサーバーはLLMなどの外部APIコールや周辺ライブラリ活用の役割に限定し、ドメインに関する知識や処理は極力受け持たないよう薄く作ってきました。

LLMへのリクエストとレスポンス処理のイメージです。

LLMを活用する際に、しばしばLLMのストリーミングモードへの対応が必要になるケースがあるかと思いますが、上図のような多段の構成になる場合、サーバー間でリアルタイムなデータストリームの中継を行う方法が必要になります。

このようなリアルタイム通信を支える技術には以下のような選択肢があります。

  • WebSocket : 双方向通信のためのプロトコル
  • HTTPストリーミング : HTTP/1.1標準のチャンク転送
    • Server-Sent Events (SSE) : HTTPストリーミングを利用した単方向イベントストリーミングの仕様

ユースケースとして双方向通信が必要な場合はWebSocketを用いる必要がありますが、今回のように通信の流れがサーバーからクライアントへの単方向で済む場合、HTTPストリーミングを用いることで比較的簡単に実装を行うことができます。特にブラウザ(フロントエンド)との通信ではSSEを利用することで、フロントエンド側はEventSource APIによる簡単な実装が可能です。
(実際にChatGPTやClaudeといったAIチャットサービスもブラウザとの通信にSSEを活用しています)

サーバー間でのHTTPストリーミングとブラウザとのSSE通信を用いたリクエスト処理の全体像のイメージとしては以下のようになります。

  • フロントエンド - Goバックエンド間:SSE
    • ブラウザ標準のEventSource APIを利用した単方向イベントストリーミング
    • Content-Type: text/event-streamヘッダーを使用
    • イベントベースのデータ転送形式
  • サーバー間通信:HTTPストリーミング
    • HTTP/1.1標準のチャンク転送
    • Transfer-Encoding: chunkedヘッダーを使用
    • バイトストリームとしてのデータ転送

このように、サーバー間ではHTTPのチャンク転送でストリーミングを行い、フロントエンドへの送信時にはSSEフォーマットに変換して送信します。
最終的にLLMからのレスポンスストリームが完了した後に、必要なビジネスロジックを適用する処理の流れを実装できます。

なお、HTTPストリーミング(SSEを含む)はHTTPプロトコル上に構築された仕様であり、通常のHTTPロードバランシングが利用可能です。そのため、WebSocketで必要となるような特別なインフラ設定(Sticky Sessionなど)が不要である点も利点の一つです。

Ginでのストリーミング実装

サーバー間のHTTPストリーミングとブラウザへのSSE配信を実装する方法について説明します。

FastAPIでのHTTPストリーミングの実装にはStreamingResponseが利用できます。これについては公式をはじめ多くの記事で実装方法が紹介されているため、本記事では説明を省略します。

また、Go標準パッケージを用いたSSEの実装についてはこちらの記事が参考になります。

本記事では、GoのWebフレームワークGinを用いて、特にブラウザとの通信におけるSSEの実装方法に焦点を当てて説明します。

GinのContext構造体がもつ2つのメソッドについて

まずは、Ginを用いてSSEを実装する際に用いる2つのメソッドについてみていきます。

SSEventメソッド

// SSEvent writes a Server-Sent Event into the body stream.
func (c *Context) SSEvent(name string, message any) {
	c.Render(-1, sse.Event{
		Event: name,
		Data:  message,
	})
}
  • name: 第一引数のnameにはイベントの名前を受け取ります
  • message: 第二引数のmessageには実際に送信するデータを受け取ります。ここで注目すべきはmessageのデータ型がanyであることです。つまりmessageには単純なstring型だけでなく、任意の型や構造体を渡すことも可能です。
  • メソッド内部ではRenderメソッドを呼び出し、sse.Event構造体を使ってイベントをフォーマットします。

Streamメソッド

// Stream sends a streaming response and returns a boolean
// indicates "Is client disconnected in middle of stream"
func (c *Context) Stream(step func(w io.Writer) bool) bool {
	w := c.Writer
	clientGone := w.CloseNotify()
	for {
		select {
		case <-clientGone:
			return true
		default:
			keepOpen := step(w)
			w.Flush()
			if !keepOpen {
				return false
			}
		}
	}
}
  • step: 引数のstepにはstep関数を受け取ります。step関数はio.Writerを渡してboolを返す関数である必要があります。
  • メソッド内部で行っているclientGone := w.CloseNotify()では、クライアントが接続を切断した時(ex. ブラウザを閉じる、ページを離れるなど)に通知を受け取るためのチャネルを取得しています。
  • メインのループ処理では以下の処理が行われます。
    • 切断検知: trueを返してストリーミングを異常終了
    • データ送信: step関数を実行し、関数の戻り値に応じて継続/終了(送信するデータがなくなるとfalseを返してストリーミングを正常終了)

実装例

ハンドラーの実装

先ほど紹介した2つのメソッドを用いた実装例を紹介します。

type ChatRequest struct {
    Message string `json:"message"`
}

type ChatResponse struct {
    Message string `json:"message"`
}

func ChatHandler(c *gin.Context) {
    var req ChatRequest
    if err := c.Bind(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
        return
    }

    messageChan := make(chan string)

    go processMessage(req.Message, messageChan)

    c.Stream(func(w io.Writer) bool {
        if msg, ok := <-messageChan; ok {
            c.SSEvent("chat", ChatResponse{
                Message: msg,
            })
            return true
        }
        return false
    })
}

メッセージ処理関数の実装

func processMessage(message string, ch chan<- string) {
    defer close(ch)
    
    // 例: 外部APIをコールする関数の実装は省略
    responses := callExternalAPI(message)
    // 外部APIからのレスポンスをチャネルを介して呼び出し元に逐次的に送信
    for _, response := range responses {
        ch <- response
    }
}

実装のポイント

// チャネルの初期化
messageChan := make(chan string)

// goroutineを使った非同期処理の開始
go processMessage(req.Message, messageChan)

// SSEストリーミングの開始
c.Stream(func(w io.Writer) bool {
    if msg, ok := <-messageChan; ok {
        // イベント名"chat"でメッセージをストリーミング
        c.SSEvent("chat", ChatResponse{
            Message: msg,
        })
        return true
    }
    return false
})
  • goroutineをつかって、processMessage関数内でチャネルmessageChanが更新されるたびに、クライアントにストリーミングレスポンス(SSEvent)を返しています。
  • Streamメソッドに渡しているstep関数は、内部でSSEventメソッドを実行し、レスポンスをSSE形式にフォーマットします。また、step関数の戻り値のboolで継続/終了を制御しています。
  • SSE形式にフォーマットされたレスポンスは以下のようになります。
event: chat
data: {"message":"こ"}

event: chat
data: {"message":"ん"}

event: chat
data: {"message":"に"}

event: chat
data: {"message":"ち"}

event: chat
data: {"message":"は"}

おわりに

今回はLLMストリーミングを例に、GinによるSSE実装の解説を行いました。
LLMを活用した機能開発に関して、また別記事でご紹介できればなと思います。

最後まで読んでいただきありがとうございました。
引き続き Atrae Engineers Advent Calendar 2024 をお楽しみください。

Discussion