🍳
deadpool-postgresを用いたRustのPostgreSQL接続メモ
deadpool-postgresでコネクションプールとステートメントキャッシュを実現しながらPostgreSQLに接続する方法を調べた結果のメモを備忘のため残しておきます。[deadpool-postgres "0.13.0"]
なぜdeadpool-postgresか
- 要件は以下
- 必須
- PostgreSQLに接続しSQLを実行可能
- それなりに使用されている実績がある
- 非同期実行が可能
- ORMがない
- 任意
- 実行速度はできるだけ早い方が好ましい
- コネクションプール機能がある
- ステートメントキャッシュ機能がある
- 必須
- 残った候補
- sqlx
- deadpool-postgres
- sqlxにしなかった理由
- 他DBを使う予定はない
- 特化しているため特有機能にも対応していそう
- 特化しているためエラー遭遇率も低そう
- sqlxは現時点で実行速度が少しだけ遅いらしい? (diesel-rs/metrics)
- コンパイル時SQL検証に今はそこまで魅力を感じなかった
- 簡単に使えるようになってそうなので必要になったときに学習できそう
前提
Cargo.toml
[dependencies]
chrono = {version = "0.4.38", features = ["alloc"]}
deadpool-postgres = "0.13.0"
postgres-types = {version = "0.2.6", features = ["with-chrono-0_4","with-uuid-1","with-serde_json-1"]}
rust_decimal = {version = "1.35.0", features = ["db-tokio-postgres"]}
serde_json = "1.0.115"
uuid = "1.8.0"
- サンプルテーブル
- テーブル名: dev.sample_table
- 列名が各型を表現していることだけ認識すれば、サンプルコードはほぼ理解可能
- テーブル名: dev.sample_table
| 列名 | 型 | 行1 | 行2 |
|---|---|---|---|
| c_id | uuid | 8eb1f400-b636-4c68-9ee1-5d609f7e9a60 | a3148fb9-fdcb-45a0-a0a1-269af2d17c2d |
| c_bool | boolean | true | false |
| c_int | integer | 1234 | 5432 |
| c_bint | bigint | 1234567 | 8765432 |
| c_dbl | double precision | 12345.67 | 876543.2 |
| c_dec | numeric | 1.234567 | 87.65432 |
| c_vchr | varchar(20) | varchar1 | varchar2 |
| c_text | text | text1 | text2 |
| c_date | date | 2001-01-01 | 2002-04-04 |
| c_dt | timestamp | 2011-02-02 02:02:02 | 2012-05-05 05:05:05 |
| c_dttz | timestamptz | 2021-03-03 12:03:03+09 | 2022-06-06 15:06:06+09 |
| c_json | jsonb | {"ary":[1,true],"key1":"val1"} | {"ary":[2,false],"key2":"val2"} |
各操作のサンプル
DB接続
コード
use deadpool_postgres::{tokio_postgres::NoTls, Config, ManagerConfig, Object, Pool, PoolConfig};
#[tokio::main]
async fn main() {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
// construct managed pool
let pool_config: PoolConfig = PoolConfig::new(5); // max pool size
let mng_config: ManagerConfig = ManagerConfig::default();
let config: Config = Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
};
let pool: Pool = config.builder(NoTls).unwrap().build().unwrap();
// get connection from pool
let _client: Object = pool.get().await.unwrap();
}
メモ
- デフォルトでは接続状態を
tokio-postgresの管理に依存する- 回線状況等によって切断された場合、無効な接続をPoolから取得する場合がある
- 回避するためには
ManagerConfigに以下を用いるManagerConfig { recycling_method: RecyclingMethod::Verified }
-
Poolへの接続取得要求にタイムアウトを設定できる- 設定する場合は
PoolConfigに以下を用いるPoolConfig { max_size: 5, timeouts: Timeouts::wait_millis(5000), queue_mode: QueueMode::default(), }
- 設定する場合は
- 接続情報を個別に設定したい場合は
Configに個別に設定する-
Configはフィールドがかなりの数あるので公式参照 -
ConfigBuilderはない
-
- tlsを使って接続したい場合、
tokio-postgres-rustlsが使用できる- コネクションプールやステートメントキャッシュはない
- コネクションプールについては
deadpoolを使用できる
- コネクションプールについては
- コネクションプールやステートメントキャッシュはない
クエリ実行 (返り値:行)
queryが使用できる。
コード
Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{types::ToSql, NoTls, Row, Statement}, Pool};
#[tokio::main]
async fn main() -> Result<()> {
let pool = get_conn_pool()?;
let client = pool.get().await?;
// run query with statement caching
let query = "select * from dev.sample_table;";
let stmt: Statement = client.prepare_cached(query).await?;
let rows: Vec<Row> = client.query(&stmt, &[]).await?;
// cast Row to SampleRow using From trait
let _data: Vec<SampleRow> = rows.into_iter().map(|row| row.into()).collect();
// run query without caching, and use parameter binding
let query = "select (1 + $1 - $2);";
let params: Vec<&(dyn ToSql + Sync)> = vec![&7, &3];
client.query(query, ¶ms[..]).await?.into_iter().for_each(|row| {
assert_eq!(row.get::<usize, i32>(0), 5);
});
Ok(())
}
struct SampleRow {
c_id: uuid::Uuid,
c_bool: bool,
c_int: i32,
c_bint: i64,
c_dbl: f64,
c_dec: rust_decimal::Decimal,
c_vchr: String,
c_text: String,
c_date: chrono::NaiveDate,
c_dt: chrono::NaiveDateTime,
c_dttz: chrono::DateTime<chrono::Utc>,
c_json: serde_json::Value,
}
impl From<Row> for SampleRow {
fn from(row: Row) -> Self {
Self {
c_id: row.get("c_id"),
c_bool: row.get("c_bool"),
c_int: row.get("c_int"),
c_bint: row.get("c_bint"),
c_dbl: row.get("c_dbl"),
c_dec: row.get("c_dec"),
c_vchr: row.get("c_vchr"),
c_text: row.get("c_text"),
c_date: row.get("c_date"),
c_dt: row.get("c_dt"),
c_dttz: row.get("c_dttz"),
c_json: row.get("c_json")
}
}
}
fn get_conn_pool() -> Result<Pool> {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
let pool_config = deadpool_postgres::PoolConfig::new(5);
let mng_config = deadpool_postgres::ManagerConfig::default();
deadpool_postgres::Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
}.builder(NoTls)?.build().context("get pool error")
}
メモ
-
prepare_cachedでステートメントキャッシュを利用できる- PostgreSQLの
PREPAREコマンドが使用できる以下SQLクエリでのみ使用可能-
SELECT,INSERT,UPDATE,DELETE,MERGE,VALUES
-
- PostgreSQLの
-
queryに渡すクエリはStatement,strのどちらでもOK - パラメータバインディングさせる値は
ToSqlトレイトを満たす必要がある- サンプルコードの
paramsの&i32がrust-analyzerでエラーとなるが、コンパイル可能 - Issueがあげられている
- サンプルコードの
-
Fromトレイトを実装すると行を自作構造体に変換できる- 面倒だが自由にマッピング可能な側面もある
- でも面倒なのでマクロ作成を考えた方がいいかもしれない
- postgres_from_row等のクレートもある
-
#[derive(FromSql)]attributeを使用すると簡単らしい- 私の調査ではシンプルな使用方法がわからなかった
-
SQLクエリに情報を付与して変換する方法は見つかった
- SQLは完全に別世界として隔離したいため採用しなかった
-
NULL値はOption::Noneが対応するため、Nullableな列はOption型で定義が必要- Rust軸で考えると列は基本
IS NOT NULLである方が簡単に取り扱える - もしくは
Fromトレイト実装内でOption型としてgetし、unwrap_or_default等する
- Rust軸で考えると列は基本
クエリ実行 (返り値:処理行数)
executeが使用できる。
コード
Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{NoTls, Statement}, Pool};
#[tokio::main]
async fn main() -> Result<()> {
let pool = get_conn_pool()?;
let client = pool.get().await?;
// prepare query with statement caching
let query = "update dev.sample_table set c_bool=$2 where c_text like $1;";
let stmt: Statement = client.prepare_cached(query).await?;
// run query without returning rows
let params = (&"text%", &false);
let row_count: u64 = client.execute(&stmt, &[params.0, params.1]).await?;
assert_eq!(row_count, 2);
let params = (&"text1", &true);
let row_count: u64 = client.execute(&stmt, &[params.0, params.1]).await?;
assert_eq!(row_count, 1);
Ok(())
}
fn get_conn_pool() -> Result<Pool> {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
let pool_config = deadpool_postgres::PoolConfig::new(5);
let mng_config = deadpool_postgres::ManagerConfig::default();
deadpool_postgres::Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
}.builder(NoTls)?.build().context("get pool error")
}
メモ
- returningの必要ない
INSERT,UPDATE,DELETEに使用できる
クエリ実行 (返り値:なし)
batch_executeが使用できる。
コード
Sample code
use deadpool_postgres::tokio_postgres::{self, NoTls, Client};
#[tokio::main]
async fn main() {
let client = get_simple_client().await;
let query = "create table if not exists dev.sample_table(c_id uuid);
alter table dev.sample_table add column if not exists c_id uuid;";
if let Err(e) = client.batch_execute(query).await {
eprintln!("{}", e);
}
}
async fn get_simple_client() -> Client {
let conn_str = "postgres://user:pass@localhost:5432/dbname";
let (client, connection) = tokio_postgres::connect(conn_str, NoTls).await.unwrap();
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});
client
}
メモ
- シンプルなSQLを実行する場合に使用できる
- 失敗した場合は
Errが返るが、どこで失敗したかはわからない - トランザクションでもないため、成功した場所までは実行されている
- 全て成功した場合の返り値は
Ok(()) - SQLインジェクションに対して無防備なメソッド
- 失敗した場合は
- コネクションプールもステートメントキャッシュも不要な場合は上記方法で接続できる
-
deadpool-postgresの機能を一切使用していないため、tokio-postgresで実行可能
-
行を順次読込するクエリ実行
query_rawが使用できる。
コード
Sample code
Cargo.toml
[dependencies]
futures = "0.3.30"
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::{types::ToSql, NoTls, RowStream}, Pool};
use futures::{pin_mut, TryStreamExt};
#[tokio::main]
async fn main() -> Result<()> {
let pool = get_conn_pool()?;
let client = pool.get().await?;
let query = "select * from dev.sample_table;";
let params: Vec<&(dyn ToSql + Sync)> = Vec::new(); // dummy params with type annotation
// get RowStream without statement cache
let stream: RowStream = client.query_raw(query, params.into_iter()).await?;
pin_mut!(stream);
// fetch each row from stream
while let Some(row) = stream.try_next().await? {
println!("{}", row.get::<&str, uuid::Uuid>("c_id"));
// a3148fb9-fdcb-45a0-a0a1-269af2d17c2d
// 8eb1f400-b636-4c68-9ee1-5d609f7e9a60
}
Ok(())
}
fn get_conn_pool() -> Result<Pool> {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
let pool_config = deadpool_postgres::PoolConfig::new(5);
let mng_config = deadpool_postgres::ManagerConfig::default();
deadpool_postgres::Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
}.builder(NoTls)?.build().context("get pool error")
}
メモ
-
futuresクレートを追加する必要がある - パラメータ指定引数の値は空であっても型注釈付き初期化が必要
- 型注釈が無いと
dyn ToSqlとdyn ToSql + Syncで特定できない模様
- 型注釈が無いと
-
RowStreamのメモリアドレスをpin_mut!で固定する必要がある- 詳細な理由は把握できていない (今の私では知識不足)
-
pin_mut!の代わりにstd::pin::pinも使用可能- だが
try_nextのためにfuturesは必要なのでpin_mut!でいい気がする
- だが
トランザクションでのクエリ実行
transactionが使用できる。
コード
Sample code
use anyhow::{Result, Context};
use deadpool_postgres::{tokio_postgres::NoTls, Pool, Transaction};
#[tokio::main]
async fn main() -> Result<()> {
let pool = get_conn_pool()?;
let mut client = pool.get().await?;
// begin transaction
let tx: Transaction = client.transaction().await?;
// run insert query with returning rows
let query = "insert into dev.sample_table (c_id,c_bool,c_int,c_bint,c_dbl,c_dec,c_vchr,c_text,c_date,c_dt,c_dttz,c_json)
values(gen_random_uuid(),null,777,null,null,null,null,null,null,null,null,null)
returning c_id,c_int;";
let rows = tx.query(query, &[]).await?; // using tx instead of client
let id: uuid::Uuid = rows.into_iter().next().unwrap().get("c_id");
let query = "delete from dev.sample_table where c_id=$1;";
tx.execute(query, &[&id]).await?; // using tx instead of client
// if Transaction is dropped without commit, rollback automatically
tx.commit().await?;
Ok(())
}
fn get_conn_pool() -> Result<Pool> {
let conn_str = "postgres://user:pass@localhost:5432/dbname".to_string();
let pool_config = deadpool_postgres::PoolConfig::new(5);
let mng_config = deadpool_postgres::ManagerConfig::default();
deadpool_postgres::Config {
url: Some(conn_str),
manager: Some(mng_config),
pool: Some(pool_config),
..Default::default()
}.builder(NoTls)?.build().context("get pool error")
}
メモ
- clientの
build_transactionからTransactionの設定を行うこともできる-
build_transactionの返り値はTransactionBuilder
-
- トランザクション内でトランザクションを開始する場合は
savepointを使用する - 明示的に
rollbackせずともドロップ時に自動でrollbackされる- ただし明示的に
rollbackすることでrollbackエラーをハンドルできるようになる
- ただし明示的に
感想
それなりに簡単にDB接続等できてよかったです。これまで他のいくつかのクレート含め調査等してきましたが、ようやくRustを使ったアプリケーション開発のスタートラインに立った気がします。
Discussion