🤖

Go で gRPC streaming 通信(Win)

2022/08/03に公開

前回は Hello World サンプルで最も基本的な Unary RPC 通信を確認しました。

今回は streaming 通信を試していきます。
開発環境は Windows10(64bit) + Visual Studio Code です。

streaming を使うと、その通信内で連続したメッセージを送受信し続けることができます。
streaming RPC には3種あります。

  1. Server streaming RPC
    • サーバからクライアントへ streaming メッセージを返します。
  2. Client streaming RPC
    • クライアントからサーバへ streaming メッセージを返します。
  3. Bidirectional streaming RPC
    • 双方向で streaming メッセージをやり取りします。

公式の route_guide という example がとてもわかりやすいです。
ゲームで UDP 通信の代わりに gRPC streaming を使えないかなぁと思っているので試していきます。

Streaming RPC

game-server

作業用フォルダ準備

game-server フォルダを作って Visual Studio Code で開きます。

protocol buffer ファイルの作成

grpc フォルダを作成し、その下に game.proto を用意します。
オンラインゲームでの通信を想定して Vector3 なデータをやり取りしてみます。

// grpc/game.proto
syntax = "proto3";

option go_package = ".;grpc";

package grpc;

service GameService {
  // A server-to-client streaming RPC.
  rpc ListTransform(Transform) returns (stream Transform) {}

  // A client-to-server streaming RPC.
  rpc RecordTransform(stream Transform) returns (RecordReply) {}

  // A Bidirectional streaming RPC.
  rpc BidirectTransform(stream Transform) returns (stream Transform) {}
}

message Vector3 {
  float x = 1;
  float y = 2;
  float z = 3;
}

message Transform {
  Vector3 position = 1;
  Vector3 rotation = 2;
}

message RecordReply {
  string message = 1;
}

コードの自動生成

PowerShell を開いて protoc を実行

protoc --go_out=. --go_opt=paths=source_relative `
    --go-grpc_out=. --go-grpc_opt=paths=source_relative `
    grpc/game.proto

game_grpc.pb.go と game.pb.go が生成されました。

作業フォルダの状態

grpc\
    + game_grpc.pb.go
    + game.pb.go
    + game.proto

サーバ開発

go mod init でモジュール初期化しておきます。

github.com/hidingfox は適切な値に変更してください。

go mod init github.com/hidingfox

main.go を記載します。

// main.go
package main

import (
	"flag"
	"fmt"
	"io"
	"log"
	"math/rand"
	"net"
	"time"

	pb "github.com/hidingfox/grpc"
	"google.golang.org/grpc"
)

var (
	port = flag.Int("port", 50051, "The server port")
)

type server struct {
	pb.UnimplementedGameServiceServer
}

// ListTransform プレイヤーの Transform をサーバからクライアントへ stream で送信する
func (s *server) ListTransform(rect *pb.Transform, stream pb.GameService_ListTransformServer) error {
	for i := 0; i < 100; i++ {
		p := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
		r := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
		t := pb.Transform{Position: &p, Rotation: &r}
		if err := stream.Send(&t); err != nil {
			return err
		}

	}
	return nil
}

// RecordTransform プレイヤーから stream で Transform 情報を受信する
func (s *server) RecordTransform(stream pb.GameService_RecordTransformServer) error {
	for {
		t, err := stream.Recv()
		if err == io.EOF {
			return stream.SendAndClose(&pb.RecordReply{
				Message: "OK",
			})
		}
		if err != nil {
			return err
		}
		log.Printf("Received: %v", t)
	}
}

// BidirectTransform サーバ・クライアント双方向 stream で Transform 情報を送受信する
func (s *server) BidirectTransform(stream pb.GameService_BidirectTransformServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		log.Printf("Received: %v", in)
		for i := 0; i < 10; i++ {
			p := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
			r := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
			t := pb.Transform{Position: &p, Rotation: &r}
			if err := stream.Send(&t); err != nil {
				return err
			}
		}
	}
}

func main() {
	flag.Parse()
	rand.Seed(time.Now().UnixNano())
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGameServiceServer(s, &server{})
	log.Printf("server listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

go mod tidy を実行して依存モジュールをインストールします。

go mod tidy

作業フォルダの状態

grpc\
    + game_grpc.pb.go
    + game.pb.go
    + game.proto
go.mod
go.sum
main.go

サーバ起動

go run して起動すれば OK

go run .\main.go

# 2022/08/02 23:47:02 server listening at [::]:50051

クライアント開発

作業用フォルダ準備

game-client フォルダを作って Visual Studio Code で開きます。

protocol buffer ファイルの作成

game-server の grpc フォルダをそのままコピーすれば OK

クライアント開発

go mod init を実行

github.com/hidingfox は適切な値に変更してください。

go mod init github.com/hidingfox

main.go を記載します。
routeguide よりもシンプルにしたつもり・・・。
mode パラメータで通信タイプを切り替えられるようにしました。

// main.go
package main

import (
	"context"
	"flag"
	"io"
	"log"
	"math/rand"
	"time"

	pb "github.com/hidingfox/grpc"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

var (
	addr = flag.String("addr", "localhost:50051", "the address to connect to")
	mode = flag.String("mode", "s", "RPC mode")
)

func serverStream(c pb.GameServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	p := pb.Vector3{X: 0, Y: 0, Z: 0}
	r := pb.Vector3{X: 0, Y: 0, Z: 0}
	t := pb.Transform{Position: &p, Rotation: &r}
	stream, err := c.ListTransform(ctx, &t)
	if err != nil {
		log.Fatalf("ListTransform failed: %v", err)
	}
	for {
		t, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("ListTransform failed: %v", err)
		}
		log.Printf("Receive: %v", t)
	}
}

func clientStream(c pb.GameServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := c.RecordTransform(ctx)
	if err != nil {
		log.Fatalf("RecordTransform failed: %v", err)
	}
	for i := 0; i < 100; i++ {
		p := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
		r := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
		t := pb.Transform{Position: &p, Rotation: &r}
		if err := stream.Send(&t); err != nil {
			log.Fatalf("RecordTransform: stream.Send(%v) failed: %v", t, err)
		}
	}
	reply, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("RecordTransform failed: %v", err)
	}
	log.Printf("RecordReply: %v", reply)
}

func bidirectStream(c pb.GameServiceClient) {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()
	stream, err := c.BidirectTransform(ctx)
	if err != nil {
		log.Fatalf("BidirectTransform failed: %v", err)
	}
	waitc := make(chan struct{})
	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				// read done.
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("BidirectTransform failed: %v", err)
			}
			log.Printf("Got message %v", in)
		}
	}()
	for i := 0; i < 10; i++ {
		p := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
		r := pb.Vector3{X: rand.Float32(), Y: rand.Float32(), Z: rand.Float32()}
		t := pb.Transform{Position: &p, Rotation: &r}
		if err := stream.Send(&t); err != nil {
			log.Fatalf("BidirectTransform: stream.Send(%v) failed: %v", t, err)
		}
	}
	stream.CloseSend()
	<-waitc
}

func main() {
	flag.Parse()
	// Set up a connection to the server.
	conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGameServiceClient(conn)
	switch *mode {
	case "s":
		serverStream(c)
	case "c":
		clientStream(c)
	case "b":
		bidirectStream(c)
	}
}

go mod tidy を実行

go mod tidy

作業フォルダの状態

grpc\
    + game_grpc.pb.go
    + game.pb.go
    + game.proto
go.mod
go.sum
main.go

クライアント実行

サーバが起動している状態で、まずは Server streaming RPC モードを試します。

go run .\main.go --mode s
# 2022/08/03 00:28:39 Receive: position:{x:0.047545657 y:0.7887996 z:0.66850066} rotation:{x:0.036283173 y:0.49585515 z:0.57031566}
# 2022/08/03 00:28:39 Receive: position:{x:0.18386358 y:0.19838057 z:0.6264753} rotation:{x:0.94531596 y:0.5830957 z:0.82607824}
# .
# (snip 100件続く)
# .

100 件 Transform が返ってきました。

次は Client streaming RPC モードを試します。

go run .\main.go --mode c

# 2022/08/03 00:32:05 RecordReply: message:"OK"

クライアント側は OK が返ってきました。

サーバ側のログを見ると

go run .\main.go

# 2022/08/03 00:31:53 server listening at [::]:50051
# 2022/08/03 00:32:05 Received: position:{x:0.6046603  y:0.9405091  z:0.6645601}  rotation:{x:0.4377142  y:0.4246375  z:0.68682307}
# 2022/08/03 00:32:05 Received: position:{x:0.06563702  y:0.15651925  z:0.09696952}  rotation:{x:0.30091187  y:0.51521266  z:0.81363994}
# .
# (snip 100件続く)
# .

ちゃんとクライアントから 100 件分のデータが送られています。

最後に Bidirectional streaming RPC モードを試します。

クライアント側ログ

go run .\main.go --mode b

# 2022/08/03 00:40:14 Got message position:{x:0.29380703 y:0.15668243 z:0.6852979} rotation:{x:0.9824183 y:0.8063538 z:0.24426582}
# 2022/08/03 00:40:14 Got message position:{x:0.3163075 y:0.96565104 z:0.3074106} rotation:{x:0.7000072 y:0.86846733 z:0.939908}
# 2022/08/03 00:40:14 Got message position:{x:0.73445493 y:0.10519899 z:0.49200013} rotation:{x:0.9716718 y:0.20778501 z:0.79693836}
# .
# (snip 100件続く)
# .

サーバ側ログ

go run .\main.go

# 2022/08/03 00:40:02 server listening at [::]:50051
# 2022/08/03 00:40:14 Received: position:{x:0.6046603  y:0.9405091  z:0.6645601}  rotation:{x:0.4377142  y:0.4246375  z:0.68682307}
# 2022/08/03 00:40:14 Received: position:{x:0.06563702  y:0.15651925  z:0.09696952}  rotation:{x:0.30091187  y:0.51521266  z:0.81363994}
# 2022/08/03 00:40:14 Received: position:{x:0.21426387  y:0.3806572  z:0.31805816}  rotation:{x:0.46888983  y:0.28303415  z:0.29310185}
# 2022/08/03 00:40:14 Received: position:{x:0.67908466  y:0.21855305  z:0.20318687}  rotation:{x:0.3608714  y:0.5706733  z:0.8624914}
# 2022/08/03 00:40:14 Received: position:{x:0.29311424  y:0.29708257  z:0.752573}  rotation:{x:0.20658267  y:0.865335  z:0.69671917}
# 2022/08/03 00:40:14 Received: position:{x:0.5238203  y:0.028303083  z:0.15832828}  rotation:{x:0.60725343  y:0.9752416  z:0.079453625}
# 2022/08/03 00:40:14 Received: position:{x:0.5948086  y:0.05912065  z:0.6920246}  rotation:{x:0.30152267  y:0.17326623  z:0.54109985}
# 2022/08/03 00:40:14 Received: position:{x:0.5441556  y:0.27850762  z:0.4231522}  rotation:{x:0.5305857  y:0.2535405  z:0.282081}
# 2022/08/03 00:40:14 Received: position:{x:0.7886049  y:0.36180547  z:0.8805431}  rotation:{x:0.29711226  y:0.89436173  z:0.097454615}
# 2022/08/03 00:40:14 Received: position:{x:0.97691685  y:0.074291  z:0.22228941}  rotation:{x:0.6810783  y:0.24151509  z:0.31152245}

クライアントからはサーバに 10 件 Send しているだけですが、クライアント側の受信ログは 100 件出力されています。
サーバ側でクライアント 1 メッセージに対して 10 件送信しているからですね。

問題なさそうですね!

Discussion