🍳
deadpool-postgresを用いたRustのPostgreSQL接続メモ
deadpool-postgresでコネクションプールとステートメントキャッシュを実現しながらPostgreSQLに接続する方法を調べた結果のメモを備忘のため残しておきます。[deadpool-postgres "0.13.0"]
なぜdeadpool-postgresか
- 要件は以下
- 必須
- PostgreSQLに接続しSQLを実行可能
- それなりに使用されている実績がある
- 非同期実行が可能
- ORMがない
- 任意
- 実行速度はできるだけ早い方が好ましい
- コネクションプール機能がある
- ステートメントキャッシュ機能がある
- 必須
- 残った候補
- sqlx
- deadpool-postgres
- sqlxにしなかった理由
- 他DBを使う予定はない
- 特化しているため特有機能にも対応していそう
- 特化しているためエラー遭遇率も低そう
- sqlxは現時点で実行速度が少しだけ遅いらしい? (diesel-rs/metrics)
- コンパイル時SQL検証に今はそこまで魅力を感じなかった
- 簡単に使えるようになってそうなので必要になったときに学習できそう
前提
Cargo.toml
[dependencies]
chrono = {version = "0.4.38", features = ["alloc"]}
deadpool-postgres = "0.13.0"
postgres-types = {version = "0.2.6", features = ["with-chrono-0_4","with-uuid-1","with-serde_json-1"]}
rust_decimal = {version = "1.35.0", features = ["db-tokio-postgres"]}
serde_json = "1.0.115"
uuid = "1.8.0"
- サンプルテーブル
- テーブル名: dev.sample_table
- 列名が各型を表現していることだけ認識すれば、サンプルコードはほぼ理解可能
- テーブル名: dev.sample_table
列名 | 型 | 行1 | 行2 |
---|---|---|---|
c_id | uuid | 8eb1f400-b636-4c68-9ee1-5d609f7e9a60 | a3148fb9-fdcb-45a0-a0a1-269af2d17c2d |
c_bool | boolean | true | false |
c_int | integer | 1234 | 5432 |
c_bint | bigint | 1234567 | 8765432 |
c_dbl | double precision | 12345.67 | 876543.2 |
c_dec | numeric | 1.234567 | 87.65432 |
c_vchr | varchar(20) | varchar1 | varchar2 |
c_text | text | text1 | text2 |
c_date | date | 2001-01-01 | 2002-04-04 |
c_dt | timestamp | 2011-02-02 02:02:02 | 2012-05-05 05:05:05 |
c_dttz | timestamptz | 2021-03-03 12:03:03+09 | 2022-06-06 15:06:06+09 |
c_json | jsonb | {"ary":[1,true],"key1":"val1"} | {"ary":[2,false],"key2":"val2"} |
各操作のサンプル
DB接続
コード
use deadpool_postgres::{tokio_postgres::NoTls, Config, ManagerConfig, Object, Pool, PoolConfig};
#[tokio::main]
async fn main() {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
// construct managed pool
let pool_config: PoolConfig = PoolConfig::new(5); // max pool size
let mng_config: ManagerConfig = ManagerConfig::default();
let config: Config = Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
};
let pool: Pool = config.builder(NoTls).unwrap().build().unwrap();
// get connection from pool
let _client: Object = pool.get().await.unwrap();
}
メモ
- デフォルトでは接続状態を
tokio-postgres
の管理に依存する- 回線状況等によって切断された場合、無効な接続をPoolから取得する場合がある
- 回避するためには
ManagerConfig
に以下を用いるManagerConfig { recycling_method: RecyclingMethod::Verified }
-
Pool
への接続取得要求にタイムアウトを設定できる- 設定する場合は
PoolConfig
に以下を用いるPoolConfig { max_size: 5, timeouts: Timeouts::wait_millis(5000), queue_mode: QueueMode::default(), }
- 設定する場合は
- 接続情報を個別に設定したい場合は
Config
に個別に設定する-
Config
はフィールドがかなりの数あるので公式参照 -
ConfigBuilder
はない
-
- tlsを使って接続したい場合、
tokio-postgres-rustls
が使用できる- コネクションプールやステートメントキャッシュはない
- コネクションプールについては
deadpool
を使用できる
- コネクションプールについては
- コネクションプールやステートメントキャッシュはない
クエリ実行 (返り値:行)
query
が使用できる。
コード
Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{types::ToSql, NoTls, Row, Statement}, Pool};
#[tokio::main]
async fn main() -> Result<()> {
let pool = get_conn_pool()?;
let client = pool.get().await?;
// run query with statement caching
let query = "select * from dev.sample_table;";
let stmt: Statement = client.prepare_cached(query).await?;
let rows: Vec<Row> = client.query(&stmt, &[]).await?;
// cast Row to SampleRow using From trait
let _data: Vec<SampleRow> = rows.into_iter().map(|row| row.into()).collect();
// run query without caching, and use parameter binding
let query = "select (1 + $1 - $2);";
let params: Vec<&(dyn ToSql + Sync)> = vec![&7, &3];
client.query(query, ¶ms[..]).await?.into_iter().for_each(|row| {
assert_eq!(row.get::<usize, i32>(0), 5);
});
Ok(())
}
struct SampleRow {
c_id: uuid::Uuid,
c_bool: bool,
c_int: i32,
c_bint: i64,
c_dbl: f64,
c_dec: rust_decimal::Decimal,
c_vchr: String,
c_text: String,
c_date: chrono::NaiveDate,
c_dt: chrono::NaiveDateTime,
c_dttz: chrono::DateTime<chrono::Utc>,
c_json: serde_json::Value,
}
impl From<Row> for SampleRow {
fn from(row: Row) -> Self {
Self {
c_id: row.get("c_id"),
c_bool: row.get("c_bool"),
c_int: row.get("c_int"),
c_bint: row.get("c_bint"),
c_dbl: row.get("c_dbl"),
c_dec: row.get("c_dec"),
c_vchr: row.get("c_vchr"),
c_text: row.get("c_text"),
c_date: row.get("c_date"),
c_dt: row.get("c_dt"),
c_dttz: row.get("c_dttz"),
c_json: row.get("c_json")
}
}
}
fn get_conn_pool() -> Result<Pool> {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
let pool_config = deadpool_postgres::PoolConfig::new(5);
let mng_config = deadpool_postgres::ManagerConfig::default();
deadpool_postgres::Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
}.builder(NoTls)?.build().context("get pool error")
}
メモ
-
prepare_cached
でステートメントキャッシュを利用できる- PostgreSQLの
PREPARE
コマンドが使用できる以下SQLクエリでのみ使用可能-
SELECT
,INSERT
,UPDATE
,DELETE
,MERGE
,VALUES
-
- PostgreSQLの
-
query
に渡すクエリはStatement
,str
のどちらでもOK - パラメータバインディングさせる値は
ToSql
トレイトを満たす必要がある- サンプルコードの
params
の&i32
がrust-analyzerでエラーとなるが、コンパイル可能 - Issueがあげられている
- サンプルコードの
-
From
トレイトを実装すると行を自作構造体に変換できる- 面倒だが自由にマッピング可能な側面もある
- でも面倒なのでマクロ作成を考えた方がいいかもしれない
- postgres_from_row等のクレートもある
-
#[derive(FromSql)]
attributeを使用すると簡単らしい- 私の調査ではシンプルな使用方法がわからなかった
-
SQLクエリに情報を付与して変換する方法は見つかった
- SQLは完全に別世界として隔離したいため採用しなかった
-
NULL
値はOption::None
が対応するため、Nullableな列はOption
型で定義が必要- Rust軸で考えると列は基本
IS NOT NULL
である方が簡単に取り扱える - もしくは
From
トレイト実装内でOption
型としてget
し、unwrap_or_default
等する
- Rust軸で考えると列は基本
クエリ実行 (返り値:処理行数)
execute
が使用できる。
コード
Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{NoTls, Statement}, Pool};
#[tokio::main]
async fn main() -> Result<()> {
let pool = get_conn_pool()?;
let client = pool.get().await?;
// prepare query with statement caching
let query = "update dev.sample_table set c_bool=$2 where c_text like $1;";
let stmt: Statement = client.prepare_cached(query).await?;
// run query without returning rows
let params = (&"text%", &false);
let row_count: u64 = client.execute(&stmt, &[params.0, params.1]).await?;
assert_eq!(row_count, 2);
let params = (&"text1", &true);
let row_count: u64 = client.execute(&stmt, &[params.0, params.1]).await?;
assert_eq!(row_count, 1);
Ok(())
}
fn get_conn_pool() -> Result<Pool> {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
let pool_config = deadpool_postgres::PoolConfig::new(5);
let mng_config = deadpool_postgres::ManagerConfig::default();
deadpool_postgres::Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
}.builder(NoTls)?.build().context("get pool error")
}
メモ
- returningの必要ない
INSERT
,UPDATE
,DELETE
に使用できる
クエリ実行 (返り値:なし)
batch_execute
が使用できる。
コード
Sample code
use deadpool_postgres::tokio_postgres::{self, NoTls, Client};
#[tokio::main]
async fn main() {
let client = get_simple_client().await;
let query = "create table if not exists dev.sample_table(c_id uuid);
alter table dev.sample_table add column if not exists c_id uuid;";
if let Err(e) = client.batch_execute(query).await {
eprintln!("{}", e);
}
}
async fn get_simple_client() -> Client {
let conn_str = "postgres://user:pass@localhost:5432/dbname";
let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
client
}
メモ
- シンプルなSQLを実行する場合に使用できる
- 失敗した場合は
Err
が返るが、どこで失敗したかはわからない - トランザクションでもないため、成功した場所までは実行されている
- 全て成功した場合の返り値は
Ok(())
- SQLインジェクションに対して無防備なメソッド
- 失敗した場合は
- コネクションプールもステートメントキャッシュも不要な場合は上記方法で接続できる
-
deadpool-postgres
の機能を一切使用していないため、tokio-postgres
で実行可能
-
行を順次読込するクエリ実行
query_raw
が使用できる。
コード
Sample code
Cargo.toml
[dependencies]
futures = "0.3.30"
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{types::ToSql, NoTls, RowStream}, Pool};
use futures::{pin_mut, TryStreamExt};
#[tokio::main]
async fn main() -> Result<()> {
let pool = get_conn_pool()?;
let client = pool.get().await?;
let query = "select * from dev.sample_table;";
let params: Vec<&(dyn ToSql + Sync)> = Vec::new(); // dummy params with type annotation
// get RowStream without statement cache
let stream: RowStream = client.query_raw(query, params.into_iter()).await?;
pin_mut!(stream);
// fetch each row from stream
while let Some(row) = stream.try_next().await? {
println!("{}", row.get::<&str, uuid::Uuid>("c_id"));
// a3148fb9-fdcb-45a0-a0a1-269af2d17c2d
// 8eb1f400-b636-4c68-9ee1-5d609f7e9a60
}
Ok(())
}
fn get_conn_pool() -> Result<Pool> {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
let pool_config = deadpool_postgres::PoolConfig::new(5);
let mng_config = deadpool_postgres::ManagerConfig::default();
deadpool_postgres::Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
}.builder(NoTls)?.build().context("get pool error")
}
メモ
-
futures
クレートを追加する必要がある - パラメータ指定引数の値は空であっても型注釈付き初期化が必要
- 型注釈が無いと
dyn ToSql
とdyn ToSql + Sync
で特定できない模様
- 型注釈が無いと
-
RowStream
のメモリアドレスをpin_mut!
で固定する必要がある- 詳細な理由は把握できていない (今の私では知識不足)
-
pin_mut!
の代わりにstd::pin::pin
も使用可能- だが
try_next
のためにfutures
は必要なのでpin_mut!
でいい気がする
- だが
トランザクションでのクエリ実行
transaction
が使用できる。
コード
Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::NoTls, Pool, Transaction};
#[tokio::main]
async fn main() -> Result<()> {
let pool = get_conn_pool()?;
let mut client = pool.get().await?;
// begin transaction
let tx: Transaction = client.transaction().await?;
// run insert query with returning rows
let query = "insert into dev.sample_table (c_id,c_bool,c_int,c_bint,c_dbl,c_dec,c_vchr,c_text,c_date,c_dt,c_dttz,c_json)
values(gen_random_uuid(),null,777,null,null,null,null,null,null,null,null,null)
returning c_id,c_int;";
let rows = tx.query(query, &[]).await?; // using tx instead of client
let id: uuid::Uuid = rows.into_iter().next().unwrap().get("c_id");
let query = "delete from dev.sample_table where c_id=$1;";
tx.execute(query, &[&id]).await?; // using tx instead of client
// if Transaction is dropped without commit, rollback automatically
tx.commit().await?;
Ok(())
}
fn get_conn_pool() -> Result<Pool> {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
let pool_config = deadpool_postgres::PoolConfig::new(5);
let mng_config = deadpool_postgres::ManagerConfig::default();
deadpool_postgres::Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
}.builder(NoTls)?.build().context("get pool error")
}
メモ
- clientの
build_transaction
からTransaction
の設定を行うこともできる-
build_transaction
の返り値はTransactionBuilder
-
- トランザクション内でトランザクションを開始する場合は
savepoint
を使用する - 明示的に
rollback
せずともドロップ時に自動でrollback
される- ただし明示的に
rollback
することでrollback
エラーをハンドルできるようになる
- ただし明示的に
感想
それなりに簡単にDB接続等できてよかったです。これまで他のいくつかのクレート含め調査等してきましたが、ようやくRustを使ったアプリケーション開発のスタートラインに立った気がします。
Discussion