🎃

GoでTopicごとにPubsubするWebsocket

2022/07/29に公開

はじめに

こんにちは

毎秒定期的に送られてくるデータをリアルタイム表示する開発があり、AppSyncを検討していたのですが、コストがかかるため、Websocketで代用しました。

PubsubをGolangで実装してみたので、簡単にシェアします

環境

Golanggorilla/websocket, echoを使います。

go version go1.16.5 darwin/amd64
github.com/gorilla/websocket v1.5.0
github.com/labstack/echo

Github

https://github.com/lil-shimon/lil-pubsub

環境構築編

ファイル構成は最終的にこの様にします

.
├── README.md
├── cmd
│   └── main.go
├── go.mod
├── go.sum
├── internal
│   └── websocket
│       ├── client.go
│       ├── handler.go
│       └── room.go
└── web
    └── index.html

適当にファイルを作成し、Go環境の準備

mkdir lil-pubsub && cd lil-pubsub
go mod init <module name なんでも良き> # e.g. go mod init github.com/lil-shimon/lil-pubsub

gorilla/websocketechoの導入

go get github.com/gorilla/websocket
go get github.com/labstack/echo

まずは簡単なhttpでアクセスできるかのテストのための最低限の記述

mkdir cmd && touch cmd/main.go

main.go

package main

import (
	"net/http"

	"github.com/labstack/echo"
)

func healthCheck(c echo.Context) error {
	return c.String(http.StatusOK, "OK")
}

func main() {
	e := echo.New()

	e.GET("/healthcheck", healthCheck)

	e.Logger.Fatal(e.Start(":1323"))
}

http://localhost:1323/healthcheck
にアクセスしてOKと表示されているかを確認します。

以上で環境構築は終了です。

Pubsub作成編

ここからがWebsocket本番です。
まずはTopicごとにではなく、全クライアントにメッセージをPublishするWebsocketを構築します

internal/websocket/client.goを作成し、クライアント情報を保持するClientの構造体を定義します。

package websocket

import "github.com/gorilla/websocket"

type Client struct {
	Ws *websocket.Conn
}

次に全てのクライアント情報を格納するRoomの構造体を作ります。
internal/websocket/room.go

package websocket

type Rooms struct {
	Clients []*Client
}

Roomに接続してきたクライアントを追加するメソッド・取得するメソッドを定義します。
internal/websocket/room.go

func (rooms *Rooms) AddClient(client *Client) {
	rooms.Clients = append(rooms.Clients, client)
}

func (room *Room) GetClients() []Client {
	var cs []Client

	for _, client := range room.Clients {
		cs = append(cs, *client)
	}

	return cs
}

クライアントの追加と取得はできたので、Roomに格納されているClientに対してメッセージをPublishするメソッドを定義します。

func (room *Room) Publish(msg []byte) {
	for _, client := range room.Clients {
		client.Send(msg) // この後に作る
	}
}

internal/websocket/client.goに一つのクライアントに対してメッセージを送信するメソッドを追加します。

func (client *Client) Send (msg []byte) error {
	return client.Ws.WriteMessage(websocket.TextMessage, msg)
}

最後にこれらのメソッドを使うため、Websocket接続からメッセージ送信までを実行するメソッドを作成します。

internal/websocket/handler.go

package websocket

import (
	"github.com/gorilla/websocket"
	"github.com/labstack/echo"
)

var upgrader = websocket.Upgrader{}
var rooms = Room{}

func ServeWs(c echo.Context) error {
     upgrader.CheckOrigin = func(r *http.Request) bool { return true }
	ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)

	if err != nil {
		c.Logger().Error(err)
	}

	defer ws.Close()

	client := &Client{Ws: ws}

	rooms.AddClient(client)

	for {
		_, msg, err := ws.ReadMessage()

		if err != nil {
			c.Logger().Error(err)
			break
		}

		rooms.Publish(msg)
	}

	return nil
}

少し解説します。
まず、最初のここでwebsocketに接続します。
一行目は"message":"websocket: request origin not allowed by Upgrader.CheckOrigin"}のエラーを回避するために追加してます。

upgrader.CheckOrigin = func(r *http.Request) bool { return true }
ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil) // 接続

その後、接続してきたクライアントを宣言し、Roomにそのクライアントを追加します

client := &Client{Ws: ws}
rooms.AddClient(client)

その下はメッセージを受信した時の処理です。

	for {
		_, msg, err := ws.ReadMessage() // メッセージを受信 messageType, message, error

		if err != nil {
			c.Logger().Error(err)
			break
		}

		rooms.Publish(msg) // メッセージをクライアントへ送信
	}

main.goにwebsocket用のRouteを追加します。

import (
	"net/http"

	"github.com/labstack/echo"
	"github.com/lil-shimon/lil-pubsub.git/internal/websocket" // 追加
)

func main() {
	e := echo.New()

	e.GET("/healthcheck", healthCheck)
	e.GET("/ws", websocket.ServeWs) // 追加

	e.Logger.Fatal(e.Start(":1323"))
}

ws://localhost:1323/wsをテストします。
私はいつもwebsocketのテストでGoogle拡張機能のWebsocket Client Testを使っているので、今回もそちらを使ってテストします。

https://chrome.google.com/webstore/detail/websocket-test-client/fgponpodhbmadfljofbimhhlengambbn?q=Gecko&hl=ja


適当にメッセージを送信したら送信、受信できていることを確認できますね!

全クライアントでのPubはできました

TopicごとのPubsub編

Topicごとにメッセージを受け取れるように改善していきます
この様な形でパスパラメータにTopicを入れ、接続するWebsocketを想定しています

ws/{topic}

まず、main.goのrouteを変更

e.GET("/ws/:topic", websocket.ServeWs)

Topicを追加するため、クライアントとTopicを管理する構造体を新しく定義します。

type Subscription struct {
	Topic string
	Client *Client
}

Roomにそのサブスクを格納したいので、クライアントではなく、サブスクに修正

type Room struct {
	Subscription []*Subscription
}

クライアントの追加や取得をしていたメソッドをサブスクへ変更

func (room *Room) AddSubscription(subscription *Subscription) {
	room.Subscriptions = append(room.Subscriptions, subscription)
}

取得時はTopicを受け取り、一致するTopicを持っているサブスクを返す様にしています

func (room *Room) GetSubscription(topic string) []Subscription {
	var subs []Subscription

	for _, sub := range room.Subscriptions {
		if sub.Topic == topic {
			subs = append(subs, *sub)
		}
	}

	return subs
}

また、Publishメソッドも改良

func (room *Room) Publish(msg []byte, topic string) {
	subs := room.GetSubscription(topic)
	for _, sub := range subs {
		err := sub.Client.Send(msg)
		if err != nil {
			return
		}
	}
}

最後にWebsocket Handlerを修正します。

func ServeWs(c echo.Context) error {

	topic := c.Param("topic") // parameterのtopicを取得

	upgrader.CheckOrigin = func(r *http.Request) bool { return true }
	ws, err := upgrader.Upgrade(c.Response(), c.Request(), nil)

	if err != nil {
		c.Logger().Error(err)
	}

	defer ws.Close()

	client := &Client{Ws: ws}

	rooms.AddSubscription(&Subscription{Topic: topic, Client: client}) // サブスクを登録する様に修正

	for {
		_, msg, err := ws.ReadMessage()

		if err != nil {
			c.Logger().Error(err)
			break
		}

		rooms.Publish(msg, topic) // PublishにTopicを渡す
	}

今回はトピックごとにメッセージ配信が切り分けられているかをテストしたいため、クライアントテストを追加

mkdir web && touch web/index.html

web/index.html

<!doctype html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>WebSocket</title>
</head>

<body>
<p>トピックを入力</p>
<label for="topic"></label><input type="text" id="topic"/>
<button type="button" id="button" onclick="sendMsg()">テスト</button>
<p id="output"></p>

<script>
    function sendMsg() {
        const topic = document.getElementById('topic').value;
        const loc = window.location;
        let uri = 'ws:';

        if (loc.protocol === 'https:') {
            uri = 'wss:';
        }
        uri += '//' + loc.host;
        uri += `/ws/${topic}`;

        console.log("uri: " + uri);
        const ws = new WebSocket(uri);

        ws.onopen = function () {
            console.log('Connected')
        }

        ws.onmessage = function (evt) {
            const out = document.getElementById('output');
            out.innerHTML += evt.data + '<br>';
        }

        const msg = {
          topic: topic,
          message: "topic is test"
        }

        setInterval(function () {
            ws.send(JSON.stringify(nvData));
        }, 1000);
    }
</script>
</body>

</html>

clientテストのindex.html用のRouteを追加

func main() {
	e := echo.New()

	e.GET("/healthcheck", healthCheck)
	e.GET("/ws/:topic", websocket.ServeWs)
	e.Static("/ws-client", "web") // 追加

	e.Logger.Fatal(e.Start(":1323"))
}

go run server/main.goでサーバーを立ち上げ、
http://localhost:1323/ws-client
にアクセス
Topicを入力し、送信

同じトピックでサブスクしているので、こちらでも受け取れています。

topicを変えると

こっちは違うトピックなので受け取りません

最後に

簡単にですが、GolangでPubsubを実装してみました。本来ならmessageTypeによってPublishやUnsubscriptionなどの他処理も入っていますが、今回は省略させていただきます。

GolangでWebsocketを開発するイメージでも掴めていただけたら幸いです!

参考

https://pkg.go.dev/github.com/gorilla/websocket

Discussion