💊

【Rust】アプリケーションのDBトランザクション管理の方法を考える

2025/01/27に公開

こんにちは。
今回はRustで構築したWebアプリケーションにおいて、DBのトランザクション管理の手法を検討、実装しました。結論としては、完全に満足できる形では実装できていませんが、試行錯誤した結果をまとめます。改善可能な点があればぜひコメントにてご指摘ください。

前提

  • Rust 1.81
  • Actix Web 4.9.0
  • SeaORM 1.1.3
  • PostgreSQL 17.0

ORMの選択肢としてはDieselもありますが、アプリケーション全体をActix Webによって非同期ベースで構築していたため、同じように非同期で実装可能なSeaORMを選択しました。

https://www.sea-ql.org/SeaORM/docs/internal-design/diesel/

やりたいこと/課題

やりたいこと

  1. アプリケーションにおけるDBトランザクションを管理するクラスを作成し、全体でロジックを共通化しつつ、各ユースケースではその管理を特に意識せずに利用することができるようにする
  2. アプリケーションで提供するAPIのエンドポイントごとにトランザクションを利用するか否か、選択可能な状態にする
  3. アプリケーションはクリーンアーキテクチャを土台としているため、インフラ層に位置付けられるトランザクション管理を抽象化して、依存関係を適切に保つ

課題

  1. Rust特有の所有権やライフタイムの概念に対応し、トランザクションの提供や利用をする必要がある
  2. クリーンアーキテクチャの原則に沿って、依存関係の逆転を利用しながら、アプリケーションロジックにインフラ層の関心ごとが滲み出ないようにする

全体フロー

DBコネクションやトランザクションを管理するマネージャークラスを定義し、各エンドポイントで以下のように利用するようにしました。

リクエスト受信
      ↓
ミドルウェアでトランザクションマネージャー生成
      ↓
エンドポイントのコントローラ呼び出し
      ↓
コントローラでトランザクションマネージャー取得
      ↓
DI用モジュールにトランザクションマネージャーを渡す
      ↓
リポジトリでトランザクションまたはコネクションを利用
      ↓
エンドポイントの処理完了
      ↓
ミドルウェアでレスポンスステータスを確認
    ┌───────────────┐
    │               │
200系              400/500系
コミット             ロールバック
    └───────────────┘
      ↓
レスポンスを返却

以下、実装を提示しながらフローの中身を整理していきます。完全な実装も公開しています。

https://github.com/penysho/ec-extension

トランザクションマネージャーの実装

まずはトランザクションマネージャーのインターフェースを定義します。
ポイントとしては、実装で利用するSeaORMに依存しないようにジェネリクスを用いてコネクションやトランザクションの型を抽象化し、そのトランザクション自体の操作を直接利用側は行わず、トランザクションマネージャーを通じて行われるようにしている点です。

transaction_manager_interface.rs
/// Transaction manager interface.
/// Manage transactions used by the application and provide for each use case.
///
/// # Parameters
/// - `T`: Transaction type.
/// - `C`: Connection type.
#[async_trait]
pub trait TransactionManager<T, C>: Send + Sync {
    /// Start a transaction.
    async fn begin(&self) -> Result<(), DomainError>;

    /// Returns whether or not a Transaction has been initiated.
    async fn is_transaction_started(&self) -> bool;

    /// Get current transaction.
    async fn get_transaction(&self) -> Result<MutexGuard<'_, Option<T>>, DomainError>;

    /// Get connection.
    async fn get_connection(&self) -> Result<C, DomainError>;

    /// Commit transaction.
    async fn commit(&self) -> Result<(), DomainError>;

    /// Roll back a transaction.
    async fn rollback(&self) -> Result<(), DomainError>;
}

次にトランザクションマネージャーのSeaORMによる実装です。
フィールドにSeaORMのDatabaseConnectionDatabaseTransactionを保持し、get_xxx()で呼び出し元に引き渡します。また、トランザクションを操作するbegin()commit()rollback()は前述の通り、ミドルウェアで実行される想定です。

sea_orm_manager.rs
/// Transaction manager for SeaORM.
#[derive(Clone)]
pub struct SeaOrmTransactionManager {
    conn: Arc<DatabaseConnection>,
    tran: Arc<Mutex<Option<DatabaseTransaction>>>,
}

impl SeaOrmTransactionManager {
    /// Create a new transaction manager.
    pub async fn new(conn: Arc<DatabaseConnection>) -> Result<Self, DomainError> {
        Ok(Self {
            conn,
            tran: Arc::new(Mutex::new(None)),
        })
    }
}

#[async_trait]
impl TransactionManager<DatabaseTransaction, Arc<DatabaseConnection>> for SeaOrmTransactionManager {
    async fn begin(&self) -> Result<(), DomainError> {
        let mut lock = self.tran.lock().await;
        if lock.is_none() {
            let tran = self.conn.begin().await.map_err(|e| {
                log::error!("Database transaction error: {}", e);
                InfrastructureErrorMapper::to_domain(InfrastructureError::DatabaseError(e))
            })?;
            *lock = Some(tran);
            Ok(())
        } else {
            log::error!("Database transaction error: transaction is already started");
            Err(DomainError::SystemError)
        }
    }

    async fn is_transaction_started(&self) -> bool {
        self.tran.lock().await.is_some()
    }

    async fn get_transaction(
        &self,
    ) -> Result<MutexGuard<'_, Option<DatabaseTransaction>>, DomainError> {
        let lock = self.tran.lock().await;
        if lock.is_some() {
            Ok(lock)
        } else {
            log::error!("Database transaction error: transaction is not started");
            Err(DomainError::SystemError)
        }
    }

    async fn get_connection(&self) -> Result<Arc<DatabaseConnection>, DomainError> {
        Ok(self.conn.clone())
    }

    async fn commit(&self) -> Result<(), DomainError> {
        let mut lock = self.tran.lock().await;
        if let Some(tran) = lock.take() {
            tran.commit().await.map_err(|e| {
                log::error!("Database commit error: {}", e);
                InfrastructureErrorMapper::to_domain(InfrastructureError::DatabaseError(e))
            })?;
            Ok(())
        } else {
            log::error!("Database commit error: transaction is not started");
            Err(DomainError::SystemError)
        }
    }

    async fn rollback(&self) -> Result<(), DomainError> {
        let mut lock = self.tran.lock().await;
        if let Some(tran) = lock.take() {
            tran.rollback().await.map_err(|e| {
                log::error!("Database rollback error: {}", e);
                InfrastructureErrorMapper::to_domain(InfrastructureError::DatabaseError(e))
            })?;
            Ok(())
        } else {
            log::error!("Database rollback error: transaction is not started");
            Err(DomainError::SystemError)
        }
    }
}

検討事項①: Mutexを利用したDatabaseTransactionの管理

今回の実装で最も苦労した点ですが、DatabaseTransactionMutexで包んでいるのは以下が理由です。

  • SeaORMのDatabaseTransaction型がCloneできない
  • Rustの所有権の概念によって、メソッドの戻り値を借用として定義すると実装が複雑化する(非同期では特に)

https://docs.rs/sea-orm/latest/sea_orm/struct.DatabaseTransaction.html

最初にパッと考えた対処策は、以下のようにget_transaction()DatabaseTransactionの参照を返却しようとした例です。
理解している人には当たり前かもしれませんが、lockはローカル変数として定義しているため、そのライフタイムはこのメソッドのスコープに限定されているため、その参照を呼び出し元に返そうとすると、コンパイルエラーになります。

これはコンパイルエラー
#[async_trait]
pub trait TransactionManager<T, C>: Send + Sync {
    /// Get current transaction.
    async fn get_transaction(&self) -> Result<Option<&T>, DomainError>;
}

#[async_trait]
impl TransactionManager<DatabaseTransaction, Arc<DatabaseConnection>> for SeaOrmTransactionManager {
    async fn get_transaction(&self) -> Result<Option<&DatabaseTransaction>, DomainError> {
        let lock = self.tran.lock().await;
        if lock.is_some() {
            Ok(lock.as_ref())
        } else {
            log::error!("Database transaction error: transaction is not started");
            Err(DomainError::SystemError)
        }
    }
}

// コンパイルエラー
error[E0515]: cannot return value referencing local variable lock
   --> src/infrastructure/db/sea_orm/sea_orm_manager.rs:81:1
    |
81  | #[async_trait]
    | ^^^^^^^^^^^^^^ returns a value referencing data owned by the current function
...
105 |             Ok(lock.as_ref())
    |                ---- lock is borrowed here
    |
    = note: this error originates in the attribute macro async_trait (in Nightly builds, run with -Z macro-backtrace for more info)

したがって、今回はミドルウェアと各ユースケース(エンドポイントの処理)でトランザクションマネージャーを共有し、それぞれでトランザクションを利用、消費する必要があることから、Mutexでトランザクションを包むことで共有可能にしています。

検討事項②: トランザクションはMutexGuardとして提供

また、get_transaction()の戻り値がMutexGuardになっていますが、これは基本的には推奨されないアプローチと考えます。
MutexGuard<'_, T>はスコープを抜けると自動的に解放されるRAII型です。そのため、MutexGuardを返すと、呼び出し元でそのロックが解放されるまで、Mutexがロックされたままになります。
つまり、よくない点は具体的に以下の二つです。

  • 呼び元でガードが保持されたままだと、デッドロックが発生する可能性がある
  • 非同期処理ではロックの確保時間をより短くしていく必要があり、相性が悪い

しかしながら、今回はこの問題を許容できると考えました。

  • ミドルウェアでリクエストごとにトランザクションマネージャーは生成されるため、リクエストを跨いで共有されることがない
  • トランザクションという特性上、同期的に利用されることが想定される。
    • ミドルウェアにて各ユースケースの処理前に、begin()でトランザクションを開始
    • トランザクションの利用はget_transaction()を使用して各ユースケースで実施
    • ミドルウェアにて各ユースケースの処理後に、commit()rollback()を実行

ちなみに以下のように、MutexGuardを使用してトランザクションを提供せず、呼び元からクロージャを受け取り、トランザクションマネージャーの内部でそれを実行する手法も検討しましたが、結果的にはライフタイムの制約に引っかかり実装できませんでした。

これはコンパイルエラー
async fn execute_in_transaction<F, Fut, R>(&self, f: F) -> Result<R, DomainError>
where
    F: FnOnce(&DatabaseTransaction) -> Fut + Send,
    Fut: Future<Output = Result<R, DomainError>> + Send,
{
    let lock = self.tran.lock().await;
    if let Some(tran) = lock.as_ref() {
        f(tran).await
    } else {
        log::error!("Database transaction error: transaction is not started");
        Err(DomainError::SystemError)
    }
}

// 呼び出し元の例
self.transaction_manager
    .execute_in_transaction(|transaction| async {
        permission_query.all(transaction).await.map_err(|e| {
            log::error!(
                "Failed to get role resource permissions. user_id: {}, error: {:?}",
                user_id,
                e
            );
            InfrastructureErrorMapper::to_domain(InfrastructureError::DatabaseError(e))
        })
    })
    .await

// コンパイルエラー
error: lifetime may not live long enough
  --> src/infrastructure/auth/rbac/rbac_authorizer.rs:80:55
   |
80 |                   .execute_in_transaction(|transaction| async {
   |  __________________________________________------------_^
   | |                                          |          |
   | |                                          |          return type of closure {async block@src/infrastructure/auth/rbac/rbac_authorizer.rs:80:55: 80:60} contains a lifetime '2
   | |                                          has type &'1 DatabaseTransaction
81 | |                     permission_query.all(transaction).await.map_err(|e| {
82 | |                         log::error!(
83 | |                             "Failed to get role resource permissions. user_id: {}, error: {:?}",
...  |
88 | |                     })
89 | |                 })
   | |_________________^ returning this value requires that '1 must outlive '2

原因は、execute_in_transaction()に渡しているクロージャ内で使用している DatabaseTransactionのライフタイムが、非同期処理のライフタイムを満たせないからで、具体的には、DatabaseTransactionの参照がクロージャのスコープ外で使われる可能性があるとコンパイラが判断しているようです。
解決方法としては、参照ではなく所有権を保持できればいいのですが、前述の通り、DatabaseTransactionはCloneできません。

したがって、MutexGuardを返却する形となりました。ただし、やはりこのアプローチは完璧ではなく、利用する側では受け取ったガードからas_ref()によって参照を取得してもらうのが前提となってしまいます。(所有権を取得してしまうとトランザクションマネージャーがコミットやロールバックを実行できなくなる)
トランザクションマネージャーの内部の仕組みが利用側に滲み出てしまっている感じもしますが、これ以上ができなかった形です。

トランザクションミドルウェアの実装

以下が実装です。処理内容としては前述のとおりで、各ユースケースの処理の前段でリクエストごとにトランザクションマネージャーを生成し、ユースケースの処理の結果によって、コミットまたはロールバックします。

sea_orm_transaction_middleware.rs
// Fixed message is responded and no internal information is returned.
const TRANSACTION_ERROR_MESSAGE: &str = "System error";

/// Middleware for managing transactions with SeaORM.
pub async fn sea_orm_transaction_middleware(
    req: ServiceRequest,
    next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, Error> {
    let connection_provider = req
        .app_data::<web::Data<SeaOrmConnectionProvider>>()
        .ok_or_else(|| {
            log::error!("Failed to get connection provider");
            error::ErrorInternalServerError(TRANSACTION_ERROR_MESSAGE)
        })?;

    let transaction_manager =
        SeaOrmTransactionManager::new(Arc::clone(&connection_provider.get_connection()))
            .await
            .map_err(|e| {
                log::error!("Initialization of transaction manager failed: {}", e);
                error::ErrorInternalServerError(TRANSACTION_ERROR_MESSAGE)
            })?;

    let transaction_started = req.request().method() != Method::GET;
    if transaction_started {
        transaction_manager
            .begin()
            .await
            .map_err(|_| error::ErrorInternalServerError(TRANSACTION_ERROR_MESSAGE))?;
    }

    req.extensions_mut().insert(Arc::new(transaction_manager)
        as Arc<dyn TransactionManager<DatabaseTransaction, Arc<DatabaseConnection>>>);

    let res = next.call(req).await;
    match res {
        Ok(response) => {
            // If a transaction is initiated by the endpoint processing, it should be committed there.
            if !transaction_started {
                return Ok(response);
            }

            if response.status().is_success() {
                if let Some(transaction_manager) = response
                    .request()
                    .extensions_mut()
                    .get::<Arc<dyn TransactionManager<DatabaseTransaction, Arc<DatabaseConnection>>>>()
                {
                    transaction_manager.commit().await.map_err(|_| {
                        error::ErrorInternalServerError(TRANSACTION_ERROR_MESSAGE)
                    })?;
                } else {
                    log::error!("Failed to get transaction manager");
                    return Err(error::ErrorInternalServerError(TRANSACTION_ERROR_MESSAGE));
                }
            } else {
                if let Some(transaction_manager) = response
                    .request()
                    .extensions_mut()
                    .get::<Arc<dyn TransactionManager<DatabaseTransaction, Arc<DatabaseConnection>>>>()
                {
                    transaction_manager.rollback().await.map_err(|_| {
                        error::ErrorInternalServerError(TRANSACTION_ERROR_MESSAGE)
                    })?;
                } else {
                    log::error!("Failed to get transaction manager");
                    return Err(error::ErrorInternalServerError(TRANSACTION_ERROR_MESSAGE));
                }
            }
            Ok(response)
        }
        Err(err) => {
            // This branch assumes an error before the application logic is called, so there is no need to explicitly roll back
            log::error!("Transaction cannot be rolled back because a response cannot be obtained");
            Err(err)
        }
    }
}

要点は以下です。

  • GETの場合のみbegin()するようにする
    • すべての処理でトランザクションを使用する必要はないため、今回はリクエストがGET系の場合はコネクションをそのまま利用する
  • 生成したトランザクションマネージャーをリクエスト内で共有できるように、ActixのExtensionsに格納

Controllerでのトランザクションマネージャーの取得

ここからが各リクエストごとの処理になります。先ほどミドルウェアで生成したトランザクションマネージャーを取得しています。
また、前述の通りクリーンアーキテクチャで構築しており、Controllerのメソッドが各エンドポイントごとに用意されています。Usecase層の初期化とそれに伴う依存関係の解決は、InteractorProviderが行っています。

controller.rs
/// Controller receives data from outside and calls usecase.
pub struct Controller<I, T, C>
where
    I: InteractorProvider<T, C>,
    T: Send + Sync + 'static,
    C: Send + Sync + 'static,
{
    pub interactor_provider: I,
    _t_marker: PhantomData<T>,
    _c_marker: PhantomData<C>,
}

impl<I, T, C> Controller<I, T, C>
where
    I: InteractorProvider<T, C>,
    T: Send + Sync + 'static,
    C: Send + Sync + 'static,
{
    pub fn new(interactor_provider: I) -> Self {
        Controller {
            interactor_provider,
            _t_marker: PhantomData,
            _c_marker: PhantomData,
        }
    }

    /// Obtain the transaction manager from the actix request.
    pub fn get_transaction_manager(
        &self,
        request: &actix_web::HttpRequest,
    ) -> Result<Arc<dyn TransactionManager<T, C>>, DomainError> {
        let manager = request
            .extensions()
            .get::<Arc<dyn TransactionManager<T, C>>>()
            .cloned();

        match manager {
            Some(manager) => Ok(manager),
            None => {
                log::error!(target: "Controller::get_transaction_manager", "transaction_manager not found");
                Err(DomainError::SystemError)
            }
        }
    }
}

// ここから各エンドポイントの実装
impl<I, T, C> Controller<I, T, C>
where
    I: InteractorProvider<T, C>,
    T: Send + Sync + 'static,
    C: Send + Sync + 'static,
{
    /// Get a list of customers.
    pub async fn get_customers(
        &self,
        params: web::Query<GetCustomersQueryParams>,
        request: actix_web::HttpRequest,
    ) -> impl Responder {
        let presenter = CustomerPresenterImpl::new();

        let query = validate_query_params(&params)?;
        let user_id = self.get_user_id(&request)?;
        let transaction_manager = self.get_transaction_manager(&request)?;

        let interactor = self
            .interactor_provider
            .provide_customer_interactor(transaction_manager)
            .await;
        let results = interactor.get_customers(&user_id, &query).await;

        presenter.present_get_customers(results).await
    }
}

InteractorProviderは以下のように定義しています。
型パラメータによって実装が複雑化している点がなんとかならないかな、、と思っています。
関連型で定義できればもっとスッキリするのですが、単体テスト用に各トレイトにmockallを設定していて、関連型が定義されたトレイトにmockallを設定するには、別途具体的な型を与えてあげる必要があります。すんなりいかなかったので、今回は型パラメータで書いてしまっています。

interactor_provider_interface.rs
/// Factory interface providing Interactor.
#[automock]
#[async_trait]
pub trait InteractorProvider<T, C>: Send + Sync
where
    T: Send + Sync + 'static,
    C: Send + Sync + 'static,
{
    /// Provide Interactor for customer.
    async fn provide_customer_interactor(
        &self,
        transaction_manager: Arc<dyn TransactionManager<T, C>>,
    ) -> Box<dyn CustomerInteractor>;
}

また、実装はインフラ層に配置しています。
以下のように、RbacAuthorizerにトランザクションマネージャーを注入し、Interactorを生成しています。CustomerRepositoryImplは特に気にしなくて大丈夫です。

interactor_provider_impl.rs
/// Factory providing Interactor.
pub struct InteractorProviderImpl {
    shopify_config: ShopifyConfig,
}

impl InteractorProviderImpl {
    pub fn new(
        shopify_config: ShopifyConfig,
    ) -> Self {
        Self {
            shopify_config,
        }
    }
}

#[async_trait]
impl InteractorProvider<DatabaseTransaction, Arc<DatabaseConnection>> for InteractorProviderImpl {
    async fn provide_customer_interactor(
        &self,
        transaction_manager: Arc<
            dyn TransactionManager<DatabaseTransaction, Arc<DatabaseConnection>>,
        >,
    ) -> Box<dyn CustomerInteractor> {
        Box::new(CustomerInteractorImpl::new(
            Box::new(CustomerRepositoryImpl::new(ShopifyGQLClient::new(
                self.shopify_config.clone(),
            ))),
            Arc::new(RbacAuthorizer::new(Arc::clone(&transaction_manager))),
        ))
    }
}

Usecaseでのロジック制御

Usecaseではインタフェース越しにアダプタやリポジトリを操作して、アプリケーションロジックを制御しているのみで、今回は詳細には取り上げません。
ちなみに、ここでは顧客情報の取得をしており、顧客情報に不特定多数がアクセスできては困るので、authorizerによって認可を行っている形です。先ほどのInteractorProviderの実装の通り、このauthorizerにトランザクションマネージャーが注入されています。

customer_impl.rs
/// Customer Interactor.
pub struct CustomerInteractorImpl {
    customer_repository: Box<dyn CustomerRepository>,
    authorizer: Arc<dyn Authorizer>,
}

impl CustomerInteractorImpl {
    pub fn new(
        customer_repository: Box<dyn CustomerRepository>,
        authorizer: Arc<dyn Authorizer>,
    ) -> Self {
        Self {
            customer_repository,
            authorizer,
        }
    }
}

#[async_trait]
impl CustomerInteractor for CustomerInteractorImpl {
    async fn get_customers(
        &self,
        user_id: &UserId,
        query: &GetCustomersQuery,
    ) -> Result<Vec<Customer>, DomainError> {
        self.authorizer
            .authorize(user_id, &Resource::Customer, &Action::Read)
            .await?;

        match query {
            GetCustomersQuery::Email(email) => {
                let customer = self
                    .customer_repository
                    .find_customer_by_email(email)
                    .await?;
                Ok(vec![customer])
            }
        }
    }
}

Infra層でのトランザクションの取得と利用

前述のauthorizerにおける認可の実装で、ここでやっとトランザクションマネージャーを利用し、DB操作を行っています。
ここではRBACによって実装していますが、その辺りのロジックは本筋ではないので、DB操作を行っている箇所に着目します。

rbac_authorizer.rs
/// Authorization by RBAC.
pub struct RbacAuthorizer {
    transaction_manager: Arc<dyn TransactionManager<DatabaseTransaction, Arc<DatabaseConnection>>>,
}

impl RbacAuthorizer {
    /// Create a new instance.
    pub fn new(
        transaction_manager: Arc<
            dyn TransactionManager<DatabaseTransaction, Arc<DatabaseConnection>>,
        >,
    ) -> Self {
        Self {
            transaction_manager,
        }
    }
}

#[async_trait]
impl Authorizer for RbacAuthorizer {
    async fn authorize(
        &self,
        user_id: &str,
        resource: &Resource,
        action: &Action,
    ) -> Result<(), DomainError> {
        let role_query = UserRole::find().filter(user_role::Column::UserId.eq(user_id));

        // -------- ここに着目 --------
        let roles = if self.transaction_manager.is_transaction_started().await {
            role_query
                .all(
                    self.transaction_manager
                        .get_transaction()
                        .await?
                        .as_ref()
                        .unwrap(),
                )
                .await
        } else {
            role_query
                .all(self.transaction_manager.get_connection().await?.as_ref())
                .await
        }
        // ----------------
        .map_err(|e| {
            log::error!(
                "Failed to get user roles. user_id: {}, error: {:?}",
                user_id,
                e
            );
            InfrastructureErrorMapper::to_domain(InfrastructureError::DatabaseError(e))
        })?;

        if roles.is_empty() {
            log::error!("User has no role. user_id: {}", user_id);
            return Err(DomainError::SystemError);
        }
        let role_ids: Vec<i32> = roles.iter().map(|role| role.role_id).collect();

        let permission_query = RoleResoucePermission::find()
            .find_also_related(Permission)
            .filter(role_resouce_permission::Column::RoleId.is_in(role_ids));
        let role_resource_permission = if self.transaction_manager.is_transaction_started().await {
            permission_query
                .all(
                    self.transaction_manager
                        .get_transaction()
                        .await?
                        .as_ref()
                        .unwrap(),
                )
                .await
        } else {
            permission_query
                .all(self.transaction_manager.get_connection().await?.as_ref())
                .await
        }
        .map_err(|e| {
            log::error!(
                "Failed to get role resource permissions. user_id: {}, error: {:?}",
                user_id,
                e
            );
            InfrastructureErrorMapper::to_domain(InfrastructureError::DatabaseError(e))
        })?;

        if !role_resource_permission.iter().any(|permission| {
            let allow_resource = match Resource::try_from(permission.0.resource_id) {
                Ok(permission_resource) => permission_resource == *resource,
                Err(_) => false,
            };

            let allow_action = match permission.1.clone().unwrap().action.parse::<Action>() {
                Ok(permission_action) => {
                    permission_action == *action || permission_action == Action::All
                }
                Err(_) => false,
            };

            return allow_resource && allow_action;
        }) {
            log::error!(
                "User is not authorized. user_id: {}, resource: {}, action: {}",
                user_id,
                resource,
                action
            );
            return Err(DomainError::AuthorizationError);
        }

        Ok(())
    }
}

率直にいうと、ここは冗長な実装になってしまいました。
利用する側で、is_transaction_started()によってトランザクションが開始されているか判定して、get_transaction()によってトランザクションを利用したクエリを発行するかget_connection()によってコネクションを利用したクエリを発行するか、分岐ができてしまっています。
これは、SeaORMのall()にはConnectionTraitというトレイトの実装を渡す必要があり、ここでSeaORMのDatabaseTransactionDatabaseConnectionを明示しなくてはならないため、この形になってしまいました。

https://docs.rs/sea-orm/latest/sea_orm/trait.ConnectionTrait.html

対応策としては、ConnectionTraitをさらに抽象化するトレイトを自前で用意すると良さそうと考えています。ただし、そこまで抽象化を頑張る必要性が現状感じられなかったので、今回はこの実装となりました。

対して、RbacAuthorizerのフィールドに、トランザクションマネージャーの実装であるSeaOrmTransactionManegerではなく、TransactionManegerトレイトを保持している点はうまく実装できたと考えています。トランザクションマネージャーを利用する側としては、SeaORMへの依存を抑えつつ、他のORMを導入した場合にも対応可能な状態を目指せているかなと考えます。

動作確認

先ほどから例に挙げている顧客情報取得のエンドポイントはGETのため、検証用にGETでもトランザクションを開始するように設定した上でログを確認してみます。
正常レスポンスが返却された場合のログでは、sqlxのログよりトランザクションがコミットされていることが確認できます。

正常レスポンスのログ
[2025-01-26T16:01:18Z INFO  sqlx::query] summary="COMMIT" db.statement="" rows_affected=0 rows_returned=0 elapsed=2.2185ms elapsed_secs=0.0022185
[2025-01-26T16:01:18Z INFO  actix_web::middleware::logger] 172.19.0.1 "GET /ec-extension/customers?email=xxxxxx HTTP/1.1" 200 689 "-" "PostmanRuntime/7.43.0" 0.774352

以下がエラーレスポンスが返却された場合のログです。存在しない顧客情報を取得しようとして404が返却され、トランザクションもロールバックされていることが確認できました。

エラーレスポンスのログ
[2025-01-26T16:02:49Z ERROR backend::infrastructure::ec::shopify::repository::customer::customer_impl] No customer found for email: dummy@example.com
[2025-01-26T16:02:50Z INFO  sqlx::query] summary="ROLLBACK" db.statement="" rows_affected=0 rows_returned=0 elapsed=1.327792ms elapsed_secs=0.001327792
[2025-01-26T16:02:50Z DEBUG actix_web::middleware::logger] Error in response: NotFound { object_name: "Customer" }
[2025-01-26T16:02:50Z INFO  actix_web::middleware::logger] 172.19.0.1 "GET /ec-extension/customers?email=dummy@example.com HTTP/1.1" 404 65 "-" "PostmanRuntime/7.43.0" 0.830698

おわりに

いくつか課題はありますが、大枠としては依存関係を適切に保ちながら、RustでWebアプリケーションにおけるトランザクション管理を実装できたと思います。
Rust特有の概念に対応しつつ適切な抽象化を実現する必要があるという点が非常に難解でした。しかし、試行錯誤によって所有権やライフタイムについて理解が深まった点もあります。
引き続き理解を深めて、パフォーマンス観点も合わせてより適切なアプローチを検討していきたいと思います。

Discussion