📶

GolangによるProduction ReadyなWebSocketサーバ実装の課題と解決

に公開

はじめに

PortalKeyのCTOの植森です。
今回はPortalKeyの通信ので利用しているGolangでのWebSocketサーバーの実装について解説していきたいと思います。

背景

PortalKeyでは、ユーザー間のリアルタイムなコミュニケーションを実現するために、WebSocketを使ったリアルタイム通信を実装しています。
PortalKeyにおけるリアルタイム通信では、ユーザーのオンライン状態やボイスチャットの状態など、様々な状態の変更をリアルタイムに通知する重要な役割を担っています。

このような通信基盤を実装する上で、以下のような課題がありました:

  1. リアルタイム通信の接続管理やエラーハンドリングを適切に行いたい
  2. 非同期なメッセージングを安全に処理したい
  3. アプリケーションロジックとリアルタイム通信処理を分離したい

これらの課題を解決するための技術としてWebSocketを採用しています。

GolangにおけるWebSocketサーバーの選定

Golangでは、WebSocketサーバーの実装には gorilla/websocket をはじめいくつかのライブラリが存在します。

このうち広く使われているのは低レベルAPIのライブラリであり、Web上で検索するときに出てくる情報も多くが低レベルAPIのライブラリに関するものです。

しかし、これらのライブラリを利用して実装をするには以下のような課題があり、productionで利用するにはそれなりの実装量とWebSocketの理解が必要です。

  1. 低レベルなAPIへの理解が要求される

    • WebSocketの接続やメッセージの送受信が低レベルな操作として提供される
    • Socket.IOやws(Node.js)のような高レベルな抽象化が存在しない
    • 切断時のクローズコードの処理
  2. イベント駆動型APIが存在せず、メッセージの読み書きやイベントを手動で制御する必要がある

    • 他言語のライブラリでよくあるような on('message') のようなイベントリスナーベースのAPIがなく、メッセージの読み書きを手動で制御する必要がある
  3. その他、WebSocketの実装に要求されるものの考慮

    • Graceful Shutdownの実装
    • 接続中コネクションの管理

これらを考慮して実装するのは難易度が高く、またインターネット上にもあまり情報として存在しません。
そのため、今回は低レベルAPIのライブラリを採用した実装について解説していきます。

実装サンプル

例えば、Node.jsのws(WebSocket)ライブラリでは以下のように簡単に実装できます:

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', function connection(ws) {
  ws.on('message', function incoming(message) {
    console.log('received: %s', message);
  });

  ws.send('something');
});

今回採用した gorilla/websocket を利用する場合は以下のようなコードから始まります:

var upgrader = websocket.Upgrader{}

func handler(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    if err != nil {
        return
    }
    defer conn.Close()

    for {
        mt, message, err := conn.ReadMessage()
        if err != nil {
            break
        }
        
        err = conn.WriteMessage(mt, message)
        if err != nil {
            break
        }
    }
}

このように、低レベルAPIのライブラリを利用する場合は、メッセージの読み書きを手動で制御する必要があり、また接続管理やエラーハンドリングなども自前で実装する必要があります。

そのため、今回は以下のような機能を持つ高レベルなAPIを実装していきます:

  1. イベントハンドラベースのAPI

    • OnOpenOnMessageOnCloseなどのイベントハンドラを提供
    • メッセージの読み書きを抽象化し、アプリケーションロジックに集中できる
  2. 安全な非同期メッセージング

    • チャネルを利用した非同期なメッセージの送受信
    • 並行処理の排他制御による安全な実装
    • Ping/Pongによる接続維持
  3. 適切な接続管理とエラーハンドリング

    • コネクションのライフサイクル管理
    • クローズコードに応じた適切な終了処理
    • Graceful Shutdownの実装

これらの機能を実装することで、アプリケーションロジックとWebSocket処理を分離し、メンテナンス性の高いコードを実現することができます。

Server構造体

コネクションやイベント管理の実装

まず、サーバとして高レベルなAPIを提供するために以下のような構造体を用意します:

type Server struct {
    connections map[core.ConnectionID]*Conn
    mu          sync.Mutex
    onOpen      func(c *Conn) *status.CloseError
    onMessage   func(c *Conn, data []byte) *status.CloseError
    onClose     func(c *Conn)
    isShutdown  atomic.Bool
}

func NewServer() *Server {
    return &Server{
        connections: make(map[core.ConnectionID]*Conn),
    }
}

func (s *Server) OnOpenHandler(fn func(c *Conn) *status.CloseError) {
    s.onOpen = fn
}

func (s *Server) OnMessageHandler(fn func(c *Conn, data []byte) *status.CloseError) {
    s.onMessage = fn
}

func (s *Server) OnCloseHandler(fn func(c *Conn)) {
    s.onClose = fn
}

この構造体は以下のような機能を提供します。

  • コネクションの追加・削除を安全に行うための排他制御
  • 各コネクションのライフサイクル管理
  • イベントハンドラの登録と実行
  • シャットダウン時の安全な終了処理

また、現状のアプリケーションでは必要がないため実装していませんが、コネクションプールを使った一斉送信や個別送信の機能を実装することも可能です。

http.Handlerの実装

次に、WebSocketサーバーをhttp.Handlerインターフェースに対応するために、ServeHTTPメソッドを実装します。

func (s *Server) ServeHTTP(res http.ResponseWriter, req *http.Request) {
    if s.isShutdown.Load() {
        http.Error(res, "Server is shutting down", http.StatusServiceUnavailable)
        return
    }

    // WebSocketへのアップグレード
    ws, err := upgrader.Upgrade(res, req, nil)
    if err != nil {
        return
    }

    // 新しい接続のためのユニークIDを生成
    connID := core.NewConnectionID()

    // 接続の管理
    s.mu.Lock()
    if _, ok := s.connections[connID]; ok {
        s.mu.Unlock()
        ws.Close()
        return
    }

    // 新しい接続を作成
    conn := newConn(req.Context(), connID, ws)
    s.connections[connID] = conn
    s.mu.Unlock()

    // onOpenハンドラの呼び出し
    if s.onOpen != nil {
        err := s.onOpen(conn)
        if err != nil {
            conn.Disconnect(err)
            return
        }
    }

    // メッセージの受信ループ
    for {
        message, ok := <-conn.recv()
        if !ok {
            break
        }

        err := s.handleMessage(conn, message)
        if err != nil {
            conn.Disconnect(err)
            break
        }
    }

    // クリーンアップ
    if s.onClose != nil {
        s.onClose(conn)
    }

    s.mu.Lock()
    delete(s.connections, connID)
    s.mu.Unlock()
}

Conn構造体

コネクションの抽象化

個々のWebSocket接続を管理するためのConn構造体を実装しています:

type Conn struct {
    ctx          context.Context
    connectionID core.ConnectionID
    conn         *websocket.Conn
    send         chan []byte
    read         chan []byte
    closed       atomic.Bool
    onClose      []func(*status.CloseError)
    mu           sync.Mutex
}

func newConn(ctx context.Context, id core.ConnectionID, conn *websocket.Conn) *Conn {
    // メッセージサイズの設定
    conn.SetReadLimit(maxMessageSize)
    // タイムアウト時間の設定
    conn.SetReadDeadline(time.Now().Add(pongWait))
    // Pongメッセージのハンドラを設定
    conn.SetPongHandler(func(string) error {
        conn.SetReadDeadline(time.Now().Add(pongWait))
        return nil
    })

    c := &Conn{
        ctx:          ctx,
        connectionID: id,
        conn:         conn,
        send:         make(chan []byte, 256),
        read:         make(chan []byte, 256),
        onClose:      make([]func(*status.CloseError), 0),
    }
    c.run()
    return c
}

このConn構造体は以下のような機能を提供します:

  • コネクションのライフサイクル管理

    • 接続開始時の初期化処理
    • 接続終了時のクリーンアップ処理
    • 接続状態の監視
  • メッセージの送受信制御

    • 非同期なメッセージの送信キュー
    • メッセージの読み込みバッファ
    • 並行処理の排他制御
  • エラーハンドリング

    • クローズコードに応じた適切な終了処理
    • エラー発生時のコールバック実行
    • コネクションの安全な終了
  • WebSocketプロトコルの制御

    • Ping/Pongによる接続維持
    • メッセージサイズの制限
    • タイムアウト制御

メッセージの送受信処理

Conn構造体はメッセージの送受信処理の役割を担っており、具体的には以下のような実装を行っています。

// メッセージの送信
func (c *Conn) Send(data []byte) {
    if c.closed.Load() {
        return
    }
    c.send <- data
}

// 読み書きの処理を開始
func (c *Conn) run() {
    // 読み込みゴルーチン
    go func() {
        var closeError *status.CloseError
        defer func() {
            close(c.read)
            if closeError != nil {
                c.close(closeError)
            }
        }()

        for {
            mt, message, err := c.conn.ReadMessage()
            if err != nil {
                // クライアントが切断した場合もエラーが返却される
                if e, ok := err.(*websocket.CloseError); ok {
                    closeError = status.ClientError(e, codes.WebSocketCloseCode(e.Code), e.Text)
                    return
                }
                closeError = status.Unknown(err, "Failed to read message")
                return
            }

            // 今回の実装で扱うのはバイナリメッセージのみ。テキストメッセージが必要な場合、MessageTypeを見て分岐する
            if mt == websocket.BinaryMessage {
                c.read <- message
            }
        }
    }()

    // 書き込みゴルーチン
    go func() {
        var closeError *status.CloseError
        pingTicker := time.NewTicker(pingPeriod)
        defer func() {
            pingTicker.Stop()
            if closeError != nil {
                c.close(closeError)
            }
        }()

        for {
            select {
            // sendチャンネルにメッセージがあればクライアントにメッセージを送信する
            case message, ok := <-c.send:
                if !ok {
                    return
                }
                // メッセージの送信前にWriteDeadlineを設定し、タイムアウト時にエラーを返すようにする
                c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
                if err := c.conn.WriteMessage(websocket.BinaryMessage, message); err != nil {
                    closeError = status.Unknown(err, "Failed to write message")
                    return
                }
            // 定期的にPingメッセージを送信する必要があるため、Tickerを使用して定期的にメッセージを送信する
            case <-pingTicker.C:
                c.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
                if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
                    closeError = status.Unknown(err, "Failed to write ping message")
                    return
                }
            }
        }
    }()
}

接続の終了処理

WebSocket接続を終了する際は、適切なクローズコードを使用して接続を終了します:

func (c *Conn) close(err *status.CloseError) {
    c.mu.Lock()
    defer c.mu.Unlock()

    if c.closed.Load() {
        return
    }

    c.closed.Store(true)

    // クローズハンドラの実行
    for _, callback := range c.onClose {
        callback(err)
    }

    // クローズメッセージの送信
    if err.WriteControl() {
        // Close Frameの送信
        c.conn.WriteControl(
            websocket.CloseMessage,
            // Reasonは125byteまでという制限があるため、必要に応じて制御すること
            websocket.FormatCloseMessage(int(err.Code()), err.Reason()),
            time.Now().Add(closeTimeout),
        )
    }

    close(c.send)
    c.conn.Close()
}

終了時のポイントとして、低レベルなライブラリを利用する場合はClose Frameを送信する必要があります。
WebSocketにはControl FrameというWebSocketの制御を行うためのメッセージがあり、Close FrameはWebSocketの接続を終了するためのメッセージです。
Close Frameを送信しない場合は異常終了として扱われるため、適切なClose Codeを送信しましょう。

WebSocketのクローズコードの仕様については以下のドキュメントが参考になります。
https://developer.mozilla.org/ja/docs/Web/API/CloseEvent/code
https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1

クローズコードのハンドリングのポイントは以下の通りです。

  • サーバからだけではなく、クライアントが切断した場合もエラーが返却されるため必要に応じてハンドリングを行う
  • クローズコードの範囲には以下の種類があり、4000番台を利用することで任意のエラーコードをアプリケーションレベルで取り扱うことが出来る
    • 1000-1015: WebSocketプロトコルで定義済みのもの
    • 1016-2999: WebSocketプロトコルの今後の改定による定義、および拡張仕様による定義用で確保されているもの
    • 3000-3999: IANAに登録されているもの
    • 4000-4999: アプリケーション間の事前の合意により利用できるもの

また、1000番台のもののうち、以下のクローズコードについては覚えておくといいでしょう。

  • CloseNormalClosure (1000): 正常な切断
  • CloseGoingAway (1001): サーバーが停止する、ページを離れるなどを表す。ブラウザのタブやウィンドウを閉じるなどの操作によって発生する
  • CloseAbnormalClosure (1006): 異常な切断(送信不可)。通信経路でエラーが起きた場合などに発生する。また、Close Frameを送信せずに切断した場合もこのコードが設定される

Graceful Shutdownの実装

サーバを本番で運用するためにGraceful Shutdownの実装が必要だったため、以下のような実装を行いました。
ポイントとしてはクライアントに対しGoing Awayを送信して正常な切断を通知することです。

このクローズコードはクライアント側で受け取れるため、サーバが再起動した際にクライアント側で再接続することが可能です。

func (s *Server) Shutdown(ctx context.Context) error {
    // シャットダウンフラグを立て、新規接続の受け付けを停止
    s.isShutdown.Store(true)

    s.mu.Lock()
    connections := make([]*Conn, 0, len(s.connections))
    for _, conn := range s.connections {
        connections = append(connections, conn)
    }
    s.mu.Unlock()

    // 既存の接続を順次切断
    for _, conn := range connections {
        // Going Awayを送信して正常な切断を通知
        conn.Disconnect(status.New(codes.CloseGoingAway, "Server is shutting down"))
    }

    // すべての接続が終了するまで待機
    ticker := time.NewTicker(10 * time.Millisecond)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-ticker.C:
            s.mu.Lock()
            count := len(s.connections)
            s.mu.Unlock()
            if count == 0 {
                return nil
            }
        }
    }
}

実際のアプリケーションでは、以下のように利用します。

func main() {
    // WebSocketサーバーの初期化
    ws := NewServer()
    
    // HTTPサーバーの設定
    server := &http.Server{
        Addr:    ":8080",
        Handler: ws,
    }

    // シグナルの受信設定
    quit := make(chan os.Signal, 1)
    signal.Notify(quit, syscall.SIGTERM, syscall.SIGINT)

    // サーバーの起動
    go func() {
        if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
            log.Fatalf("Failed to start server: %v", err)
        }
    }()

    // シグナルの待機
    <-quit
    log.Println("Shutting down server...")

    // Graceful Shutdownの実行
    ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
    defer cancel()

    // まずWebSocketサーバーのシャットダウン
    if err := ws.Shutdown(ctx); err != nil {
        log.Printf("WebSocket server forced to shutdown: %v", err)
    }

    // 次にHTTPサーバーのシャットダウン
    if err := server.Shutdown(ctx); err != nil {
        log.Printf("HTTP server forced to shutdown: %v", err)
    }

    log.Println("Server gracefully stopped")
}

まとめ

今回は、GolangでのWebSocketサーバーの実装について、特に低レベルなライブラリを使用する際の課題と解決方法について解説しました。

実装のポイント

  1. イベントハンドラベースのAPIによる実装の簡素化

    • Socket.IOライクなイベントハンドラの提供
    • OnOpenOnMessageOnCloseによる直感的なAPI
    • アプリケーションロジックとWebSocket処理の明確な分離
  2. 非同期メッセージングの安全な実装

    • チャネルを活用した非同期処理
    • 読み込み/書き込みの分離による並行処理の制御
    • Ping/Pongによる接続維持の自動化
  3. 接続管理とエラーハンドリング

    • コネクションプールによる接続の一元管理
    • WebSocketプロトコルに準拠したクローズコードの実装
    • Graceful Shutdownへの対応

得られた知見

低レベルなWebSocketライブラリを使用する際は、以下の点に注意を払う必要があります:

  1. プロトコルレベルの理解

    • WebSocketのControl Frameの適切な処理
    • クローズコードの仕様への準拠
    • メッセージタイプ(バイナリ/テキスト)の区別
  2. 並行処理の制御

    • Mutexによる排他制御
    • チャネルを使用した安全なメッセージング
    • デッドロックやレースコンディションの防止
  3. エラー処理とリソース管理

    • 適切なタイムアウト設定
    • リソースリークの防止
    • クリーンな終了処理の実装

今後の展望

この実装は以下のような拡張性を持っています:

  1. スケーラビリティ

    • コネクションプールを利用した効率的なブロードキャスト
    • 複数サーバー間での状態同期
    • 負荷分散への対応
  2. 機能拡張

    • メッセージのバッファリングやリトライ
    • 認証・認可の統合
    • メトリクスの収集
  3. 運用性

    • 詳細なログ出力
    • モニタリングの容易さ
    • トラブルシューティングのしやすさ

次回は、この通信基盤を使ってクライアントとどうやって通信しているのかについて、より詳しく解説していきたいと思います。
特に、PortalKeyではProtobufを使ったメッセージのシリアライズ/デシリアライズ、およびIDLを使ったメッセージの定義を行っているため、その辺りの実装についても解説していきたいと思います。
また、実運用における課題とその解決方法についても触れていく予定です。

おまけ

今回、このブログ記事はCursorで作成しました。
今まで書いたブログ記事とプロジェクトのコードを読ませて、そのコードをもとにブログ記事を作成させてみましたが、7割ぐらいの文章を書いてもらっています。
特にサンプルコードについてはプロジェクトのコードから必要なものをかなりの精度で抽出してくれるので、とても便利でした。

実装作業でコーディングエージェントを利用しておらずどこかの機会で使いたいと考えていたので、こういったユースケースでもいいので少しずつ触ってみようと思います。

PortalKey Tech Blog

Discussion