🦀
Rust(axum)+NATSでスケーラブルなWebSocketサーバーを実装した
はじめに
RustでaxumというWebフレームワークが公開されたので何か作ろうと思い、WebSocketサーバーを作ることにしました。
単純なエコーサーバだと味気ないのでNATSというメッセージングシステムを使ってスケーラブルなWebSocketサーバーかつグループ単位で分けられるタイプの仕様とします。
axum
axum自体の解説はこのzennが詳しく書いてあります。
NATS
NATSは以下の記事が詳しく書いてあります。
NATSはSubjectをベースとしたメッセージングシステムで、PublisherがSubjectに対してメッセージを送信し、Subjectと紐づくSubscriberへメッセージを転送します。
構成
- WebSocketクライアントが特定のグループのユーザーとして接続する
- クライアントからメッセージを送信すると特定のグループをトピックとしてNATSのメッセージがPublishされる
- PublishされたメッセージをWebSokcetサーバーが受け取る
- 特定のグループに所属したユーザーにWebSocketのメッセージをして送信される
この構成により、WebSocketサーバー自体が増えてもグループ間のメッセージングが成立できます。
実際のコード
#[tokio::main]
async fn main() {
env_logger::init();
let nats_host = env::var("NATS_HOST").expect("NATS_HOST is not defined");
let nc = match nats::asynk::connect(&nats_host).await {
Ok(nc) => nc,
Err(e) => panic!("{:?}", e),
};
let group_list = Mutex::new(HashMap::new());
let app_state = Arc::new(AppState { group_list, nc });
let addr = SocketAddr::from(([0, 0, 0, 0], 8088));
let mut s_task = gen_server_task(app_state.clone(), addr);
let mut nc_task = gen_nc_task(app_state.clone());
tracing::info!("listening on {}", addr);
tokio::select! {
_ = (&mut nc_task) => s_task.abort(),
_ = (&mut s_task) => nc_task.abort(),
}
}
app_stateで状態の取り回しをしています。
中身はグループのリストとNATSへのコネクションです。
gen_server_taskとgen_nc_taskでそれぞれNATSのサブスクライバー、サーバーの起動のtokio::task::JoinHandle<()>を生成しています。
テストについて
#[cfg(test)]
mod tests {
use super::*;
use futures::SinkExt;
use futures::StreamExt;
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::test]
async fn connect_websocket() {
let nats_host = env::var("NATS_HOST").expect("NATS_HOST is not defined");
let nc = match nats::asynk::connect(&nats_host).await {
Ok(nc) => nc,
Err(e) => panic!("{:?}", e),
};
let group_list = Mutex::new(HashMap::new());
let app_state = Arc::new(AppState { group_list, nc });
let addr = SocketAddr::from(([0, 0, 0, 0], 8088));
gen_server_task(app_state.clone(), addr);
gen_nc_task(app_state.clone());
let url =
url::Url::parse("ws://localhost:8088/websocket/group1/user1").expect("Can't parse url");
let (ws_stream, _) = connect_async(url).await.expect("Failed to connect");
let (mut write, mut read) = ws_stream.split();
write
.send(Message::Text(format!("test")))
.await
.expect("Failed to send message");
if let Some(Ok(message)) = read.next().await {
assert_eq!(message, Message::Text(format!("user1: test")));
}
}
}
テストはmainと同じように
gen_server_task(app_state.clone(), addr);
gen_nc_task(app_state.clone());
を生成し、その後tokio_tungstenite::connect_asyncで立ち上げたサーバーに接続しメッセージの送受信をE2Eでテストしています。
まとめ
- axumを使ったそれっぽいWebSocketサーバーの実装ができた
- テストが書けるような実装ができた
Discussion