🦀

axumでredis-rsのpubsubを使う時の知見

2022/09/04に公開

最近axumの上でredisを使おうと思って実装しています。
ここでは知見になりそうなtipsだったりを紹介します。

library

axumとredis-rsのバージョンは以下の指定をしています。

redis = { version = "0.21.5", features = ["tokio-comp"] }
axum = { version = "0.4.0", features = ["headers"] }

subscribeの方法

axumのExtensionに以下のようなstateを入れます。

pub struct AppState {
    rc: redis::Client,
}

s_taskとrc_taskの2つのJoinHandle<()>を作り、axumサーバとredisのsubscriberをtokio::select!で同時に実行させます。
subscribe_conn.psubscribe("*")の部分は通常subscribe関数でsubscribeしますがpsubscribeパターンマッチを使うことができます。

    let mut s_task = {
        let app = app(app_state.clone());
        tokio::spawn(async move {
            if let Err(e) = axum::Server::bind(&addr)
                .serve(app.into_make_service())
                .await
            {
                panic!("{:?}", e)
            }
        })
    };
    let mut rc_task = {
        tokio::spawn(async move {
            let mut subscribe_conn = match app_state.rc.get_async_connection().await {
                Ok(sub) => sub.into_pubsub(),
                Err(e) => panic!("{:?}", e),
            };
            let mut subscribe_stream = match subscribe_conn.psubscribe("*").await {
                Ok(_) => subscribe_conn.on_message(),
                Err(e) => panic!("{:?}", e),
            };
    
            while let Some(sub) = subscribe_stream.next().await {
                let mut chennel_list = app_state.chennel_list.lock().await;
                let converted: String = match sub.get_payload::<String>() {
                    Ok(msg) => msg,
                    Err(e) => e.to_string(),
                };
                println!("{:?}", converted)
            }
        })
    };

    tokio::select! {
        _ = (&mut rc_task) => s_task.abort(),
        _ = (&mut s_task) => rc_task.abort(),
    }

publishの型周りの扱い

以下のコードの場合、error[E0698]: type inside `async fn` body must be known in this contextというエラーでコンパイルエラーになります。
このためコンパイル時に型を明示する必要があります。

use std::sync::Arc;

use axum::{extract::Extension, http::StatusCode, response::IntoResponse, Json};
use serde_json::json;
use redis::{AsyncCommands, RedisResult, Value, aio::Connection};

#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)]
pub struct PostHogeRequest {
    #[serde(rename = "channelId")]
    pub channel_id: String,
    #[serde(rename = "payload")]
    pub payload: ::std::collections::HashMap<String, serde_json::Value>,
}

pub async fn post_hoge(
    Json(params): Json<PostHogeRequest>,
    Extension(state): Extension<Arc<super::super::AppState>>,
) -> impl IntoResponse {
    let mut publish_conn:Connection = match state.rc.get_async_connection().await {
        Ok(c) => c,
        Err(e) => panic!("{:?}", e),
    };
    let result = publish_conn.publish(&params.channel_id, json!(params.payload).to_string()).await;

    match result {
        Ok(a) => StatusCode::NO_CONTENT.into_response(),
        Err(e) => (StatusCode::GONE, format!("{:?}", e)).into_response(),
    }
}

以下のようにresult変数の型をRedisResult<Value>publish関数の引数の型を<String, String, Value>で明示するとコンパイルできるようになります。
RedisResult<Value>の中身はpub type RedisResult<T> = Result<T, RedisError>で定義されていて、publishが返すResult<T, RedisError>のTの中身はi64で実体はValueというenumの一部に定義されています。

let result: RedisResult<Value> = publish_conn.publish::<String, String, Value>(params.channel_id, json!(params.payload).to_string()).await;
GitHubで編集を提案

Discussion