Rustで複数の通信プロトコルに対応したリアルタイム通信サーバを実装する
この記事は Rust Advent Calendar 2022 その2 の19日目の記事です。
はじめに
リアルタイムな通信を行うサーバは、ゲームのマルチプレイやチャット、ビデオ通話など様々な場面で必要になります。
このようなサーバは、よくあるREST APIを提供するHTTPサーバとは異なり、必ずしもリクエスト/レスポンスベースの通信であるとは限らず、双方向的な通信を行います。
さらに1クライアントあたり、秒間数〜数十以上のメッセージを送受信します。
リアルタイムな通信を実現できる通信プロトコルは多数あり、WebSocket、gRPCの双方向ストリームやQUICなど、あるいは生のTCPストリーム上に独自のプロトコルを構築したり、UDPを利用することもあります。
どの通信プロトコルを利用するかは、それぞれのメリット・デメリット、実行環境の制約によって利用できるかどうかなどの違いを吟味した上で適切に選定する必要があります。
例えば一般的なWebブラウザ上では、Socket APIを直接触れないために生のTCPストリーム上に独自のプロトコルを構築することはできず、gRPCも対応していないため利用できません。
WebSocketなどブラウザ上で利用可能な通信プロトコルを利用する必要があります。
前置きが長くなりましたが、こういった事情から複数のプラットフォーム、あるいはアプリケーションごとに最適な通信プロトコルを使いたいが、内部的な処理は共通で良いようなケースでは、簡単に通信プロトコルを置き換えたり複数対応できる状態にしておきたいのではないかと思います。
今回はこれをRustで実現する簡単な例を紹介します。
対象読者
以下を前提としています
- リアルタイム通信に興味がある
- Rustでプログラムをある程度書いたことがある
- tokio ランタイムを触ったことがある
- ざっくりとしたL4,L7のネットワークの知識がある
- ひとまず理論的なことは置いといて何かしらの実装例を見たい
大まかな設計
設計といっても大したことはなく、通信プロトコルのレイヤとそれ以外のロジックを分離しますというだけの話です。
今回は数人程度で1つのルームに集まって協力を行うようなマルチプレイゲームを想定してみます。
サーバ側でゲームに関するロジックは一切持たず、単純にメッセージをルーム内の他クライアントにリレーするだけのものがあれば十分とします。
ルーム内の処理は、話を単純にするために1プロセス内で完結するものとします。
つまり同じルームに入るクライアントは同じプロセスのサーバに接続するものとします。
(このようなワークロードをk8s上にデプロイしていい感じにスケールさせるためのOSSとして Agones というものがあります。)
APIとしては、ひとまず以下があれば十分とします。
- ログイン
- LoginRequest
- クライアントからサーバへのログインリクエスト
- LoginResponse
- LoginRequestに対するサーバからのレスポンス
- LoginRequest
- ルームへの入室
- JoinRequest
- クライアントからサーバへのルーム参加リクエスト
- JoinResponse
- JoinRequestに対するサーバからのレスポンス
- JoinNotification
- 既にルームに参加しているクライアントに対して、サーバから新規ルーム参加者を通知する
- JoinRequest
- ルームからの退室
- LeaveRequest
- クライアントからサーバへのルーム退室リクエスト
- LeaveResponse
- LeaveRequestに対するサーバからのレスポンス
- LeaveNotification
- ルームに参加しているクライアントに対して、サーバからルーム退室者を通知する
- LeaveRequest
- ルーム内のクライアントへのメッセージ
- SendMessage
- クライアントからサーバに対して、ルーム内クライアントにメッセージを送信する処理を要求する
- 今回はこれに対応するレスポンスは用意せず、ベストエフォートとしておく。
- MessageNotification
- ルームに参加しているクライアントに対して、SendMessageで来たメッセージをサーバからクライアントに通知する
- SendMessage
大まかな実装イメージは以下の通りです。
このひどく雑な図だけだと何がなんだかさっぱりだと思うので補足します。
- 図の一番右の通信プロトコルの部分と、内部のロジックの部分を分離するというよくある構造で複数のプロトコルに対応したリアルタイム通信サーバを実装する
- 今回はひとまず、TCP(+独自プロトコル)、WebSocket、gRPCの双方向ストリーミングの3つをサポートするサーバアプリケーションを実装
- 非同期ランタイムはtokio前提
-
tokio::sync::mpsc::channel
を利用して各プロトコル実装部分と内部ロジック間でメッセージをやり取りし、Actorモデルっぽい感じで実装する - メッセージの中身自体はどの通信プロトコルでも共通にする。今回はProtocol Buffersにシリアライズしたバイナリメッセージを送る
- Protocol Buffersでクライアントから呼び出すAPIをそれぞれmessageとして構造化して定義する
簡単な実装例に過ぎないので色々雑ですがひとまずこれで複数のプロトコルに対応したリアルタイム通信サーバが実装できそうです。
APIの定義
前述の通り、各通信プロトコル上にProtocol Buffersにシリアライズしたメッセージを流します。
Protocol BuffersのメッセージでAPIの種類と中身を定義します。
gRPCの双方向ストリームを使うことを想定すると、クライアントからのメッセージとサーバからのメッセージをそれぞれひとまとめにし、各種APIで渡される中身のメッセージを oneof
で表現するのが良さそうです。
WebSocketや生のTCP等であれば、あえて oneof
で定義しなくても良いですが、何がクライアントから送信されるもので、何がサーバから送信されるものか明確になるのでまあ悪くはないでしょう。
Protocol Buffersの中身は以下です。
実装
リポジトリはこちらです。
実装の全体像は以下のようなイメージです。
大きく entity
, actor
, network_protocol
の3つのモジュールで構成されており、前述の図の左側から順に対応しています。
src/
├── actor
│ ├── event.rs
│ ├── player.rs
│ └── room.rs
├── actor.rs
├── config.rs
├── entity
│ ├── player.rs
│ └── room.rs
├── entity.rs
├── lib.rs
├── main.rs
├── network_protocol
│ ├── grpc.rs
│ ├── server.rs
│ ├── tcp.rs
│ └── websocket.rs
├── network_protocol.rs
└── protobuf.rs
図の左側から順に説明します。
entity
ここでは扱う登場人物がどいういうデータを持っていてどういう振る舞いをするか簡単に実装しておきます。
今回はシンプルに特定のルーム内のプレイヤー間でメッセージを送受信し合うだけなので、 Player
と Room
という登場人物だけあれば良さそうです。
entity::Player
Player
はシンプルに自分のIdと、ルーム内のメッセージを受け取って送信するための tokio::sync::mpsc::UnboundedSender
を持っておきます。
ここでは送信するメッセージの中身の詳細は知らなくても良いので、ジェネリックな型パラメータ OutputMessageT
にしてしまいましょう。
この OutputMessageT
型のメッセージをプレイヤーに対して送信することができます。
送信したメッセージは、チャンネルの受信側が処理し、クライアントに通知を行う想定とします。
entity::Room
細かい部分の説明は省略しますが、 Room
内には複数の Player
が入退室することができ、Room内のプレイヤーに対してメッセージを送信します。
前述の通り、プレイヤーに送信するメッセージの中身は何でも良いため、同様に型パラメータ OutputMessageT
としておきます。
actor
actor
モジュールでは、主に内部のメッセージのやり取りを tokio::sync::mpsc::channel
を介して行います。
entity
同様に、 actor::Player
と actor::Room
を実装しています。
Input/OutputEvent
actor
モジュール内で送信し合うメッセージは、ここでイベントとして定義しておきます。
外から来たメッセージは actor::Player
を介して actor::Room
に入力として渡るということで InputEvent
, 逆に actor::Room
内の処理(Roomへの参加処理やメッセージのブロードキャスト)によって actor::Player
を介して外向き(最終的には通信クライアント側)に渡されるものは OutputEvent
とします。
actor::Player
actor::Player
は、後述の通信プロトコルレイヤである network_protocol
モジュールとProtocol Buffersでメッセージを送受信します。
Protocol Buffersから内部で利用するデータ構造への変換は、別の層でやったほうが良さそうですが、今回は面倒なのでこの中でディスパッチと一緒にやってしまいます。
actor::Player
は通信プロトコルレイヤとやり取りするために、 tokio::sync::mpsc::channel
の送受信用チャンネルを持っておきます。
実際の actor::Player
の処理は主に2つです。
-
input_rx
から受け取ったProtocol BuffersのメッセージをInputEvent
に変換してactor::Room
に受け渡す -
actor::Room
から来たイベントをoutput_tx
に送信する
1, 2の処理
actor::Room
actor::Room
は actor::Player
からチャンネル経由で InputEvent
を受信し、イベントの内容に従って entity::Room
のメソッドを呼び出したりしてルーム内の処理を行います。
例えば、 InputEvent::Message
のイベントが来て送信対象が全員の場合、 entity::Room::broadcast
でルーム内のプレイヤー全員にメッセージを送信します。
すると、 entity::Room
内の entity::Player
が持っているチャンネルにイベントが伝搬し、そのチャンネルへのイベントを待機している actor::Player
までイベントが渡り、最終的にクライアントに通知されるという流れになります。
network_protocol
network_protocol
モジュール内に通信プロトコル層の実装を行います。
ここで複数の通信プロトコルに対応した実装を行い、内部のロジックは前述の actor
モジュールにメッセージを受け渡すだけで実行されるようにします。
このモジュールで行うことは主に以下の通りです。
- 通信プロトコルごとのサーバ実装
- 例えばgRPCであれば、 tonic 等のライブラリを利用してgRPCサーバの処理を書きます
- 通信プロトコルの上のメッセージをProtocol Buffersにシリアライズ/デシリアライズする
-
actor
モジュールとProtocol Buffersでメッセージのやり取りをする
通信プロトコルごとのサーバ実装は、使うライブラリ等によってインターフェースが異なるので、共通化できるように trait
にしておきます。
今回は run
メソッドに起動アドレスと設定、「完了するとシャットダウンのシグナルが送られた」とみなすための Future
を渡すと、その通りにサーバが起動するような Server
トレイトを定義してみます。
各プロトコルのサーバはこのトレイト境界を満たすように実装し、クライアントから受信したメッセージを actor
モジュールに渡し、逆に actor
モジュールからきたメッセージをクライアントに送信するだけです。
そしてこの型パラメータ S
に Server
を実装した型を渡せばサーバを起動できるイメージです。
また、上記の F: Future<Output = ()> + Send + 'static
に渡せる関数としては、例えば以下のようなシグナルを受け取ったらシャットダウンするようなものを想定しています。
今回は上記のトレイトをWebSocket, 生のTCP, gRPCで実装しました。
WebSocket
今回WebSocketのサーバは warp を利用しました。
前述の通り、単純にサーバを起動して、クライアントが接続したら actor::Player
をインスタンス化して actor::Player
に対してメッセージ送受信をするだけです。
クライアントからメッセージを受信したら、バイナリをデシリアライズして actor::Player
の send
メソッドで渡し、
逆に actor::Player
の recv
メソッドで受信したメッセージはProtocol Buffersのバイナリにシリアライズしてクライアントにそのまま送信します。
生のTCP
今回は簡易的で良いため、 LengthDelimitedCodec を使用してフレーミングを行いました。
これはシンプルに長さ区切りでメッセージをエンコードするものです。
Protocol Buffersでシリアライズした1メッセージとその長さを1つのフレームとしてエンコードしています。
それ以外の処理はほとんどWebSocketと同じです。
gRPC
gRPCは tonic を利用しました。
gRPCでは、1本の双方向ストリーミング上で通信を行うようにしており、他の通信プロトコルと概ね同じような実装になります。
gRPCのAPI定義は、他のプロトコルでも使うProtobufスキーマに含めてしまっていますが、以下のように定義しています。
service App {
rpc Start(stream ClientMessage) returns (stream ServerMessage) {}
}
gRPCの場合、自前でProtocol Buffersにシリアライズ/デシリアライズする必要が無い点は他のプロトコルと比べると楽ですね。(と言っても1行程度ですが)
複数の通信プロトコルに対応したリアルタイム通信サーバの完成
ここまででほぼ完成しています。
あとはmain関数から、 network_protocol
で実装したサーバを起動してやれば良いだけです。
少し不恰好ですが、型引数を一部だけ省略して server::run::<websocket::ServerImpl>()
とするのは出来ないようですね。
もっと良いやり方はあるかもしれませんがひとまず良しとします。
ということで、特別な工夫は無いですが、Rustで簡単に複数の通信プロトコルに対応したリアルタイム通信サーバを実装することができました。
E2Eテストのシナリオ共通化
クライアント・サーバ間で実際にAPI呼び出しを行うE2Eテストは、通信プロトコルが違っても同じシナリオでAPIを呼び出してテストを行いたいです。
愚直に書くと、それぞれのプロトコルのテストのために同じようなコードができてしまうので、共通化したいですね。
これも特にひねりがあるわけではなくよくある方法だと思いますが、各プロトコルの通信クライアントをインターフェース(Rustではtrait)化し、シナリオ部分の実装と切り離すことで簡単に実現できます。
シナリオ部分の実装から、各プロトコルの通信クライアントのtrait経由でProtocol Buffersのデータを受け渡し、通信クライアントの実装部分がメッセージをシリアライズしてサーバに送信するという構造にすれば良さそうです。
ということでE2EテストもWebSocket, TCP, gRPCで実装しました。
通信クライアント部分をtrait化
今回のE2Eテストでは、テストシナリオ(APIを呼び出す際のパラメタや順序を定義し、期待する結果かどうかをテストする)から、通信を行うクライアントを生成することを想定してみます。
テストシナリオ側は通信を行うクライアントがどういうプロトコルであるかを知る必要は無いようにしたいので、通信クライアントを trait
にします。
テストシナリオ側は、この generate
関数で通信クライアントを生成し、 send
メソッドでサーバへのメッセージ送信、 recv
メソッドでサーバからメッセージを受信し、期待した結果になるかどうかをテストします。
通信クライアントの実装例
例えばWebSocketだとこのようになります。
同様にTCP, gRPCも実装しました。
送受信メッセージの処理部分
テストシナリオ
既に説明したように、テストシナリオは通信プロトコル間で共通のものを使うので、 Client
traitに依存するようにしています。
2重ログインの場合にちゃんと期待したエラーになるかどうかをテストする例
あとは通信プロトコルの数だけテストを実行すればOKです!
終わり
Rustで簡単に複数の通信プロトコルに対応したリアルタイム通信サーバを実装する例でした。
コードを書いていて、問題になりそうなコードはほとんどビルド時に指摘されるので、実際に実行してみたらバグってたということが少なかったように思います。
バグってる時は大体 match
で _
を使った時や Result
をちゃんとハンドリングしなかった時です。
Rustはいいですね。
Discussion