React + Golang + websocketでリアルタイムチャットアプリを作る -Part2/Redis, Pub/Sub編-
はじめに
※この記事はPart1の続きです
前回はwebsocketを利用してリアルタイムチャットアプリを作成しました
ただ前回の実装だと、同一のサーバーに接続しているユーザー同士でしかチャットを行うことができません
そこで今回、本番環境でサーバーをスケールアウトさせるケースも意識して、
各ユーザーがどのサーバーへ接続していてもチャットが行えるように改善していきます
具体的にはインメモリのKVS
であるRedis
が提供しているPub/Sub機能を利用します
利用する技術とか
Client
・ React 18
・ TypeScript
・ Vite
・ Recoil
Server
・ Golang 1.18
・ gorilla/websocket
・ go-redis
その他ミドルウェア
・ Redis7.0.2
・ Docker,docker-compose
※ソースコード全文はこちら
最終的なディレクトリ構成はこんな感じ(一部省略)
.
├── docker-compose.yaml
├── Makefile
├── client
│ ├── Dockerfile
│ ├── package.json
│ ├── package-lock.json
│ ├── vite.config.ts
│ ├── index.html
│ └── src
│ ├── App.tsx
│ ├── main.tsx
│ ├── components
│ ├── hooks
│ ├── models
│ └── state
└── server
├── go.mod
├── go.sum
├── Dockerfile
├── .air.toml
└── src
├── main.go
├── services
├── domain
└── handlers
一度試してみる
一旦同一サーバーで接続しているユーザーしかチャットが行えないことを確認します
必要ない方は次のセクションに進んでもらって大丈夫です!
手順
docker-compose.yaml
を以下のように編集します
version: "3.8"
services:
client:
build:
context: ./client
dockerfile: Dockerfile
tty: true
stdin_open: true
ports:
- 3000:5173
+ environment:
+ - VITE_WS_PORT=80
volumes:
- type: bind
source: "client/src"
target: "/usr/app/src"
depends_on:
- server
server:
build:
context: ./server
dockerfile: Dockerfile
stdin_open: true
tty: true
ports:
- 80:80
volumes:
- type: bind
source: server/src
target: /go/app/src
+ client_2:
+ build:
+ context: ./client
+ dockerfile: Dockerfile
+ tty: true
+ stdin_open: true
+ ports:
+ - 3001:5173
+ environment:
+ - VITE_WS_PORT=81
+ volumes:
+ - type: bind
+ source: "client/src"
+ target: "/usr/app/src"
+ depends_on:
+ - server_2
+ server_2:
+ build:
+ context: ./server
+ dockerfile: Dockerfile
+ stdin_open: true
+ tty: true
+ ports:
+ - 81:80
+ volumes:
+ - type: bind
+ source: server/src
+ target: /go/app/src
vite-env.d.ts
に以下を追記します
/// <reference types="vite/client" />
+ interface ImportMetaEnv {
+ VITE_WS_PORT: string;
+ }
websocket.ts
を以下のように編集します
import { atom, selector } from "recoil";
import * as WebSocket from "websocket";
const connect = (): Promise<WebSocket.w3cwebsocket> => {
return new Promise((resolve, reject) => {
+ const port = import.meta.env.VITE_WS_PORT;
+ const url = "ws://localhost:" + port + "/ws";
+ const socket = new WebSocket.w3cwebsocket(url);
+
+ socket.onopen = () => {
+ console.log("connected", port);
resolve(socket);
};
socket.onclose = () => {
console.log("reconnecting...");
connect();
};
socket.onerror = (err) => {
console.log("connection error:", err);
reject(err);
};
});
};
const connectWebsocketSelector = selector({
key: "connectWebsocket",
get: async (): Promise<WebSocket.w3cwebsocket> => {
return await connect();
},
});
export const websocketAtom = atom<WebSocket.w3cwebsocket>({
key: "websocket",
default: connectWebsocketSelector,
});
上記が完了したらコンソールから
docker-compose up --build
を実行してコンテナを立ち上げます
ブラウザで、
http://localhost:3000
とhttp://localhost:3001
を開いてチャットを行ってみます
3000
と3001
は別々のサーバーと接続しているのでうまくチャットが行えないことが確認できると思います
Pub/Subの実装
それではPub/Subを実装して、同一サーバーでなくてもチャットできるようにします
もうちょっと書き下すと、
各サーバーでRedis
の特定channel
をsubscribe
しておき、
そのchannel
に値がpublish
されたこと(チャットが送信されたこと)を、
subscribe
しているサーバー全てに通知できるようにします
図にするとこんな感じです
Redisの準備
まずはRedis
の環境を準備します
前回作成したdocker-compose.yaml
を以下のように編集します
version: "3.8"
services:
client:
build:
context: ./client
dockerfile: Dockerfile
tty: true
stdin_open: true
ports:
- 3000:5173
volumes:
- type: bind
source: "client/src"
target: "/usr/app/src"
depends_on:
- server
server:
build:
context: ./server
dockerfile: Dockerfile
stdin_open: true
tty: true
ports:
- 80:80
volumes:
- type: bind
source: server/src
target: /go/app/src
+
+ redis:
+ image: redis:7.0.2
+ command: redis-server --appendonly yes
+ restart: always
+ ports:
+ - 6379:6379
+ volumes:
+ - type: volume
+ source: redis-volume
+ target: /data
+ volumes:
+ redis-volume:
Redis
のクライアントにはgo-redis
のv8
を利用します
go.mod
はこんな感じです
module github.com/TadayoshiOtsuka/simple_chat
go 1.18
require github.com/go-redis/redis/v8 v8.11.5
require github.com/gorilla/websocket v1.5.0
require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)
PubSubServiceの実装
go-redis
をラップするPubSubService
を実装します
以下のようになります
package services
import (
"context"
"github.com/go-redis/redis/v8"
)
type PubSubService struct {
client *redis.Client
}
func NewPubSubService() *PubSubService {
c := redis.NewClient(&redis.Options{
Addr: "redis:6379",
Password: "",
DB: 0,
})
return &PubSubService{
client: c,
}
}
func (s *PubSubService) Publish(ctx context.Context, channel string, payload any) {
s.client.Publish(ctx, channel, payload)
}
func (s *PubSubService) Subscribe(ctx context.Context, channel string) <-chan *redis.Message {
return s.client.Subscribe(ctx, channel).Channel()
}
・Publish
メソッド
=>
Redisへ、任意の名前のchannel
を指定してpayload
をpublishします
・Subscribe
メソッド
=>
任意の名前のchannel
を引数として渡す事で、渡されたchannel
をsubscribeします
.Channel()
とメソッドチェーンしているので、返り値は<-chan *redis.Message
になります
subscribeしているchannel
と同じchannel
に値がpublishされる度、
publishされたデータ(*redis.Message
)がこの<-chan *redis.Message
に送信されます
以下みたいな感じです
// goroutineA
> msgCh := s.Subscribe(ctx, "hoge") // ①値が送信されるまで処理をブロック
> fmt.Println(msgCh.Payload) // ③値が送信されたので処理が進む
// goroutineB
> s.Publish(ctx, "hoge", "huga") // ② hoge channelへpublish
Hub構造体の編集
作成したPubSubService
を前回作成したHub
から利用するようにします
package domain
import (
"context"
+ "github.com/TadayoshiOtsuka/simple_chat/src/services"
)
type Hub struct {
Clients map[*Client]bool
RegisterCh chan *Client
UnRegisterCh chan *Client
BroadcastCh chan []byte
+ pubsub *services.PubSubService
}
+const broadCastChan = "broadcast"
+func NewHub(pubsub *services.PubSubService) *Hub {
return &Hub{
Clients: make(map[*Client]bool),
RegisterCh: make(chan *Client),
UnRegisterCh: make(chan *Client),
BroadcastCh: make(chan []byte),
+ pubsub: pubsub,
}
}
func (h *Hub) RunLoop() {
for {
select {
case client := <-h.RegisterCh:
h.register(client)
case client := <-h.UnRegisterCh:
h.unregister(client)
case msg := <-h.BroadcastCh:
+ h.publishMessage(msg)
}
}
}
+func (h *Hub) SubscribeMessages() {
+ ch := h.pubsub.Subscribe(context.TODO(), broadCastChan)
+
+ for msg := range ch {
+ h.broadCastToAllClient([]byte(msg.Payload))
+ }
+}
+func (h *Hub) publishMessage(msg []byte) {
+ h.pubsub.Publish(context.TODO(), broadCastChan, msg)
+}
func (h *Hub) register(c *Client) {
h.Clients[c] = true
}
func (h *Hub) unregister(c *Client) {
delete(h.Clients, c)
}
func (h *Hub) broadCastToAllClient(msg []byte) {
for c := range h.Clients {
c.sendCh <- msg
}
}
・SubscribeMessages
メソッド
=>
Redis
のbroadCastChan
に値がpublishされると、その値を取り出して、
Hub
のClients
に対してpublishされた値を送信します
・publishMessage
メソッド
=>
Redis
のbroadCastChan
へ引数で受け取っているmsg []byte
をpublishします
main.go
の編集
PubSubService
の初期化、Hub
の初期化部分の追記、
goroutine
でSubscribeMessages
メソッドを実行するよう編集します
package main
import (
"fmt"
"log"
"net/http"
"github.com/TadayoshiOtsuka/simple_chat/src/domain"
"github.com/TadayoshiOtsuka/simple_chat/src/handlers"
+ "github.com/TadayoshiOtsuka/simple_chat/src/services"
)
func main() {
+ pubsub := services.NewPubSubService()
+ hub := domain.NewHub(pubsub)
+ go hub.SubscribeMessages()
go hub.RunLoop()
http.HandleFunc("/ws", handlers.NewWebsocketHandler(hub).Handle)
port := "80"
log.Printf("Listening on port %s", port)
if err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil); err != nil {
log.Panicln("Serve Error:", err)
}
}
今回FEの修正はないため、ここまでで実装は完了です!
一度docker-compose up --build
して正常に動くか試してみましょう🎉
別々のサーバーに接続している場合の動作確認
今回の目的でもあるので、
異なるサーバーへ接続していても問題なくチャットできるかどうか実際にテストしてみます
docker-compose.yaml
を以下のように編集します
version: "3.8"
services:
client:
build:
context: ./client
dockerfile: Dockerfile
tty: true
stdin_open: true
ports:
- 3000:5173
+ environment:
+ - VITE_WS_PORT=80
volumes:
- type: bind
source: "client/src"
target: "/usr/app/src"
depends_on:
- server
server:
build:
context: ./server
dockerfile: Dockerfile
stdin_open: true
tty: true
ports:
- 80:80
volumes:
- type: bind
source: server/src
target: /go/app/src
+ client_2:
+ build:
+ context: ./client
+ dockerfile: Dockerfile
+ tty: true
+ stdin_open: true
+ ports:
+ - 3001:5173
+ environment:
+ - VITE_WS_PORT=81
+ volumes:
+ - type: bind
+ source: "client/src"
+ target: "/usr/app/src"
+ depends_on:
+ - server_2
+ server_2:
+ build:
+ context: ./server
+ dockerfile: Dockerfile
+ stdin_open: true
+ tty: true
+ ports:
+ - 81:80
+ volumes:
+ - type: bind
+ source: server/src
+ target: /go/app/src
redis:
image: redis:7.0.2
command: redis-server --appendonly yes
restart: always
ports:
- 6379:6379
volumes:
- type: volume
source: redis-volume
target: /data
volumes:
redis-volume:
vite-env.d.ts
に以下を追記します
/// <reference types="vite/client" />
+ interface ImportMetaEnv {
+ VITE_WS_PORT: string;
+ }
websocket.ts
を以下のように編集します
import { atom, selector } from "recoil";
import * as WebSocket from "websocket";
const connect = (): Promise<WebSocket.w3cwebsocket> => {
return new Promise((resolve, reject) => {
+ const port = import.meta.env.VITE_WS_PORT;
+ const url = "ws://localhost:" + port + "/ws";
+ const socket = new WebSocket.w3cwebsocket(url);
+
+ socket.onopen = () => {
+ console.log("connected", port);
resolve(socket);
};
socket.onclose = () => {
console.log("reconnecting...");
connect();
};
socket.onerror = (err) => {
console.log("connection error:", err);
reject(err);
};
});
};
const connectWebsocketSelector = selector({
key: "connectWebsocket",
get: async (): Promise<WebSocket.w3cwebsocket> => {
return await connect();
},
});
export const websocketAtom = atom<WebSocket.w3cwebsocket>({
key: "websocket",
default: connectWebsocketSelector,
});
上記が完了したら、
コンソールにてdocker-compose up --build
を実行して、
ブラウザでhttp://localhost:3000
とhttp://localhost:3001
を開いてチャットしてみます
※それぞれ別々のコンテナへリクエストするようになっています
上記画像のように3000
と3001
で期待通りチャットが行えていれば成功です🎉
おわりに
ここまでで一旦websocket
, Redis,Pub/Sub
によるリアルタイムチャットアプリの実装はおわりです
説明不足の点もあったかと思いますがここまで読んでいただきありがとうございました🙇
次はwebsocket
での認証関連やgRPC
で同じような要件を実装してみた場合の比較とかを記事にできたらいいかなぁと思ってます
参考にさせていただいたリンク
Discussion