redisとmysqlにwebチャットのデータ管理を任せる
今回も次のqiita記事のWebチャットを触ってみます。ここに公開されていています。
コードは前回は、Dockerfileを修正して、実行できるようにする方法について書きましたが、今回は内容を少し変更してみます。
grpc-web-react-hooks/server/main.go
のGetMessages
とCreateMessage
を見ると分かるのですが、server
のフィールドのrequests
という配列で、全メッセージを保持して管理しています。ループしながらrequests
が更新されているかを見続けるのも苦しいので、redisのpubsubを使って、mysqlで永続化にしてみます。diffで示しても変更が多いので、server
サービス全体のコードを表示しておきます。
package main
import (
"context"
"log"
"net"
"time"
"github.com/golang/protobuf/ptypes/empty"
pb "github.com/okmttdhr/grpc-web-react-hooks/messenger"
"encoding/json"
"github.com/go-redis/redis/v7"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
const (
port = ":9090"
chatChannel = "example"
)
type server struct {
pb.UnimplementedMessengerServer
}
func (s *server) GetMessages(_ *empty.Empty, stream pb.Messenger_GetMessagesServer) error {
msgs, err := getMessages()
if err != nil {
return err
}
for _, msg := range msgs {
if err := stream.Send(&pb.MessageResponse{Message: msg}); err != nil {
return err
}
}
client := newRedisClient()
pubsub := client.Subscribe(chatChannel)
defer pubsub.Close()
_, err = pubsub.Receive()
if err != nil {
log.Fatal(err)
}
ch := pubsub.Channel()
for msg := range ch {
var r pb.MessageRequest
err := json.Unmarshal([]byte(msg.Payload), &r)
if err != nil {
return err
}
if err := stream.Send(&pb.MessageResponse{Message: r.GetMessage()}); err != nil {
return err
}
}
return nil
}
func (s *server) CreateMessage(ctx context.Context, r *pb.MessageRequest) (*pb.MessageResponse, error) {
client := newRedisClient()
log.Printf("Received: %v", r.GetMessage())
rawMassage := r.GetMessage() + ": " + time.Now().Format("2006-01-02 15:04:05")
msg, err := json.Marshal(pb.MessageRequest{Message: rawMassage})
if err != nil {
return nil, err
}
save(rawMassage)
if err != nil {
return nil, err
}
client.Publish(chatChannel, msg)
return &pb.MessageResponse{Message: r.GetMessage()}, nil
}
func newRedisClient() *redis.Client {
client := redis.NewClient(&redis.Options{
Addr: "redis:6379",
Password: "",
DB: 0,
})
return client
}
func main() {
err := dbOpen()
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer dbClose()
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterMessengerServer(s, &server{})
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
package main
import (
_ "github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
var db *sqlx.DB
func dbOpen() error {
var err error
db, err = sqlx.Open("mysql", "root:@tcp(docker.for.mac.localhost:3306)/chat_db?parseTime=true")
return err
}
func dbClose() {
if db != nil {
db.Close()
}
}
func getMessages() ([]string, error) {
sql := "SELECT message FROM messages ORDER BY created_at;"
messages := make([]string, 0, 100)
rows, err := db.Query(sql)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
var msg string
rows.Scan(&msg)
messages = append(messages, msg)
}
return messages, nil
}
func save(message string) error {
sql := `INSERT INTO messages
(message, created_at, updated_at)
VALUES
(:message, NOW(), NOW());
`
query, params, err := sqlx.Named(sql, map[string]interface{}{"message": message})
if err != nil {
return err
}
_, err = db.Exec(query, params...)
return err
}
docker.for.mac.localhost
はdockerコンテナからホストOSのlocalhostに繋ぐときの指定の仕方です。確認していませんが、OSがWindowsの場合は、docker.for.win.localhost
にすれば良いらしいです。redisを追加するので、docker-compose.yml
も変更してみます。
@@ -7,6 +7,10 @@ services:
dockerfile: DockerfileProto
volumes:
- .:/proto
+ redis:
+ image: "redis:latest"
+ volumes:
+ - "./redis/data:/data"
server:
command: ./scripts/server.sh
build:
@@ -18,6 +22,7 @@ services:
- "9090:9090"
depends_on:
- proto
+ - redis
envoy:
build:
context: .
pubsubを使ったことで、pubsubのチャンネルとチャットのルームIDを関連付けることで複数のチャットルームをサポートする拡張も簡単になっていると思います。ですが実は5分程度するとGetMessages
の接続が切れてしまいます。envoyの設定の修正で直ると思いますが、そこはまだ確かめていません。かわりにimprobable-eng/grpc-web/tree/master/go/grpcweb
を使ってgrpc-webのプロキシとしてenvoyを使わなかった場合を試してみましたが、この場合は9時間経過しても接続が切れなかったことは確認しています。
Discussion