GoでTopicごとにPubsubするWebsocket
はじめに
こんにちは
毎秒定期的に送られてくるデータをリアルタイム表示する開発があり、AppSync
を検討していたのですが、コストがかかるため、Websocketで代用しました。
PubsubをGolangで実装してみたので、簡単にシェアします
環境
Golang
とgorilla/websocket
, echo
を使います。
go version go1.16.5 darwin/amd64
github.com/gorilla/websocket v1.5.0
github.com/labstack/echo
Github
環境構築編
ファイル構成は最終的にこの様にします
.
├── README.md
├── cmd
│ └── main.go
├── go.mod
├── go.sum
├── internal
│ └── websocket
│ ├── client.go
│ ├── handler.go
│ └── room.go
└── web
└── index.html
適当にファイルを作成し、Go環境の準備
mkdir lil-pubsub && cd lil-pubsub
go mod init <module name なんでも良き> # e.g. go mod init github.com/lil-shimon/lil-pubsub
gorilla/websocket
とecho
の導入
go get github.com/gorilla/websocket
go get github.com/labstack/echo
まずは簡単なhttpでアクセスできるかのテストのための最低限の記述
mkdir cmd && touch cmd/main.go
main.go
package main
import (
"net/http"
"github.com/labstack/echo"
)
func healthCheck(c echo.Context) error {
return c.String(http.StatusOK, "OK")
}
func main() {
e := echo.New()
e.GET("/healthcheck", healthCheck)
e.Logger.Fatal(e.Start(":1323"))
}
にアクセスしてOKと表示されているかを確認します。
以上で環境構築は終了です。
Pubsub作成編
ここからがWebsocket本番です。
まずはTopicごとにではなく、全クライアントにメッセージをPublishするWebsocketを構築します
internal/websocket/client.go
を作成し、クライアント情報を保持するClient
の構造体を定義します。
package websocket
import "github.com/gorilla/websocket"
type Client struct {
Ws *websocket.Conn
}
次に全てのクライアント情報を格納するRoom
の構造体を作ります。
internal/websocket/room.go
package websocket
type Rooms struct {
Clients []*Client
}
Roomに接続してきたクライアントを追加するメソッド・取得するメソッドを定義します。
internal/websocket/room.go
func (rooms *Rooms) AddClient(client *Client) {
rooms.Clients = append(rooms.Clients, client)
}
func (room *Room) GetClients() []Client {
var cs []Client
for _, client := range room.Clients {
cs = append(cs, *client)
}
return cs
}
クライアントの追加と取得はできたので、Roomに格納されているClientに対してメッセージをPublishするメソッドを定義します。
func (room *Room) Publish(msg []byte) {
for _, client := range room.Clients {
client.Send(msg) // この後に作る
}
}
internal/websocket/client.go
に一つのクライアントに対してメッセージを送信するメソッドを追加します。
func (client *Client) Send (msg []byte) error {
return client.Ws.WriteMessage(websocket.TextMessage, msg)
}
最後にこれらのメソッドを使うため、Websocket接続からメッセージ送信までを実行するメソッドを作成します。
internal/websocket/handler.go
package websocket
import (
"github.com/gorilla/websocket"
"github.com/labstack/echo"
)
var upgrader = websocket.Upgrader{}
var rooms = Room{}
func ServeWs(c echo.Context) error {
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
c.Logger().Error(err)
}
defer ws.Close()
client := &Client{Ws: ws}
rooms.AddClient(client)
for {
_, msg, err := ws.ReadMessage()
if err != nil {
c.Logger().Error(err)
break
}
rooms.Publish(msg)
}
return nil
}
少し解説します。
まず、最初のここでwebsocketに接続します。
一行目は"message":"websocket: request origin not allowed by Upgrader.CheckOrigin"}
のエラーを回避するために追加してます。
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) // 接続
その後、接続してきたクライアントを宣言し、Roomにそのクライアントを追加します
client := &Client{Ws: ws}
rooms.AddClient(client)
その下はメッセージを受信した時の処理です。
for {
_, msg, err := ws.ReadMessage() // メッセージを受信 messageType, message, error
if err != nil {
c.Logger().Error(err)
break
}
rooms.Publish(msg) // メッセージをクライアントへ送信
}
main.go
にwebsocket用のRouteを追加します。
import (
"net/http"
"github.com/labstack/echo"
"github.com/lil-shimon/lil-pubsub.git/internal/websocket" // 追加
)
func main() {
e := echo.New()
e.GET("/healthcheck", healthCheck)
e.GET("/ws", websocket.ServeWs) // 追加
e.Logger.Fatal(e.Start(":1323"))
}
ws://localhost:1323/wsをテストします。
私はいつもwebsocketのテストでGoogle拡張機能のWebsocket Client Test
を使っているので、今回もそちらを使ってテストします。
適当にメッセージを送信したら送信、受信できていることを確認できますね!
全クライアントでのPubはできました
TopicごとのPubsub編
Topicごとにメッセージを受け取れるように改善していきます
この様な形でパスパラメータにTopicを入れ、接続するWebsocketを想定しています
ws/{topic}
まず、main.go
のrouteを変更
e.GET("/ws/:topic", websocket.ServeWs)
Topicを追加するため、クライアントとTopicを管理する構造体を新しく定義します。
type Subscription struct {
Topic string
Client *Client
}
Roomにそのサブスクを格納したいので、クライアントではなく、サブスクに修正
type Room struct {
Subscription []*Subscription
}
クライアントの追加や取得をしていたメソッドをサブスクへ変更
func (room *Room) AddSubscription(subscription *Subscription) {
room.Subscriptions = append(room.Subscriptions, subscription)
}
取得時はTopicを受け取り、一致するTopicを持っているサブスクを返す様にしています
func (room *Room) GetSubscription(topic string) []Subscription {
var subs []Subscription
for _, sub := range room.Subscriptions {
if sub.Topic == topic {
subs = append(subs, *sub)
}
}
return subs
}
また、Publishメソッドも改良
func (room *Room) Publish(msg []byte, topic string) {
subs := room.GetSubscription(topic)
for _, sub := range subs {
err := sub.Client.Send(msg)
if err != nil {
return
}
}
}
最後にWebsocket Handlerを修正します。
func ServeWs(c echo.Context) error {
topic := c.Param("topic") // parameterのtopicを取得
upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)
if err != nil {
c.Logger().Error(err)
}
defer ws.Close()
client := &Client{Ws: ws}
rooms.AddSubscription(&Subscription{Topic: topic, Client: client}) // サブスクを登録する様に修正
for {
_, msg, err := ws.ReadMessage()
if err != nil {
c.Logger().Error(err)
break
}
rooms.Publish(msg, topic) // PublishにTopicを渡す
}
今回はトピックごとにメッセージ配信が切り分けられているかをテストしたいため、クライアントテストを追加
mkdir web && touch web/index.html
web/index.html
<!doctype html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>WebSocket</title>
</head>
<body>
<p>トピックを入力</p>
<label for="topic"></label><input type="text" id="topic"/>
<button type="button" id="button" onclick="sendMsg()">テスト</button>
<p id="output"></p>
<script>
function sendMsg() {
const topic = document.getElementById('topic').value;
const loc = window.location;
let uri = 'ws:';
if (loc.protocol === 'https:') {
uri = 'wss:';
}
uri += '//' + loc.host;
uri += `/ws/${topic}`;
console.log("uri: " + uri);
const ws = new WebSocket(uri);
ws.onopen = function () {
console.log('Connected')
}
ws.onmessage = function (evt) {
const out = document.getElementById('output');
out.innerHTML += evt.data + '<br>';
}
const msg = {
topic: topic,
message: "topic is test"
}
setInterval(function () {
ws.send(JSON.stringify(nvData));
}, 1000);
}
</script>
</body>
</html>
clientテストのindex.html用のRouteを追加
func main() {
e := echo.New()
e.GET("/healthcheck", healthCheck)
e.GET("/ws/:topic", websocket.ServeWs)
e.Static("/ws-client", "web") // 追加
e.Logger.Fatal(e.Start(":1323"))
}
go run server/main.go
でサーバーを立ち上げ、
にアクセス
Topicを入力し、送信
同じトピックでサブスクしているので、こちらでも受け取れています。
topicを変えると
こっちは違うトピックなので受け取りません
最後に
簡単にですが、GolangでPubsubを実装してみました。本来ならmessageTypeによってPublishやUnsubscriptionなどの他処理も入っていますが、今回は省略させていただきます。
GolangでWebsocketを開発するイメージでも掴めていただけたら幸いです!
参考
Discussion