🚀

React + Golang + websocketでリアルタイムチャットアプリを作る -Part2/Redis, Pub/Sub編-

2022/07/28に公開

はじめに

※この記事はPart1の続きです

前回はwebsocketを利用してリアルタイムチャットアプリを作成しました
ただ前回の実装だと、同一のサーバーに接続しているユーザー同士でしかチャットを行うことができません

そこで今回、本番環境でサーバーをスケールアウトさせるケースも意識して、
各ユーザーがどのサーバーへ接続していてもチャットが行えるように改善していきます
具体的にはインメモリのKVSであるRedisが提供しているPub/Sub機能を利用します

利用する技術とか

Client
React 18
TypeScript
Vite
Recoil
Server
Golang 1.18
gorilla/websocket
・  go-redis
その他ミドルウェア
Redis7.0.2
Docker,docker-compose

※ソースコード全文はこちら
https://github.com/TadayoshiOtsuka/simple_chat/tree/v2

最終的なディレクトリ構成はこんな感じ(一部省略)

.
├── docker-compose.yaml
├── Makefile
├── client
│   ├── Dockerfile
│   ├── package.json
│   ├── package-lock.json
│   ├── vite.config.ts
│   ├── index.html
│   └── src
│       ├── App.tsx
│       ├── main.tsx
│       ├── components
│       ├── hooks
│       ├── models
│       └── state
└── server
    ├── go.mod
    ├── go.sum
    ├── Dockerfile
    ├── .air.toml
    └── src
        ├── main.go
        ├── services
        ├── domain
        └── handlers

一度試してみる

一旦同一サーバーで接続しているユーザーしかチャットが行えないことを確認します
必要ない方は次のセクションに進んでもらって大丈夫です!

手順

docker-compose.yamlを以下のように編集します

docker-compose.yaml
version: "3.8"

services:
  client:
    build:
      context: ./client
      dockerfile: Dockerfile
    tty: true
    stdin_open: true
    ports:
      - 3000:5173
+   environment:
+     - VITE_WS_PORT=80
    volumes:
      - type: bind
        source: "client/src"
        target: "/usr/app/src"
    depends_on:
      - server

  server:
    build:
      context: ./server
      dockerfile: Dockerfile
    stdin_open: true
    tty: true
    ports:
      - 80:80
    volumes:
      - type: bind
        source: server/src
        target: /go/app/src

+ client_2:
+   build:
+     context: ./client
+     dockerfile: Dockerfile
+   tty: true
+   stdin_open: true
+   ports:
+     - 3001:5173
+   environment:
+     - VITE_WS_PORT=81
+   volumes:
+     - type: bind
+       source: "client/src"
+       target: "/usr/app/src"
+   depends_on:
+     - server_2

+ server_2:
+   build:
+     context: ./server
+     dockerfile: Dockerfile
+   stdin_open: true
+   tty: true
+   ports:
+     - 81:80
+   volumes:
+     - type: bind
+       source: server/src
+       target: /go/app/src

vite-env.d.tsに以下を追記します

client/src/vite-env.d.ts
/// <reference types="vite/client" />

+ interface ImportMetaEnv {
+   VITE_WS_PORT: string;
+ }

websocket.tsを以下のように編集します

client/src/state/websocket.ts
import { atom, selector } from "recoil";
import * as WebSocket from "websocket";

const connect = (): Promise<WebSocket.w3cwebsocket> => {
  return new Promise((resolve, reject) => {
+   const port = import.meta.env.VITE_WS_PORT;
+   const url = "ws://localhost:" + port + "/ws";
+   const socket = new WebSocket.w3cwebsocket(url);
+
+   socket.onopen = () => {
+     console.log("connected", port);
      resolve(socket);
    };
    socket.onclose = () => {
      console.log("reconnecting...");
      connect();
    };
    socket.onerror = (err) => {
      console.log("connection error:", err);
      reject(err);
    };
  });
};

const connectWebsocketSelector = selector({
  key: "connectWebsocket",
  get: async (): Promise<WebSocket.w3cwebsocket> => {
    return await connect();
  },
});

export const websocketAtom = atom<WebSocket.w3cwebsocket>({
  key: "websocket",
  default: connectWebsocketSelector,
});

上記が完了したらコンソールから
docker-compose up --buildを実行してコンテナを立ち上げます
ブラウザで、
http://localhost:3000http://localhost:3001
を開いてチャットを行ってみます
30003001は別々のサーバーと接続しているのでうまくチャットが行えないことが確認できると思います

Pub/Subの実装

それではPub/Subを実装して、同一サーバーでなくてもチャットできるようにします
もうちょっと書き下すと、
各サーバーでRedisの特定channelsubscribeしておき、
そのchannelに値がpublishされたこと(チャットが送信されたこと)を、
subscribeしているサーバー全てに通知できるようにします

図にするとこんな感じです

Redisの準備

まずはRedisの環境を準備します
前回作成したdocker-compose.yamlを以下のように編集します

docker-compose.yaml
version: "3.8"

services:
 client:
   build:
     context: ./client
     dockerfile: Dockerfile
   tty: true
   stdin_open: true
   ports:
     - 3000:5173
   volumes:
     - type: bind
       source: "client/src"
       target: "/usr/app/src"
   depends_on:
     - server

 server:
   build:
     context: ./server
     dockerfile: Dockerfile
   stdin_open: true
   tty: true
   ports:
     - 80:80
   volumes:
     - type: bind
       source: server/src
       target: /go/app/src
+
+ redis:
+   image: redis:7.0.2
+   command: redis-server --appendonly yes
+   restart: always
+   ports:
+     - 6379:6379
+   volumes:
+     - type: volume
+       source: redis-volume
+       target: /data

+ volumes:
+  redis-volume:

Redisのクライアントにはgo-redisv8を利用します

go.modはこんな感じです

module github.com/TadayoshiOtsuka/simple_chat

go 1.18

require github.com/go-redis/redis/v8 v8.11.5

require github.com/gorilla/websocket v1.5.0

require (
	github.com/cespare/xxhash/v2 v2.1.2 // indirect
	github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
)

PubSubServiceの実装

go-redisをラップするPubSubServiceを実装します
以下のようになります

server/src/services
package services

import (
	"context"

	"github.com/go-redis/redis/v8"
)

type PubSubService struct {
	client *redis.Client
}

func NewPubSubService() *PubSubService {
	c := redis.NewClient(&redis.Options{
		Addr:     "redis:6379",
		Password: "",
		DB:       0,
	})

	return &PubSubService{
		client: c,
	}
}

func (s *PubSubService) Publish(ctx context.Context, channel string, payload any) {
	s.client.Publish(ctx, channel, payload)
}

func (s *PubSubService) Subscribe(ctx context.Context, channel string) <-chan *redis.Message {
	return s.client.Subscribe(ctx, channel).Channel()
}

Publishメソッド
=>
Redisへ、任意の名前のchannelを指定してpayloadをpublishします

Subscribeメソッド
=>
任意の名前のchannelを引数として渡す事で、渡されたchannelをsubscribeします
.Channel()とメソッドチェーンしているので、返り値は<-chan *redis.Messageになります
subscribeしているchannelと同じchannelに値がpublishされる度、
publishされたデータ(*redis.Message)がこの<-chan *redis.Messageに送信されます
以下みたいな感じです

// goroutineA
> msgCh := s.Subscribe(ctx, "hoge") // ①値が送信されるまで処理をブロック
> fmt.Println(msgCh.Payload) // ③値が送信されたので処理が進む

// goroutineB
> s.Publish(ctx, "hoge", "huga") // ② hoge channelへpublish

Hub構造体の編集

作成したPubSubServiceを前回作成したHubから利用するようにします

server/src/domain/hub.go
package domain

import (
	"context"

+	"github.com/TadayoshiOtsuka/simple_chat/src/services"
)

type Hub struct {
	Clients      map[*Client]bool
	RegisterCh   chan *Client
	UnRegisterCh chan *Client
	BroadcastCh  chan []byte
+	pubsub       *services.PubSubService
}

+const broadCastChan = "broadcast"

+func NewHub(pubsub *services.PubSubService) *Hub {
	return &Hub{
		Clients:      make(map[*Client]bool),
		RegisterCh:   make(chan *Client),
		UnRegisterCh: make(chan *Client),
		BroadcastCh:  make(chan []byte),
+		pubsub:       pubsub,
	}
}

func (h *Hub) RunLoop() {
	for {
		select {
		case client := <-h.RegisterCh:
			h.register(client)

		case client := <-h.UnRegisterCh:
			h.unregister(client)

		case msg := <-h.BroadcastCh:
+			h.publishMessage(msg)
		}
	}
}

+func (h *Hub) SubscribeMessages() {
+	ch := h.pubsub.Subscribe(context.TODO(), broadCastChan)
+
+	for msg := range ch {
+		h.broadCastToAllClient([]byte(msg.Payload))
+	}
+}

+func (h *Hub) publishMessage(msg []byte) {
+	h.pubsub.Publish(context.TODO(), broadCastChan, msg)
+}

func (h *Hub) register(c *Client) {
	h.Clients[c] = true
}

func (h *Hub) unregister(c *Client) {
	delete(h.Clients, c)
}

func (h *Hub) broadCastToAllClient(msg []byte) {
	for c := range h.Clients {
		c.sendCh <- msg
	}
}

SubscribeMessagesメソッド
=>
RedisbroadCastChanに値がpublishされると、その値を取り出して、
HubClientsに対してpublishされた値を送信します

publishMessageメソッド
=>
RedisbroadCastChanへ引数で受け取っているmsg []byteをpublishします


main.goの編集

PubSubServiceの初期化、Hubの初期化部分の追記、
goroutineSubscribeMessagesメソッドを実行するよう編集します

server/src/main.go
package main

import (
	"fmt"
	"log"
	"net/http"

	"github.com/TadayoshiOtsuka/simple_chat/src/domain"
	"github.com/TadayoshiOtsuka/simple_chat/src/handlers"
+	"github.com/TadayoshiOtsuka/simple_chat/src/services"
)

func main() {
+	pubsub := services.NewPubSubService()
+	hub := domain.NewHub(pubsub)
+	go hub.SubscribeMessages()
	go hub.RunLoop()

	http.HandleFunc("/ws", handlers.NewWebsocketHandler(hub).Handle)

	port := "80"
	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(fmt.Sprintf(":%v", port), nil); err != nil {
		log.Panicln("Serve Error:", err)
	}
}

今回FEの修正はないため、ここまでで実装は完了です!
一度docker-compose up --buildして正常に動くか試してみましょう🎉

別々のサーバーに接続している場合の動作確認

今回の目的でもあるので、
異なるサーバーへ接続していても問題なくチャットできるかどうか実際にテストしてみます

docker-compose.yamlを以下のように編集します

docker-compose.yaml
version: "3.8"

services:
  client:
    build:
      context: ./client
      dockerfile: Dockerfile
    tty: true
    stdin_open: true
    ports:
      - 3000:5173
+   environment:
+     - VITE_WS_PORT=80
    volumes:
      - type: bind
        source: "client/src"
        target: "/usr/app/src"
    depends_on:
      - server

  server:
    build:
      context: ./server
      dockerfile: Dockerfile
    stdin_open: true
    tty: true
    ports:
      - 80:80
    volumes:
      - type: bind
        source: server/src
        target: /go/app/src

+ client_2:
+   build:
+     context: ./client
+     dockerfile: Dockerfile
+   tty: true
+   stdin_open: true
+   ports:
+     - 3001:5173
+   environment:
+     - VITE_WS_PORT=81
+   volumes:
+     - type: bind
+       source: "client/src"
+       target: "/usr/app/src"
+   depends_on:
+     - server_2

+ server_2:
+   build:
+     context: ./server
+     dockerfile: Dockerfile
+   stdin_open: true
+   tty: true
+   ports:
+     - 81:80
+   volumes:
+     - type: bind
+       source: server/src
+       target: /go/app/src

  redis:
    image: redis:7.0.2
    command: redis-server --appendonly yes
    restart: always
    ports:
      - 6379:6379
    volumes:
      - type: volume
        source: redis-volume
        target: /data
volumes:
  redis-volume:

vite-env.d.tsに以下を追記します

client/src/vite-env.d.ts
/// <reference types="vite/client" />

+ interface ImportMetaEnv {
+   VITE_WS_PORT: string;
+ }

websocket.tsを以下のように編集します

client/src/state/websocket.ts
import { atom, selector } from "recoil";
import * as WebSocket from "websocket";

const connect = (): Promise<WebSocket.w3cwebsocket> => {
  return new Promise((resolve, reject) => {
+   const port = import.meta.env.VITE_WS_PORT;
+   const url = "ws://localhost:" + port + "/ws";
+   const socket = new WebSocket.w3cwebsocket(url);
+
+   socket.onopen = () => {
+     console.log("connected", port);
      resolve(socket);
    };
    socket.onclose = () => {
      console.log("reconnecting...");
      connect();
    };
    socket.onerror = (err) => {
      console.log("connection error:", err);
      reject(err);
    };
  });
};

const connectWebsocketSelector = selector({
  key: "connectWebsocket",
  get: async (): Promise<WebSocket.w3cwebsocket> => {
    return await connect();
  },
});

export const websocketAtom = atom<WebSocket.w3cwebsocket>({
  key: "websocket",
  default: connectWebsocketSelector,
});

上記が完了したら、
コンソールにてdocker-compose up --buildを実行して、
ブラウザでhttp://localhost:3000http://localhost:3001を開いてチャットしてみます
※それぞれ別々のコンテナへリクエストするようになっています


上記画像のように30003001で期待通りチャットが行えていれば成功です🎉

おわりに

ここまでで一旦websocket, Redis,Pub/Subによるリアルタイムチャットアプリの実装はおわりです
説明不足の点もあったかと思いますがここまで読んでいただきありがとうございました🙇
次はwebsocketでの認証関連やgRPCで同じような要件を実装してみた場合の比較とかを記事にできたらいいかなぁと思ってます

参考にさせていただいたリンク

https://github.com/go-redis/redis
https://dev.to/jeroendk/building-a-simple-chat-application-with-websockets-in-go-and-vue-js-gao

Discussion