🛠️

Rustのsqlxを使ったリポジトリ層の設計パターン

に公開

sqlxはRustからデータベースを扱うためのライブラリです。Rust製のORMとしてはdieselなどの先発のライブラリがありますが、非同期処理に対応していることや、実装が簡単であるといった特長から、近年人気を集めています。

sqlxの基礎的な使い方に関する解説記事は、比較的多くある一方で、実際のアプリケーションに導入する際の知見をまとめた記事は、あまり見受けられませんでした。

そこで、本記事では、sqlxを実際のアプリケーションにおいて、リポジトリ層に導入する場合に、どのようなパターンを組むのが良いのかについて、考察してみたいと思います。

使用するバージョン

Config.toml
sqlx = { version = "0.8", features = ["runtime-tokio", "postgres"] }

今回はRDBMSとして、PostgreSQLを使用し、アプリケーション部分にはaxumを使用しますが、本記事で紹介する方法は、ほかのデータベースやWebフレームワークと組み合わせて使用することができるので、適宜読み替えていただけると幸いです。

パターン1 リポジトリ構造体にPoolを埋め込む

#[derive(sqlx::FromRow, Serialize)]
pub struct User {
    pub id: i32,
    pub name: String,
}

pub struct CreateUser {
    pub name: String,
}

/// Userリポジトリ
pub struct UserRepository {
    pool: PgPool,
}

impl UserRepository {
    pub fn new(pool: PgPool) -> Self {
        UserRepository { pool }
    }

    pub async fn create_user(&self, input: CreateUser) -> sqlx::Result<User> {
        sqlx::query_as("INSERT INTO users (name) VALUES ($1) RETURNING *")
            .bind(input.name)
            .fetch_one(&self.pool)
            .await
    }
}

1つ目は、リポジトリの構造体 (UserRepository) に、データベースのコネクションプール (PgPool) を直接埋め込むパターンです。Javaを経験したことがある方には、馴染み深いパターンだと思います。axumから使う場合には、Stateにコネクションプールを保持して、リポジトリを生成できるように FromRequestParts の設定をすると、見通しが良くなると思います。

#[derive(Clone)]
pub struct AppState {
    pub db: PgPool
}

impl FromRequestParts<AppState> for UserRepository {
    type Rejection = Infallible;

    async fn from_request_parts(
        _parts: &mut axum::http::request::Parts,
        state: &AppState,
    ) -> Result<Self, Self::Rejection> {
        UserRespository::new(state.db.clone())
    }
}

// HTTPハンドラの記述

#[derive(Deserialize)]
struct CreateUserInput {
    pub name: String,
}

async fn create_user(
    user_repository: UserRepository,
    Json(CreateUserInput { name }): Json<CreateUserInput>,
) -> AppResult<Json<User>> {
    Ok(Json(user_repository.create_user(CreateUser { name }).await?))
}

このパターンはシンプルで、HTTPハンドラ側の記述量も多くないため、一見良さそうに見えますが、実は致命的な欠点が1つあります。それは、トランザクションに対応できないという点です。コネクションプールを埋め込んでいる限り、複数のリポジトリからは別々のコネクションが取得されるので、構造上トランザクションを張ることが不可能です。

コネクションプールに加えて、トランザクションも保持できるようなenumを作ればいいのかと思いきや、それほど単純な話ではなく、sqlxのトランザクションが&mutを要求する関係で、ライフタイムや所有権の問題が出てくるため、現状では不可能であると思われます。

逆に、トランザクションの必要性がない場合には、このパターンもありだとは思います。

パターン2 impl Executor を引数として受ける

pub struct UserRepository;

impl UserRepository {
    pub async fn create_user(
        executor: impl PgExecutor<'_>,
        input: CreateUser,
    ) -> sqlx::Result<User> {
        sqlx::query_as("INSERT INTO users (name) VALUES ($1) RETURNING *")
            .bind(input.name)
            .fetch_one(executor)
            .await
    }
}

先ほどのパターンでは、リポジトリにコネクションプールを埋め込んでしまったため、トランザクションを処理できないという欠点がありました。そこで、埋め込みをやめて、引数として渡すようにしようというのが、このパターンの発想です。

sqlxには PgExecutor (PostgreSQLの場合) というトレイトがあり、クエリを実行できるインスタンス (&PgPool&mut PgTransaction&mut PgConnectionなど) に実装されています。これをジェネリクスで受けるようにすれば、トランザクションにも対応できるというわけです。

HTTPハンドラ側は次のような形になります。

async fn create_user(
    State(AppState { db }): State<AppState>,
    Json(CreateUserInput { name }): Json<CreateUserInput>,
) -> impl IntoResponse {
    let user = UserRepository::create_user(&db, CreateUser { name }).await?;

    // トランザクションありの場合
    let mut tx = db.begin().await?;
    let user = UserRepository::create_user(&mut tx, CreateUser { name }).await?;
    tx.commit().await?;

    Ok(Json(user))
}

このパターンには明確な欠点が2つあります。

  • Executorを毎回渡す必要があり、手間がかかる
  • データベースとリポジトリの関係性の情報が欠落している

1つ目は単純に、記述量が多くなるのと、そもそもリポジトリの中身が空なので、実質的にnamespaceと変わりないという点があります。

2つ目の方が深刻で、例えば2つ以上のデータベースに接続する場合に、どのリポジトリがどのデータベースから取得できるのかがわからないことが問題です。そのため、違うデータベースのコネクションプールを引数として与えても、トレイトの制約上は同じPgExecutor を満たしているので、コンパイルエラーにならないということが起きます。プロジェクトの規模がそこまで大きくない場合には、対応関係を把握できるかもしれませんが、あまりスマートなパターンとは思えないのが正直なところです。(厳格な型システムというRustの価値観的にも、微妙だと思います)

パターン3 newtypeパターンを用いて、リポジトリをトレイトとして実装する

お待たせしました。本命のパターンです。

#[allow(async_fn_in_trait)]
pub trait UserRepository<'a>: DBExecutor<'a> {
    async fn create_user(self, input: CreateUser) -> sqlx::Result<User> {
        sqlx::query_as("INSERT INTO users (name) VALUES ($1) RETURNING *")
            .bind(input.name)
            .fetch_one(self)
            .await
    }
}

/// DBExecutor を実装したすべての型に UserRepository を実装
impl<'a, T: DBExecutor<'a>> UserRepository<'a> for T {}

先ほどのパターン2では、データベースとリポジトリの関連付けがなされていないことが課題としてありました。この根底には、PgExecutor トレイトがsqlxで宣言されたもので、複数のデータベースを区別できないという原因があります。そこで、それぞれのデータベースに特化したトレイトを新たに作ることにより、データベースの区別を型ベースで行えるようにするのが、このパターンの特徴です。

上の例では、UserRepositoryDBExecutor というトレイトを継承しており、その後のジェネリックな実装によって、DBExecutor を実装したすべての型に UserRepository を実装しています。

DBExecutor は、もとのsqlxの PgExecutor を継承したトレイトです。このトレイトは、新たに作成する DBPoolDBTransaction に実装されます。

/// メインDBのExecutorトレイト
pub trait DBExecutor<'c>: PgExecutor<'c> + Sized {}

/// メインDBプール
#[derive(Debug, Clone)]
pub struct DBPool(PgPool);

impl DBPool {
    pub fn new(pool: PgPool) -> Self {
        DBPool(pool)
    }

    /// トランザクションを開始
    ///
    /// トランザクションを適用するには、操作の後に [`DBTransaction::commit`] を呼び出す必要があります。
    pub async fn begin(&self) -> sqlx::Result<DBTransaction> {
        Ok(DBTransaction(self.0.begin().await?))
    }
}

/// メインDBトランザクション
#[derive(Debug)]
pub struct DBTransaction(PgTransaction<'static>);

impl DBTransaction {
    /// トランザクションを確定
    pub async fn commit(self) -> sqlx::Result<()> {
        self.0.commit().await
    }

    /// トランザクションを中断
    pub async fn rollback(self) -> sqlx::Result<()> {
        self.0.rollback().await
    }
}

impl<'c> DBExecutor<'c> for &DBPool {}
impl<'c> DBExecutor<'c> for &'c mut DBTransaction {}

ここで、DBPoolDBTransaction は、それぞれ PgPoolPgTransaction のラッパーしただけの構造体です。このようなパターンはRustではよく見受けられ、newtypeパターンとして知られています。これにより、別のデータベースを使用する場合には、SecondDBExecutorSecondDBTransaction のように新たなトレイトや構造体を作成し、型ベースで区別できるようになります。

PgPoolPgTransaction は独自に作った型なので、PgExecutor を手動で実装する必要があります。PgExecutorExecutor<Database=Postgres> のエイリアスです。

impl<'p> Executor<'p> for &DBPool {
    type Database = Postgres;

    fn fetch_many<'e, 'q: 'e, E>(
        self,
        query: E,
    ) -> BoxStream<'e, Result<Either<PgQueryResult, PgRow>, Error>>
    where
        E: 'q + sqlx::Execute<'q, Self::Database>,
    {
        self.0.clone().fetch_many(query)
    }

    fn fetch_optional<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Option<PgRow>, Error>>
    where
        E: 'q + sqlx::Execute<'q, Self::Database>,
    {
        self.0.clone().fetch_optional(query)
    }

    fn prepare_with<'e, 'q: 'e>(
        self,
        sql: &'q str,
        parameters: &'e [PgTypeInfo],
    ) -> BoxFuture<'e, Result<PgStatement<'q>, Error>> {
        self.0.clone().prepare_with(sql, parameters)
    }

    fn describe<'e, 'q: 'e>(
        self,
        sql: &'q str,
    ) -> BoxFuture<'e, Result<sqlx::Describe<Self::Database>, Error>> {
        self.0.clone().describe(sql)
    }
}

impl<'c> Executor<'c> for &'c mut DBTransaction {
    type Database = Postgres;

    fn fetch_many<'e, 'q: 'e, E>(
        self,
        query: E,
    ) -> BoxStream<'e, Result<Either<PgQueryResult, PgRow>, Error>>
    where
        'c: 'e,
        E: 'q + sqlx::Execute<'q, Self::Database>,
    {
        self.0.fetch_many(query)
    }

    fn fetch_optional<'e, 'q: 'e, E>(self, query: E) -> BoxFuture<'e, Result<Option<PgRow>, Error>>
    where
        'c: 'e,
        E: 'q + sqlx::Execute<'q, Self::Database>,
    {
        self.0.fetch_optional(query)
    }

    fn prepare_with<'e, 'q: 'e>(
        self,
        sql: &'q str,
        parameters: &'e [PgTypeInfo],
    ) -> BoxFuture<'e, Result<PgStatement<'q>, Error>>
    where
        'c: 'e,
    {
        self.0.prepare_with(sql, parameters)
    }

    fn describe<'e, 'q: 'e>(
        self,
        sql: &'q str,
    ) -> BoxFuture<'e, Result<sqlx::Describe<Self::Database>, Error>>
    where
        'c: 'e,
    {
        self.0.describe(sql)
    }
}

やや記述量が多いですが、やっていることは単純で、内部のsqlxの型の実装を呼び出すだけです。実装を行う必要があるのは、&DBPool&mut DBTransaction の2つです。

このパターンを用いたHTTPハンドラは次のように書くことができます。

impl FromRequestParts<AppState> for DBPool {
    type Rejection = Infallible;

    async fn from_request_parts(
        _parts: &mut axum::http::request::Parts,
        state: &AppState,
    ) -> Result<Self, Self::Rejection> {
        Ok(state.db.clone())
    }
}

// HTTPハンドラの記述

async fn create_user(
    db: DBPool,
    Json(CreateUserInput { name }): Json<CreateUserInput>,
) -> impl IntoResponse {
    let user = db.create_user(CreateUser { name }).await?;

    // トランザクションありの場合
    let mut tx = db.begin().await?;
    let user = tx.create_user(CreateUser { name }).await?;
    tx.commit().await?;

    Ok(Json(user))
}

2つめのパターンと比較して非常にスマートに記述できることがわかります。

また、トランザクションについても対応しながら、データベースとリポジトリの関係を維持することに成功しています。

このパターンの欠点として、Executor の実装周りのボイラープレートがやや必要なことが挙げられますが、各データベースで1回だけ記述すればよく、リポジトリの追加時に記述が必要なわけではないので、それほど手間はかかりません。

まとめ

Rustのサーバーサイド開発における、sqlxを用いたリポジトリ層の実装パターンを検討しました。

個人的な結論として、現状では3つ目の newtypeパターンとトレイトを組み合わせた実装 が、多くのケースで適しているように感じます。小規模でデータベースを1つしか使用しないような場合では、型の区別は必要ないかもしれませんが、newtypeパータンの記述量はそれほど大きく増えるわけではなく、今後のスケーリングを考慮するのではあれば、はじめからカスタム型を作成する方針で進めるのがいいのではと思います。

他のアイデアを思い付いた方がいらっしゃれば、コメントなどで共有いただけると嬉しいです。

この記事がRustを用いたサーバーサイド開発の一助となれば幸いです。

Discussion