🥊

僕「gRPC streaming RPC って何?」

2023/05/16に公開
1

TL;DR

gRPC streaming RPC についての理解がふんわりすぎたため、調べたことをまとめました。

ついでに、簡単なチャットサーバーを gRPC streaming RPC を用いて実装してみました。[1]

対象読者

  • gRPC streaming RPC 使ってるけど、ふんわりな理解だわって人
  • gRPC streaming RPC 気になってる人
  • gRPC の理解もふわふわしてる人

そもそも gRPC とは

gRPC公式HP

gRPC streaming RPC の前に、そもそも gRPC とはなんでしょうか?

gRPCは、Googleが開発したオープンソースのRPC(Remote Procedure Call)フレームワークです。

gRPCは、異なるシステム間で高効率な通信を提供することを目的に設計されており、以下のような特徴があります。

  1. 言語とプラットフォームの独立性: gRPCは多くのプログラミング言語とプラットフォームでサポートされており、異なる言語や環境で書かれたシステム間でも簡単に通信ができます。
  2. Protocol Buffers: gRPCは、デフォルトでは Protocol Buffersを用いてデータを効率的にシリアル化・デシリアル化し、小さなメッセージサイズと高速な処理を実現しています。Protocol Buffersは、バイナリ形式でデータを表現し、IDL(Interface Definition Language)を使用してデータ構造とサービスを定義します。
  3. HTTP/2: gRPCは、通信プロトコルとしてHTTP/2を使用しており、HTTP/2の特性を活用して高速で効率的な通信が可能です。HTTP/2の特性には、 多重化(複数のリクエストとレスポンスの同時送受信) 、ヘッダー圧縮(HTTPヘッダー情報の圧縮によるデータ転送量の削減)、サーバープッシュ(予測的なリソース送信)などがあります。

1と2は関係していて、 例えばバックエンドとフロントエンドの言語が異なっていても、共通のAPI定義(protoファイル)を元に、それぞれの言語におけるAPI定義を自動生成し、それに基づいてgRPC通信を行うことができます。

HTTP/2の強み:多重化

HTTP/1.x のコネクション管理(mdn web docs)

gRPC通信で使われているHTTP/2は 多重化(複数のリクエストとレスポンスの同時送受信) をサポートしているという特徴があります。

すごいざっくりいうと、 古い HTTP 通信が手紙のように相手の返事が返ってくるまで待つ必要があるものである一方で、 HTTP/2 は電話のように、相手の声が聞こえながらも自分も自由に声を出せるというような通信方式です。

多重化で嬉しいこと

そもそも、HTTPは、TCPコネクションを繋いだ後、その上で通信を行うプロトコルです。
多重化では、1つのTCPコネクションで複数のリクエスト・レスポンスをやり取りできます。

この多重化が実現できることで、嬉しいことはいっぱいあります。

  1. 効率的な通信: 多重化により、単一のTCPコネクションで複数のリクエストとレスポンスを同時にやり取りできるため、通信の効率が大幅に向上します。これにより、Webページのロード時間が短縮され、ユーザー体験が向上します。

  2. ヘッド・オブ・ライン・ブロッキング(HOL)の緩和: HTTP/2の多重化では、1つのリクエストの遅延が他のリクエストに影響を与えにくくなります。これにより、ヘッド・オブ・ライン・ブロッキングが大幅に緩和され、通信の遅延や待機時間が減少します。

  3. TCPコネクションの効率的な利用: 多重化により、複数のリクエストとレスポンスが同時に単一のTCPコネクションで処理されるため、TCPコネクションのオーバーヘッドが減少します。これにより、通信の効率が向上し、リソースの消費が抑えられます。

  4. 帯域幅の最適化: HTTP/2の多重化では、帯域幅をより効果的に利用できます。複数のリクエストとレスポンスが同時に送受信されるため、データ転送の隙間が少なくなり、帯域幅の使用率が向上します。

さっきの電話の例で言うと、多重化によって、サーバーとクライアントの能力が許す限り二人が同時に喋ることができるわけです(人間だとお行儀悪いですが)。
片方の人が喋っている内容をもう片方が聞きながら、同時にその返答をできるようになります。

話終わってからの通訳同時通訳 の違いに似てますね。

  • 話終わってからの通訳: 英語話者が話し終わった後、日本語に訳した内容を伝える
  • 同時通訳: 英語話者が話している途中で、同時に日本語に訳して伝える

つまり、多重化ができるおかげで、待ち時間が少なくできるので、単位時間あたりやり取り可能な情報量も増えるわけです。

↓多重化に至る歴史を調べていますが、やや本筋からずれるので折りたたんでいます。

「1リクエスト/1TCPコネクション」時代〜多重化までの歴史

「1リクエスト/1TCPコネクション」時代〜多重化までの歴史

従来のHTTP/1.x通信で難しかった理由を知るために、簡単に多重化に至る歴史を振り返ってみましょう。

一応、HTTP/1.x(HTTP/1.0およびHTTP/1.1)では、TCPコネクションを複数のリクエスト・レスポンスで使い回す方法が考えられてきましたが、なかなか難しい歴史を歩んでいたようです。

HTTP/1.0: 1リクエスト/1TCPコネクション

HTTP/1.0では、デフォルトでTCPコネクションがリクエストごとに開かれて閉じられるという挙動がありました。これは、接続のオーバーヘッドが大きく、パフォーマンスに悪影響を与えることがありました。

HTTP/1.1: 複数リクエスト/1TCPコネクション→ただし問題が...

HTTP/1.1では、これを改善するために「Keep-Alive」という機能が導入されました。「Keep-Alive」により、複数のリクエスト・レスポンスを単一のTCPコネクションで処理することが可能になりました。しかし、HTTP/1.1は基本的に順序付きであり、1つのリクエストが完了するまで、次のリクエストが待機状態になるという「ヘッド・オブ・ライン・ブロッキング」(Head of Line Blocking; HOL)という問題がありました[2]
このため、HTTP/1.1でも複数のリソースを同時に効率的にロードすることは難しかったのです。

HTTP pipelining(en.wikipedia)などの手法でこの問題を解決しようとしましたがHTTP/1の仕様上の問題など色々あって普及には至りませんでした。

HTTP/2: 「多重化」導入で抜本的に解決

なんやかんやで、HTTP/2では、この問題を解決するために 「多重化」(Multiplexing) という機能が導入されました。多重化により、HTTP/2では複数のリクエスト・レスポンスを同時に単一のTCPコネクションで処理することができ、ヘッド・オブ・ライン・ブロッキングの問題を軽減して効率的な通信を実現しています。

つまりどういう?

gRPC は、HTTP/2 を使ったフレームワークです。

HTTP/2 を使っているため、「1つのTCPコネクションで複数のリクエスト/レスポンスのやり取りをできる」という多重化を(必要ならば)使うことができます。

gRPC の RPC 四天王

A basic tutorial introduction to gRPC in Go.

さて、 gRPC streaming RPC についてみてみましょう。

gRPC RPC は、クライアントーサーバー間の通信パターンに対して4つの異なる種類のRPCを提供しています。

  1. 単一リクエストー単一レスポンスRPC(Unary RPC)
  2. サーバーストリーミングRPC(Server streaming RPC)
  3. クライアントストリーミングRPC(Client streaming RPC)
  4. 双方向ストリーミングRPC(Bidirectional streaming RPC)

単一リクエストー単一レスポンスRPC(Unary RPC) は、一つのリクエストに対して一つのレスポンスという普通の RPC です。
サーバーストリーミングRPC(Server streaming RPC) は、クライアントは一つのリクエストを送り、サーバーが複数のレスポンスを返します。クライアントはレスポンスが返り終わるまで待機します。
クライアントストリーミングRPC(Client streaming RPC) は、クライアントは複数のリクエストを送り、サーバーはリクエストを全て受け取るまで待機し、すべて受け取ると一つのレスポンスを返します。
双方向ストリーミングRPC(Bidirectional streaming RPC) は、クライアントとサーバーが互いに独立してメッセージを読み書きすることができます。つまり、両方のストリームは独立して操作され、クライアントとサーバーはそれぞれのペースで読み書きすることができます。

実際に gRPC streaming RPC を使ってチャットを作ってみる

gRPC streaming RPC を使って簡単なチャットサーバー・クライアントを作ってみましょう。
せっかくなので、双方向ストリーミングRPC(Bidirectional streaming RPC)を使ってみます。

↓完成品はGitHubリポジトリにアップロードしています

https://github.com/zawakin/simple-grpc-stream

簡単のため、以下のシンプルな要件とします。

要件

チャットシステム

  • ユーザーは名前を入力できる
  • ユーザーはメッセージを送ると、他のユーザーはそのメッセージを名前付きで見ることができる
  • チャットルームはグローバルの一つだけ

特にWebフロントを用意せず、複数ターミナルでチャットするという簡単なものにしましょう。

設計

言語

Go(サーバー/クライアント)

proto

APIを定義する proto ファイルです。これをコンパイルし、サーバー/クライアントで用いるGoファイルを自動生成を行います。

gRPCサーバー

  • クライアントと接続を開始すると、ストリームを map として保持する。
  • クライアントからメッセージを受け取ると、それ以外のクライアントにそのメッセージを送信する

gRPCクライアント

  • 起動時
    • サーバーと gRPC コネクションを張る
    • サーバーに ChatStream リクエストを送信し、ストリームを開始する
    • サーバーからのストリームレスポンスを受け取ると標準出力する goroutine を立てる
    • ユーザーの名前を標準入力で受け取る
  • チャット利用時
    • 標準入力で受け取ったメッセージをストリームでサーバーに送信する

フロー図

手順

  1. gRPC stream API を定義する proto ファイルを作成
  2. proto から .pb.go を自動生成
  3. gRPC stream API サーバーを実装
  4. gRPC stream API クライアントを実装

ファイル

.
├── README.md
├── api
│   └── chat_service.pb.go
├── chat_service.proto
├── client
│   └── main.go
├── go.mod
├── go.sum
└── server
    └── main.go

3 directories, 7 files

1. protoファイル作成&コンパイル

みんな大好き(?) proto ファイルをつくりましょう。
rpc のリクエストかレスポンスの前に stream キーワードをつけると、ストリーミング通信が実現できます。簡単ですね。

今回はあえて双方向にするために、 stream をリクエストとレスポンスどちらにもつけています。

chat_service.proto
syntax = "proto3";

package api;

service ChatService {
  rpc ChatStream(stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
  string user = 1;
  string message = 2;
  uint64 timestamp = 3;
}

ChatStream という streaming RPC を定義しています。
リクエストとレスポンスにそれぞれ stream をつけているので、双方向ストリーミングになります。

作成した chat_service.proto をコンパイルして Go ファイルを出力します。

protoc --go_out=plugins=grpc:./api ./*.proto

api/chat_service.pb.go が出力されました。
今回は、サーバー/クライアントどちらもGoなので、この api パッケージを使いまわしましょう。

2. サーバーを実装する

ChatStream RPC のサーバーを実装しましょう。

まず最初に、コード全体を載せます。

server/main.go
package main

// (略)

type ClientID uuid.UUID

type ClientStream struct {
	clientID ClientID
	stream   api.ChatService_ChatStreamServer
}

type Server struct {
	clients map[ClientID]*ClientStream
	mu      sync.Mutex
}

func (s *Server) ChatStream(stream api.ChatService_ChatStreamServer) error {
	clientStream := &ClientStream{
		clientID: NewClientID(),
		stream:   stream,
	}

	s.addClient(clientStream)
	defer s.removeClient(clientStream)

	for {
		msg, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				log.Printf("Client %s disconnected", clientStream.clientID)
				return nil
			}
			log.Printf("Failed to receive message: %v", err)
			return err
		}

		log.Printf("%s: %s", msg.GetUser(), msg.GetMessage())

		err = s.broadcast(clientStream.clientID, msg)
		if err != nil {
			log.Printf("Failed to broadcast message: %v", err)
			return err
		}
	}
}

func (s *Server) addClient(clientStream *ClientStream) {
	s.mu.Lock()
	defer s.mu.Unlock()

	s.clients[clientStream.clientID] = clientStream
}

func (s *Server) removeClient(clientStream *ClientStream) {
	s.mu.Lock()
	defer s.mu.Unlock()

	delete(s.clients, clientStream.clientID)
}

func (s *Server) broadcast(clientID ClientID, msg *api.ChatMessage) error {
	// (略)
}

func main() {
	lis, err := net.Listen("tcp", ":50051")
	if err != nil {
		log.Fatalf("Failed to listen: %v", err)
	}

	// (略)

	s := grpc.NewServer(
		// (略)
	)
	api.RegisterChatServiceServer(s, NewServer())
	if err := s.Serve(lis); err != nil {
		log.Fatalf("Failed to serve: %v", err)
	}
}

見た目は長いですが、重要なのは func (s *Server) ChatStream(stream api.ChatService_ChatStreamServer) error の実装部分です。
これを理解するには、 stream 変数のインターフェイス api.ChatService_ChatStreamServer について知る必要があります。

api.ChatService_ChatStreamServer はステップ1で自動生成されたサーバー側からのストリームに対する操作をまとめたインターフェイスです。

api/chat_service.pb.go
type ChatService_ChatStreamServer interface {
	Send(*ChatMessage) error
	Recv() (*ChatMessage, error)
	grpc.ServerStream
}

Send(*ChatMessage) error でサーバー→クライアントへのストリーミングを行います。
Recv() (*ChatMessage, error) でクライアントから来たメッセージを受け取ることができます。

ChatStream の実装をもう一度抜き出します。

func (s *Server) ChatStream(stream api.ChatService_ChatStreamServer) error {
	// 略

	for {
		msg, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				log.Printf("Client %s disconnected", clientStream.clientID)
				return nil
			}
			log.Printf("Failed to receive message: %v", err)
			return err
		}

		log.Printf("%s: %s", msg.GetUser(), msg.GetMessage())

		err = s.broadcast(clientStream.clientID, msg)
		if err != nil {
			log.Printf("Failed to broadcast message: %v", err)
			return err
		}
	}
}

つまり、 ChatStream の for ループの中で行う処理は、

  1. stream からメッセージを受信するまでブロックする
  2. メッセージを受信すると *ChatMessage 構造体(msg)に変換
  3. msg をそのまま他のクライアントにブロードキャストする

broadcast は、元のメッセージを送信したクライアント以外のクライアントに向けて各クライアントの stream を使って stream.Send(msg) でメッセージを送信することができます。

func (s *Server) broadcast(clientID ClientID, msg *api.ChatMessage) error {
	s.mu.Lock()
	defer s.mu.Unlock()

	for _, client := range s.clients {
		if client.clientID == clientID {
			continue
		}
		err := client.stream.Send(msg)
		if err != nil {
			return err
		}
	}

	return nil
}

脱線: RecvSend の実装

github.com/grpc/grpc-go パッケージには、 RecvSend の実装が書かれています。ここでは、これらの実装を細かくみませんが、 個人的には勉強になったので少しだけまとめます。

Recv: ストリーム受信

Recv では大体以下の流れでリクエストの型に変換します。

  1. バイト列を受信する
  2. データが圧縮されていれば、解凍する
  3. バイト列をリクエストの構造体(今は *ChatMessage) にデシリアライズする

Send: ストリーム送信

  1. リクエストの構造体(今は *ChatMessage) をバイト列にシリアライズする
  2. 必要に応じて圧縮する
  3. バイト列を送信する

gRPC の gzip パッケージ で確認できるように、 gzip による圧縮をおこなっています。
(圧縮・解凍という操作は、 トランスポート層で行われており、 ServerStream 構造体に含まれる *transport.Stream 構造体に Compressor がパラメータとして含まれているようです)

3. クライアントを実装する

client/main.go
package main

// (略)

func main() {
	conn, err := grpc.Dial(":50051",
		// (略)
	)
	// (略)
	
	client := api.NewChatServiceClient(conn)
	ctx := context.Background()

	stream, err := client.ChatStream(ctx)
	if err != nil {
		log.Fatalf("Failed to open chat stream: %v", err)
	}

	go receiveMessage(stream)

	reader := bufio.NewReader(os.Stdin)
	readLine := func() string {
		// (略: 標準入力から一行読み取る関数)
	}

	fmt.Print("Enter your username: ")
	username := readLine()

	fmt.Println("Start chatting! Type your messages and press Enter to send.")
	for {
		text := readLine()

		msg := &api.ChatMessage{
			User:      username,
			Message:   text,
			Timestamp: uint64(time.Now().Unix()),
		}

		err = stream.Send(msg)
		if err != nil {
			log.Printf("Failed to send message: %v", err)
			return
		}
	}
}

func receiveMessage(stream api.ChatService_ChatStreamClient) {
	for {
		msg, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				log.Printf("Server closed the connection")
				return
			}
			log.Printf("Failed to receive message: %v", err)
			return
		}
		fmt.Printf("%s: %s\n", msg.GetUser(), msg.GetMessage())
	}
}

main の中の for ループの中で、 stream.Send(msg) でサーバーに向けてストリーム送信を行います。

for {
	text := readLine()

	msg := &api.ChatMessage{
		User:      username,
		Message:   text,
		Timestamp: uint64(time.Now().Unix()),
	}

	err = stream.Send(msg)
	if err != nil {
		log.Printf("Failed to send message: %v", err)
		return
	}
}

一方、前述したように、サーバーはチャットを受け取ると、送信者以外にブロードキャストします。
他のクライアントがチャットメッセージを送るタイミングはわからないため、他のクライアントのメッセージがいつブロードキャストされサーバーから送られてくるかはクライアントからはわかりません。

クライアントがサーバーから送られる他のクライアントのメッセージをキャッチするため、サーバーからのストリーミングレスポンスを受け取る用の goroutine を起動します。

go receiveMessage(stream)

receiveMessage の実装は以下の通りです。

func receiveMessage(stream api.ChatService_ChatStreamClient) {
	for {
		msg, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				log.Printf("Server closed the connection")
				return
			}
			log.Printf("Failed to receive message: %v", err)
			return
		}
		fmt.Printf("%s: %s\n", msg.GetUser(), msg.GetMessage())
	}
}

サーバーからデータを受け取る stream.Recv() はブロッキング処理なので、 main で呼んでしまうと、今度はクライアントからサーバーへのチャット送信ロジックが実行できなくなってしまいますね。
goroutine のおかげで並行処理されるため、 stream.Recv() によるブロッキングの影響はこの goroutine に限定されるため、 main goroutine ではブロッキングが起きません。

ここでもサーバーと同様に、以下の流れでストリームから *ChatMessage を受け取ることができます。

  1. stream からメッセージを受信するまでブロックする
  2. メッセージを受信すると *ChatMessage 構造体(msg)に変換
  3. msg を標準出力する

実行結果

まず、サーバーを起動します。

サーバー
$ go run server/main.go

続いて、別ターミナルでクライアントを起動します。

クライアント1(ざわきん)
$ go run client/main.go
Enter your username: ざわきん
Start chatting! Type your messages and press Enter to send.

さらに別ターミナルで別のクライアントを起動します。

クライアント2(てぃあきん)
$ go run client/main.go
Enter your username: てぃあきん
Start chatting! Type your messages and press Enter to send.

クライアント2 で こんにちは と打ってみます。

クライアント2(てぃあきん)
こんにちは

すると、クライアント2のメッセージが、クライアント1に送信されました。

クライアント1(ざわきん)
てぃあきん: こんにちは

逆側もやってみます。

クライアント1(ざわきん)
どうも、ざわきんといいます

とチャットを入力すると、

クライアント2(てぃあきん)
ざわきん: どうも、ざわきんといいます

とクライアント1から送信したメッセージが表示されました。

サーバーのログも確認してみましょう。

サーバー
てぃあきん: こんにちは
ざわきん: どうも、ざわきんといいます

うまくいってそうですね。

まとめ

ここまで長々と書いてしまいましたが、最後まで読んでいただきありがとうございました!

いかがでしたでしょうか?

gRPC ってなんだか難しいことやってるイメージありますが、実際に例を作ってみると理解が深まりますね。

普段、 Unary RPC に慣れた身だとなかなか直観と異なる実装になるため、実際に作ってみてわかることもたくさんありました。

今回作成した簡単なチャットシステムだと、双方向のうまみを味わうこともなかったので、もっとリアルタイム性の高いものを作ってみるのも面白そうですね。

僕は 株式会社ナレッジワーク でソフトウェアエンジニアをやっています。
実は、Bidirectional streaming RPC を用いたゴリゴリのシステムもあったり、なかなか勉強になる毎日です。

興味ある方はぜひ DM などからメッセージいただけると嬉しいです!

では、良い gRPC ライフを 👊

もっと雑な感想

今まで Qiita ではこんな感じの長めの記事を書いていたのですが、 Zenn だと本とかの方が読みやすいのかな?(この辺り意見ある人はコメントなどで教えてください!)

GitHubにコード上げてます

今回のチャットシステムの全体のコードは GitHubリポジトリにアップロードしています。スターがあるととても喜びます!

https://github.com/zawakin/simple-grpc-stream

脚注
  1. (ChatGPTと会話しながら) ↩︎

  2. https://en.wikipedia.org/wiki/Head-of-line_blocking ↩︎

Discussion

anon_devanon_dev

chat_service.protoをコンパイルする際の、以下のコマンドについて質問です。

protoc --go_out=plugins=grpc:./api ./*.proto

api/chat_service.pb.goファイルが生成されないのですが、何か別のコマンドはありませんでしょうか。