🚀
RustでsqlxでPostgreSQLのLISTENとNOTIFYをしてみる
目的
ユニークビジョン株式会社 Advent Calendar 2024のシリーズ2、12/12の記事です。
PostgreSQLにはLISTENとNOTIFYという非同期で通信する仕組みが存在します。
これをRustに組み込んでみます。
説明
pg_notify
PostgreSQLの関数であるpg_notifyは、チャンネル名とペイロードの2つの引数を取ります。
SELECT pg_notify('channel_name', 'payload');
チャンネル名をLISTENしている対象に通知が送られます。
コード
コードは以下のようになります。
channel_msgとchannel_termの2つのチャンネルをLISTENして、channel_termが来たら停止するようになっています。
LISTENはstreamに変換できるので利用が便利です。
use std::time::Duration;
use futures_util::TryStreamExt;
use sqlx::{postgres::PgListener, PgPool};
const CHANNEL_MSG: &str = "channel_msg";
const CHANNEL_TERM: &str = "channel_term";
pub async fn execute(pool: &PgPool) -> Result<(), sqlx::Error> {
let mut listener = PgListener::connect_with(&pool).await?;
let notify_pool = pool.clone();
let _t = tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(2)).await;
let _ = notfiy(¬ify_pool, CHANNEL_MSG, "hello").await;
tokio::time::sleep(Duration::from_secs(2)).await;
let _ = notfiy(¬ify_pool, CHANNEL_MSG, "world").await;
tokio::time::sleep(Duration::from_secs(2)).await;
let _ = notfiy(¬ify_pool, CHANNEL_TERM, "goodbye").await;
});
listener.listen_all(vec![CHANNEL_MSG, CHANNEL_TERM]).await?;
let mut stream = listener.into_stream();
loop {
match stream.try_next().await {
Ok(Some(notification)) => {
println!("[from stream]: {notification:?}");
if notification.channel() == CHANNEL_TERM {
break;
}
}
Ok(None) => {
println!("Stream closed.");
break;
}
Err(e) => {
eprintln!("Error: {e}");
break;
}
}
}
Ok(())
}
async fn notfiy(pool: &PgPool, channel: &str, payload: &str) -> Result<(), sqlx::Error> {
let sql = r#"SELECT pg_notify($1, $2)"#;
sqlx::query(sql).bind(channel).bind(payload).execute(pool).await?;
Ok(())
}
結果
実行結果は以下のようになります。
[from stream]: PgNotification { process_id: 131, channel: "channel_msg", payload: "hello" }
[from stream]: PgNotification { process_id: 93, channel: "channel_msg", payload: "予定表~①💖ハンカクだ" }
[from stream]: PgNotification { process_id: 131, channel: "channel_msg", payload: "world" }
[from stream]: PgNotification { process_id: 131, channel: "channel_term", payload: "goodbye" }
起動中にpsqlで手動で実行しました。
web=# select pg_notify('channel_msg', '予定表~①💖ハンカクだ');
pg_notify
-----------
(1 row)
まとめ
今回は標準出力に出す程度の例になりますが、Slackなどと組み合わせればDBが任意のタイミングでSlack通知を行えるなどの応用ができます。
様々なプロセスが独立して動いている時DBを通じて、通信できる仕組みになるので、色々できそうで夢が広がります。
Discussion