🚀

RustでsqlxでPostgreSQLのLISTENとNOTIFYをしてみる

2024/12/19に公開

目的

ユニークビジョン株式会社 Advent Calendar 2024のシリーズ2、12/12の記事です。

PostgreSQLにはLISTENとNOTIFYという非同期で通信する仕組みが存在します。
これをRustに組み込んでみます。

説明

pg_notify

PostgreSQLの関数であるpg_notifyは、チャンネル名とペイロードの2つの引数を取ります。

SELECT pg_notify('channel_name', 'payload');

チャンネル名をLISTENしている対象に通知が送られます。

コード

コードは以下のようになります。
channel_msgとchannel_termの2つのチャンネルをLISTENして、channel_termが来たら停止するようになっています。
LISTENはstreamに変換できるので利用が便利です。

use std::time::Duration;

use futures_util::TryStreamExt;
use sqlx::{postgres::PgListener, PgPool};

const CHANNEL_MSG: &str = "channel_msg";
const CHANNEL_TERM: &str = "channel_term";

pub async fn execute(pool: &PgPool) -> Result<(), sqlx::Error> {
    let mut listener = PgListener::connect_with(&pool).await?;

    let notify_pool = pool.clone();
    let _t = tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(2)).await;
        let _ = notfiy(&notify_pool, CHANNEL_MSG, "hello").await;
        tokio::time::sleep(Duration::from_secs(2)).await;
        let _ = notfiy(&notify_pool, CHANNEL_MSG, "world").await;
        tokio::time::sleep(Duration::from_secs(2)).await;
        let _ = notfiy(&notify_pool, CHANNEL_TERM, "goodbye").await;
    });

    listener.listen_all(vec![CHANNEL_MSG, CHANNEL_TERM]).await?;
    let mut stream = listener.into_stream();
    loop {
        match stream.try_next().await {
            Ok(Some(notification)) => {
                println!("[from stream]: {notification:?}");
                if notification.channel() == CHANNEL_TERM {
                    break;
                }
            }
            Ok(None) => {
                println!("Stream closed.");
                break;
            }
            Err(e) => {
                eprintln!("Error: {e}");
                break;
            }
        }
    }
    Ok(())
}

async fn notfiy(pool: &PgPool, channel: &str, payload: &str) -> Result<(), sqlx::Error> {
    let sql = r#"SELECT pg_notify($1, $2)"#;
     sqlx::query(sql).bind(channel).bind(payload).execute(pool).await?;
     Ok(())
} 

結果

実行結果は以下のようになります。

[from stream]: PgNotification { process_id: 131, channel: "channel_msg", payload: "hello" }
[from stream]: PgNotification { process_id: 93, channel: "channel_msg", payload: "予定表~①💖ハンカクだ" }
[from stream]: PgNotification { process_id: 131, channel: "channel_msg", payload: "world" }
[from stream]: PgNotification { process_id: 131, channel: "channel_term", payload: "goodbye" }

起動中にpsqlで手動で実行しました。

web=# select pg_notify('channel_msg', '予定表~①💖ハンカクだ');
 pg_notify 
-----------
 
(1 row)

まとめ

今回は標準出力に出す程度の例になりますが、Slackなどと組み合わせればDBが任意のタイミングでSlack通知を行えるなどの応用ができます。

様々なプロセスが独立して動いている時DBを通じて、通信できる仕組みになるので、色々できそうで夢が広がります。

Discussion