🍳

deadpool-postgresを用いたRustのPostgreSQL接続メモ

2024/04/22に公開


deadpool-postgresでコネクションプールとステートメントキャッシュを実現しながらPostgreSQLに接続する方法を調べた結果のメモを備忘のため残しておきます。[deadpool-postgres "0.13.0"]

なぜdeadpool-postgresか

  • 要件は以下
    • 必須
      • PostgreSQLに接続しSQLを実行可能
      • それなりに使用されている実績がある
      • 非同期実行が可能
      • ORMがない
    • 任意
      • 実行速度はできるだけ早い方が好ましい
      • コネクションプール機能がある
      • ステートメントキャッシュ機能がある
  • 残った候補
    • sqlx
    • deadpool-postgres
  • sqlxにしなかった理由
    • 他DBを使う予定はない
    • 特化しているため特有機能にも対応していそう
    • 特化しているためエラー遭遇率も低そう
    • sqlxは現時点で実行速度が少しだけ遅いらしい? (diesel-rs/metrics)
    • コンパイル時SQL検証に今はそこまで魅力を感じなかった
    • 簡単に使えるようになってそうなので必要になったときに学習できそう

前提

Cargo.toml
[dependencies]
chrono = {version = "0.4.38", features = ["alloc"]}
deadpool-postgres = "0.13.0"
postgres-types = {version = "0.2.6", features = ["with-chrono-0_4","with-uuid-1","with-serde_json-1"]}
rust_decimal = {version = "1.35.0", features = ["db-tokio-postgres"]}
serde_json = "1.0.115"
uuid = "1.8.0"
  • サンプルテーブル
    • テーブル名: dev.sample_table
      • 列名が各型を表現していることだけ認識すれば、サンプルコードはほぼ理解可能
列名 行1 行2
c_id uuid 8eb1f400-b636-4c68-9ee1-5d609f7e9a60 a3148fb9-fdcb-45a0-a0a1-269af2d17c2d
c_bool boolean true false
c_int integer 1234 5432
c_bint bigint 1234567 8765432
c_dbl double precision 12345.67 876543.2
c_dec numeric 1.234567 87.65432
c_vchr varchar(20) varchar1 varchar2
c_text text text1 text2
c_date date 2001-01-01 2002-04-04
c_dt timestamp 2011-02-02 02:02:02 2012-05-05 05:05:05
c_dttz timestamptz 2021-03-03 12:03:03+09 2022-06-06 15:06:06+09
c_json jsonb {"ary":[1,true],"key1":"val1"} {"ary":[2,false],"key2":"val2"}

各操作のサンプル

DB接続

コード

use deadpool_postgres::{tokio_postgres::NoTls, Config, ManagerConfig, Object, Pool, PoolConfig};

#[tokio::main]
async fn main() {
    let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();

    // construct managed pool
    let pool_config: PoolConfig = PoolConfig::new(5); // max pool size
    let mng_config: ManagerConfig = ManagerConfig::default();
    let config: Config = Config {
        url: Some(conn_str),
        manager: Some(mng_config),
        pool: Some(pool_config),
        ..Default::default()
    };
    let pool: Pool = config.builder(NoTls).unwrap().build().unwrap();

    // get connection from pool
    let _client: Object = pool.get().await.unwrap();
}

メモ

  • デフォルトでは接続状態をtokio-postgresの管理に依存する
    • 回線状況等によって切断された場合、無効な接続をPoolから取得する場合がある
    • 回避するためにはManagerConfigに以下を用いる
      • ManagerConfig { recycling_method: RecyclingMethod::Verified }
  • Poolへの接続取得要求にタイムアウトを設定できる
    • 設定する場合はPoolConfigに以下を用いる
      PoolConfig {
          max_size: 5,
          timeouts: Timeouts::wait_millis(5000),
          queue_mode: QueueMode::default(),
      }
      
  • 接続情報を個別に設定したい場合はConfigに個別に設定する
    • Configはフィールドがかなりの数あるので公式参照
    • ConfigBuilderはない
  • tlsを使って接続したい場合、tokio-postgres-rustlsが使用できる
    • コネクションプールやステートメントキャッシュはない
      • コネクションプールについてはdeadpoolを使用できる

クエリ実行 (返り値:行)

queryが使用できる。

コード

Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{types::ToSql, NoTls, Row, Statement}, Pool};

#[tokio::main]
async fn main() -> Result<()> {
    let pool = get_conn_pool()?;
    let client = pool.get().await?;

    // run query with statement caching
    let query = "select * from dev.sample_table;";
    let stmt: Statement = client.prepare_cached(query).await?;
    let rows: Vec<Row> = client.query(&stmt, &[]).await?;

    // cast Row to SampleRow using From trait
    let _data: Vec<SampleRow> = rows.into_iter().map(|row| row.into()).collect();

    // run query without caching, and use parameter binding
    let query = "select (1 + $1 - $2);";
    let params: Vec<&(dyn ToSql + Sync)> = vec![&7, &3];
    client.query(query, &params[..]).await?.into_iter().for_each(|row| {
        assert_eq!(row.get::<usize, i32>(0), 5);
    });

    Ok(())
}

struct SampleRow {
    c_id: uuid::Uuid,
    c_bool: bool,
    c_int: i32,
    c_bint: i64,
    c_dbl: f64,
    c_dec: rust_decimal::Decimal,
    c_vchr: String,
    c_text: String,
    c_date: chrono::NaiveDate,
    c_dt: chrono::NaiveDateTime,
    c_dttz: chrono::DateTime<chrono::Utc>,
    c_json: serde_json::Value,
}
impl From<Row> for SampleRow {
    fn from(row: Row) -> Self {
        Self {
            c_id: row.get("c_id"),
            c_bool: row.get("c_bool"),
            c_int: row.get("c_int"),
            c_bint: row.get("c_bint"),
            c_dbl: row.get("c_dbl"),
            c_dec: row.get("c_dec"),
            c_vchr: row.get("c_vchr"),
            c_text: row.get("c_text"),
            c_date: row.get("c_date"),
            c_dt: row.get("c_dt"),
            c_dttz: row.get("c_dttz"),
            c_json: row.get("c_json")
        }
    }
}

fn get_conn_pool() -> Result<Pool> {
    let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
    let pool_config = deadpool_postgres::PoolConfig::new(5);
    let mng_config = deadpool_postgres::ManagerConfig::default();
    deadpool_postgres::Config {
        url: Some(conn_str),
        manager: Some(mng_config),
        pool: Some(pool_config),
        ..Default::default()
    }.builder(NoTls)?.build().context("get pool error")
}

メモ

  • prepare_cachedでステートメントキャッシュを利用できる
    • PostgreSQLのPREPAREコマンドが使用できる以下SQLクエリでのみ使用可能
      • SELECT, INSERT, UPDATE, DELETE, MERGE, VALUES
  • queryに渡すクエリはStatement,strのどちらでもOK
  • パラメータバインディングさせる値はToSqlトレイトを満たす必要がある
    • サンプルコードのparams&i32がrust-analyzerでエラーとなるが、コンパイル可能
    • Issueがあげられている
  • Fromトレイトを実装すると行を自作構造体に変換できる
    • 面倒だが自由にマッピング可能な側面もある
    • でも面倒なのでマクロ作成を考えた方がいいかもしれない
  • #[derive(FromSql)]attributeを使用すると簡単らしい
  • NULL値はOption::Noneが対応するため、Nullableな列はOption型で定義が必要
    • Rust軸で考えると列は基本IS NOT NULLである方が簡単に取り扱える
    • もしくはFromトレイト実装内でOption型としてgetし、unwrap_or_default等する

クエリ実行 (返り値:処理行数)

executeが使用できる。

コード

Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{NoTls, Statement}, Pool};

#[tokio::main]
async fn main() -> Result<()> {
    let pool = get_conn_pool()?;
    let client = pool.get().await?;

    // prepare query with statement caching
    let query = "update dev.sample_table set c_bool=$2 where c_text like $1;";
    let stmt: Statement = client.prepare_cached(query).await?;

    // run query without returning rows
    let params = (&"text%", &false);
    let row_count: u64 = client.execute(&stmt, &[params.0, params.1]).await?;
    assert_eq!(row_count, 2);

    let params = (&"text1", &true);
    let row_count: u64 = client.execute(&stmt, &[params.0, params.1]).await?;
    assert_eq!(row_count, 1);

    Ok(())
}

fn get_conn_pool() -> Result<Pool> {
    let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
    let pool_config = deadpool_postgres::PoolConfig::new(5);
    let mng_config = deadpool_postgres::ManagerConfig::default();
    deadpool_postgres::Config {
        url: Some(conn_str),
        manager: Some(mng_config),
        pool: Some(pool_config),
        ..Default::default()
    }.builder(NoTls)?.build().context("get pool error")
}

メモ

  • returningの必要ないINSERT,UPDATE,DELETEに使用できる

クエリ実行 (返り値:なし)

batch_executeが使用できる。

コード

Sample code
use deadpool_postgres::tokio_postgres::{self, NoTls, Client};

#[tokio::main]
async fn main() {
    let client = get_simple_client().await;
    let query = "create table if not exists dev.sample_table(c_id uuid);
                    alter table dev.sample_table add column if not exists c_id uuid;";
    if let Err(e) = client.batch_execute(query).await {
        eprintln!("{}", e);
    }
}

async fn get_simple_client() -> Client {
    let conn_str = "postgres://user:pass@localhost:5432/dbname";
    let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();
    tokio::spawn(async move {
        if let Err(e) = connection.await {
            eprintln!("connection error: {}", e);
        }
    });
    client
}

メモ

  • シンプルなSQLを実行する場合に使用できる
    • 失敗した場合はErrが返るが、どこで失敗したかはわからない
    • トランザクションでもないため、成功した場所までは実行されている
    • 全て成功した場合の返り値はOk(())
    • SQLインジェクションに対して無防備なメソッド
  • コネクションプールもステートメントキャッシュも不要な場合は上記方法で接続できる
    • deadpool-postgresの機能を一切使用していないため、tokio-postgresで実行可能

行を順次読込するクエリ実行

query_rawが使用できる。

コード

Sample code
Cargo.toml
[dependencies]
futures = "0.3.30"
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{types::ToSql, NoTls, RowStream}, Pool};
use futures::{pin_mut, TryStreamExt};

#[tokio::main]
async fn main() -> Result<()> {
    let pool = get_conn_pool()?;
    let client = pool.get().await?;

    let query = "select * from dev.sample_table;";
    let params: Vec<&(dyn ToSql + Sync)> = Vec::new(); // dummy params with type annotation
    // get RowStream without statement cache
    let stream: RowStream = client.query_raw(query, params.into_iter()).await?;
    pin_mut!(stream);

    // fetch each row from stream
    while let Some(row) = stream.try_next().await? {
        println!("{}", row.get::<&str, uuid::Uuid>("c_id"));
        // a3148fb9-fdcb-45a0-a0a1-269af2d17c2d
        // 8eb1f400-b636-4c68-9ee1-5d609f7e9a60
    }

    Ok(())
}

fn get_conn_pool() -> Result<Pool> {
    let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
    let pool_config = deadpool_postgres::PoolConfig::new(5);
    let mng_config = deadpool_postgres::ManagerConfig::default();
    deadpool_postgres::Config {
        url: Some(conn_str),
        manager: Some(mng_config),
        pool: Some(pool_config),
        ..Default::default()
    }.builder(NoTls)?.build().context("get pool error")
}

メモ

  • futuresクレートを追加する必要がある
  • パラメータ指定引数の値は空であっても型注釈付き初期化が必要
    • 型注釈が無いとdyn ToSqldyn ToSql + Syncで特定できない模様
  • RowStreamのメモリアドレスをpin_mut!で固定する必要がある
    • 詳細な理由は把握できていない (今の私では知識不足)
    • pin_mut!の代わりにstd::pin::pinも使用可能
      • だがtry_nextのためにfuturesは必要なのでpin_mut!でいい気がする

トランザクションでのクエリ実行

transactionが使用できる。

コード

Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::NoTls, Pool, Transaction};

#[tokio::main]
async fn main() -> Result<()> {
    let pool = get_conn_pool()?;
    let mut client = pool.get().await?;

    // begin transaction
    let tx: Transaction = client.transaction().await?;

    // run insert query with returning rows
    let query = "insert into dev.sample_table (c_id,c_bool,c_int,c_bint,c_dbl,c_dec,c_vchr,c_text,c_date,c_dt,c_dttz,c_json)
                    values(gen_random_uuid(),null,777,null,null,null,null,null,null,null,null,null)
                    returning c_id,c_int;";
    let rows = tx.query(query, &[]).await?; // using tx instead of client
    let id: uuid::Uuid = rows.into_iter().next().unwrap().get("c_id");

    let query = "delete from dev.sample_table where c_id=$1;";
    tx.execute(query, &[&id]).await?; // using tx instead of client

    // if Transaction is dropped without commit, rollback automatically
    tx.commit().await?;

    Ok(())
}

fn get_conn_pool() -> Result<Pool> {
    let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
    let pool_config = deadpool_postgres::PoolConfig::new(5);
    let mng_config = deadpool_postgres::ManagerConfig::default();
    deadpool_postgres::Config {
        url: Some(conn_str),
        manager: Some(mng_config),
        pool: Some(pool_config),
        ..Default::default()
    }.builder(NoTls)?.build().context("get pool error")
}

メモ

  • clientのbuild_transactionからTransactionの設定を行うこともできる
    • build_transactionの返り値はTransactionBuilder
  • トランザクション内でトランザクションを開始する場合はsavepointを使用する
  • 明示的にrollbackせずともドロップ時に自動でrollbackされる
    • ただし明示的にrollbackすることでrollbackエラーをハンドルできるようになる

感想

それなりに簡単にDB接続等できてよかったです。これまで他のいくつかのクレート含め調査等してきましたが、ようやくRustを使ったアプリケーション開発のスタートラインに立った気がします。

参考文献

Discussion