🙆‍♀️

Rust sea ormでusecase単位でのtransaction処理

2024/07/07に公開

こんにちは

最近rustを触り始めました。
個人的にsea-ormの使い心地が良かったので普段やっているusecaseでのtransaction管理についてやっていこうと思い備忘録がてら記事にします

https://github.com/SeaQL/sea-orm

今回の記事ではレイヤードアーキテクチャの構成のusecase単位でのtransactionの処理について書いていきます
一連の操作がすべて成功した時のみ結果を反映したい,汎用的に使えるような構成にしたい場合に有効かなと思います

この記事ではclean architectureに関しての説明は行いません

個人的に好きなのは
https://zenn.dev/tacoms/articles/29eedf32d8d18b
https://zenn.dev/tomoya_pama/articles/98a2c822d2a37b
https://zenn.dev/maru44/articles/b9e07e91a0ea77
です

ディレクトリ設計は以下の感じでやっていきます

.
├── Cargo.lock
├── Cargo.toml
└── src
├── infrastructure
│ ├── datastore
│ │ ├── db_client.rs
│ │ └── transaction_manager_impl.rs
│ ├── repository_impl
│ │ └── post_repository_impl.rs
│ ├── migrations
│ │ ├── Cargo.toml
│ │ ├── README.md
│ │ └── src
│ │ ├── lib.rs
│ │ ├── m20220101_000001_create_table.rs
│ │ └── main.rs
│ ├── di.rs
│ └── server.rs
├── domain
│ ├── repository
│ │ ├── post_repository.rs
│ │ └── entity
├── application
│ ├── usecase
│ │ ├── create_post_usecase.rs
│ │ └── get_post_usecase.rs
│ └── transaction_manager.rs
├── adapter
│ ├── controller
│ │ └── post_controller.rs
│ └── handler
│ └── post_handler.rs
└── main.rs

今回actix_webを使用するのですがこちらの記事を参考にさせていただきました

環境構築

cargoは入っている前提で

sea-orm-cli install

cargo install sea-orm-cli

migration 初期設定

sea-orm-cli migrate init -d ./src/infrastructure/migration

このようなものが生成されます

// main.go
use sea_orm_migration::prelude::*;

#[async_std::main]
async fn main() {
    cli::run_cli(migration::Migrator).await;
}
//m20220101_000001_create_table.rs
use sea_orm_migration::prelude::*;

#[derive(DeriveMigrationName)]
pub struct Migration;

#[async_trait::async_trait]
impl MigrationTrait for Migration {
    async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> {

        manager
            .create_table(
                Table::create()
                    .table(Post::Table)
                    .if_not_exists()
                    .col(
                        ColumnDef::new(Post::Id)
                            .integer()
                            .not_null()
                            .auto_increment()
                            .primary_key(),
                    )
                    .col(ColumnDef::new(Post::Title).string().not_null())
                    .col(ColumnDef::new(Post::Text).string().not_null())
                    .to_owned(),
            )
            .await
    }

    async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> {
        manager
            .drop_table(Table::drop().table(Post::Table).to_owned())
            .await
    }
}

#[derive(DeriveIden)]
enum Post {
    Table,
    Id,
    Title,
    Text,
}

こちらのfileを編集してtableを作っていきます

https://www.sea-ql.org/SeaORM/docs/next/migration/writing-migration/

ここでproject直下のCargo.tomlに

[workspace]
members = [
    "src/infrastructure/migration",
]

を追加してください

dockerにpostgres環境を構築

# docker-compose.yml
version: '3.8'
services:
  sea-orm-example-db:
    image: postgres:13
    container_name: sea-orm-example-db
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: password
      POSTGRES_DB: sea_orm_example
      TZ: Asia/Tokyo
    ports:
      - "5432:5432"
    #保存する場合
    # volumes:
    #   - postgres_data:/var/lib/postgresql/data

postgres 立ち上げ

docker-compose  up -d sea-orm-example-db

migrationの適応

# dbを設定
export DATABASE_URL=postgres://user:password@localhost:5432/sea-orm-example  
sea-orm-cli migrate -d ./src/infrastructure//migration  

sea-ormではmigration file -> ormを生成の順で作成していきます

sea-orm-cli generate entity -o ./src/infrastructure/tables --with-serde both --with-copy-enums

コマンドの説明

sea-orm-cli generate entity

  • dbのtableからentityを生成するコマンド

オプションの説明

-o ./src/infrastructure/tables
生成先を指定する
sea-orm-cli generate entity -o ./src/infrastructure/tables
serdeのサポートを指定する。bothを設定することでSerialize,Deserializeをサポート
--with-copy-enums
生成されるエンティティの列挙型に Copy トレイトを実装する

実装

Cargo.tomlは以下のようになります
std::error::Error型を用いるとSendの扱いが面倒になるためanyhowを用いています

[package]
name = "sea-orm-example"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
sea-orm = { version = "0.12.15", features = [
    "sqlx-postgres",
    "runtime-tokio-rustls",
    "macros",
    "debug-print"
] }
async-trait = "0.1.79"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
actix-web = "4.0"
serde_json = "1.0"
dotenv = "0.15.0"
env_logger = "0.11.3"
log = "0.4.21"
anyhow = "1.0.86"

[workspace]
members = [
    "src/infrastructure/migration",
]

db_clientの定義

infrastructure層でdb_clientを定義し,infrastructure層のrepository_implに渡すためのconnectionを渡す関数を持たせます

// infrastructure/datastore/db_client.rs
#[async_trait]
pub trait DBClient {
    fn get_connection(&self) -> Arc<DatabaseConnection>;
}

pub struct DBClientImpl {
    con: Arc<DatabaseConnection>,
}

impl DBClientImpl {
    pub async fn new() -> Result<Box<Self>> {
        let db_name = env::var("DATABASE_NAME").expect("DATABASE_NAME must be set");
        let db_user = env::var("DATABASE_USER").expect("DATABASE_USER must be set");
        let db_pass = env::var("DATABASE_PASSWORD").expect("DATABASE_PASS must be set");
        let db_port = env::var("DATABASE_PORT").expect("DATABASE_PORT must be set");
        let db_host = env::var("DATABASE_HOST").expect("DATABASE_HOST must be set");
        let db_url = format!(
            "postgresql://{}:{}@{}:{}/{}",
            db_user, db_pass, db_host, db_port, db_name
        );

        let max_connections: u32 = env::var("DB_MAX_ACTIVE")
            .unwrap_or_else(|_| "10".to_string())
            .parse()
            .expect("DB_MAX_ACTIVE must be an unsigned integer");
        let min_connections: u32 = env::var("DB_MAX_IDLE")
            .unwrap_or_else(|_| "0".to_string())
            .parse()
            .expect("DB_MAX_IDLE must be an unsigned integer");
        let idle_timeout_secs: u64 = env::var("DB_IDLE_TIMEOUT")
            .unwrap_or_else(|_| "10".to_string())
            .parse()
            .expect("DB_IDLE_TIMEOUT must be an unsigned integer");
        let is_logging_enabled: bool = env::var("DB_LOGGING")
            .unwrap_or_else(|_| "false".to_string())
            .parse()
            .expect("DB_LOGGING must be a boolean");

        let mut opt = ConnectOptions::new(db_url);
        opt.max_connections(max_connections)
            .min_connections(min_connections)
            .idle_timeout(Duration::from_secs(idle_timeout_secs))
            .sqlx_logging(is_logging_enabled);

        let con = match Database::connect(opt).await {
            Ok(con) => con,
            Err(e) => {
                error!("Failed to connect to database: {}", e.to_string());
                return Err(anyhow::anyhow!("Failed to connect to database"));
            }
        };

        Ok(Box::new(DBClientImpl { con: Arc::new(con) }))
    }
}

#[async_trait]
impl DBClient for DBClientImpl {
    fn get_connection(&self) -> Arc<DatabaseConnection> {
        self.con.clone()
    }
}

今回clean architectureを満たすためにtransaction_managerのtraitをapplication層で宣言し,infrastructure層のdb_clientで実装を行っています

transactionは必要に応じてusecase層で発行するような形です

// application/transaction_manager.rs
use anyhow::Result;
use async_trait::async_trait;

#[async_trait]
pub trait TransactionManager {
    async fn begin(&mut self) -> Result<()>;
    async fn commit(&mut self) -> Result<()>;
    async fn rollback(&mut self) -> Result<()>;
}

transaction_managerの実装

// infrastructure/datastore/transaction_manager_impl.rs
pub struct TransactionManagerImpl {
    con: Arc<DatabaseConnection>,
    transaction: Mutex<Option<DatabaseTransaction>>,
}

impl TransactionManagerImpl {
    pub fn new(connection: Arc<DatabaseConnection>) -> Self {
        Self {
            con: connection,
            transaction: Mutex::new(None),
        }
    }
}

#[async_trait]
impl TransactionManager for TransactionManagerImpl {
    async fn begin(&mut self) -> Result<()> {
        let mut tx_guard = self.transaction.lock().await;
        if tx_guard.is_none() {
            let transaction = self.con.begin().await.map_err(|e| {
                error!("Failed to start transaction: {}", e.to_string());
                e
            })?;

            info!("Transaction started");
            *tx_guard = Some(transaction);
            Ok(())
        } else {
            Err(anyhow::anyhow!("Transaction already exists"))
        }
    }

    async fn commit(&mut self) -> Result<()> {
        let mut tx_guard = self.transaction.lock().await;
        if let Some(transaction) = tx_guard.take() {
            transaction.commit().await.map_err(|e| {
                error!("Failed to commit transaction: {}", e.to_string());
                e
            })?;
            info!("Transaction committed");
            Ok(())
        } else {
            Err(anyhow::anyhow!("Transaction already exists"))
        }
    }

    async fn rollback(&mut self) -> Result<()> {
        let mut tx_guard = self.transaction.lock().await;
        if let Some(transaction) = tx_guard.take() {
            transaction.rollback().await.map_err(|e| {
                error!("Failed to rollback transaction: {}", e.to_string());
                e
            })?;
            info!("Transaction rolled back");
            Ok(())
        } else {
            Err(anyhow::anyhow!("Transaction already exists"))
        }
    }
}

usecase実装

#[async_trait]
pub trait CreatePostUsecase {
    async fn execute(&self, post: NewPost) -> Result<Post>;
}

pub struct CreatePostUsecaseImpl<P, TM>
where
    P: PostRepository,
    TM: TransactionManager,
{
    post_repository: Arc<P>,
    transaction_manager: Arc<Mutex<TM>>,
}

impl<P, TM> CreatePostUsecaseImpl<P, TM>
where
    P: PostRepository,
    TM: TransactionManager,
{
    pub fn new(post_repository: Arc<P>, transaction_manager: Arc<Mutex<TM>>) -> Self {
        Self {
            post_repository,
            transaction_manager,
        }
    }
}

#[async_trait]
impl<P, TM> CreatePostUsecase for CreatePostUsecaseImpl<P, TM>
where
    P: PostRepository + Send + Sync,
    TM: TransactionManager + Send + Sync,
{
    async fn execute(&self, post: NewPost) -> Result<Post> {
        self.transaction_manager.lock().await.begin().await?;

        let res = match self.post_repository.create(post).await {
            Ok(post) => post,
            Err(e) => {
                self.transaction_manager.lock().await.rollback().await?;
                return Err(anyhow::anyhow!(e));
            }
        };

        self.transaction_manager.lock().await.commit().await?;

        Ok(res)
    }
}

transaction_managerを注入し,そちらを介してtransactionの管理を行います。

このようにすればtransactionが必要なケースの時のみtransaction_managerを注入していけば済むようにできます

DIについてはrustでのデファクトスタンダードが見つけられず,いい感じのライブラリも見つけられなかったため独自で書いています

#[async_trait]
pub trait DIContainer {
    async fn post_container(&self) -> Arc<dyn PostController>;
}

pub struct DIContainerImpl<D>
where
    D: DBClient,
{
    db_client: D,
}

impl<D> DIContainerImpl<D>
where
    D: DBClient,
{
    pub fn new(db_client: D) -> Self {
        Self { db_client }
    }
}

#[async_trait]
impl<D> DIContainer for DIContainerImpl<D>
where
    D: DBClient + Send + Sync,
{
    async fn post_container(&self) -> Arc<dyn PostController> {
        let post_repository = Arc::new(PostRepositoryImpl::new(Arc::clone(
            &self.db_client.get_connection(),
        )));

        let get_post_usecase = GetPostUsecaseImpl::new(Arc::clone(&post_repository));

        let transaction_manager = Arc::new(Mutex::new(TransactionManagerImpl::new(Arc::clone(
            &self.db_client.get_connection(),
        ))));

        let create_post_usecase =
            CreatePostUsecaseImpl::new(Arc::clone(&post_repository), transaction_manager);

        let post_controller = PostControllerImpl::new(get_post_usecase, create_post_usecase);
        Arc::new(post_controller)
    }
}

これであっているのだろうか🤔
shakuは触ったことがあるのですが更新が止まっていて不安が残るので採用に躊躇しています
誰かいいライブラリを知っていたら教えてください🙇🙇

logの確認

~/RustProject/sea-orm-example $ RUST_LOG=debug cargo run                     
    Finished dev [unoptimized + debuginfo] target(s) in 0.13s
     Running `target/debug/sea-orm-example`
[2024-07-07T08:36:20Z INFO  sea_orm_example] Starting server...
[2024-07-07T08:36:20Z INFO  actix_server::builder] starting 4 workers
[2024-07-07T08:36:20Z INFO  actix_server::server] Actix runtime found; starting in Actix runtime
[2024-07-07T08:36:27Z INFO  sea_orm_example::infrastructure::datastore::transaction_manager_impl] Transaction started
[2024-07-07T08:36:27Z DEBUG sea_orm::driver::sqlx_postgres] INSERT INTO "post" ("title", "text") VALUES ('test2', 'test2test2') RETURNING "id", "title", "text"
[2024-07-07T08:36:27Z INFO  sqlx::query] summary="INSERT INTO \"post\" (\"title\", …" db.statement="\n\nINSERT INTO\n  \"post\" (\"title\", \"text\")\nVALUES\n  ($1, $2) RETURNING \"id\",\n  \"title\",\n  \"text\"\n" rows_affected=0 rows_returned=1 elapsed=8.50375ms elapsed_secs=0.00850375
[2024-07-07T08:36:27Z INFO  sqlx::query] summary="COMMIT" db.statement="" rows_affected=0 rows_returned=0 elapsed=1.004916ms elapsed_secs=0.001004916
[2024-07-07T08:36:27Z INFO  sea_orm_example::infrastructure::datastore::transaction_manager_impl] Transaction committed

sea-ormではtransactionのスタートのログは出ないようになっているんでしょうか?
設定で出す方法が見当たらなかったので勝手にlogを出すようにしています

まとめ

この記事では、Rustで非同期トランザクション管理を実装する方法について説明しました。まず、必要なクレートを設定し、トランザクション管理トレイトを定義しました。その後、データベース接続を管理するDBClientを実装し、トランザクションを開始、コミット、およびロールバックするTransactionManagerを実装しました。最後に、具体的なユースケースとして投稿を作成する機能を実装しました。

今回rustで汎用的なtransactionを試してみて何とかなりそうな雰囲気を感じられてとても良かったです

githubは、 https://github.com/tomika7060/example-sea-orm です

参考

https://zenn.dev/yoshiyoshifujii/articles/ecc1d9806811a8
https://qiita.com/tsuchinoko0402/items/dda60c43dbe4e83e729d
https://rs.nkmk.me/rust-anyhow-basic/
https://zenn.dev/collabostyle/articles/0641d73f776d80

Discussion