👏

Server Streaming RPCとRedigoでプッシュ通知を実装する

2020/09/29に公開

Server Streaming RPCとRedigoでプッシュ通知を実装する

例えば「何か注文が入ったらリアルタイムで注文リストを更新する」という仕組みを作りたいとき、この技術を使う。

構造

クライアント(客、スタッフ)とバックエンドサーバとの通信にServer Streaming RPCを使い、バックエンドサーバとRedisとの通信にRedigoを使う。

Server Streaming RPC

Restでは1つのリクエストに対して1つのレスポンスを返すのが基本だが、gRPCのServer Streaming RPCを使うと1つのリクエストに対してサーバ側の選ぶタイミングで何度もレスポンスを返せる。これによってリアルタイム更新、プッシュ通知のような仕組みを作ることができる。

Redigo

注文を追加するメソッドが呼ばれたときに反応して、注文リストを送るメソッドを呼ぶ仕組みをゴルーチンやチャネルで作ることが難しかった。これを解決するためにRedisのPub/Sub機能を利用した。

RedigoはGoでRedisを操作するためのライブラリ。

ソース

Protocol Buffers

service BackendService {
  // 注文に反応して注文リストを返すメソッド。streamが付いている点が違う。
  rpc ListOrders (ListOrdersMessage) returns (stream ListOrdersMessageResponse){}
  // 注文するメソッド。
  rpc CreateOrder (CreateOrderMessage) returns (CreateOrderResponse){}
}

message ListOrdersMessage{
  // RedisのPubSubチャンネルを識別するためのid
  uint32 connection_id = 1;
}

message ListOrdersResponse {
  repeated string orders = 1;
}

message CreateOrderMessage{
  uint32 connection_id = 1;
  string order = 2;
}

message CreateOrderResponse{
  string create_order = 1;
}

Go

func (sc *SampleController) ListOrders(in *pb.ListOrdersMessage, server pb.Sample_ListOrdersServer) (err error) {
	// Redisに接続
	conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%s", os.Getenv("REDIS_HOST"), os.Getenv("REDIS_PORT")))
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	psc := redis.PubSubConn{Conn: conn}
	psc.Subscribe(strconv.Itoa(int(in.ConnectionId)))

	for {
		switch psc.Receive().(type) {
		case redis.Message, redis.Subscription:
			log.Printf("responsed orders\n")
			readModel, err := (略:データベースから注文リストを取得する)
			if err != nil {
				return err
			}
			err = server.Send(&pb.ListOrdersResponse{
				Orders: readModel,
			})
			if err != nil {
				log.Printf("%s¥n", err)
			}
		case error:
			return
		}
	}
	return nil
}

func (sc *SampleController) CreateOrder(ctx context.Context, in *pb.CreateOrderMessage) (response *pb.CreateOrderResponse, err error) {
	ress, err := (略:データベースに注文を追加する)
	if err != nil {
		return
	}
	// 接続
	conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%s", os.Getenv("REDIS_HOST"), os.Getenv("REDIS_PORT")))
	if err != nil {
		panic(err)
	}
	defer conn.Close()
	// パブリッシュ "hello"の部分は何でもいい
	_, err = redis.Int(conn.Do("PUBLISH", strconv.Itoa(int(in.ConnectionId)), "hello"))
	if err != nil {
		panic(err)
	}
	return &pb.CreateOrderResponse{
		CreateOrder: "successful",
	}, nil
}

参考

Discussion