🦀
axumでredis-rsのpubsubを使う時の知見
最近axumの上でredisを使おうと思って実装しています。
ここでは知見になりそうなtipsだったりを紹介します。
library
axumとredis-rsのバージョンは以下の指定をしています。
redis = { version = "0.21.5", features = ["tokio-comp"] }
axum = { version = "0.4.0", features = ["headers"] }
subscribeの方法
axumのExtensionに以下のようなstateを入れます。
pub struct AppState {
rc: redis::Client,
}
s_taskとrc_taskの2つのJoinHandle<()>
を作り、axumサーバとredisのsubscriberをtokio::select!
で同時に実行させます。
subscribe_conn.psubscribe("*")
の部分は通常subscribe
関数でsubscribeしますがpsubscribe
でパターンマッチを使うことができます。
let mut s_task = {
let app = app(app_state.clone());
tokio::spawn(async move {
if let Err(e) = axum::Server::bind(&addr)
.serve(app.into_make_service())
.await
{
panic!("{:?}", e)
}
})
};
let mut rc_task = {
tokio::spawn(async move {
let mut subscribe_conn = match app_state.rc.get_async_connection().await {
Ok(sub) => sub.into_pubsub(),
Err(e) => panic!("{:?}", e),
};
let mut subscribe_stream = match subscribe_conn.psubscribe("*").await {
Ok(_) => subscribe_conn.on_message(),
Err(e) => panic!("{:?}", e),
};
while let Some(sub) = subscribe_stream.next().await {
let mut chennel_list = app_state.chennel_list.lock().await;
let converted: String = match sub.get_payload::<String>() {
Ok(msg) => msg,
Err(e) => e.to_string(),
};
println!("{:?}", converted)
}
})
};
tokio::select! {
_ = (&mut rc_task) => s_task.abort(),
_ = (&mut s_task) => rc_task.abort(),
}
publishの型周りの扱い
以下のコードの場合、error[E0698]: type inside `async fn` body must be known in this context
というエラーでコンパイルエラーになります。
このためコンパイル時に型を明示する必要があります。
use std::sync::Arc;
use axum::{extract::Extension, http::StatusCode, response::IntoResponse, Json};
use serde_json::json;
use redis::{AsyncCommands, RedisResult, Value, aio::Connection};
#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct PostHogeRequest {
#[serde(rename = "channelId")]
pub channel_id: String,
#[serde(rename = "payload")]
pub payload: ::std::collections::HashMap<String, serde_json::Value>,
}
pub async fn post_hoge(
Json(params): Json<PostHogeRequest>,
Extension(state): Extension<Arc<super::super::AppState>>,
) -> impl IntoResponse {
let mut publish_conn:Connection = match state.rc.get_async_connection().await {
Ok(c) => c,
Err(e) => panic!("{:?}", e),
};
let result = publish_conn.publish(¶ms.channel_id, json!(params.payload).to_string()).await;
match result {
Ok(a) => StatusCode::NO_CONTENT.into_response(),
Err(e) => (StatusCode::GONE, format!("{:?}", e)).into_response(),
}
}
以下のようにresult
変数の型をRedisResult<Value>
、publish
関数の引数の型を<String, String, Value>
で明示するとコンパイルできるようになります。
RedisResult<Value>
の中身はpub type RedisResult<T> = Result<T, RedisError>
で定義されていて、publish
が返すResult<T, RedisError>
のTの中身はi64
で実体はValue
というenumの一部に定義されています。
let result: RedisResult<Value> = publish_conn.publish::<String, String, Value>(params.channel_id, json!(params.payload).to_string()).await;
Discussion