Chapter 08

サーバーストリーミングの実装

さき(H.Saki)
さき(H.Saki)
2022.06.19に更新

この章について

ここからは、実際にストリーミング処理を実装して動かしていきます。
まずこの章では、サーバーストリーミングから見ていきます。

具体的には、Unary RPCだったHelloメソッドの他に、サーバーストリーミングを行うHelloServerStreamメソッドを作っていきましょう。

メソッドの追加処理

protoファイルでの定義

まずは、protoファイルにHelloServerStreamメソッドの定義を記述します。

api/hello.proto
service GreetingService {
	// サービスが持つメソッドの定義
	rpc Hello (HelloRequest) returns (HelloResponse);
+	// サーバーストリーミングRPC
+	rpc HelloServerStream (HelloRequest) returns (stream HelloResponse);
}

今回はサーバーストリーミングですので、一つのリクエストに複数個のレスポンスが返ってくる形態です。
それを表現するために、レスポンスを表す戻り値の定義のところにstreamとつけています。

サーバーストリーミングメソッド用のコードを自動生成させる

protoファイルの修正が終わったところで、HelloServerStreamメソッド用のコードを自動生成で作りましょう。
もう一度以下のprotocコマンドを実行します。

$ cd api
$ protoc --go_out=../pkg/grpc --go_opt=paths=source_relative \
	--go-grpc_out=../pkg/grpc --go-grpc_opt=paths=source_relative \
	hello.proto

サーバーサイドの実装

ここからは、gRPCサーバーの中にHelloServerStreamメソッドを付け加えるように実装を追加していきます。

自動生成されたコード

自動生成されたコードは、元々あったGreetingServiceServerサービスにHelloServerStreamメソッドが追加されたものになります。

pkg/grpc/hello_grpc.pb.go
type GreetingServiceServer interface {
	// サービスが持つメソッドの定義
	Hello(context.Context, *HelloRequest) (*HelloResponse, error)
+	// サーバーストリーミングRPC
+	HelloServerStream(*HelloRequest, GreetingService_HelloServerStreamServer) error
	mustEmbedUnimplementedGreetingServiceServer()
}

引数としてHelloRequest型を渡すところはUnary RPCであるHelloメソッドと同じです。
ただ、ストリーミングにした戻り値からHelloResponse型がなくなりエラーだけになっています。

その代わりに、第二引数にGreetingService_HelloServerStreamServerインターフェースというものが加わりました。

pkg/grpc/hello_grpc.pb.go
// 自動生成された、サーバーストリーミングのためのインターフェース(for サーバー)
type GreetingService_HelloServerStreamServer interface {
	Send(*HelloResponse) error
	grpc.ServerStream
}

このGreetingService_HelloServerStreamServerインターフェースを使って、どのようにレスポンスをクライアントに返していくのかについては後ほど説明します。

サーバーサイドのビジネスロジックを実装する

それでは、gRPCサービスの実態である自作構造体myServer型にもHelloServerStreamメソッドを実装していきましょう。
HelloServerStreamメソッドのシグネチャは、自動生成されたGreetingServiceServerインターフェースに含まれていたHelloServerStreamメソッドに従います。

cmd/server/main.go
func (s *myServer) HelloServerStream(req *hellopb.HelloRequest, stream hellopb.GreetingService_HelloServerStreamServer) error {
	resCount := 5
	for i := 0; i < resCount; i++ {
		if err := stream.Send(&hellopb.HelloResponse{
			Message: fmt.Sprintf("[%d] Hello, %s!", i, req.GetName()),
		}); err != nil {
			return err
		}
		time.Sleep(time.Second * 1)
	}
	return nil
}

レスポンス送信処理

特筆するべきなのは、クライアントにレスポンスを返す部分の記述が、第二引数として受け取ったstreamSendメソッドになっているところです。
レスポンスを返したいときには、Sendメソッドの引数にHelloResponse型を渡すことでそれがクライアントに送信されます。

// Unary RPCがレスポンスを返すところ
func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
	// HelloResponse型を直接returnする
	return &hellopb.HelloResponse{
		Message: fmt.Sprintf("Hello, %s!", req.GetName()),
	}, nil
}

// Server Stream RPCがレスポンスを返すところ
func (s *myServer) HelloServerStream(req *hellopb.HelloRequest, stream hellopb.GreetingService_HelloServerStreamServer) error {
	// (一部抜粋)
	// streamのSendメソッドを使っている
	stream.Send(&hellopb.HelloResponse{
		Message: fmt.Sprintf("[%d] Hello, %s!", i, req.GetName()),
	})
}

Sendメソッドを何度も実行することで何度もクライアントにレスポンスを返すことができ、これにてサーバーからのストリーミングを実現しています。
これがUnary RPCのときとの違いです。

ストリームの終端

サーバー側から全てのデータを送信し終わったときには、HelloServerStreamメソッドをreturn文で終わらせることでストリームを終わらせることができます。

// Unary RPCの通信終了時
func (s *myServer) Hello(ctx context.Context, req *hellopb.HelloRequest) (*hellopb.HelloResponse, error) {
	// HelloResponse型を1つreturnする
	// (Unaryなので、レスポンスを一つ返せば終わり)
	return &hellopb.HelloResponse{
		Message: fmt.Sprintf("Hello, %s!", req.GetName()),
	}, nil
}

// Server Stream RPCの通信終了時
func (s *myServer) HelloServerStream(req *hellopb.HelloRequest, stream hellopb.GreetingService_HelloServerStreamServer) error {
	// (略: レスポンス送信処理)

	// return文でメソッドを終了させる=ストリームの終わり
	return nil
}

gRPCurlを用いたサーバーサイドの動作確認

それでは、このHelloServerStreamメソッドの動作確認をgRPCurlでやってみましょう。
サーバー起動を行った後に、以下のようにリクエストを送信します。

$ grpcurl -plaintext -d '{"name": "hsaki"}' localhost:8080 myapp.GreetingService.HelloServerStream
{
  "message": "[0] Hello, hsaki!"
}
{
  "message": "[1] Hello, hsaki!"
}
{
  "message": "[2] Hello, hsaki!"
}
{
  "message": "[3] Hello, hsaki!"
}
{
  "message": "[4] Hello, hsaki!"
}

一度'{"name": "hsaki"}'というリクエストを送っただけで、サーバーからは5回レスポンスが続けて送られてきました。
これがサーバーストリーミングの挙動です。

クライアントコードの実装

それでは今度はHelloServerStreamメソッドを呼び出すようなクライアントコードを書いていきましょう。

自動生成されたコード

自動生成されたGreetingService用のクライアントにも、HelloServerStreamメソッドを呼び出すためのメソッドが追加されています。

pkg/grpc/hello_grpc.pb.go
type GreetingServiceClient interface {
	// サービスが持つメソッドの定義
	Hello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloResponse, error)
+	// サーバーストリーミングRPC
+	HelloServerStream(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (GreetingService_HelloServerStreamClient, error)
}

引数としてHelloRequest型を渡すところは、Unary RPCであるHelloメソッドと同じです。
ただ、サーバーから送られてくる複数個のレスポンスを受け取るために、戻り値がGreetingService_HelloServerStreamClientに変わっています。

pkg/grpc/hello_grpc.pb.go
// 自動生成された、サーバーストリーミングのためのインターフェース(for クライアント)
type GreetingService_HelloServerStreamClient interface {
	Recv() (*HelloResponse, error)
	grpc.ClientStream
}

このGreetingService_HelloServerStreamClientインターフェースを使って、どのようにサーバーから返ってくる複数個レスポンス受け取るのかは後ほど説明します。

クライアントの実装

それでは、クライアントに新しく追加されたHelloServerStreamメソッドを使って、gRPCサーバー上にあるHelloServerStreamメソッドを呼び出す処理を書いていきましょう。

cmd/client/main.go
func main() {
	// (前略)
	for {
		fmt.Println("1: send Request")
+		fmt.Println("2: HelloServerStream")
-		fmt.Println("2: exit")
+		fmt.Println("3: exit")
		fmt.Print("please enter >")

		// (略)

		switch in {
		case "1":
			(略)

+		case "2":
+			HelloServerStream()

-		case "2":
+		case "3":
			fmt.Println("bye.")
			goto M
		}
	}
M:
}

+func HelloServerStream() {
+	fmt.Println("Please enter your name.")
+	scanner.Scan()
+	name := scanner.Text()
+
+	req := &hellopb.HelloRequest{
+		Name: name,
+	}
+	stream, err := client.HelloServerStream(context.Background(), req)
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
+
+	for {
+		res, err := stream.Recv()
+		if errors.Is(err, io.EOF) {
+			fmt.Println("all the responses have already received.")
+			break
+		}
+
+		if err != nil {
+			fmt.Println(err)
+		}
+		fmt.Println(res)
+	}
+}

特筆するべき点について説明します。

レスポンス受信処理

Unary RPCのときは、サーバーからレスポンスは1回しか送られてこないので、gRPCクライアントが持つHelloメソッドを一回呼ぶだけで直接レスポンスを得ることができました。
しかし、サーバーストリーミングの場合は、

  1. クライアントが持つHelloServerStreamメソッドを呼んで、サーバーからレスポンスが送られてくるストリーム(GreetingService_HelloServerStreamClientインターフェース型)を取得
  2. そのストリームのRecvメソッドを呼ぶことでレスポンスを得る

という2ステップが必要になります。

// Unary RPCがレスポンスを受け取るところ
func Hello() {
	// (一部抜粋)
	// Helloメソッドの実行 -> HelloResponse型のレスポンスresを入手
	res, err := client.Hello(context.Background(), req)
}

// Server Stream RPCがレスポンスを受け取るところ
func HelloServerStream() {
	// (一部抜粋)
	// サーバーから複数回レスポンスを受け取るためのストリームを得る
	stream, err := client.HelloServerStream(context.Background(), req)

	for {
		// ストリームからレスポンスを得る
		res, err := stream.Recv()
	}
}

ストリームの終端

サーバーストリーミングといっても、いつまでも無限にレスポンスを受け取るわけではありません。
サーバーからもうこれ以上レスポンスは送られてきませんというタイミングは絶対に訪れます。

クライアントの方で「全てのレスポンスを受け取った」とどう判断するのでしょうか。
実際にその判断を行っているのは以下の箇所です。

res, err := stream.Recv()
if errors.Is(err, io.EOF) {
	fmt.Println("all the responses have already received.")
	break
}

Recvメソッドでレスポンスを受け取るとき、これ以上受け取るレスポンスがないという状態なら、第一戻り値にはnil、第二戻り値のerrにはio.EOFが格納されています。

var EOF = errors.New("EOF")

そのため、errors.Is関数を用いて「エラーを受け取ったか&受け取ったエラーがio.EOFか」を確かめることで、後続のレスポンスの有無を判定することができます。

実装したクライアントの挙動確認

それでは、今作ったクライアントコードの挙動を確認してみます。

$ cd cmd/client
$ go run main.go
start gRPC Client.

1: Hello
2: HelloServerStream
3: exit
please enter >2

Please enter your name.
hsaki
message:"[0] Hello, hsaki!"
message:"[1] Hello, hsaki!"
message:"[2] Hello, hsaki!"
message:"[3] Hello, hsaki!"
message:"[4] Hello, hsaki!"

1: Hello
2: HelloServerStream
3: exit
please enter >3

bye.

このように、ターミナルを通じてリクエスト送信・レスポンスの表示ができれば成功です。
きちんと複数個のレスポンスを受け取ることができました。