Go で gRPC streaming 通信(Win)
前回は Hello World サンプルで最も基本的な Unary RPC 通信を確認しました。
今回は streaming 通信を試していきます。
開発環境は Windows10(64bit) + Visual Studio Code です。
streaming を使うと、その通信内で連続したメッセージを送受信し続けることができます。
streaming RPC には3種あります。
- Server streaming RPC
- サーバからクライアントへ streaming メッセージを返します。
- Client streaming RPC
- クライアントからサーバへ streaming メッセージを返します。
- Bidirectional streaming RPC
- 双方向で streaming メッセージをやり取りします。
公式の route_guide という example がとてもわかりやすいです。
ゲームで UDP 通信の代わりに gRPC streaming を使えないかなぁと思っているので試していきます。
Streaming RPC
game-server
作業用フォルダ準備
game-server フォルダを作って Visual Studio Code で開きます。
protocol buffer ファイルの作成
grpc フォルダを作成し、その下に game.proto を用意します。
オンラインゲームでの通信を想定して Vector3 なデータをやり取りしてみます。
// grpc/game.proto
syntax = "proto3";
option go_package = ".;grpc";
package grpc;
service GameService {
// A server-to-client streaming RPC.
rpc ListTransform(Transform) returns (stream Transform) {}
// A client-to-server streaming RPC.
rpc RecordTransform(stream Transform) returns (RecordReply) {}
// A Bidirectional streaming RPC.
rpc BidirectTransform(stream Transform) returns (stream Transform) {}
}
message Vector3 {
float x = 1;
float y = 2;
float z = 3;
}
message Transform {
Vector3 position = 1;
Vector3 rotation = 2;
}
message RecordReply {
string message = 1;
}
コードの自動生成
PowerShell を開いて protoc を実行
protoc --go_out=. --go_opt=paths=source_relative `
--go-grpc_out=. --go-grpc_opt=paths=source_relative `
grpc/game.proto
game_grpc.pb.go と game.pb.go が生成されました。
作業フォルダの状態
grpc\
+ game_grpc.pb.go
+ game.pb.go
+ game.proto
サーバ開発
go mod init でモジュール初期化しておきます。
github.com/hidingfox
は適切な値に変更してください。
go mod init github.com/hidingfox
main.go を記載します。
// main.go
package main
import (
"flag"
"fmt"
"io"
"log"
"math/rand"
"net"
"time"
pb "github.com/hidingfox/grpc"
"google.golang.org/grpc"
)
var (
port = flag.Int("port", 50051, "The server port")
)
type server struct {
pb.UnimplementedGameServiceServer
}
// ListTransform プレイヤーの Transform をサーバからクライアントへ stream で送信する
func (s *server) ListTransform(rect *pb.Transform, stream pb.GameService_ListTransformServer) error {
for i := 0; i < 100; i++ {
p := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
r := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
t := pb.Transform{Position: &p, Rotation: &r}
if err := stream.Send(&t); err != nil {
return err
}
}
return nil
}
// RecordTransform プレイヤーから stream で Transform 情報を受信する
func (s *server) RecordTransform(stream pb.GameService_RecordTransformServer) error {
for {
t, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.RecordReply{
Message: "OK",
})
}
if err != nil {
return err
}
log.Printf("Received: %v", t)
}
}
// BidirectTransform サーバ・クライアント双方向 stream で Transform 情報を送受信する
func (s *server) BidirectTransform(stream pb.GameService_BidirectTransformServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
log.Printf("Received: %v", in)
for i := 0; i < 10; i++ {
p := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
r := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
t := pb.Transform{Position: &p, Rotation: &r}
if err := stream.Send(&t); err != nil {
return err
}
}
}
}
func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGameServiceServer(s, &server{})
log.Printf("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
go mod tidy を実行して依存モジュールをインストールします。
go mod tidy
作業フォルダの状態
grpc\
+ game_grpc.pb.go
+ game.pb.go
+ game.proto
go.mod
go.sum
main.go
サーバ起動
go run して起動すれば OK
go run .\main.go
# 2022/08/02 23:47:02 server listening at [::]:50051
クライアント開発
作業用フォルダ準備
game-client フォルダを作って Visual Studio Code で開きます。
protocol buffer ファイルの作成
game-server の grpc フォルダをそのままコピーすれば OK
クライアント開発
go mod init を実行
github.com/hidingfox
は適切な値に変更してください。
go mod init github.com/hidingfox
main.go を記載します。
routeguide よりもシンプルにしたつもり・・・。
mode パラメータで通信タイプを切り替えられるようにしました。
// main.go
package main
import (
"context"
"flag"
"io"
"log"
"math/rand"
"time"
pb "github.com/hidingfox/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
var (
addr = flag.String("addr", "localhost:50051", "the address to connect to")
mode = flag.String("mode", "s", "RPC mode")
)
func serverStream(c pb.GameServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
p := pb.Vector3{X: 0, Y: 0, Z: 0}
r := pb.Vector3{X: 0, Y: 0, Z: 0}
t := pb.Transform{Position: &p, Rotation: &r}
stream, err := c.ListTransform(ctx, &t)
if err != nil {
log.Fatalf("ListTransform failed: %v", err)
}
for {
t, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("ListTransform failed: %v", err)
}
log.Printf("Receive: %v", t)
}
}
func clientStream(c pb.GameServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := c.RecordTransform(ctx)
if err != nil {
log.Fatalf("RecordTransform failed: %v", err)
}
for i := 0; i < 100; i++ {
p := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
r := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
t := pb.Transform{Position: &p, Rotation: &r}
if err := stream.Send(&t); err != nil {
log.Fatalf("RecordTransform: stream.Send(%v) failed: %v", t, err)
}
}
reply, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("RecordTransform failed: %v", err)
}
log.Printf("RecordReply: %v", reply)
}
func bidirectStream(c pb.GameServiceClient) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
stream, err := c.BidirectTransform(ctx)
if err != nil {
log.Fatalf("BidirectTransform failed: %v", err)
}
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("BidirectTransform failed: %v", err)
}
log.Printf("Got message %v", in)
}
}()
for i := 0; i < 10; i++ {
p := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
r := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
t := pb.Transform{Position: &p, Rotation: &r}
if err := stream.Send(&t); err != nil {
log.Fatalf("BidirectTransform: stream.Send(%v) failed: %v", t, err)
}
}
stream.CloseSend()
<-waitc
}
func main() {
flag.Parse()
// Set up a connection to the server.
conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewGameServiceClient(conn)
switch *mode {
case "s":
serverStream(c)
case "c":
clientStream(c)
case "b":
bidirectStream(c)
}
}
go mod tidy を実行
go mod tidy
作業フォルダの状態
grpc\
+ game_grpc.pb.go
+ game.pb.go
+ game.proto
go.mod
go.sum
main.go
クライアント実行
サーバが起動している状態で、まずは Server streaming RPC モードを試します。
go run .\main.go --mode s
# 2022/08/03 00:28:39 Receive: position:{x:0.047545657 y:0.7887996 z:0.66850066} rotation:{x:0.036283173 y:0.49585515 z:0.57031566}
# 2022/08/03 00:28:39 Receive: position:{x:0.18386358 y:0.19838057 z:0.6264753} rotation:{x:0.94531596 y:0.5830957 z:0.82607824}
# .
# (snip 100件続く)
# .
100 件 Transform が返ってきました。
次は Client streaming RPC モードを試します。
go run .\main.go --mode c
# 2022/08/03 00:32:05 RecordReply: message:"OK"
クライアント側は OK が返ってきました。
サーバ側のログを見ると
go run .\main.go
# 2022/08/03 00:31:53 server listening at [::]:50051
# 2022/08/03 00:32:05 Received: position:{x:0.6046603 y:0.9405091 z:0.6645601} rotation:{x:0.4377142 y:0.4246375 z:0.68682307}
# 2022/08/03 00:32:05 Received: position:{x:0.06563702 y:0.15651925 z:0.09696952} rotation:{x:0.30091187 y:0.51521266 z:0.81363994}
# .
# (snip 100件続く)
# .
ちゃんとクライアントから 100 件分のデータが送られています。
最後に Bidirectional streaming RPC モードを試します。
クライアント側ログ
go run .\main.go --mode b
# 2022/08/03 00:40:14 Got message position:{x:0.29380703 y:0.15668243 z:0.6852979} rotation:{x:0.9824183 y:0.8063538 z:0.24426582}
# 2022/08/03 00:40:14 Got message position:{x:0.3163075 y:0.96565104 z:0.3074106} rotation:{x:0.7000072 y:0.86846733 z:0.939908}
# 2022/08/03 00:40:14 Got message position:{x:0.73445493 y:0.10519899 z:0.49200013} rotation:{x:0.9716718 y:0.20778501 z:0.79693836}
# .
# (snip 100件続く)
# .
サーバ側ログ
go run .\main.go
# 2022/08/03 00:40:02 server listening at [::]:50051
# 2022/08/03 00:40:14 Received: position:{x:0.6046603 y:0.9405091 z:0.6645601} rotation:{x:0.4377142 y:0.4246375 z:0.68682307}
# 2022/08/03 00:40:14 Received: position:{x:0.06563702 y:0.15651925 z:0.09696952} rotation:{x:0.30091187 y:0.51521266 z:0.81363994}
# 2022/08/03 00:40:14 Received: position:{x:0.21426387 y:0.3806572 z:0.31805816} rotation:{x:0.46888983 y:0.28303415 z:0.29310185}
# 2022/08/03 00:40:14 Received: position:{x:0.67908466 y:0.21855305 z:0.20318687} rotation:{x:0.3608714 y:0.5706733 z:0.8624914}
# 2022/08/03 00:40:14 Received: position:{x:0.29311424 y:0.29708257 z:0.752573} rotation:{x:0.20658267 y:0.865335 z:0.69671917}
# 2022/08/03 00:40:14 Received: position:{x:0.5238203 y:0.028303083 z:0.15832828} rotation:{x:0.60725343 y:0.9752416 z:0.079453625}
# 2022/08/03 00:40:14 Received: position:{x:0.5948086 y:0.05912065 z:0.6920246} rotation:{x:0.30152267 y:0.17326623 z:0.54109985}
# 2022/08/03 00:40:14 Received: position:{x:0.5441556 y:0.27850762 z:0.4231522} rotation:{x:0.5305857 y:0.2535405 z:0.282081}
# 2022/08/03 00:40:14 Received: position:{x:0.7886049 y:0.36180547 z:0.8805431} rotation:{x:0.29711226 y:0.89436173 z:0.097454615}
# 2022/08/03 00:40:14 Received: position:{x:0.97691685 y:0.074291 z:0.22228941} rotation:{x:0.6810783 y:0.24151509 z:0.31152245}
クライアントからはサーバに 10 件 Send しているだけですが、クライアント側の受信ログは 100 件出力されています。
サーバ側でクライアント 1 メッセージに対して 10 件送信しているからですね。
問題なさそうですね!
Discussion