redisとmysqlにwebチャットのデータ管理を任せる

5 min read読了の目安(約4500字

今回も次のqiita記事のWebチャットを触ってみます。

https://qiita.com/okmttdhr/items/a37563047904ac98f3ed
コードはここに公開されていています。

前回は、Dockerfileを修正して、実行できるようにする方法について書きましたが、今回は内容を少し変更してみます。

https://zenn.dev/nnabeyang/articles/79dd186f4bc1339296ec

grpc-web-react-hooks/server/main.goGetMessagesCreateMessageを見ると分かるのですが、serverのフィールドのrequestsという配列で、全メッセージを保持して管理しています。ループしながらrequestsが更新されているかを見続けるのも苦しいので、redisのpubsubを使って、mysqlで永続化にしてみます。diffで示しても変更が多いので、serverサービス全体のコードを表示しておきます。

grpc-web-react-hooks/server/main.go
package main

import (
	"context"
	"log"
	"net"
	"time"

	"github.com/golang/protobuf/ptypes/empty"
	pb "github.com/okmttdhr/grpc-web-react-hooks/messenger"

	"encoding/json"

	"github.com/go-redis/redis/v7"
	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
)

const (
	port        = ":9090"
	chatChannel = "example"
)

type server struct {
	pb.UnimplementedMessengerServer
}

func (s *server) GetMessages(_ *empty.Empty, stream pb.Messenger_GetMessagesServer) error {
	msgs, err := getMessages()
	if err != nil {
		return err
	}
	for _, msg := range msgs {
		if err := stream.Send(&pb.MessageResponse{Message: msg}); err != nil {
			return err
		}
	}
	client := newRedisClient()
	pubsub := client.Subscribe(chatChannel)
	defer pubsub.Close()

	_, err = pubsub.Receive()
	if err != nil {
		log.Fatal(err)
	}

	ch := pubsub.Channel()

	for msg := range ch {
		var r pb.MessageRequest
		err := json.Unmarshal([]byte(msg.Payload), &r)
		if err != nil {
			return err
		}
		if err := stream.Send(&pb.MessageResponse{Message: r.GetMessage()}); err != nil {
			return err
		}
	}

	return nil
}

func (s *server) CreateMessage(ctx context.Context, r *pb.MessageRequest) (*pb.MessageResponse, error) {
	client := newRedisClient()
	log.Printf("Received: %v", r.GetMessage())
	rawMassage := r.GetMessage() + ": " + time.Now().Format("2006-01-02 15:04:05")
	msg, err := json.Marshal(pb.MessageRequest{Message: rawMassage})
	if err != nil {
		return nil, err
	}
	save(rawMassage)
	if err != nil {
		return nil, err
	}
	client.Publish(chatChannel, msg)
	return &pb.MessageResponse{Message: r.GetMessage()}, nil
}

func newRedisClient() *redis.Client {
	client := redis.NewClient(&redis.Options{
		Addr:     "redis:6379",
		Password: "",
		DB:       0,
	})

	return client
}

func main() {
	err := dbOpen()
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	defer dbClose()
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterMessengerServer(s, &server{})
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}
grpc-web-react-hooks/server/database.go
package main

import (
	_ "github.com/go-sql-driver/mysql"
	"github.com/jmoiron/sqlx"
)

var db *sqlx.DB

func dbOpen() error {
	var err error
	db, err = sqlx.Open("mysql", "root:@tcp(docker.for.mac.localhost:3306)/chat_db?parseTime=true")
	return err
}

func dbClose() {
	if db != nil {
		db.Close()
	}
}

func getMessages() ([]string, error) {
	sql := "SELECT message FROM messages ORDER BY created_at;"
	messages := make([]string, 0, 100)
	rows, err := db.Query(sql)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	for rows.Next() {
		var msg string
		rows.Scan(&msg)
		messages = append(messages, msg)
	}
	return messages, nil
}

func save(message string) error {
	sql := `INSERT INTO messages
	(message, created_at, updated_at)
	VALUES
	(:message, NOW(), NOW());
	`
	query, params, err := sqlx.Named(sql, map[string]interface{}{"message": message})
	if err != nil {
		return err
	}
	_, err = db.Exec(query, params...)
	return err
}

docker.for.mac.localhostはdockerコンテナからホストOSのlocalhostに繋ぐときの指定の仕方です。確認していませんが、OSがWindowsの場合は、docker.for.win.localhostにすれば良いらしいです。redisを追加するので、docker-compose.ymlも変更してみます。

docker-compose.yml
@@ -7,6 +7,10 @@ services:
       dockerfile: DockerfileProto
     volumes:
       - .:/proto
+  redis:
+    image: "redis:latest"
+    volumes:
+      - "./redis/data:/data"
   server:
     command: ./scripts/server.sh
     build:
@@ -18,6 +22,7 @@ services:
       - "9090:9090"
     depends_on:
       - proto
+      - redis
   envoy:
     build:
       context: .

pubsubを使ったことで、pubsubのチャンネルとチャットのルームIDを関連付けることで複数のチャットルームをサポートする拡張も簡単になっていると思います。ですが実は5分程度するとGetMessagesの接続が切れてしまいます。envoyの設定の修正で直ると思いますが、そこはまだ確かめていません。かわりにimprobable-eng/grpc-web/tree/master/go/grpcwebを使ってgrpc-webのプロキシとしてenvoyを使わなかった場合を試してみましたが、この場合は9時間経過しても接続が切れなかったことは確認しています。