🎮

Rustで複数の通信プロトコルに対応したリアルタイム通信サーバを実装する

2022/12/18に公開

この記事は Rust Advent Calendar 2022 その2 の19日目の記事です。

はじめに

リアルタイムな通信を行うサーバは、ゲームのマルチプレイやチャット、ビデオ通話など様々な場面で必要になります。

このようなサーバは、よくあるREST APIを提供するHTTPサーバとは異なり、必ずしもリクエスト/レスポンスベースの通信であるとは限らず、双方向的な通信を行います。
さらに1クライアントあたり、秒間数〜数十以上のメッセージを送受信します。
リアルタイムな通信を実現できる通信プロトコルは多数あり、WebSocket、gRPCの双方向ストリームやQUICなど、あるいは生のTCPストリーム上に独自のプロトコルを構築したり、UDPを利用することもあります。

どの通信プロトコルを利用するかは、それぞれのメリット・デメリット、実行環境の制約によって利用できるかどうかなどの違いを吟味した上で適切に選定する必要があります。
例えば一般的なWebブラウザ上では、Socket APIを直接触れないために生のTCPストリーム上に独自のプロトコルを構築することはできず、gRPCも対応していないため利用できません。
WebSocketなどブラウザ上で利用可能な通信プロトコルを利用する必要があります。

前置きが長くなりましたが、こういった事情から複数のプラットフォーム、あるいはアプリケーションごとに最適な通信プロトコルを使いたいが、内部的な処理は共通で良いようなケースでは、簡単に通信プロトコルを置き換えたり複数対応できる状態にしておきたいのではないかと思います。

今回はこれをRustで実現する簡単な例を紹介します。

対象読者

以下を前提としています

  • リアルタイム通信に興味がある
  • Rustでプログラムをある程度書いたことがある
  • tokio ランタイムを触ったことがある
  • ざっくりとしたL4,L7のネットワークの知識がある
  • ひとまず理論的なことは置いといて何かしらの実装例を見たい

大まかな設計

設計といっても大したことはなく、通信プロトコルのレイヤとそれ以外のロジックを分離しますというだけの話です。

今回は数人程度で1つのルームに集まって協力を行うようなマルチプレイゲームを想定してみます。
サーバ側でゲームに関するロジックは一切持たず、単純にメッセージをルーム内の他クライアントにリレーするだけのものがあれば十分とします。

ルーム内の処理は、話を単純にするために1プロセス内で完結するものとします。
つまり同じルームに入るクライアントは同じプロセスのサーバに接続するものとします。

(このようなワークロードをk8s上にデプロイしていい感じにスケールさせるためのOSSとして Agones というものがあります。)

APIとしては、ひとまず以下があれば十分とします。

  • ログイン
    • LoginRequest
      • クライアントからサーバへのログインリクエスト
    • LoginResponse
      • LoginRequestに対するサーバからのレスポンス
  • ルームへの入室
    • JoinRequest
      • クライアントからサーバへのルーム参加リクエスト
    • JoinResponse
      • JoinRequestに対するサーバからのレスポンス
    • JoinNotification
      • 既にルームに参加しているクライアントに対して、サーバから新規ルーム参加者を通知する
  • ルームからの退室
    • LeaveRequest
      • クライアントからサーバへのルーム退室リクエスト
    • LeaveResponse
      • LeaveRequestに対するサーバからのレスポンス
    • LeaveNotification
      • ルームに参加しているクライアントに対して、サーバからルーム退室者を通知する
  • ルーム内のクライアントへのメッセージ
    • SendMessage
      • クライアントからサーバに対して、ルーム内クライアントにメッセージを送信する処理を要求する
      • 今回はこれに対応するレスポンスは用意せず、ベストエフォートとしておく。
    • MessageNotification
      • ルームに参加しているクライアントに対して、SendMessageで来たメッセージをサーバからクライアントに通知する

大まかな実装イメージは以下の通りです。

このひどく雑な図だけだと何がなんだかさっぱりだと思うので補足します。

  • 図の一番右の通信プロトコルの部分と、内部のロジックの部分を分離するというよくある構造で複数のプロトコルに対応したリアルタイム通信サーバを実装する
  • 今回はひとまず、TCP(+独自プロトコル)、WebSocket、gRPCの双方向ストリーミングの3つをサポートするサーバアプリケーションを実装
  • 非同期ランタイムはtokio前提
  • tokio::sync::mpsc::channel を利用して各プロトコル実装部分と内部ロジック間でメッセージをやり取りし、Actorモデルっぽい感じで実装する
  • メッセージの中身自体はどの通信プロトコルでも共通にする。今回はProtocol Buffersにシリアライズしたバイナリメッセージを送る
    • Protocol Buffersでクライアントから呼び出すAPIをそれぞれmessageとして構造化して定義する

簡単な実装例に過ぎないので色々雑ですがひとまずこれで複数のプロトコルに対応したリアルタイム通信サーバが実装できそうです。

APIの定義

前述の通り、各通信プロトコル上にProtocol Buffersにシリアライズしたメッセージを流します。
Protocol BuffersのメッセージでAPIの種類と中身を定義します。
gRPCの双方向ストリームを使うことを想定すると、クライアントからのメッセージとサーバからのメッセージをそれぞれひとまとめにし、各種APIで渡される中身のメッセージを oneof で表現するのが良さそうです。
WebSocketや生のTCP等であれば、あえて oneof で定義しなくても良いですが、何がクライアントから送信されるもので、何がサーバから送信されるものか明確になるのでまあ悪くはないでしょう。

Protocol Buffersの中身は以下です。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/protobuf/app.proto

実装

リポジトリはこちらです。
https://github.com/yoshd/mini-realtime-server-rs

実装の全体像は以下のようなイメージです。

大きく entity , actor , network_protocol の3つのモジュールで構成されており、前述の図の左側から順に対応しています。

src/
├── actor
│   ├── event.rs
│   ├── player.rs
│   └── room.rs
├── actor.rs
├── config.rs
├── entity
│   ├── player.rs
│   └── room.rs
├── entity.rs
├── lib.rs
├── main.rs
├── network_protocol
│   ├── grpc.rs
│   ├── server.rs
│   ├── tcp.rs
│   └── websocket.rs
├── network_protocol.rs
└── protobuf.rs

図の左側から順に説明します。

entity

ここでは扱う登場人物がどいういうデータを持っていてどういう振る舞いをするか簡単に実装しておきます。
今回はシンプルに特定のルーム内のプレイヤー間でメッセージを送受信し合うだけなので、 PlayerRoom という登場人物だけあれば良さそうです。

entity::Player

Player はシンプルに自分のIdと、ルーム内のメッセージを受け取って送信するための tokio::sync::mpsc::UnboundedSender を持っておきます。
ここでは送信するメッセージの中身の詳細は知らなくても良いので、ジェネリックな型パラメータ OutputMessageT にしてしまいましょう。
この OutputMessageT 型のメッセージをプレイヤーに対して送信することができます。
送信したメッセージは、チャンネルの受信側が処理し、クライアントに通知を行う想定とします。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/entity/player.rs#L7-L24

entity::Room

細かい部分の説明は省略しますが、 Room 内には複数の Player が入退室することができ、Room内のプレイヤーに対してメッセージを送信します。
前述の通り、プレイヤーに送信するメッセージの中身は何でも良いため、同様に型パラメータ OutputMessageT としておきます。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/entity/room.rs#L35-L120

actor

actor モジュールでは、主に内部のメッセージのやり取りを tokio::sync::mpsc::channel を介して行います。
entity 同様に、 actor::Playeractor::Room を実装しています。

Input/OutputEvent

actor モジュール内で送信し合うメッセージは、ここでイベントとして定義しておきます。
外から来たメッセージは actor::Player を介して actor::Room に入力として渡るということで InputEvent , 逆に actor::Room 内の処理(Roomへの参加処理やメッセージのブロードキャスト)によって actor::Player を介して外向き(最終的には通信クライアント側)に渡されるものは OutputEvent とします。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/actor/event.rs#L9-L60

actor::Player

actor::Player は、後述の通信プロトコルレイヤである network_protocol モジュールとProtocol Buffersでメッセージを送受信します。
Protocol Buffersから内部で利用するデータ構造への変換は、別の層でやったほうが良さそうですが、今回は面倒なのでこの中でディスパッチと一緒にやってしまいます。
actor::Player は通信プロトコルレイヤとやり取りするために、 tokio::sync::mpsc::channel の送受信用チャンネルを持っておきます。

実際の actor::Player の処理は主に2つです。

  1. input_rx から受け取ったProtocol Buffersのメッセージを InputEvent に変換して actor::Room に受け渡す
  2. actor::Room から来たイベントを output_tx に送信する

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/actor/player.rs#L27-L30

1, 2の処理

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/actor/player.rs#L51-L60

actor::Room

actor::Roomactor::Player からチャンネル経由で InputEvent を受信し、イベントの内容に従って entity::Room のメソッドを呼び出したりしてルーム内の処理を行います。
例えば、 InputEvent::Message のイベントが来て送信対象が全員の場合、 entity::Room::broadcast でルーム内のプレイヤー全員にメッセージを送信します。
すると、 entity::Room 内の entity::Player が持っているチャンネルにイベントが伝搬し、そのチャンネルへのイベントを待機している actor::Player までイベントが渡り、最終的にクライアントに通知されるという流れになります。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/actor/room.rs#L48-L77

network_protocol

network_protocol モジュール内に通信プロトコル層の実装を行います。
ここで複数の通信プロトコルに対応した実装を行い、内部のロジックは前述の actor モジュールにメッセージを受け渡すだけで実行されるようにします。
このモジュールで行うことは主に以下の通りです。

  1. 通信プロトコルごとのサーバ実装
    • 例えばgRPCであれば、 tonic 等のライブラリを利用してgRPCサーバの処理を書きます
  2. 通信プロトコルの上のメッセージをProtocol Buffersにシリアライズ/デシリアライズする
  3. actor モジュールとProtocol Buffersでメッセージのやり取りをする

通信プロトコルごとのサーバ実装は、使うライブラリ等によってインターフェースが異なるので、共通化できるように trait にしておきます。
今回は run メソッドに起動アドレスと設定、「完了するとシャットダウンのシグナルが送られた」とみなすための Future を渡すと、その通りにサーバが起動するような Server トレイトを定義してみます。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/network_protocol/server.rs#L9-L16

各プロトコルのサーバはこのトレイト境界を満たすように実装し、クライアントから受信したメッセージを actor モジュールに渡し、逆に actor モジュールからきたメッセージをクライアントに送信するだけです。

そしてこの型パラメータ SServer を実装した型を渡せばサーバを起動できるイメージです。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/network_protocol/server.rs#L18-L26

また、上記の F: Future<Output = ()> + Send + 'static に渡せる関数としては、例えば以下のようなシグナルを受け取ったらシャットダウンするようなものを想定しています。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/network_protocol/server.rs#L28-L35

今回は上記のトレイトをWebSocket, 生のTCP, gRPCで実装しました。

WebSocket

今回WebSocketのサーバは warp を利用しました。

前述の通り、単純にサーバを起動して、クライアントが接続したら actor::Player をインスタンス化して actor::Player に対してメッセージ送受信をするだけです。
クライアントからメッセージを受信したら、バイナリをデシリアライズして actor::Playersend メソッドで渡し、
逆に actor::Playerrecv メソッドで受信したメッセージはProtocol Buffersのバイナリにシリアライズしてクライアントにそのまま送信します。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/network_protocol/websocket.rs#L20-L120

生のTCP

今回は簡易的で良いため、 LengthDelimitedCodec を使用してフレーミングを行いました。
これはシンプルに長さ区切りでメッセージをエンコードするものです。
Protocol Buffersでシリアライズした1メッセージとその長さを1つのフレームとしてエンコードしています。

それ以外の処理はほとんどWebSocketと同じです。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/network_protocol/tcp.rs#L25-L161

gRPC

gRPCは tonic を利用しました。
gRPCでは、1本の双方向ストリーミング上で通信を行うようにしており、他の通信プロトコルと概ね同じような実装になります。

gRPCのAPI定義は、他のプロトコルでも使うProtobufスキーマに含めてしまっていますが、以下のように定義しています。

service App {
    rpc Start(stream ClientMessage) returns (stream ServerMessage) {}
}

gRPCの場合、自前でProtocol Buffersにシリアライズ/デシリアライズする必要が無い点は他のプロトコルと比べると楽ですね。(と言っても1行程度ですが)

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/network_protocol/grpc.rs#L16-L106

複数の通信プロトコルに対応したリアルタイム通信サーバの完成

ここまででほぼ完成しています。
あとはmain関数から、 network_protocol で実装したサーバを起動してやれば良いだけです。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/src/main.rs#L39-L52

少し不恰好ですが、型引数を一部だけ省略して server::run::<websocket::ServerImpl>() とするのは出来ないようですね。
もっと良いやり方はあるかもしれませんがひとまず良しとします。

ということで、特別な工夫は無いですが、Rustで簡単に複数の通信プロトコルに対応したリアルタイム通信サーバを実装することができました。

E2Eテストのシナリオ共通化

クライアント・サーバ間で実際にAPI呼び出しを行うE2Eテストは、通信プロトコルが違っても同じシナリオでAPIを呼び出してテストを行いたいです。
愚直に書くと、それぞれのプロトコルのテストのために同じようなコードができてしまうので、共通化したいですね。
これも特にひねりがあるわけではなくよくある方法だと思いますが、各プロトコルの通信クライアントをインターフェース(Rustではtrait)化し、シナリオ部分の実装と切り離すことで簡単に実現できます。

シナリオ部分の実装から、各プロトコルの通信クライアントのtrait経由でProtocol Buffersのデータを受け渡し、通信クライアントの実装部分がメッセージをシリアライズしてサーバに送信するという構造にすれば良さそうです。

ということでE2EテストもWebSocket, TCP, gRPCで実装しました。

通信クライアント部分をtrait化

今回のE2Eテストでは、テストシナリオ(APIを呼び出す際のパラメタや順序を定義し、期待する結果かどうかをテストする)から、通信を行うクライアントを生成することを想定してみます。
テストシナリオ側は通信を行うクライアントがどういうプロトコルであるかを知る必要は無いようにしたいので、通信クライアントを trait にします。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/tests/common/mod.rs#L18-L28

テストシナリオ側は、この generate 関数で通信クライアントを生成し、 send メソッドでサーバへのメッセージ送信、 recv メソッドでサーバからメッセージを受信し、期待した結果になるかどうかをテストします。

通信クライアントの実装例

例えばWebSocketだとこのようになります。
同様にTCP, gRPCも実装しました。

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/tests/common/websocket.rs#L59-L75

送受信メッセージの処理部分

https://github.com/yoshd/mini-realtime-server-rs/blob/4bd4c8bd17b5ed3f393df5166fdf41d0425b7c3a/tests/common/websocket.rs#L25-L49

テストシナリオ

既に説明したように、テストシナリオは通信プロトコル間で共通のものを使うので、 Client traitに依存するようにしています。

2重ログインの場合にちゃんと期待したエラーになるかどうかをテストする例

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/tests/common/scenario.rs#L263-L313

あとは通信プロトコルの数だけテストを実行すればOKです!

https://github.com/yoshd/mini-realtime-server-rs/blob/6f9fadd0a1da68b73f7752df95a0f8beccd0cef4/tests/e2e.rs#L5-L45

終わり

Rustで簡単に複数の通信プロトコルに対応したリアルタイム通信サーバを実装する例でした。

コードを書いていて、問題になりそうなコードはほとんどビルド時に指摘されるので、実際に実行してみたらバグってたということが少なかったように思います。
バグってる時は大体 match_ を使った時や Result をちゃんとハンドリングしなかった時です。

Rustはいいですね。

GitHubで編集を提案

Discussion