📘

connect-goでgRPCの通信方式4種類を網羅!テスト付き実装ガイド

2023/03/29に公開

最近、いくつかの言語でサポートを発表し、Connectが色々と騒がれていました!

Jan 18, 2023
Announcing Connect-Swift: You’ll actually want to use Protobuf on iOS
https://buf.build/blog/announcing-connect-swift/

Feb 22, 2023
Announcing Connect-Kotlin: Connect is now fully supported on mobile!
https://buf.build/blog/announcing-connect-kotlin/

Feb 28, 2023
Connect for Node.js is now available
https://buf.build/blog/connect-node-beta

ついにはモバイル系をフルサポート。
またNode.jsまでサポートを発表し、フロントエンド/サーバーサイド共にConnectによって、容易にTypeScriptを出力できるようになり、勢いが止まりません!!

https://twitter.com/__syumai/status/1630695749007855616?s=20

従来のgRPCを使うには、複雑なコマンドや各種ライブラリの調整など様々な準備が必要でした。
ところが、Connectがその複雑さを取り除き、桁違いに扱いやすくしたことで、界隈が大盛り上がりを見せていました。

そんな勢いが止まらないConnectが管理している、「 connect-go 」について、テストを書きながら、環境構築〜4種類の通信方式実装までを実装ガイドとしてまとめています。

書いていること

🔻サンプルコードはコチラにございます🔻
https://github.com/Hirochon/connect-go-test

connect-goを使ったgRPCサーバーの環境構築と、connect-goのリポジトリにて書かれているテストを参考に、gPRCに存在する4種類の通信方式の実装とそのテストについて書いています。

特に下記について、重点的にまとめています。

  • gRPCサーバーを作るための最小構成
  • connect-goを使った、4つの通信方式の実装とテストガイド

序章 Goが実行できる環境を整える

Goが最低限実行できる環境を整えていきます。
こちら(connect-go-test/issues/1)に実装があります。

.
├── server
│   ├── Dockerfile
│   ├── main.go
│   └── go.mod
├── docker-compose.yml
└── README.md

上記のディレクトリで、docker-composeを使って実行していきます。
まずは片方のshellのタブでupします。

docker compose up

そしてもう一つのタブでexecとrunします。

docker compose exec connect-go-test ash
go run ./...

これによって、Hello Worldと出力されたら成功です。

1章 connect-goでコード生成ができる環境作り

次に最低限、connect-goでコード生成が可能な環境を作ります。
こちら(connect-go-test/issues/2)に実装があります。

前提として、フロントエンドやバックエンドなどの実装を、モノレポで組んでいることが多いと思うので、それぞれのディレクトリに生成する想定で進めていきます。
結果としてこの章では、以下の様なディレクトリになります。

.
├── Makefile
├── README.md
├── buf.gen.yaml
├── buf.yaml
├── docker-compose.yml
├── protocolbuffers
│   ├── Dockerfile
│   └── greet
│       └── v1
│           └── greet.proto
└── server
    ├── Dockerfile
    ├── go.mod
    ├── main.go
    └── protocolbuffers
        └── greet
            └── v1
                ├── greet.pb.go
                └── greetv1connect
                    └── greet.connect.go

1.1 Docker環境周りを整える

protocolbuffersディレクトリのDockerfileに注目します。
コード生成するためのgo installをいくつか書いています。

RUN go install github.com/bufbuild/buf/cmd/buf@latest && \
    go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest && \
    go install google.golang.org/protobuf/cmd/protoc-gen-go@latest && \
    go install github.com/bufbuild/connect-go/cmd/protoc-gen-connect-go@latest

上記はこちらの公式ページで記載されているコマンドを参考に書いており、主にコード生成時に必要となるツール群です。

1.2 buf.yamlとbuf.gen.yamlを設定する

コードを生成するための設定ファイルを配置する必要があります。

ここでbuf.yamlbuf.gen.yamlは下記のように書きました。

buf.yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT
buf.gen.yaml
version: v1
plugins:
  - name: go
    out: server
    opt: paths=source_relative
  - name: connect-go
    out: server
    opt: paths=source_relative

上記はこちらの公式ページで記載されている見本を参考に書いています。

1.3 protocol buffersの定義

コードを生成する際には、まず.proto拡張子を持つファイルにProtocol Buffers定義を記述する必要があります。この定義には、データ型、メッセージ、フィールド、およびその他のProtocol Buffersの構成要素が含まれます。

今回は、以下のようなgreet.protoというファイルのProtocol Buffersを定義しました。

greet.proto
syntax = "proto3";

package protocolbuffers.greet.v1;

option go_package = "github.com/Hirochon/connect-go-test/server/protocolbuffers/greet/v1;greetv1";

message GreetUnaryRequest {
  string name = 1;
}

message GreetUnaryResponse {
  string greeting = 1;
}

service GreetService {
  rpc GreetUnary(GreetUnaryRequest) returns (GreetUnaryResponse) {}
}

GreetUnaryRequestGreetUnaryResponseがそれぞれmessageとして定義していて、Protocol Buffersで扱うデータ構造となります。その中でstring型のnamegreetingがフィールドとして定義されています。
GreetServiceserviceとして定義していて、これはRPCの実装範囲とも捉えることができます。GoにおけるRPCは同じpackage内のインターフェースにまとめられるので、その制御が行える範囲で定義する必要があります。
後述しますが、通信方式はUnaryとなっています。

1.4 コード生成

いくつか端折った部分もありますが、こちら(commit)のような差分になり、いよいよコード生成ができる準備ができました。

ここで下記のようにdockerコマンドでコードを生成することができます。

docker compose up protocolbuffers

実際には、公式ドキュメントにもある、下記コマンドを実行しています。

buf lint
buf generate

よってserver/protocolbuffersディレクトリの下に、gRPCで通信を行うためのGoのコードが生成されました。

└── server
    ├── ...
    └── protocolbuffers
        └── greet
            └── v1
                ├── greet.pb.go
                └── greetv1connect
                    └── greet.connect.go

第2章 main.goでmultiplexerとListenAndServeする

公式ドキュメントを参考に構成しています。
こちら(connect-go-test/issues/3)に実装があります。
差分といえば"localhost:5050"ではなく、":5050"にしているところです。

package main

import (
	"net/http"

	"golang.org/x/net/http2"
	"golang.org/x/net/http2/h2c"

	"github.com/Hirochon/connect-go-test/server/protocolbuffers/greet/v1/greetv1connect"
)

func server() http.Handler {
	mux := http.NewServeMux()
	mux.Handle(greetv1connect.NewGreetServiceHandler(nil))
	return mux
}

func main() {
	mux := server()
	http.ListenAndServe(
		":5050",
		h2c.NewHandler(mux, &http2.Server{}),
	)
}

まだRPCの実装を書いていないので、NewGreetServiceHandlerの引数はnilになっています。
ちなみにこの状態でgo runすると、問答無用でnil pointerします。

ここで環境づくりは終了です。次はconnect-goのリポジトリにあるテストを眺めていきます!

第3章 connect-goでテストに使われている実装を観察する

connect-goのリポジトリから4種類のRPCがテストされている箇所を見つけ出します!
まずはsuffixにtestが付いているファイルをざっと流し読みして、それっぽいものを見つけます。

...

3.1 connect_ext_test.goをじっくり見る

ありました。connect_ext_test.goで、どうやら幅広いテスト実装&4種類のRPCを実行していることがわかかります!

さらに見ていくと、4種類のRPCの実装を行なっている箇所が見えてきます。
また、これらの引数や戻り値を見てみると、pingv1というパッケージから参照されています。

3.2 internal/gen/connect/ping/v1/ping.pb.goをじっくり見る

さらに見ていくと、Ping関数の引数となっているところPing関数の戻り値となっているところが見つかります。
ここでPublicなフィールドを見てみると、ProtocolBuffersに定義したmessageに当たるものが見つかります。つまりコード生成によって、引数と戻り値が作られた訳ですね。
同ファイルの下の方を見ていくと、SumやCountUp、CumSumの引数や戻り値も定義されていることがわかります。

3.3 internal/gen/connect/ping/v1/pingv1connect/ping.connect.goをじっくり見る

引数や戻り値は見つかったけど、どこでメソッドを抽象化しているんだと思った方もいるでしょう!
これを探す方法は、connect_ext_test.goを再び眺め観察していると発見できます。
4種類のRPCの実装を行なっていたものは全てpingServerという構造体のメソッドとして定義されています。
これは怪しいと思い、その構造体のフィールドを見てみると、pingv1connect.UnimplementedPingServiceHandlerがあります。
これの実装を見てみると、何も値を持たない構造体に、何も返さないメソッドがぶら下がっていることがわかります。
つまり、何も実装しなかった場合にこちらが適用されるということです。

少し上の方のNewPingServiceHandlerというサーバーの初期化関数ぽいのがあります。実際にconnect_ext_test.goで検索してみるとたくさん初期化しているので、目星がつきます。
また、NewPingServiceHandlerの第1引数にあるPingServiceHandlerでは、interfaceを使ったメソッドの抽象化を行なっており、これらを実装しないと動かないことがわかります。

3.4 テーブル形式でまとめ

Unary RPC Client streaming RPC Server streaming RPC Bidirectional streaming RPC
Pingの実装 Sumの実装 CountUpの実装 CumSumの実装
Pingの引数 Sumの引数 CountUpの引数 CumSumの引数
Pingの戻り値 Sumの戻り値 CountUpの戻り値 CumSumの戻り値
Pingの仮実装 Sumの仮実装 CountUpの仮実装 CumSumの仮実装
Pingの抽象化 Sumの抽象化 CountUpの抽象化 CumSumの抽象化

これらをProtocolBuffersから全て生成できる且つ、メソッドを実装するだけで使えると考えると、手間も省けてとても便利ですね!

第4章 Unary RPCをテストして実装する

こちら(connect-go-test/issues/4)に実装があります。
Unary RPCとは、クライアントが1つのリクエストを送信し、1つのレスポンスを返すという最も単純なタイプのRPCです。シンプルなリクエストとレスポンスという認識で問題ないと思います。

Unary RPC

4.1 Unary RPCのテストを書く

1.3 protocol buffersの定義にて書いているのが、Unary RPCの形式になります。
まず、ここで生成するコードのテストを書いていきます。

着目する点としては以下になります。

  1. http2でサーバーをスタートさせているところの流れ
  2. NewPingServiceClientによって、clientを生成する
  3. リクエスト作成からレスポンス受け取りまで

これらの特徴から似たような書き方で下記のテストを用意してみました。

handler_test.go
package main

import (
	"context"
	"net/http/httptest"
	"testing"

	greetv1 "github.com/Hirochon/connect-go-test/server/protocolbuffers/greet/v1"
	"github.com/Hirochon/connect-go-test/server/protocolbuffers/greet/v1/greetv1connect"
	"github.com/bufbuild/connect-go"
)

func TestGreetUnaryHandler(t *testing.T) {
	t.Parallel()
	mux := server()
	server := httptest.NewUnstartedServer(mux)
	server.EnableHTTP2 = true
	server.StartTLS()
	t.Cleanup(server.Close)
	cases := []struct {
		scenario string
		name     string
		want     string
	}{
		{
			scenario: "Twitterのユーザー名",
			name:     "heacet43",
			want:     "Hello, heacet43!",
		},
		{
			scenario: "GitHubのユーザー名",
			name:     "Hirochon",
			want:     "Hello, Hirochon!",
		},
		{
			scenario: "今の気持ち",
			name:     "お腹すいた",
			want:     "Hello, お腹すいた!",
		},
	}
	for _, c := range cases {
		c := c
		t.Run(c.scenario, func(t *testing.T) {
			t.Parallel()
			client := greetv1connect.NewGreetServiceClient(
				server.Client(),
				server.URL,
			)
			res, err := client.GreetUnary(context.Background(), connect.NewRequest(&greetv1.GreetUnaryRequest{
				Name: c.name,
			}))
			if err != nil {
				t.Error(err)
			}
			if res.Msg.GetGreeting() != c.want {
				t.Errorf("greeting got: %s, want: %s", res.Msg.GetGreeting(), c.want)
			}
		})
	}
}

実行すると、もちろん落ちます。

=== RUN   TestGreetUnaryHandler
=== PAUSE TestGreetUnaryHandler
=== CONT  TestGreetUnaryHandler
--- FAIL: TestGreetUnaryHandler (0.00s)
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
        panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x2 addr=0x0 pc=0x10109ce94]

4.2 Unary RPCの実装を書く

テストを通すために実装を書いていきます。

先ほど紹介したPingの実装を参考に以下のように組みました。

handler.go
package main

import (
	"context"
	"fmt"

	greetv1 "github.com/Hirochon/connect-go-test/server/protocolbuffers/greet/v1"
	"github.com/Hirochon/connect-go-test/server/protocolbuffers/greet/v1/greetv1connect"
	"github.com/bufbuild/connect-go"
)

type GreetServer struct {
	greetv1connect.UnimplementedGreetServiceHandler
}

func (s *GreetServer) GreetUnary(
	ctx context.Context,
	req *connect.Request[greetv1.GreetUnaryRequest],
) (*connect.Response[greetv1.GreetUnaryResponse], error) {
	res := connect.NewResponse(&greetv1.GreetUnaryResponse{
		Greeting: fmt.Sprintf("Hello, %s!", req.Msg.Name),
	})
	return res, nil
}

さらに、NewGreetServiceHandlerへ構造体を渡します。

main.go
package main

import (
	"net/http"

	"golang.org/x/net/http2"
	"golang.org/x/net/http2/h2c"

	"github.com/Hirochon/connect-go-test/server/protocolbuffers/greet/v1/greetv1connect"
)

func server() http.Handler {
	mux := http.NewServeMux()
-	mux.Handle(greetv1connect.NewGreetServiceHandler(nil))
+	mux.Handle(greetv1connect.NewGreetServiceHandler(&GreetServer{}))
	return mux
}

func main() {
	mux := server()
	http.ListenAndServe(
		":5050",
		h2c.NewHandler(mux, &http2.Server{}),
	)
}

これで無事にテストが通りました🎉

=== RUN   TestGreetUnaryHandler
=== PAUSE TestGreetUnaryHandler
=== CONT  TestGreetUnaryHandler
=== RUN   TestGreetUnaryHandler/Twitterのユーザー名
=== PAUSE TestGreetUnaryHandler/Twitterのユーザー名
=== RUN   TestGreetUnaryHandler/GitHubのユーザー名
=== PAUSE TestGreetUnaryHandler/GitHubのユーザー名
=== RUN   TestGreetUnaryHandler/今の気持ち
=== PAUSE TestGreetUnaryHandler/今の気持ち
=== CONT  TestGreetUnaryHandler/Twitterのユーザー名
=== CONT  TestGreetUnaryHandler/今の気持ち
=== CONT  TestGreetUnaryHandler/GitHubのユーザー名
--- PASS: TestGreetUnaryHandler/今の気持ち (0.01s)
--- PASS: TestGreetUnaryHandler/GitHubのユーザー名 (0.01s)
--- PASS: TestGreetUnaryHandler/Twitterのユーザー名 (0.01s)
--- PASS: TestGreetUnaryHandler (0.00s)
PASS
ok      github.com/Hirochon/connect-go-test/server      0.272s

第5章 Server streaming RPCをテストして実装する

こちら(connect-go-test/issues/5)に実装があります。
Server streaming RPCは、サーバーがクライアントのリクエストに応答してメッセージのストリームを返し、すべてのメッセージを送信した後、サーバーのステータス詳細とoptionの末尾のメタデータがクライアントに送信します。
つまり下記画像のように1つのリクエストに対して、複数のレスポンスを返すことができます。
この特徴から一気に大量のデータを送るのではなく、分割して送ることが可能になります。
よって局所的な負荷をサーバーへかけない仕組みや、少しずつ結果を返して初期表示スピードを上げて、ユーザー体験を向上させる仕組みなどが構築できます。

Server streaming RPC

5.1 Server streaming RPCのProtocolBuffers定義

protocolbuffers/greet/v1/greet.protoにServer streaming RPCを定義します。

message GreetServerStreamRequest {
  string name = 1;
}

message GreetServerStreamResponse {
  string greeting = 1;
}

service GreetService {
  ...
  rpc GreetServerStream(GreetServerStreamRequest) returns (stream GreetServerStreamResponse) {}
}

変更点として、戻り値の先頭にstreamが付いています。たったこれだけでServer streaming RPCのコード生成が可能になります。

docker compose up protocolbuffers

上記コマンドでコードが生成されます。

5.2 Server streaming RPCのテストを書く

着目点としては以下になります。

  1. http2でサーバーをスタートさせているところの流れ
  2. NewPingServiceClientによって、clientを生成する
  3. connect.NewRequest()でリクエストを作成する
  4. client.CountUp()でサーバーストリームするためのリクエスト
  5. for文でstream.Receive()が途切れるまで取得し続ける

これらの特徴から下記のテストを実装しました。

handler_test.go
func TestGreetServerStreamHandler(t *testing.T) {
	t.Parallel()
	mux := server()
	server := httptest.NewUnstartedServer(mux)
	server.EnableHTTP2 = true
	server.StartTLS()
	t.Cleanup(server.Close)
	cases := []struct {
		scenario string
		name     string
	}{
		{
			scenario: "Twitterのユーザー名",
			name:     "heacet43",
		},
		{
			scenario: "GitHubのユーザー名",
			name:     "Hirochon",
		},
	}
	for _, c := range cases {
		c := c
		t.Run(c.scenario, func(t *testing.T) {
			t.Parallel()
			client := greetv1connect.NewGreetServiceClient(
				server.Client(),
				server.URL,
			)
			stream, err := client.GreetServerStream(context.Background(), connect.NewRequest(&greetv1.GreetServerStreamRequest{
				Name: c.name,
			}))
			if err != nil {
				t.Error(err)
			}
			i := 0
			for stream.Receive() {
				greeting := stream.Msg().GetGreeting()
				if greeting != fmt.Sprintf("Hello, %s! (%d)", c.name, i) {
					t.Errorf("greeting got: %s, want: %s", greeting, fmt.Sprintf("Hello, %s! (%d)", c.name, i))
				}
				i++
			}
		})
	}
}

5.3 Server streaming RPCの実装を書く

先ほど紹介したCountUpの実装(主にfor文のところ)を参考に以下の実装をしました。

handler.go
func (s *GreetServer) GreetServerStream(
	ctx context.Context,
	req *connect.Request[greetv1.GreetServerStreamRequest],
	stream *connect.ServerStream[greetv1.GreetServerStreamResponse],
) error {
	for i := 0; i < 10; i++ {
		if err := stream.Send(&greetv1.GreetServerStreamResponse{
			Greeting: fmt.Sprintf("Hello, %s! (%d)", req.Msg.Name, i),
		}); err != nil {
			return err
		}
	}
	return nil
}

これで無事にテストが通りました🎉

=== RUN   TestGreetServerStreamHandler
=== PAUSE TestGreetServerStreamHandler
=== CONT  TestGreetServerStreamHandler
=== RUN   TestGreetServerStreamHandler/Twitterのユーザー名
=== PAUSE TestGreetServerStreamHandler/Twitterのユーザー名
=== RUN   TestGreetServerStreamHandler/GitHubのユーザー名
=== PAUSE TestGreetServerStreamHandler/GitHubのユーザー名
=== CONT  TestGreetServerStreamHandler/Twitterのユーザー名
=== CONT  TestGreetServerStreamHandler/GitHubのユーザー名
--- PASS: TestGreetServerStreamHandler/GitHubのユーザー名 (0.01s)
--- PASS: TestGreetServerStreamHandler/Twitterのユーザー名 (0.01s)
--- PASS: TestGreetServerStreamHandler (0.00s)
PASS
ok      github.com/Hirochon/connect-go-test/server      0.290s

第6章 Client streaming RPCをテストして実装する

こちら(connect-go-test/issues/6)に実装があります。
Client streaming RPCはクライアントが単一のメッセージではなく、メッセージのストリームをサーバーに送信します。サーバーは、通常、クライアントのメッセージをすべて受信した後に、1つのメッセージ(ステータスの詳細とオプションの末尾のメタデータとともに)で応答する。
つまり、クライアントが複数リクエストを行い、サーバーが1回のレスポンスを行う形を取ることができます。

Client streaming RPC

6.1 Client streaming RPCのProtocolBuffers定義

protocolbuffers/greet/v1/greet.protoにClient streaming RPCを定義します。

message GreetClientStreamRequest {
  string name = 1;
}

message GreetClientStreamResponse {
  string greeting = 1;
}

service GreetService {
  ...
  rpc GreetClientStream(stream GreetClientStreamRequest) returns (GreetClientStreamResponse) {}
}

変更点として、引数の先頭にstreamが付いています。たったこれだけでClient streaming RPCのコード生成が可能になります。

docker compose up protocolbuffers

上記コマンドでコードが生成されます。

6.2 Client streaming RPCのテストを書く

着目点としては以下になります。

  1. http2でサーバーをスタートさせているところの流れ
  2. NewPingServiceClientによって、clientを生成する
  3. for文でstream.Send()を何回も呼んでClient側のリクエストをたくさん送っている
  4. stream.CloseAndReceive()でstreamをcloseしてサーバーから1つのレスポンスを受け取る

これらの特徴から下記のテストを実装しました。

handler_test.go
func TestGreetClientStreamHandler(t *testing.T) {
	t.Parallel()
	mux := server()
	server := httptest.NewUnstartedServer(mux)
	server.EnableHTTP2 = true
	server.StartTLS()
	t.Cleanup(server.Close)
	cases := []struct {
		scenario string
		name     string
		want     string
	}{
		{
			scenario: "Twitterのユーザー名",
			name:     "heacet43",
			want:     "Hello, heacet43 (0), heacet43 (1), heacet43 (2), heacet43 (3), heacet43 (4), heacet43 (5), heacet43 (6), heacet43 (7), heacet43 (8), heacet43 (9)!",
		},
		{
			scenario: "GitHubのユーザー名",
			name:     "Hirochon",
			want:     "Hello, Hirochon (0), Hirochon (1), Hirochon (2), Hirochon (3), Hirochon (4), Hirochon (5), Hirochon (6), Hirochon (7), Hirochon (8), Hirochon (9)!",
		},
	}
	for _, c := range cases {
		c := c
		t.Run(c.scenario, func(t *testing.T) {
			t.Parallel()
			client := greetv1connect.NewGreetServiceClient(
				server.Client(),
				server.URL,
			)
			stream := client.GreetClientStream(context.Background())
			for i := 0; i < 10; i++ {
				msg := fmt.Sprintf("%s (%d)", c.name, i)
				if err := stream.Send(&greetv1.GreetClientStreamRequest{
					Name: msg,
				}); err != nil {
					t.Error(err)
				}
			}
			res, err := stream.CloseAndReceive()
			if err != nil {
				t.Error(err)
			}
			if res.Msg.GetGreeting() != c.want {
				t.Errorf("greeting got: %s, want: %s", res.Msg.GetGreeting(), c.want)
			}
		})
	}
}

6.3 Client streaming RPCの実装を書く

先ほど紹介したSumの着目点は以下です。

  1. クライアントからstream.Receive()が尽きるまで取得し続ける
  2. 最後に1度だけクライアントへのレスポンスを返す

これらの特徴から下記の実装しました。

handler.go
func (s *GreetServer) GreetClientStream(
	ctx context.Context,
	req *connect.ClientStream[greetv1.GreetClientStreamRequest],
) (*connect.Response[greetv1.GreetClientStreamResponse], error) {
	var names string
	for req.Receive() {
		if len(names) > 0 {
			names += ", "
		}
		names += req.Msg().Name
	}
	res := connect.NewResponse(&greetv1.GreetClientStreamResponse{
		Greeting: fmt.Sprintf("Hello, %s!", names),
	})
	return res, nil
}

これで無事にテストが通りました🎉

=== RUN   TestGreetClientStreamHandler
=== PAUSE TestGreetClientStreamHandler
=== CONT  TestGreetClientStreamHandler
=== RUN   TestGreetClientStreamHandler/Twitterのユーザー名
=== PAUSE TestGreetClientStreamHandler/Twitterのユーザー名
=== RUN   TestGreetClientStreamHandler/GitHubのユーザー名
=== PAUSE TestGreetClientStreamHandler/GitHubのユーザー名
=== CONT  TestGreetClientStreamHandler/Twitterのユーザー名
=== CONT  TestGreetClientStreamHandler/GitHubのユーザー名
--- PASS: TestGreetClientStreamHandler/GitHubのユーザー名 (0.01s)
--- PASS: TestGreetClientStreamHandler/Twitterのユーザー名 (0.01s)
--- PASS: TestGreetClientStreamHandler (0.00s)
PASS
ok      github.com/Hirochon/connect-go-test/server      0.280s

第7章 Bidirectional streaming RPCをテストして実装する

こちら(connect-go-test/issues/7)に実装があります。
Bidirectional streaming RPCでは、クライアントがメソッドを呼び出し、サーバーがクライアントのメタデータ、メソッド名、期限を受信することで呼び出しが開始されます。サーバーは、初期メタデータを送り返すか、クライアントがメッセージのストリーミングを開始するのを待つかを選択することができます。
クライアントとサーバーサイドのストリーム処理は、アプリケーションに依存します。2つのストリームは独立しているため、クライアントとサーバーは任意の順序でメッセージを読み書きすることができます。

つまりクライアントとサーバーどちらも任意のタイミングで且つ、複数のリクエスト/レスポンスを受信/送信することができます。

Bidirectional streaming RPC

7.1 Bidirectional streaming RPCのProtocolBuffers定義

protocolbuffers/greet/v1/greet.protoにBidirectional streaming RPCを定義します。

message GreetBidiStreamRequest {
  string name = 1;
}

message GreetBidiStreamResponse {
  string greeting = 1;
}

service GreetService {
  ...
  rpc GreetBidiStream(stream GreetBidiStreamRequest) returns (stream GreetBidiStreamResponse) {}
}

変更点として、引数と戻り値の先頭にstreamが付いています。たったこれだけでBidirectional streaming RPCのコード生成が可能になります。

docker compose up protocolbuffers

上記コマンドでコードが生成されます。

7.2 Bidirectional streaming RPCのテストを書く

着目点としては以下になります。

  1. http2でサーバーをスタートさせているところの流れ
  2. NewPingServiceClientによって、clientを生成する
  3. sync.WaitGroupを使って、リクエスト/レスポンスを独立して進める
  4. エラーハンドリングにerrors.Isを使っている(ラップされているEOFを検知できたりする)

これらの特徴から似たような書き方で下記のテストを用意してみました。(同じfor内だと面白くないので、goroutineで分けてリクエスト/レスポンスしてみました)

handler_test.go
func TestGreetBidiStreamHandler(t *testing.T) {
	t.Parallel()
	mux := server()
	server := httptest.NewUnstartedServer(mux)
	server.EnableHTTP2 = true
	server.StartTLS()
	t.Cleanup(server.Close)
	cases := []struct {
		scenario string
		name     string
	}{
		{
			scenario: "Twitterのユーザー名",
			name:     "heacet43",
		},
		{
			scenario: "GitHubのユーザー名",
			name:     "Hirochon",
		},
	}
	for _, c := range cases {
		c := c
		t.Run(c.scenario, func(t *testing.T) {
			t.Parallel()
			client := greetv1connect.NewGreetServiceClient(
				server.Client(),
				server.URL,
			)
			stream := client.GreetBidiStream(context.Background())
			var wg sync.WaitGroup
			wg.Add(2)
			go func() {
				defer wg.Done()
				for i := 0; i < 10; i++ {
					if err := stream.Send(&greetv1.GreetBidiStreamRequest{
						Name: fmt.Sprintf("%s (%d)", c.name, i),
					}); err != nil {
						t.Error(err)
					}
				}
				stream.CloseRequest()
			}()
			go func() {
				defer wg.Done()
				for i := 0; i < 10; i++ {
					msg, err := stream.Receive()
					if errors.Is(err, io.EOF) {
						break
					}
					if err != nil {
						t.Error(err)
					}
					if msg.GetGreeting() != fmt.Sprintf("Hello, %s (%d)!", c.name, i) {
						t.Errorf("greeting got: %s, want: %s", msg.GetGreeting(), fmt.Sprintf("Hello, %s! (%d)", c.name, i))
					}
				}
				stream.CloseResponse()
			}()
			wg.Wait()
		})
	}
}

7.3 Bidirectional streaming RPCの実装を書く

先ほど紹介したCumSumについて、以下に着目しました。

  1. forを使って、値を受け取ると同時に値を渡す
  2. errors.Isを使ったエラーハンドリング(EOFがラップされていたりする)

これらの特徴から下記の実装しました。

handler.go
func (s *GreetServer) GreetBidiStream(
	ctx context.Context,
	stream *connect.BidiStream[greetv1.GreetBidiStreamRequest, greetv1.GreetBidiStreamResponse],
) error {
	for i := 0; ; i++ {
		msg, err := stream.Receive()
		if errors.Is(err, io.EOF) {
			return nil
		}
		if err != nil {
			connect.NewError(connect.CodeInternal, fmt.Errorf("failed to receive request: %w", err))
		}
		fmt.Println("Request message: ", msg)
		if err := stream.Send(&greetv1.GreetBidiStreamResponse{
			Greeting: fmt.Sprintf("Hello, %s! (%d)", msg.Name, i),
		}); err != nil {
			return connect.NewError(connect.CodeInternal, fmt.Errorf("failed to send response: %w", err))
		}
	}
}

これで無事にテストが通りました🎉

=== RUN   TestGreetBidiStreamHandler
=== PAUSE TestGreetBidiStreamHandler
=== CONT  TestGreetBidiStreamHandler
=== RUN   TestGreetBidiStreamHandler/Twitterのユーザー名
=== PAUSE TestGreetBidiStreamHandler/Twitterのユーザー名
=== RUN   TestGreetBidiStreamHandler/GitHubのユーザー名
=== PAUSE TestGreetBidiStreamHandler/GitHubのユーザー名
=== CONT  TestGreetBidiStreamHandler/Twitterのユーザー名
=== CONT  TestGreetBidiStreamHandler/GitHubのユーザー名
--- PASS: TestGreetBidiStreamHandler/Twitterのユーザー名 (0.01s)
--- PASS: TestGreetBidiStreamHandler/GitHubのユーザー名 (0.01s)
--- PASS: TestGreetBidiStreamHandler (0.00s)
PASS
ok      github.com/Hirochon/connect-go-test/server      0.284s

おわりに

公式のconnect-goで実行されているテスト例を熟読することによって、公式ドキュメントで説明されていないテストの手段を見つけ出すことができました。

また記事を作成している間にも、「ここなんかおかしいな」と感じる部分があり、コントリビュートしてみたところ、色々ありましたが結果的にマージされたので、この様に記事を書いてみるのも良い経験になったなと思いました!

Go以外でもconnectを試してみたい!!!

参考文献

https://connect.build/
https://connect.build/docs/go/getting-started
https://grpc.io/docs/what-is-grpc/core-concepts/
https://github.com/bufbuild/connect-go/tree/main
https://dragonquest.fandom.com/ja/wiki/呪文と特技リスト

Discussion