Rust sea ormでusecase単位でのtransaction処理
こんにちは
最近rustを触り始めました。
個人的にsea-ormの使い心地が良かったので普段やっているusecaseでのtransaction管理についてやっていこうと思い備忘録がてら記事にします
今回の記事ではレイヤードアーキテクチャの構成のusecase単位でのtransactionの処理について書いていきます
一連の操作がすべて成功した時のみ結果を反映したい,汎用的に使えるような構成にしたい場合に有効かなと思います
この記事ではclean architectureに関しての説明は行いません
個人的に好きなのは
ですディレクトリ設計は以下の感じでやっていきます
.
├── Cargo.lock
├── Cargo.toml
└── src
├── infrastructure
│ ├── datastore
│ │ ├── db_client.rs
│ │ └── transaction_manager_impl.rs
│ ├── repository_impl
│ │ └── post_repository_impl.rs
│ ├── migrations
│ │ ├── Cargo.toml
│ │ ├── README.md
│ │ └── src
│ │ ├── lib.rs
│ │ ├── m20220101_000001_create_table.rs
│ │ └── main.rs
│ ├── di.rs
│ └── server.rs
├── domain
│ ├── repository
│ │ ├── post_repository.rs
│ │ └── entity
├── application
│ ├── usecase
│ │ ├── create_post_usecase.rs
│ │ └── get_post_usecase.rs
│ └── transaction_manager.rs
├── adapter
│ ├── controller
│ │ └── post_controller.rs
│ └── handler
│ └── post_handler.rs
└── main.rs
今回actix_webを使用するのですがこちらの記事を参考にさせていただきました
環境構築
cargoは入っている前提で
sea-orm-cli install
cargo install sea-orm-cli
migration 初期設定
sea-orm-cli migrate init -d ./src/infrastructure/migration
このようなものが生成されます
// main.go
use sea_orm_migration::prelude::*;
#[async_std::main]
async fn main() {
cli::run_cli(migration::Migrator).await;
}
//m20220101_000001_create_table.rs
use sea_orm_migration::prelude::*;
#[derive(DeriveMigrationName)]
pub struct Migration;
#[async_trait::async_trait]
impl MigrationTrait for Migration {
async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.create_table(
Table::create()
.table(Post::Table)
.if_not_exists()
.col(
ColumnDef::new(Post::Id)
.integer()
.not_null()
.auto_increment()
.primary_key(),
)
.col(ColumnDef::new(Post::Title).string().not_null())
.col(ColumnDef::new(Post::Text).string().not_null())
.to_owned(),
)
.await
}
async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
manager
.drop_table(Table::drop().table(Post::Table).to_owned())
.await
}
}
#[derive(DeriveIden)]
enum Post {
Table,
Id,
Title,
Text,
}
こちらのfileを編集してtableを作っていきます
ここでproject直下のCargo.tomlに
[workspace]
members = [
"src/infrastructure/migration",
]
を追加してください
dockerにpostgres環境を構築
# docker-compose.yml
version: '3.8'
services:
sea-orm-example-db:
image: postgres:13
container_name: sea-orm-example-db
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: sea_orm_example
TZ: Asia/Tokyo
ports:
- "5432:5432"
#保存する場合
# volumes:
# - postgres_data:/var/lib/postgresql/data
postgres 立ち上げ
docker-compose up -d sea-orm-example-db
migrationの適応
# dbを設定
export DATABASE_URL=postgres://user:password@localhost:5432/sea-orm-example
sea-orm-cli migrate -d ./src/infrastructure//migration
sea-ormではmigration file -> ormを生成の順で作成していきます
sea-orm-cli generate entity -o ./src/infrastructure/tables --with-serde both --with-copy-enums
コマンドの説明
sea-orm-cli generate entity
- dbのtableからentityを生成するコマンド
オプションの説明
-o ./src/infrastructure/tables
生成先を指定する
sea-orm-cli generate entity -o ./src/infrastructure/tables
serdeのサポートを指定する。bothを設定することでSerialize,Deserializeをサポート
--with-copy-enums
生成されるエンティティの列挙型に Copy トレイトを実装する
実装
Cargo.tomlは以下のようになります
std::error::Error型を用いるとSendの扱いが面倒になるためanyhowを用いています
[package]
name = "sea-orm-example"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
sea-orm = { version = "0.12.15", features = [
"sqlx-postgres",
"runtime-tokio-rustls",
"macros",
"debug-print"
] }
async-trait = "0.1.79"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
actix-web = "4.0"
serde_json = "1.0"
dotenv = "0.15.0"
env_logger = "0.11.3"
log = "0.4.21"
anyhow = "1.0.86"
[workspace]
members = [
"src/infrastructure/migration",
]
db_clientの定義
infrastructure層でdb_clientを定義し,infrastructure層のrepository_implに渡すためのconnectionを渡す関数を持たせます
// infrastructure/datastore/db_client.rs
#[async_trait]
pub trait DBClient {
fn get_connection(&self) -> Arc<DatabaseConnection>;
}
pub struct DBClientImpl {
con: Arc<DatabaseConnection>,
}
impl DBClientImpl {
pub async fn new() -> Result<Box<Self>> {
let db_name = env::var("DATABASE_NAME").expect("DATABASE_NAME must be set");
let db_user = env::var("DATABASE_USER").expect("DATABASE_USER must be set");
let db_pass = env::var("DATABASE_PASSWORD").expect("DATABASE_PASS must be set");
let db_port = env::var("DATABASE_PORT").expect("DATABASE_PORT must be set");
let db_host = env::var("DATABASE_HOST").expect("DATABASE_HOST must be set");
let db_url = format!(
"postgresql://{}:{}@{}:{}/{}",
db_user, db_pass, db_host, db_port, db_name
);
let max_connections: u32 = env::var("DB_MAX_ACTIVE")
.unwrap_or_else(|_| "10".to_string())
.parse()
.expect("DB_MAX_ACTIVE must be an unsigned integer");
let min_connections: u32 = env::var("DB_MAX_IDLE")
.unwrap_or_else(|_| "0".to_string())
.parse()
.expect("DB_MAX_IDLE must be an unsigned integer");
let idle_timeout_secs: u64 = env::var("DB_IDLE_TIMEOUT")
.unwrap_or_else(|_| "10".to_string())
.parse()
.expect("DB_IDLE_TIMEOUT must be an unsigned integer");
let is_logging_enabled: bool = env::var("DB_LOGGING")
.unwrap_or_else(|_| "false".to_string())
.parse()
.expect("DB_LOGGING must be a boolean");
let mut opt = ConnectOptions::new(db_url);
opt.max_connections(max_connections)
.min_connections(min_connections)
.idle_timeout(Duration::from_secs(idle_timeout_secs))
.sqlx_logging(is_logging_enabled);
let con = match Database::connect(opt).await {
Ok(con) => con,
Err(e) => {
error!("Failed to connect to database: {}", e.to_string());
return Err(anyhow::anyhow!("Failed to connect to database"));
}
};
Ok(Box::new(DBClientImpl { con: Arc::new(con) }))
}
}
#[async_trait]
impl DBClient for DBClientImpl {
fn get_connection(&self) -> Arc<DatabaseConnection> {
self.con.clone()
}
}
今回clean architectureを満たすためにtransaction_managerのtraitをapplication層で宣言し,infrastructure層のdb_clientで実装を行っています
transactionは必要に応じてusecase層で発行するような形です
// application/transaction_manager.rs
use anyhow::Result;
use async_trait::async_trait;
#[async_trait]
pub trait TransactionManager {
async fn begin(&mut self) -> Result<()>;
async fn commit(&mut self) -> Result<()>;
async fn rollback(&mut self) -> Result<()>;
}
transaction_managerの実装
// infrastructure/datastore/transaction_manager_impl.rs
pub struct TransactionManagerImpl {
con: Arc<DatabaseConnection>,
transaction: Mutex<Option<DatabaseTransaction>>,
}
impl TransactionManagerImpl {
pub fn new(connection: Arc<DatabaseConnection>) -> Self {
Self {
con: connection,
transaction: Mutex::new(None),
}
}
}
#[async_trait]
impl TransactionManager for TransactionManagerImpl {
async fn begin(&mut self) -> Result<()> {
let mut tx_guard = self.transaction.lock().await;
if tx_guard.is_none() {
let transaction = self.con.begin().await.map_err(|e| {
error!("Failed to start transaction: {}", e.to_string());
e
})?;
info!("Transaction started");
*tx_guard = Some(transaction);
Ok(())
} else {
Err(anyhow::anyhow!("Transaction already exists"))
}
}
async fn commit(&mut self) -> Result<()> {
let mut tx_guard = self.transaction.lock().await;
if let Some(transaction) = tx_guard.take() {
transaction.commit().await.map_err(|e| {
error!("Failed to commit transaction: {}", e.to_string());
e
})?;
info!("Transaction committed");
Ok(())
} else {
Err(anyhow::anyhow!("Transaction already exists"))
}
}
async fn rollback(&mut self) -> Result<()> {
let mut tx_guard = self.transaction.lock().await;
if let Some(transaction) = tx_guard.take() {
transaction.rollback().await.map_err(|e| {
error!("Failed to rollback transaction: {}", e.to_string());
e
})?;
info!("Transaction rolled back");
Ok(())
} else {
Err(anyhow::anyhow!("Transaction already exists"))
}
}
}
usecase実装
#[async_trait]
pub trait CreatePostUsecase {
async fn execute(&self, post: NewPost) -> Result<Post>;
}
pub struct CreatePostUsecaseImpl<P, TM>
where
P: PostRepository,
TM: TransactionManager,
{
post_repository: Arc<P>,
transaction_manager: Arc<Mutex<TM>>,
}
impl<P, TM> CreatePostUsecaseImpl<P, TM>
where
P: PostRepository,
TM: TransactionManager,
{
pub fn new(post_repository: Arc<P>, transaction_manager: Arc<Mutex<TM>>) -> Self {
Self {
post_repository,
transaction_manager,
}
}
}
#[async_trait]
impl<P, TM> CreatePostUsecase for CreatePostUsecaseImpl<P, TM>
where
P: PostRepository + Send + Sync,
TM: TransactionManager + Send + Sync,
{
async fn execute(&self, post: NewPost) -> Result<Post> {
self.transaction_manager.lock().await.begin().await?;
let res = match self.post_repository.create(post).await {
Ok(post) => post,
Err(e) => {
self.transaction_manager.lock().await.rollback().await?;
return Err(anyhow::anyhow!(e));
}
};
self.transaction_manager.lock().await.commit().await?;
Ok(res)
}
}
transaction_managerを注入し,そちらを介してtransactionの管理を行います。
このようにすればtransactionが必要なケースの時のみtransaction_managerを注入していけば済むようにできます
DIについてはrustでのデファクトスタンダードが見つけられず,いい感じのライブラリも見つけられなかったため独自で書いています
#[async_trait]
pub trait DIContainer {
async fn post_container(&self) -> Arc<dyn PostController>;
}
pub struct DIContainerImpl<D>
where
D: DBClient,
{
db_client: D,
}
impl<D> DIContainerImpl<D>
where
D: DBClient,
{
pub fn new(db_client: D) -> Self {
Self { db_client }
}
}
#[async_trait]
impl<D> DIContainer for DIContainerImpl<D>
where
D: DBClient + Send + Sync,
{
async fn post_container(&self) -> Arc<dyn PostController> {
let post_repository = Arc::new(PostRepositoryImpl::new(Arc::clone(
&self.db_client.get_connection(),
)));
let get_post_usecase = GetPostUsecaseImpl::new(Arc::clone(&post_repository));
let transaction_manager = Arc::new(Mutex::new(TransactionManagerImpl::new(Arc::clone(
&self.db_client.get_connection(),
))));
let create_post_usecase =
CreatePostUsecaseImpl::new(Arc::clone(&post_repository), transaction_manager);
let post_controller = PostControllerImpl::new(get_post_usecase, create_post_usecase);
Arc::new(post_controller)
}
}
これであっているのだろうか🤔
shakuは触ったことがあるのですが更新が止まっていて不安が残るので採用に躊躇しています
誰かいいライブラリを知っていたら教えてください🙇🙇
logの確認
~/RustProject/sea-orm-example $ RUST_LOG=debug cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.13s
Running `target/debug/sea-orm-example`
[2024-07-07T08:36:20Z INFO sea_orm_example] Starting server...
[2024-07-07T08:36:20Z INFO actix_server::builder] starting 4 workers
[2024-07-07T08:36:20Z INFO actix_server::server] Actix runtime found; starting in Actix runtime
[2024-07-07T08:36:27Z INFO sea_orm_example::infrastructure::datastore::transaction_manager_impl] Transaction started
[2024-07-07T08:36:27Z DEBUG sea_orm::driver::sqlx_postgres] INSERT INTO "post" ("title", "text") VALUES ('test2', 'test2test2') RETURNING "id", "title", "text"
[2024-07-07T08:36:27Z INFO sqlx::query] summary="INSERT INTO \"post\" (\"title\", …" db.statement="\n\nINSERT INTO\n \"post\" (\"title\", \"text\")\nVALUES\n ($1, $2) RETURNING \"id\",\n \"title\",\n \"text\"\n" rows_affected=0 rows_returned=1 elapsed=8.50375ms elapsed_secs=0.00850375
[2024-07-07T08:36:27Z INFO sqlx::query] summary="COMMIT" db.statement="" rows_affected=0 rows_returned=0 elapsed=1.004916ms elapsed_secs=0.001004916
[2024-07-07T08:36:27Z INFO sea_orm_example::infrastructure::datastore::transaction_manager_impl] Transaction committed
sea-ormではtransactionのスタートのログは出ないようになっているんでしょうか?
設定で出す方法が見当たらなかったので勝手にlogを出すようにしています
まとめ
この記事では、Rustで非同期トランザクション管理を実装する方法について説明しました。まず、必要なクレートを設定し、トランザクション管理トレイトを定義しました。その後、データベース接続を管理するDBClientを実装し、トランザクションを開始、コミット、およびロールバックするTransactionManagerを実装しました。最後に、具体的なユースケースとして投稿を作成する機能を実装しました。
今回rustで汎用的なtransactionを試してみて何とかなりそうな雰囲気を感じられてとても良かったです
githubは、 https://github.com/tomika7060/example-sea-orm です
参考
Discussion