React + grpc-web + Goでリアルタイムお絵描きブロードキャスティング
の記事です!
概要
今まで、ReactでWebSocket等の双方向通信を扱った実装をしたことがなかったのでこの機会にやろうと思います。
今年、ちょうど仕事でgrpc(とgrpc-webとGo)を使い始めたので組み合わせてみます。
grpc-webは癖が強いと思いがちなので(個人的に思っているだけかも。。)、ちゃんと向き合っていこうと思います。
以前に、grpc-webで1:1のチャットアプリを作っている記事を読んで面白そうだったので、それの拡張として今回は通信量の多いデータのブロードキャストができるのかを検証したいと思います。
サーバ->クライアントの通信にはServer stream RPCという通信方式をおそらく使うことになると思うので、その実装を通してgrpc-webとgrpc-webクライアントを使ったReactの実装の理解を深めようと思います。
ざっくりした目的
- grpc-webでWebクライアントへのブロードキャストができるか検証する
- 双方向通信がある場合のhooks設計を考える
- 同時に実プロダクトとしての可能性についても考える
試しに作ったもの
ざっくり説明
簡単に言えばオンラインホワイトボードのようなものです。
複数ユーザーがブラウザでホワイトボードを共有でき、それぞれが描いた文字や絵がユーザー間で共有できます。
gifの流れは
userID:green(左)がサーバへ接続
→greenが書き込む
→接続されているgreenの画面に描画
→userID:red(中)がサーバへ接続
→redが書き込む
→接続されているgreen, redの画面に描画
→greenが書き込む
→接続されているgreen, redの画面に描画
→userID:yellow(右)がサーバへ接続
→yellowが書き込む
→接続されているgreen, red, yellowの画面に描画
→greenが接続解除
→redが書き込む
→接続されているred, yellowの画面に描画
となっています。
仕様
以下の仕様になっています。
- 各クライアント(ブラウザのwindow単位)からサーバに接続できる
- 各クライアントから手描きデータをサーバに送信する
- サーバは受け取った手描きデータを接続されている全てのクライアントに(ほぼ)同時送信する
- 各クライアントはサーバから受け取った手書きデータをページ上に反映する
設計
grpc-web, grpcサーバ間のServer streaming RPC, Unary RPCによってデータの送受信を行なっています。
接続情報はサーバ内部で管理していて、それぞれの接続streamを通じてブロードキャストする仕組みです。
接続状態の管理
Web側でgrpc-webのクライアントを生成し、grpcサーバにServer streaming RPCで接続すると
サーバ内部でstreamが登録されます。
streamの実体はgrpc.ServerStreamという構造体で接続クライアント数分のをsync.Mutex構造体で管理しています。(後述しますが、メモリ上で管理するしかないと思っています)
手描きデータのクライアントからサーバへの送信
Webでgrpc-webのクライアントを生成し、Unary RPC接続(Restみたいな単方向通信)でサーバへ手描きデータを送信します。
手描きデータのサーバからクライアントへの送信
Server streaming RPCで接続されているクライアントに対し、サーバ内で管理しているそれぞれのクライアントに対応するstreamを使ってデータを送信します。
具体的なコード
protobufによるインターフェイスの定義
通信データのインターフェイス定義はprotobufで行います。
今回は
- サーバとの接続リクエスト(Server stream RPC)
- 接続解除リクエスト(Unary RPC)
- 手描きデータ送信リクエスト(Unary RPC)
を定義します。
syntax = "proto3";
option go_package = "grpc-example";
package drawing_share;
service DrawingShare {
rpc Connect (ConnectRequest) returns (stream ConnectResponse); // このstreamがServer stream RPCの定義
rpc SendDrawing (SendDrawingRequest) returns (SendDrawingResponse);
rpc DisConnect (DisConnectRequest) returns (DisConnectResponse);
}
message ConnectRequest {
string user = 1;
}
// このインターフェイスで描画データをやりとりする
message ConnectResponse {
string from = 1; // userID
DotData data = 2; // 描画している点の座標(x, y)
}
message DisConnectRequest {
string user = 1;
}
message DisConnectResponse {
string status = 1;
}
message SendDrawingRequest {
DotData data = 1;
string from = 2;
}
message DotData {
uint64 x = 1;
uint64 y = 2;
}
message SendDrawingResponse {
string status = 1;
}
ポイントはrpc Connectの返り値がstreamであることです。
Server stream RPCの書き方になります。
Connectリクエストに対しての分割レスポンスを利用してDotDataを都度都度通知しているところが本質です。
protobufのコンパイル
※本質ではないので飛ばしていただいても大丈夫です。
protobufコンパイルに必要なDocker Image、処理コマンド等
定義したprotoファイルからサーバ、クライアント用のインターフェイスを表現したコード(今回の場合、TypeScript用, Go用)を自動生成します。
インストールすべきものや、実際の運用の詳細は他記事に任せます。
ちなみに、職場ではgithub, CircleCIと連携して管理・運用を行っています。
今回はprotocによるコンパイルコマンドを定義したコンテナの実行によりコンパイルします。
Dockerfile, 実行コマンドを示します。
FROM golang:1.19
RUN apt-get update
RUN apt-get install sudo
RUN apt-get install -y protobuf-compiler
RUN curl -sL https://deb.nodesource.com/setup_16.x | sudo -E bash - && sudo apt install -y nodejs
RUN npm -g install protoc-gen-grpc-web
RUN npm -g install protoc-gen-ts
RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest \
&& go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
COPY ./run.sh /var/work/
COPY ./protobuf /var/work/protobuf
WORKDIR /var/work/
CMD ["bash", "run.sh"]
protoc \
--go_out=./protobuf/server --go_opt=paths=source_relative \
--go-grpc_out=./protobuf/server --go-grpc_opt=paths=source_relative \
protobuf/*.proto
protoc \
--js_out=import_style=commonjs,binary:./protobuf/web \
--grpc-web_out=import_style=typescript,mode=grpcwebtext:./protobuf/web \
protobuf/*.proto
docker build -t protobuf-builder .
docker run --name protobuf-builder -v `pwd`:/var/work protobuf-builder
によってprotoファイルのコンパイルを行い、tsとgoのコードを自動生成します。
envoyをdockerで立てる
※本質ではないので飛ばしていただいても大丈夫です。
envoy.yaml, envoyを立てているdocker-composeの設定等
grpc-webクライアント(port:3005)<-->envoy(port:9000)<-->grpcサーバ(port:50051)
の通信を実現すべく、envoyを立てます。
ほぼexampleのコピペです。port:9000->port:50051へのproxyになります。
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 9000 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.config.filter.network.http_connection_manager.v2.HttpConnectionManager
codec_type: auto
stat_prefix: ingress_http
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route:
cluster: local_service
max_grpc_timeout: 0s
cors:
allow_origin_string_match:
- prefix: "*"
allow_methods: GET, PUT, DELETE, POST, OPTIONS
allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout
http_filters:
- name: envoy.filters.http.grpc_web
- name: envoy.filters.http.cors
- name: envoy.filters.http.router
clusters:
- name: local_service
connect_timeout: 0.25s
type: logical_dns
http2_protocol_options: {}
lb_policy: round_robin
dns_lookup_family: V4_ONLY
upstream_connection_options:
tcp_keepalive:
keepalive_time: 300
load_assignment:
cluster_name: cluster_0
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: host.docker.internal
port_value: 50051
version: '3'
services:
envoy:
build:
context: .
container_name: envoy-grpc-proxy
ports:
- "9000:9000"
FROM envoyproxy/envoy:v1.15.0
COPY ./envoy.yaml /etc/envoy/envoy.yaml
CMD /usr/local/bin/envoy -c /etc/envoy/envoy.yaml
docker-compose up
でenvoyを起動します。
クライアント
Reactのコードを紹介します。
今回はNextJSで実装しましたが、Nextである必要はありません。
手描きエリアはcanvas要素を使います。
全体像(View層+custom hook)
View層は以下です。
データ管理やデータ処理メソッドはuseDrawという名前のhooksで切り出しています。
inline styleなのは許してください。
import {useDraw, userList} from "../../hooks/useDraw"
function DemoStreamDetail() {
const {
stream: currentStream,
connectStream,
disconnectStream,
cRef,
startDrawing,
stopDrawing,
draw,
onChangeUser,
onAutoDraw,
user,
} = useDraw()
return (
<div style={{ marginLeft: 10 }}>
<h2>
クライアント{user}
</h2>
<button onClick={connectStream} style={{ width: 100, padding: 10 }}>
接続
</button>
<button onClick={disconnectStream} style={{ width: 100, padding: 10, marginLeft: 10 }}>
接続終了
</button>
<button onClick={onAutoDraw} style={{ width: 100, padding: 10, marginLeft: 10 }}>
自動描画
</button>
<p>grpcサーバと接続しているか: {String(currentStream !== null)}</p>
<p style={{ marginTop: 40 }}>UserID(色名=UserID)</p>
<select
onChange={onChangeUser}
defaultValue={userList[0]}
style={{ width: 100, height: 40 }}
>
{userList.map((color: string) => {
return (
<option key={color} value={color}>
{color}
</option>
);
})}
</select>
<div style={{ marginTop: 20 }}>
<canvas
style={{ backgroundColor: "white" }}
ref={cRef}
width="400"
height="400"
onMouseMove={draw}
onMouseDown={startDrawing}
onMouseUp={stopDrawing}
/>
</div>
</div>
);
}
export default DemoStreamDetail;
定義したCustom hookの中身は以下です。
import {useCallback, useRef, useState, MouseEvent, ChangeEvent} from "react"
import {ClientReadableStream} from "grpc-web"
import {DrawingShareClient} from "../../protobuf/Drawing_shareServiceClientPb"
import {
ConnectRequest,
SendDrawingRequest,
DotData,
ConnectResponse,
DisConnectRequest
} from "../../protobuf/drawing_share_pb"
// grpc-web clientを生成
const client = new DrawingShareClient("http://localhost:9000")
const wait = (ms: number) => new Promise((res) => {
setTimeout(() => res(true), ms)
})
export const userList = [
'green',
'red',
'yellow',
'blue',
'black',
'orange',
]
export const useDraw = () => {
const [stream, setStream] = useState<ClientReadableStream<ConnectResponse> | null>(null)
// セレクトボックスで選択しているuserID(green, red, yellow)
const [user, setUser] = useState<string>(userList[0])
// マウスカーソルが押し込み状態の時trueになる(離した時falseになる)
const [isDrawing, setIsDrawing] = useState<boolean>(false)
const cRef = useRef<HTMLCanvasElement | null>(null) // canvas要素が参照するRef
const startDrawing = useCallback(() => {
setIsDrawing(true)
}, [])
const stopDrawing = useCallback(() => {
setIsDrawing(false)
}, [])
// Server stream RPCのハンドリング
const connectStream = () => {
const req = new ConnectRequest()
req.setUser(user)
if (stream !== null) {
return
}
// rpc Connectで定義した接続リクエスト
const connection = client.connect(req)
// streamを通じてがdataが入ってきた時"data"というeventTypeで受け取れる
connection.on("data", (m: ConnectResponse) => {
const data = m.getData()
const x = data?.getX()
const y = data?.getY()
if (!x || !y) return
_draw(x, y, m.getFrom())
})
// streamがcloseした時"end"というeventTypeで受け取れる
connection.on("end", () => {
connection.cancel()
setStream(null)
})
setStream(connection)
}
// canvasの参照Refに描画点を反映する
const _draw = (x: number, y: number, user: string) => {
const canvas = cRef.current
const ctx = canvas!.getContext('2d')
if (ctx == null) {
return
}
// 点の座標データ(x, y)を受け取った時、(x, y)を中心に半径10のfilled円を描きます
ctx.beginPath()
ctx.fillStyle = user // userIDはgreen, yellow, redとかでそのままcolor名で使う
ctx.arc(x, y, 10, 0, 2 * Math.PI)
ctx.fill()
ctx.closePath()
}
const onChangeUser = useCallback((e: ChangeEvent<HTMLSelectElement>) => {
setUser(e.target.value);
}, [setUser])
const disconnectStream = () => {
if (stream) {
const req = new DisConnectRequest()
req.setUser(user)
// 接続解除リクエスト
stream.cancel()
client.disConnect(req, null)
setStream(null)
}
}
const draw = (e: MouseEvent) => {
if (!isDrawing) return
const {
nativeEvent: {
offsetX: x,
offsetY: y,
},
} = e;
return _sendData(x, y, user)
}
const _sendData = async (x: number, y: number, user: string) => {
const req = new SendDrawingRequest()
const d = new DotData()
d.setX(x)
d.setY(y)
req.setData(d)
req.setFrom(user)
// Unary RPC (rpc SendDrawing)で定義したリクエストで描画点の情報をサーバへ送信する
return client.sendDrawing(req, null)
}
// 自動描画用 後述します。
const onAutoDraw = useCallback(async () => {
let x = 0;
let y = 10;
for (let i = 0; i < 1000; i++) {
await wait(20)
const rx = Math.floor(Math.random() * 20) - 8
const ry = Math.floor(Math.random() * 5)
x = Math.max((x + rx) % 400, 0)
y = Math.max((y + ry) % 400, 0)
await _sendData(x, y, user)
}
}, [user]);
return {
stream,
connectStream,
disconnectStream,
cRef,
user,
startDrawing,
stopDrawing,
isDrawing,
draw,
onChangeUser,
onAutoDraw,
};
}
connnection
hooks内のstateでconnection情報(ClientReadableStream)を管理しています。
このconnectionはprotoファイルから自動生成されたClientReadableStreamというクラスのインスタンスで、onメソッドでeventTypeを指定することで、サーバ側からstream RPCからの通知をハンドリングすることができます。
詳細はdocumentに書いてあります。
今回の描画データ(座標データ)はstream RPCに乗ってきてeventType: "data"でsubscribeすることができます。
接続解除時は
connection.cancel()
によってconnectionで動いているすべてのhandlerを停止させます。
後述しますが、このcancel処理はあくまでクライアント側でsubscribeを停止するのみで、サーバ側のstreamはcloseされません。
そのため、サーバ側のstreamをcloseするためのリクエストを
client.disConnect(req, null)
で行なっています。
canvasの管理
手描きエリアはcanvasで実装しています。
const cRef = useRef<HTMLCanvasElement | null>(null)
で、canvasが参照するrefを管理しています。
connection.on("data")
でサーバ側から取得したデータをこのrefに以下のコードで反映しています。
ctx.beginPath()
ctx.fillStyle = user // userIDはgreen, yellow, redとかでそのままcolor名で使う
ctx.arc(x, y, 10, 0, 2 * Math.PI)
ctx.fill()
ctx.closePath()
やりとりしているデータはただの(x, y)座標なので、半径10の塗りつぶされた円を描くことによって、可視化しています。
refを使うことで直接DOMを操作するので、この点データのViewへの反映でcomponentがrenderingされることはありません。
サーバ
全体像
全体像を先に示します。
エントリーは以下のファイルです。
package main
import (
pb "github.com/grpc-example/protobuf/server/protobuf" // 自動生成されたgrpcインターフェイス
"github.com/grpc-example/server/service"
"google.golang.org/grpc"
"log"
"net"
)
func main() {
port := 50051
listener, err := net.Listen("tcp", "127.0.0.1:50051")
if err != nil {
log.Fatal(err)
}
s := grpc.NewServer()
cs, err := service.NewDrawingSharingServer()
if err != nil {
log.Fatal(err)
}
pb.RegisterDrawingShareServer(s, cs)
log.Printf("start gRPC server port: %v", port)
s.Serve(listener)
}
これはdocumentに忠実に*grpc.Serverを生成しserveしています
pb.RegisterDrawingShareServer(s, cs)
によってgrpc serverにdrawingShareServerを登録しています。
これによって、接続、接続解除、手描きデータ送信のリクエストをさばけるようになります。
以下で、具体的にリクエストを受け付ける部分を示します。
package service
import (
"context"
pb "github.com/grpc-example/protobuf/server/protobuf"
"log"
"sync"
)
type drawingSharingServer struct {
pb.UnimplementedDrawingShareServer
// userId, streamのmap
sci *safeConnectionInfo
}
type safeConnectionInfo struct {
mu sync.Mutex
v map[string]pb.DrawingShare_ConnectServer
}
// 接続情報pb.DrawingShare_ConnectServerをsync.Mutexで管理するsafeConnectionInfoに登録する
func (sc *safeConnectionInfo) connect(uid string, conn pb.DrawingShare_ConnectServer) {
sc.mu.Lock()
sc.v[uid] = conn
sc.mu.Unlock()
}
func (sc *safeConnectionInfo) disconnect(uid string) {
sc.mu.Lock()
delete(sc.v, uid)
sc.mu.Unlock()
}
func (sc *safeConnectionInfo) value(uid string) pb.DrawingShare_ConnectServer {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.v[uid]
}
func (sc *safeConnectionInfo) allConnections() map[string]pb.DrawingShare_ConnectServer {
sc.mu.Lock()
defer sc.mu.Unlock()
return sc.v
}
func NewDrawingSharingServer() (*drawingSharingServer, error) {
return &drawingSharingServer{
sci: &safeConnectionInfo{
v: make(map[string]pb.DrawingShare_ConnectServer),
},
}, nil
}
// 接続stream
func (cs *drawingSharingServer) Connect(req *pb.ConnectRequest, server pb.DrawingShare_ConnectServer) error {
u := req.GetUser()
cs.sci.connect(u, server)
defer cs.sci.disconnect(u)
// streamをopenのまま維持する
for {
// 管理するstreamが削除されていれば(=クライアントから削除していれば)streamを閉じる
if cs.sci.value(u) == nil {
return nil
}
}
return nil
}
func (cs *drawingSharingServer) DisConnect(ctx context.Context, req *pb.DisConnectRequest) (*pb.DisConnectResponse, error) {
u := req.GetUser()
// sync.Mutexで管理するstreamから削除する
cs.sci.disconnect(u)
return &pb.DisConnectResponse{
Status: "OK",
}, nil
}
// clientから手描きデータを受け取る
func (cs *drawingSharingServer) SendDrawing(ctx context.Context, req *pb.SendDrawingRequest) (*pb.SendDrawingResponse, error) {
data := req.GetData()
x := data.GetX()
y := data.GetY()
from := req.GetFrom()
// sync.Mutexで管理するすべてのstreamに対して(すべての接続中のclientに対して)stream.Sendして通知する
for _, stream := range cs.sci.allConnections() {
err := stream.Send(&pb.ConnectResponse{
From: from,
Data: &pb.DotData{
X: x,
Y: y,
},
})
if err != nil {
log.Printf("%+v", err)
continue
}
}
return &pb.SendDrawingResponse{
Status: "OK",
}, nil
}
streamingの接続受付
Server stream RPCによるリクエストはConnectメソッドに入って来ます。
この中で、safeConnectionInfoで管理する接続streamが存在する限りfor文で処理を続けることでstreamをopenにしておきます。
接続情報の管理
type safeConnectionInfo struct {
mu sync.Mutex
v map[string]pb.DrawingShare_ConnectServer
}
で管理しています。
複数のgoroutineからアクセスされるためsync.Mutexで管理します。
接続解除
Unary rpcで受け付けたリクエストはDisConnectメソッドに入り、接続情報からstreamを削除します。
それによって、間接的にstreamを閉じます。
手描きデータの受付からstreamingを使った各クライアントへの転送
Unary rpcで受け付けたリクエストはSendDrawingメソッドに入ります。
safeConnectionInfoに登録されているstream(つまり接続中のすべてのclient)に対して
stream.Send()
によってデータをブロードキャスト的に送信します。
※チャットアプリを実装した他記事ではデータの方をmap変数で管理して、streaming受付のgoroutine(上のコードではConnectメソッドにあたる)で変更検知し、返すことをしていました。しかし今回はブロードキャストしたいので、接続userごとにデータ管理する必要が出てきて接続情報を管理しunaryで受け付けた時に送信する設計にしました。
複数クライアントの同時描画を試す
PC1台のlocal環境だと同時にマウス操作することができなかったので自動描画モードで試してみました。
自動描画モード
先ほどのReactのコードに
const onAutoDraw = useCallback(async () => {
let x = 0;
let y = 0;
for (let i = 0; i < 2000; i++) {
await wait(30)
// 適当な揺らぎを与える
const rx = Math.floor(Math.random() * 20) - 8
const ry = Math.floor(Math.random() * 10)
// 適当な(x, y)座標を作る
x = Math.max((x + rx) % 400, 0)
y = Math.max((y + ry) % 400, 0)
// grpcサーバにリクエストする
await _sendData(x, y, user)
}
}, [user]);
という関数を定義しました。
中身で適当な(x, y)座標を生成して、grpcサーバに30ミリ秒ごとに送ります。
gifで示したようにクライアントgreen, クライアントred, クライアントyellowが同時に絵を描く場合でも、すべてのデータがブロードキャストされていることが確認できました。
実プロダクトで運用する場合の懸念点
コネクションが切れたときのハンドリングがサーバ側でできない
grpcにはBidirectional streaming rpcという双方向のstreaming接続の方式があります。
しかし残念ながらgrpc-webではServer stream RPCしかサポートしていません。そのため今回はServer stream RPCで実装しましたが、Reactのコード紹介時にも述べた通り、実はこの方式だとクライアント側からstreamのcloseができません。
今回は無理やり別のUnary RPCのリクエスト(Disconnect)によってサーバに通知していますが、これは完璧ではなく、例えばユーザーがブラウザを閉じた場合等には対応できません。
そのため、timeoutを設定するなどして空回りしている処理を止める必要があります。
この点はもし実プロダクトに取り入れようとすると、無視できない問題になりそうです。
もちろんWebSocketなどの双方向通信ならばクライアントから接続を閉じることもできます。
例えば、AWSのAPI GatewayのWebSocket APIではdefaultで接続切断時に設定できるルートがあります。
(接続切断時に任意のLambda関数を動かせたりします)
接続情報の管理が難しい
Goのコードで示したように、サーバ内で接続情報は
type safeConnectionInfo struct {
mu sync.Mutex
v map[string]pb.DrawingShare_ConnectServer
}
で管理していました。
pb.DrawingShare_ConnectServer
はgrpc.ServerStream
interfaceが埋め込まれているinterfaceで、このinterfaceを満たす構造体がメモリ上に乗っていることになります。
しかし、この構造体の情報をDynamodb等のデータストアに永続化するのは厳しいです。
(できなくはないかもしれないですが、手間のかかる変換をかける必要がありますし、そもそも永続化することは想定された設計にはなっていません)。
実プロダクトで例えばサーバが複数コンテナで動いている状況を考えてみると、何かしらのデータストアで接続情報を管理する必要が出てくると思いますがこれについてはServer stream RPCでは実現するのが難しそうです。
この問題について、API GatewayのWebSocket APIでは内部的に接続情報を管理してくれていて接続ごとの一意なconnectionIDを発行してくれるため、例えばこのconnectionIDをDynamodbに入れておくなどの手法によってブロードキャストは簡単に実装できます。
まとめ
- grpc-webからServer stream RPCを使って複数クライアントへのブロードキャストを実現できた
- Reactのcustom hookの中で、grpc-webクライアントでstreamを通じてデータを取得することを確認できた
- (当初の目的にはなかったが)canvasへの理解も多少進んだ
- 各クライアントとの接続streamの管理をGoで頑張ることでサーバ->クライアントの通知を行えることが確認できた
- grpc-webの理解が(ちょっとだけ)進んだ
検証したかったことができたのでよかったです。
今回はServer stream RPCを使うことで双方向通信を(無理矢理)やってみましたが結構考えることが多くて意外と大変でした。
Custom hookの設計については、サーバからの受信するという点ではgrpc-webクライアントは比較的使いやすかったです。
(薄々気づいていましたが、)ブロードキャストを目的とした実プロダクションでの選択肢には流石に今のところは難しそうだということも感じることができました。
同時にAPI GatewayのWebSocket APIなどのサーバレス構成がどれだけ楽なのかということを再認識できたこともよかったです。
Discussion