📝
RustでPostgreSQLのデータをキャッシュする
目的
AWS Lambdaなどで大量にアクセスされる時にPostgreSQLにあるマスターデータを都度アクセスするのは非効率です。なので一度アクセスしたらキャッシュしておきたいです。
ただしマスターデータは追加や更新されていくので、適当な時間で再度取り直したいですし、なかった場合はそと値だけ取得してキャッシュに保存します。
コード
PostgreSQL
アカウント情報をマスターデータとします。
CREATE TABLE IF NOT EXISTS public.accounts (
uuid UUID NOT NULL DEFAULT gen_random_uuid()
,content TEXT NOT NULL DEFAULT ''
,PRIMARY KEY(uuid)
);
INSERT INTO accounts(
uuid
,content
) VALUES
('00000000-0000-0000-0000-000000000001','1'),
('00000000-0000-0000-0000-000000000002','2'),
('00000000-0000-0000-0000-000000000003','3'),
('00000000-0000-0000-0000-000000000004','4'),
('00000000-0000-0000-0000-000000000005','5');
コネクションプール
DBアクセスまわり、コネクションプールを取得しています。
pg.rs
use std::time::Duration;
pub type PgPool = deadpool_postgres::Pool;
pub type PgClient = deadpool_postgres::Client;
pub fn get_postgres_pool(url: &str) -> anyhow::Result<PgPool> {
let pg_url = url::Url::parse(url)?;
let dbname = match pg_url.path_segments() {
Some(mut res) => res.next(),
None => Some("web"),
};
let pool_config = deadpool_postgres::PoolConfig {
max_size: 2,
timeouts: deadpool_postgres::Timeouts {
wait: Some(Duration::from_secs(2)),
..Default::default()
},
..Default::default()
};
let cfg = deadpool_postgres::Config {
user: Some(pg_url.username().to_string()),
password: pg_url.password().map(|password| password.to_string()),
dbname: dbname.map(|dbname| dbname.to_string()),
host: pg_url.host_str().map(|host| host.to_string()),
pool: Some(pool_config),
..Default::default()
};
Ok(cfg.create_pool(
Some(deadpool_postgres::Runtime::Tokio1),
deadpool_postgres::tokio_postgres::NoTls,
)?)
}
モデル
アカウントデータクラスです。
初回や破棄時間が過ぎた時に全件取得するget_allとデータがなかった時に取得するget_oneを用意します。
account.rs
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::pg::PgClient;
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
pub struct Account {
pub uuid: Uuid,
pub content: String,
}
const ALL_SQL: &str = r#"
SELECT
to_json(t1.*)
FROM
public.accounts AS t1
WHERE
t1.uuid <> '00000000-0000-0000-0000-000000000005'
"#;
const ONE_SQL: &str = r#"
SELECT
to_json(t1.*)
FROM
public.accounts AS t1
WHERE
t1.uuid = $1
"#;
impl Account {
pub async fn get_all(
pg_client: &PgClient,
) -> Result<HashMap<Uuid, Account>, resident_utils::postgres::holder::Error> {
let accounts: Vec<Account> = pg_client
.query(ALL_SQL, &[])
.await?
.iter()
.map(|row| {
let json: serde_json::Value = row.get(0);
serde_json::from_value(json).unwrap()
})
.collect();
let mut map = HashMap::new();
for account in accounts {
map.insert(account.uuid, account);
}
Ok(map)
}
pub async fn get_one(
pg_client: &PgClient,
uuid: &Uuid,
) -> Result<Option<Account>, resident_utils::postgres::holder::Error> {
let accounts: Vec<Account> = pg_client
.query(ONE_SQL, &[&uuid])
.await?
.iter()
.map(|row| {
let json: serde_json::Value = row.get(0);
serde_json::from_value(json).unwrap()
})
.collect();
Ok(accounts.first().map(|a| a.clone()))
}
}
データーホルダー
データ管理するクラスです。内部にハッシュを持っていてgetで値を返します。
データの破棄時間が過ぎていた場合は全件取得しなおします。
またデータがなかった場合はそのデータを取得して、キャッシュに追加します。
holder.rs
use std::{collections::HashMap, future::Future, hash::Hash, time::Duration};
use chrono::prelude::*;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("PostgresPool {0}")]
PostgresPool(#[from] deadpool_postgres::PoolError),
#[error("Postgres {0}")]
Postgres(#[from] deadpool_postgres::tokio_postgres::Error),
}
pub struct HolderMap<K, V> {
map: HashMap<K, V>,
expire_interval: Duration,
expire_at: DateTime<Utc>,
pg_pool: deadpool_postgres::Pool,
}
impl<K, V> HolderMap<K, V>
where
K: PartialEq + Eq + Hash + Clone,
V: Clone,
{
pub fn new(
pg_pool: deadpool_postgres::Pool,
expire_interval: Duration,
now: Option<DateTime<Utc>>,
) -> Self {
Self {
map: HashMap::new(),
expire_interval,
expire_at: now.unwrap_or(Utc::now()),
pg_pool,
}
}
pub async fn get<FutOne, FutAll>(
&mut self,
key: &K,
now: Option<DateTime<Utc>>,
f: impl FnOnce(deadpool_postgres::Client, K) -> FutOne,
g: impl FnOnce(deadpool_postgres::Client) -> FutAll,
) -> Result<Option<V>, Error>
where
FutOne: Future<Output = Result<Option<V>, Error>>,
FutAll: Future<Output = Result<HashMap<K, V>, Error>>,
{
let now = now.unwrap_or(Utc::now());
if now >= self.expire_at {
let pg_client = self.pg_pool.get().await?;
self.map = g(pg_client).await?;
self.expire_at = now + self.expire_interval;
}
if let Some(value) = self.map.get(key) {
return Ok(Some(value.clone()));
}
let pg_client = self.pg_pool.get().await?;
let value = f(pg_client, key.clone()).await?;
Ok(if let Some(value) = value {
self.map.insert(key.clone(), value.clone());
Some(value)
} else {
None
})
}
}
メイン
メインではstaticとしてHOLDERを定義しています。
これはMutexにして各スレッドで書き込み可能な状態で共有します。
スレッドが始まる前にpg_poolが取得できた時に初期化します。
main.rs
use std::{str::FromStr, sync::OnceLock, time::Duration};
use account::Account;
use pg::get_postgres_pool;
use resident_utils::postgres::holder::HolderMap;
use tokio::{sync::Mutex, time::sleep};
use uuid::Uuid;
pub mod account;
pub mod pg;
static HOLDER: OnceLock<Mutex<HolderMap<Uuid, Account>>> = OnceLock::new();
async fn get_account(uuid: &Uuid) -> anyhow::Result<Option<Account>> {
let mut holder = HOLDER.get().unwrap().lock().await;
let account = holder
.get(
uuid,
None,
|pg_client, uuid| async move {
println!("one");
Account::get_one(&pg_client, &uuid).await
},
|pg_client| async move {
println!("all");
Account::get_all(&pg_client).await
},
)
.await
.unwrap();
Ok(account)
}
async fn get_accounts() {
let account = get_account(&Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap())
.await
.unwrap();
println!("{:?}", account);
let account = get_account(&Uuid::from_str("00000000-0000-0000-0000-000000000002").unwrap())
.await
.unwrap();
println!("{:?}", account);
let account = get_account(&Uuid::from_str("00000000-0000-0000-0000-000000000003").unwrap())
.await
.unwrap();
println!("{:?}", account);
let account = get_account(&Uuid::from_str("00000000-0000-0000-0000-000000000004").unwrap())
.await
.unwrap();
println!("{:?}", account);
let account = get_account(&Uuid::from_str("00000000-0000-0000-0000-000000000005").unwrap())
.await
.unwrap();
println!("{:?}", account);
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let pg_url =
std::env::var("PG_URL").unwrap_or("postgres://user:pass@localhost:5432/web".to_owned());
let pg_pool = get_postgres_pool(&pg_url)?;
// スレッドが始まる前に初期化する
let _ =
HOLDER.get_or_init(|| Mutex::new(HolderMap::new(pg_pool, Duration::from_secs(3), None)));
let thread1 = tokio::spawn(async move {
get_accounts().await;
sleep(Duration::from_secs(4)).await;
get_accounts().await;
});
let thread2 = tokio::spawn(async move {
get_accounts().await;
});
thread1.await?;
thread2.await?;
Ok(())
}
まとめ
今回の機能はresident-utilsに組み込みました。
コードはここにあります。
Discussion