✏️
WebSocket通信によるブロードキャスト
WebSocket通信におけるチャネルとゴルーチン
今回はチャネルはmessageQueue,ゴルーチンとして動作させるのはstartBroadcastWorkerとなります。
var messageQueue = make(chan string, 100)
func startBroadcastWorker() {
for msg := range messageQueue {
clients.Lock()
for conn := range clients.conns {
err := websocket.Message.Send(conn, msg)
if err != nil {
log.Println("Error sending message:", err)
conn.Close()
delete(clients.conns, conn)
}
}
clients.Unlock()
}
}
チャネル(channel)でメッセージを管理する
チャネルの役割
チャネルは Go におけるデータの送受信のための仕組みです。
今回は、messageQueue を通じてメッセージを送信し、startBroadcastWorker 内で受信しています。
メッセージの中継点としての役割を担っています。
バッファ付きチャネル: make(chan string, 100) によって、最大100件のメッセージをキューに保持できるようになっています。
送信されたメッセージは非同期にキューに追加され、別の処理(ゴルーチン)がそれを処理する仕組みです。
// websocket通信によるメッセージ受信、
// チャネル(キュー)へのメッセージ追加
for {
var msg string
// クライアントからのメッセージを受信
err := websocket.Message.Receive(ws, &msg)
if err != nil {
log.Println("Error receiving message:", err)
break
}
// メッセージをキューに追加
messageQueue <- fmt.Sprintf("Client: %s", msg)
}
ゴルーチン(goroutine)でメッセージをブロードキャストする
main関数に
go startBroadcastWorker()
のように実行することで別スレッドでゴルーチンとして動作させています。
ゴルーチンの役割
ゴルーチンは Go の軽量スレッドです。
なので、非同期に並列処理を実行できます。
今回は、startBroadcastWorker をゴルーチンとして実行し、messageQueue からメッセージを受け取って、全クライアント (clients.conns) にブロードキャストしています。
非同期処理の利点は、やはりメッセージが受信されるたびにリアルタイムでクライアントに送信できることです。
また、ゴルーチンを使用することで、メインスレッドがブロックせずに動作します。
他のメインの動作が止まってしまっては困りますからね。
実装
WebSocketによるメッセージ送受信の実装は、pentechさんの、Go による WebSocket 通信を参考にさせていただきました。
Githubにあげていますが、一応こちらにも貼っておきます。
サーバー側
main.go
package main
import (
"fmt"
"log"
"sync"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
"golang.org/x/net/websocket"
)
// クライアントの接続を管理
var clients = struct {
sync.Mutex
conns map[*websocket.Conn]bool
}{conns: make(map[*websocket.Conn]bool)}
// メッセージキュー(チャネル)
var messageQueue = make(chan string, 100)
// メッセージをブロードキャストするゴルーチン
// ゴルーチン:軽量スレッド
func startBroadcastWorker() {
for msg := range messageQueue {
clients.Lock()
for conn := range clients.conns {
err := websocket.Message.Send(conn, msg)
if err != nil {
log.Println("Error sending message:", err)
conn.Close()
delete(clients.conns, conn)
}
}
clients.Unlock()
}
}
func handleWebSocket(c echo.Context) error {
websocket.Handler(func(ws *websocket.Conn) {
// クライアントを登録
clients.Lock()
clients.conns[ws] = true
clients.Unlock()
defer func() {
// クライアントを削除
clients.Lock()
delete(clients.conns, ws)
clients.Unlock()
ws.Close()
}()
for {
var msg string
// クライアントからのメッセージを受信
err := websocket.Message.Receive(ws, &msg)
if err != nil {
log.Println("Error receiving message:", err)
break
}
// メッセージをキューに追加
messageQueue <- fmt.Sprintf("Client: %s", msg)
}
}).ServeHTTP(c.Response(), c.Request())
return nil
}
func main() {
e := echo.New()
e.Use(middleware.Logger())
e.Static("/", "public")
e.GET("/ws", handleWebSocket)
// ブロードキャスト用のゴルーチンを起動
go startBroadcastWorker()
e.Logger.Fatal(e.Start(":8080"))
}
クライアント側
index.html
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>WebSocket</title>
<script src="main.js"></script>
</head>
<body>
<input type="text" id="input"></p>
<p><input type="submit" class="btn" value="送信"></p>
<p id="output"></p>
</body>
</html>
main.js
document.addEventListener('DOMContentLoaded', () => {
let loc = window.location;
let uri = (loc.protocol === 'https:' ? 'wss:' : 'ws:') + '//' + loc.host + '/ws';
const ws = new WebSocket(uri);
ws.onopen = function () {
console.log('Connected to WebSocket server');
};
ws.onmessage = function (evt) {
let out = document.getElementById('output');
out.innerHTML += evt.data + '<br>';
};
ws.onclose = function () {
console.log('Connection closed. Attempting to reconnect...');
setTimeout(() => {
location.reload(); // ページを再読込
}, 1000);
};
ws.onerror = function (err) {
console.error('WebSocket error:', err);
};
const btn = document.querySelector('.btn');
btn.addEventListener('click', () => {
const input = document.getElementById('input').value;
if (ws.readyState === WebSocket.OPEN) {
ws.send(input);
} else {
console.warn('WebSocket is not open. Message not sent.');
}
});
});
動作イメージ
送信したメッセージが他のクライアントにも反映されました!
Discussion