✨
Go言語で書くgRPCテクニック:中級編[Protocol Buffers最適化など]
はじめに
gRPCは、高速・軽量なRPCフレームワークで、Go言語との相性が良く、マイクロサービス間の通信やAPIサーバーの実装に最適です。
前回の内容より、もう少し深い内容になります。
対象読者
- gRPCの基本を理解し、さらに高度な機能を使いたい方
- ストリーミングRPCやインターセプターを使った効率的な通信を実現したい方
目次
- ストリーミングRPCの実装
- Server Streaming
- Client Streaming
- Bidirectional Streaming
- Protocol Buffersの最適化
- 定義の工夫
- フィールド番号の最適化
- オプションの使用
- gRPCインターセプターによるミドルウェアの実装
- ロギング
- 認証
1. ストリーミング RPC の実装
1.1 Server Streaming
Server Streaming は、クライアントから 1 回のリクエストを受け取り、複数回のレスポンスを返す 通信方式です。
1.1.1 .proto ファイルの定義
syntax = "proto3";
package example;
service StreamService {
rpc ListMessages (MessageRequest) returns (stream MessageResponse) {}
}
message MessageRequest {
string user = 1;
}
message MessageResponse {
string text = 1;
}
1.1.2 Server の実装
server.go
package main
import (
"fmt"
"log"
"net"
"time"
"google.golang.org/grpc"
pb "example"
)
type server struct {
pb.UnimplementedStreamServiceServer
}
func (s *server) ListMessages(req *pb.MessageRequest, stream pb.StreamService_ListMessagesServer) error {
messages := []string{"Hello", "How are you?", "Goodbye"}
for _, msg := range messages {
res := &pb.MessageResponse{Text: msg}
if err := stream.Send(res); err != nil {
return err
}
time.Sleep(time.Second) // 擬似的な遅延
}
return nil
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterStreamServiceServer(s, &server{})
fmt.Println("Server is running on port 50051")
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
-
stream.Send()
を使って、複数回のレスポンスをクライアントに送信 - サーバーは1度のリクエストに対して複数のメッセージを返す
1.1.3 Client の実装
client.go
package main
import (
"context"
"fmt"
"log"
"time"
"google.golang.org/grpc"
pb "example"
)
func main() {
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("did not connect: %v", err)
}
defer conn.Close()
c := pb.NewStreamServiceClient(conn)
req := &pb.MessageRequest{User: "Gopher"}
stream, err := c.ListMessages(context.Background(), req)
if err != nil {
log.Fatalf("could not list messages: %v", err)
}
for {
res, err := stream.Recv()
if err != nil {
break
}
fmt.Println("Received: ", res.Text)
}
}
-
stream.Recv()
を使って、複数のレスポンスを受け取る - サーバーがレスポンスを送信し続ける限り、クライアントはループして受信
2. Protocol Buffers の最適化
2.1 フィールド番号の最適化
- Protocol Buffersでは、フィールド番号が小さいほどエンコードサイズが小さくなります。
- 頻繁に使用するフィールドには1 から順に小さい番号を割り当てる。
message OptimizedMessage {
string content = 1; // 頻繁に使うフィールド
int32 id = 2;
string optional_field = 3; // 使用頻度が低いフィールド
}
2.2 オプションの使用
-
optional
キーワードを使うことで、フィールドが存在しない場合のデータ量を削減。
message UserProfile {
string name = 1;
optional string nickname = 2; // あってもなくても良い
}
3. gRPCインターセプターによるミドルウェアの実装
3.1 ロギングインターセプター
- 全てのRPCリクエストに対してログを記録する。
func loggingInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Printf("Request - Method:%s; %v", info.FullMethod, req)
res, err := handler(ctx, req)
if err != nil {
log.Printf("Error - Method:%s; %v", info.FullMethod, err)
}
return res, err
}
func main() {
s := grpc.NewServer(grpc.UnaryInterceptor(loggingInterceptor))
}
3.2 認証インターセプター
- メタデータを確認して、認証を行う。
func authInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok || len(md["authorization"]) == 0 {
return nil, status.Errorf(codes.Unauthenticated, "missing authorization token")
}
return handler(ctx, req)
}
まとめ
項目 | 説明 |
---|---|
ストリーミング RPC | 複数回のレスポンスを返す効率的な通信方式 |
Protocol Buffers の最適化 | フィールド番号の最適化やオプションの使用でデータサイズ削減 |
gRPC インターセプター | ロギング、認証などの共通処理をミドルウェア化 |
gRPCのストリーミングRPCやProtocol Buffersの最適化、インターセプターの実装など、より高度な機能を紹介しました。
次は上級編として、gRPC Gateway や ロードバランシング、認証と認可 についてまとめようと思います。
Discussion