🦀

Rust(axum)+NATSでスケーラブルなWebSocketサーバーを実装した

3 min read

はじめに

RustでaxumというWebフレームワークが公開されたので何か作ろうと思い、WebSocketサーバーを作ることにしました。
単純なエコーサーバだと味気ないのでNATSというメッセージングシステムを使ってスケーラブルなWebSocketサーバーかつグループ単位で分けられるタイプの仕様とします。

axum

axum自体の解説はこのzennが詳しく書いてあります。

https://zenn.dev/techno_tanoc/articles/99e54c82cb049f

NATS

NATSは以下の記事が詳しく書いてあります。

NATSはSubjectをベースとしたメッセージングシステムで、PublisherがSubjectに対してメッセージを送信し、Subjectと紐づくSubscriberへメッセージを転送します。

https://techstep.hatenablog.com/entry/2020/12/29/144659

構成

graph TB
    A[NATS] -->|SubScribe| B[WebSocketSender]-->D[WebSocketClient]
    D[WebSocketClient]-->C[WebSocketReciver] -->|Publish| A[NATS]
 subgraph WebSocketServer
B
C
end
  1. WebSocketクライアントが特定のグループのユーザーとして接続する
  2. クライアントからメッセージを送信すると特定のグループをトピックとしてNATSのメッセージがPublishされる
  3. PublishされたメッセージをWebSokcetサーバーが受け取る
  4. 特定のグループに所属したユーザーにWebSocketのメッセージをして送信される

この構成により、WebSocketサーバー自体が増えてもグループ間のメッセージングが成立できます。

実際のコード

https://github.com/Taillook/axum-grouping-ws-server
#[tokio::main]
async fn main() {
    env_logger::init();

    let nats_host = env::var("NATS_HOST").expect("NATS_HOST is not defined");
    let nc = match nats::asynk::connect(&nats_host).await {
        Ok(nc) => nc,
        Err(e) => panic!("{:?}", e),
    };

    let group_list = Mutex::new(HashMap::new());

    let app_state = Arc::new(AppState { group_list, nc });

    let addr = SocketAddr::from(([0, 0, 0, 0], 8088));

    let mut s_task = gen_server_task(app_state.clone(), addr);
    let mut nc_task = gen_nc_task(app_state.clone());
    tracing::info!("listening on {}", addr);
    tokio::select! {
        _ = (&mut nc_task) => s_task.abort(),
        _ = (&mut s_task) => nc_task.abort(),
    }
}

app_stateで状態の取り回しをしています。
中身はグループのリストとNATSへのコネクションです。
gen_server_taskとgen_nc_taskでそれぞれNATSのサブスクライバー、サーバーの起動のtokio::task::JoinHandle<()>を生成しています。

テストについて

#[cfg(test)]
mod tests {
    use super::*;
    use futures::SinkExt;
    use futures::StreamExt;
    use tokio_tungstenite::{connect_async, tungstenite::Message};

    #[tokio::test]
    async fn connect_websocket() {
        let nats_host = env::var("NATS_HOST").expect("NATS_HOST is not defined");
        let nc = match nats::asynk::connect(&nats_host).await {
            Ok(nc) => nc,
            Err(e) => panic!("{:?}", e),
        };
        let group_list = Mutex::new(HashMap::new());
        let app_state = Arc::new(AppState { group_list, nc });
        let addr = SocketAddr::from(([0, 0, 0, 0], 8088));

        gen_server_task(app_state.clone(), addr);
        gen_nc_task(app_state.clone());

        let url =
            url::Url::parse("ws://localhost:8088/websocket/group1/user1").expect("Can't parse url");
        let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
        let (mut write, mut read) = ws_stream.split();

        write
            .send(Message::Text(format!("test")))
            .await
            .expect("Failed to send message");
        if let Some(Ok(message)) = read.next().await {
            assert_eq!(message, Message::Text(format!("user1: test")));
        }
    }
}

テストはmainと同じように

gen_server_task(app_state.clone(), addr);
gen_nc_task(app_state.clone());

を生成し、その後tokio_tungstenite::connect_asyncで立ち上げたサーバーに接続しメッセージの送受信をE2Eでテストしています。

まとめ

  • axumを使ったそれっぽいWebSocketサーバーの実装ができた
  • テストが書けるような実装ができた

Discussion

ログインするとコメントできます