🦄

PrismaのTransactionClientを引き回さない

2023/12/16に公開

はじめに

本投稿で使用している環境

  • Node.js: 20.5.1
  • @nestjs/core: 10.1.3 (platformにはfastifyを使用)
  • prisma: 5.7.0
  • @prisma/client: 5.7.0

Prisma Transactionの種類

PrismaでTransactionを実行する場合は大きく分けて以下の2つの書き方があります。
https://www.prisma.io/docs/orm/prisma-client/queries/transactions#interactive-transactions

1. Nested writes

1発のPrismaClient操作で、リレーションのレコードも含めて操作する書き方です。

const newUser: User = await prisma.user.create({
  data: {
    email: 'alice@prisma.io',
    posts: {
      create: [
        { title: 'Join the Prisma Slack on https://slack.prisma.io' },
        { title: 'Follow @prisma on Twitter' },
      ],
    },
  },
})

明示的なtransaction宣言はしていませんが、この書き方で発行されるSQLは以下のようにtransactionコンテキスト内でINSERTされます。

LOG [Prisma] Query:  BEGIN
LOG [Prisma] Query:  INSERT INTO "public"."User" ("id","companyId","email","name","createdAt","updatedAt") VALUES ($1,$2,$3,$4,$5,$6) RETURNING "public"."User"."id"
LOG [Prisma] Query:  INSERT INTO "public"."Post" ("id","userId","title","content") VALUES ($1,$2,$3,$4) RETURNING "public"."Post"."id"
LOG [Prisma] Query:  SELECT "public"."User"."id", "public"."User"."companyId", "public"."User"."email", "public"."User"."name", "public"."User"."createdAt", "public"."User"."updatedAt", FROM "public"."User" WHERE "public"."User"."id" = $1 LIMIT $2 OFFSET $3
LOG [Prisma] Query:  COMMIT

仮にリレーションのpostに対する操作に失敗した場合は、userのcreateが成功したとしてもuserの処理はRollbackされ、All or Nothingな挙動になっています。

2. The $transaction API

この$transaction APIはさらに以下の2つの書き方に分けられます。

2-1. Sequential Prisma Client operations

複数のPrismaClient処理をシーケンシャルに実行できるAPIです。
$transaction関数に対して配列で複数のPrismaClientの処理を渡します。
1つ目のPrismaClient処理で成功し、2つ目以降で失敗すると、1つ目の処理はRollbackされます。

const [posts, totalPosts] = await prisma.$transaction([
  prisma.post.findMany({ where: { title: { contains: 'prisma' } } }),
  prisma.post.count(),
])

2-2. Interactive transactions

2-1のSequential operationsでは、$transactionメソッドの引数にはprisma promiseを渡さなければならないため、例えばtransaction処理中に作成したデータを使って別のクエリを実行するなどの、柔軟な使い方ができませんでした。

この問題を解決するために新たに登場したのがInteractive transactionsで、以下のように$transactionAPIに関数を渡し、その関数の中でPrismaClientの操作以外の処理もすることが可能になりました。
https://github.com/prisma/prisma/issues/1844

こちらも同様に処理中に例外が発生すると書き込んだ内容がすべてRollbackされます。

return prisma.$transaction(async (tx) => {
    // 1. Decrement amount from the sender.
    const sender = await tx.account.update({
      data: {
        balance: {
          decrement: amount,
        },
      },
      where: {
        email: from,
      },
    })

    // 2. Verify that the sender's balance didn't go below zero.
    if (sender.balance < 0) {
      throw new Error(`${from} doesn't have enough to send ${amount}`)
    }

    // 3. Increment the recipient's balance by amount
    const recipient = await tx.account.update({
      data: {
        balance: {
          increment: amount,
        },
      },
      where: {
        email: to,
      },
    })

    return recipient
  })

このInteractive Transactionには引数に渡す関数に対してTransactionコンテキストを保持したPrismaClientが渡されるのですが、このclient経由で実行したSQLでなければtransaction中の処理として実行されないという制約があります。

const prisma = new PrismaClient();
await prisma.$transaction(async (prismaTx: Prisma.TransactionClient) => {
  const result = await prismaTx.something.create(...) // ✅ transaction内で処理される
  await this.prisma.something.findUnique({ // ❌ prismaTxを使わない場合はfindしても見つからないためnullになる
    where: {
      id: result.id
    }
  });
});

このInteractive Transactionの挙動に関しては、以下の課題がありました。

複数のmoduleにまたがって1つのtransactionを実行したい場合に、Prisma TransactionClientを引き回す必要がある

特にレイヤー構造的な作りでRepositoryパターンを使っている際や、Serviceクラスなどに共通ロジックを切り出している際にPrismaのTransactionClientを考慮したいとなると、冗長なコードを書く必要がありました。

以下はレイヤードなアーキテクチャにおいてprismaのtransaction処理が冗長になる1例です。(Nest.jsを用いた例のため依存しているクラスはDIしています)

// some-usecase.ts
@Injectable()
class SomeUseCase {
  constructor(private prisma: PrismaClient, private someService: ISomeService, ...) {}

  async someUseCase() {
    ...
    await this.prisma.$transaction(async (primsaTx) => {
      ...
      await this.someService.create(params, prismaTx);
    });
  }
}

// some-service.ts
class SomeService implements ISomeService {
  constructor(private someRepository: ISomeRepository, ...) {}
  
  async create(params, prismaTx) {
    ...
    await this.someRepository.createWithTx(params, prismaTx)
  }
}

// some-repository.ts
@Injectable()
class SomeRepository implements ISomeRepository {
  constructor(private prisma: PrismaClient) {}
  
  async create(params) {
    await this.prisma.something.create(...)
  }
  
  // 🔺 repositoryでは利用側でtransactionを実行しているかどうかは知り得ないため、transactionで使用したい場合はprismaTxを引き回す専用の関数を用意する必要がある
  async createWithTx(params, prismaTx) {
    await prismaTx.something.create(...)
  }
}

この場合にシンプルな集約をcreateする場合はデータアクセス層(prismaに依存している場所)でtransaction専用の関数を用意してあげても特に冗長にはならないでしょう。

しかし、transaction内で複数ドメイン(または複数のServiceクラス)にまたがる処理を1つのtransaction内で完結させたい場合などではコードの冗長性が無視できないレベルになってきます。

Serviceクラスなどに処理を切り出している場合は、findなどのprismaの読み取り系メソッドを多く呼び出すこともあるため、find系の関数もtransaction用とnot transaction用で2つ作成する必要が出始め、Repository層に類似の関数が多く定義されていきます。

※ 関数自体はそれぞれ1つにし、引数のprismaTxがundefinedかどうかでPrismaClientを条件分岐させる方法もあると思いますが、その方法でも根本的には冗長さは変わらないと思っています。

class SomeRepository {
  async findById() {}
  
  async findByIdWithTx() {}
  
  async create() {}
  
  async createWithTx() {}
  
  async update() {}
  
  async updateWithTx() {}
  
  ...
}

これがほぼすべてのドメインで使うService、Repositoryに渡って考えると、該当箇所のコード量が単純に倍近く増えていきます。

なぜ TransactionClientを引き回す仕組みになっているのか

PrismaClientのアーキテクチャ

前提として、PrismaClientは大きく分けると以下の3つのコンポーネントに分かれて構成されています。

Client Libraryは以下のようにNode.jsなどの実行環境内でORMのロジックを定義し、実行することができる上位レイヤーの仕組みのことです。これによってアプリケーションからPrismaClientを通じてクエリを発行することができます。

await this.prisma.posts.findMany(...);

Query EngineはRustで書かれており、バイナリファイルとしてPrismaClientに含められています。通常はnode_modules/@prisma/enginesにインストールされます。
Query EngineはDatabaseと直接接続されており、Client LibraryはこのQuery Engineに対して接続します。

Client Libraryによって実行されたthis.prisma.posts.findManyはQuery Engine内で解析され、実際にDBに対してSQLを発行します。

Query Engineの責務としては以下と記載されています。
https://www.prisma.io/docs/orm/more/under-the-hood/engines#responsibilities-of-the-query-engine

  • database connectionの管理
  • PrismaClient Library(Node.jsプロセス)からクエリを受け付ける
  • SQLクエリの生成
  • SQLクエリをdatabaseに対して実行
  • databaseからのresponseをPrisma Client Libraryに返却

なぜNode.jsプロセスで動くPrismaClientとバイナリのQuery Engineでアーキテクチャを分けたのか公式の情報を見つけることはできませんでしたが、拡張性・責務の最適化・パフォーマンスなどを考慮すると分割することが最適であったのだと予想します。

Interactive TransactionとQueryEngine

PrismaClientがInteractive Transactionを開始してQueryEngineに対してリクエストした時、QueryEngine内部ではTransaction ManagerがITX Serverを生成し、ITX Serverと接続するITX Clientをhash mapに保持します。
その際にユニークなtransaction idを生成し、ITX Clientの内部に持たせておきます。
https://github.com/prisma/prisma-engines/blob/79fb5193cf0a8fdbef536e4b4a159cad677ab1b9/query-engine/core/src/executor/interpreting_executor.rs#L157

そして、PrismaClientからtransaction内で新たなクエリリクエストが発生した場合に、Transaction Manager経由でidを使用して対象のITX Clientを特定し、ITX Serverに対してSQLを実行するように要求します。

ITX Serverはtimeoutの設定値を見て制限時間を超えていなければレスポンスを返し、超えていればtransactionをrollbackして終了します。

https://github.com/prisma/prisma-engines/pull/2619

つまり、Interactive Transactionを実行している間は、常にユニークなtransaction idを使ってITX Clientを特定する必要があるため、PrismaClientはそのtransaction idを保持しておく必要があります。

通常のWebサービスで長時間サーバーを起動させておくような使い方をする場合は、PrismaClientは1インスタンスを1アプリケーション(1プロセス)内で使い回すようなシングルトンパターンを推奨されています。
これはDB側のパフォーマンスを考慮するための措置であり、1クエリごとにDBと新規のコネクションを張っているとDB側のメモリ不足などに繋がるためです。

https://www.prisma.io/docs/orm/prisma-client/setup-and-configuration/databases-connections#prismaclient-in-long-running-applications

しかし、transactionを開始した場合はシングルトンのPrismaClientインスタンスを参照してしまうと、transactionのidが保持されていないため、上記で述べたITX Clientを特定することができません。
つまり、Interactive Transactionではtransaction idを常に保持しておく必要があるため、transaction専用のPrismaClientインスタンスを参照し続ける必要があり、ゆえにPrismaClientを引き回す必要があるのです。

await prisma.$transaction(async (prismaTransactionClient) => {
  await prismaTransactionClient.posts.findMany(...) // prismaTransactionClientはtransaction idを保持している
  await prisma.posts.findMany(...) // transaction idを保持していない
})

Prisma TransactionClientを引き回さない

ここでやっと本題なのですが、上記で述べてきた課題と引き回さざるを得ない理由から、Interactive Transaction内でtransactionとしてSQLを実行したければ、transaction idを保持しているPrismaClientを参照すればよいだけなので、そのPrismaClientをグローバルな場所に置き、そこを参照するような仕組みにすれば解決します。

単純にグローバル変数として保持してしまうと、複数リクエストで同じtransactionを利用しかねないため、リクエストごとに生成されるリクエストコンテキストを利用します。

今回は、Node.js APIのAsyncLocalStorageを用いてPrismaClientManagerというPrismaClientを管理するいたってシンプルな仕組みを作ることによって解決しました。

以下サンプルコードです。
https://github.com/uki1014/nestjs-prisma-client-manager

AsyncLocalStorageは、他言語におけるスレッドローカルストレージのようなもので、類似のライブラリだとRubyのsteveklabnik/request_storeなどがあります。

グローバル変数は、例えばリクエストごとにグローバル変数の値を書き換えると、後続のリクエストに影響が発生しますが、AsyncLocalStorageはそのリクエスト内でライフサイクルが完結しています。
つまり、リクエストが発生したときにストアが生成され、リクエストが終了するとストアが破棄されるような仕組みになっています。

この仕組みを用いて、あるリクエスト内においてPrismaでtransactionを発生させるとAsyncLocalStorageに対してPrismaのTransactionClientを格納し、その値を各所から参照させることでPrismaを利用するコード側ではtrasansactionであることを意識せずに済みます。
リクエストごとにストアが生成されるため、異なるリクエスト同士で間違ったPrismaClientが参照される心配もありません。

prismaClientManager.ts
import { Injectable } from '@nestjs/common';

import type { PrismaService } from './prisma-service';
import type { Prisma, PrismaClient } from '@prisma/client';
import type { ConfigService } from '~/config/config.service';
import type { RequestContext } from '~/infrastructure/framework/requestContext';

const PRISMA_TRANSACTION_CLIENT_KEY = 'prismaTransactionClient' as const;

@Injectable()
export class PrismaClientManager {
  constructor(
    private readonly prisma: PrismaClient,
    private readonly requestContext: RequestContext,
    private readonly configService: ConfigService
  ) {}

  async transaction<T>(
    fn: () => Promise<T>,
    options?: {
      maxWait?: number;
      timeout?: number;
      isolationLevel?: Prisma.TransactionIsolationLevel;
    }
  ): Promise<T> {
    try {
      const prismaTransactionClient = this.getPrismaTransactionClient();

      if (prismaTransactionClient) return await fn();

      return await this.prisma.$transaction(async (newPrismaTransactionClient) => {
        this.requestContext.set(PRISMA_TRANSACTION_CLIENT_KEY, newPrismaTransactionClient);

        const result = await fn();
        this.clearTransactionClient();
        return result;
      }, options);
    } catch (error) {
      this.clearTransactionClient();
      throw error;
    }
  }

  getClient(): Prisma.TransactionClient | PrismaService {
    const prismaTransactionClient = this.getPrismaTransactionClient();

    if (prismaTransactionClient) return prismaTransactionClient;

    return this.prisma;
  }

  private getPrismaTransactionClient() {
    return this.requestContext.get(PRISMA_TRANSACTION_CLIENT_KEY);
  }

  private clearTransactionClient() {
    this.requestContext.set(PRISMA_TRANSACTION_CLIENT_KEY, null);
  }
}
requestContext.module.ts
@Global()
@Module({
  imports: [
    ClsModule.forRoot({
      global: true,
      middleware: {
        mount: true,
        setup: (cls) => {
          cls.set('prismaTransactionClient', null);
        },
      },
    }),
  ],
  providers: [
    {
      provide: RequestContext,
      useExisting: ClsService,
    },
  ],
  exports: [RequestContext],
})
export class RequestContextModule {}

注意点

CLIから実行する場合

AsyncLocalStorageはHttp Requestに対して生成されるため、CLIや非同期Job内で利用するためには意図的にAsyncLocalStorageを生成する必要があります。

cli.ts
async main() {
  const prisma = prismaClientManager.getClient();
  await prisma.posts.findMany(...)
}

await cls.run(async () => {
  await main();
});

PrismaClient取得のタイミング

Nest.jsを用いている場合、DIによって別クラスへの依存度を減らせることがメリットではありますが、このPrismaClientManagerの仕組みを入れるとPrismaClientの取得順序に意味が発生してしまうため、利用側で気をつける必要が出てきます。
これは本来のInteractive Transactionでも同様のことが言えるため、大きな違いはありません。

const prisma = this.prismaClientManager.getClient();
const someThing = await prisma.someThing.findFirst();

await this.prismaClientManager.transaction(async () => {
  const prisma = this.prismaClientManager.getClient(); // transaction開始後はclientを取り直さなければいけない
  await prisma.hoge.findMany({
    where: {
      someThingId: someThing.id
    }
  })
})

constructor内でgetClientしてはいけない

主にNest.jsにおいての話ですが、関数が処理されるたびにAsyncLocalStorageにアクセスする必要があるため、constructorのような1度しか処理されない場所でgetClientしてしまうと、常に同じPrismaClientを参照することになってしまい、意味がなくなってしまいます。

class PostRepository {
  prisma: PrismaClient;
  
  constructor(private prismaClientManger: PrismaClientManager) {
    this.prisma = this.prismaClientManager.getClient(); // これはだめ
  }
}

jest-prismaとの統合

jestでテストを実行する際にtest caseごとにtransactionを生成し、自動でrollbackしてくれることでテストケースごとにDBが干渉しない仕組みを作ってくれるjest-prismaというライブラリがあります。

こちらは内部でPrismaClientを切り替えるロジックを保持しており、かつ常にtransactionが実行されているような仕組みになっているので、prismaClientManagerではtest環境では常にそのままPrismaClientを返すようなロジックにする必要があります。

https://github.com/Quramy/jest-prisma/blob/5ad12aca4011da5fe4d3b6e1b18fee6208700947/packages/jest-prisma-core/src/delegate.ts#L209-L220

prismaClientManager.ts
export class PrismaClientManager {

  async transaction<T>(
    fn: () => Promise<T>,
    options?: {
      maxWait?: number;
      timeout?: number;
      isolationLevel?: Prisma.TransactionIsolationLevel;
    }
  ): Promise<T> {
    // NOTE: テスト環境ではjest-prismaがPrismaClientを切り替えて提供するため、transaction clientに対する処理は加えない
    if (isTest) return await fn();

    try {
      const prismaTransactionClient = this.getPrismaTransactionClient();
    ...
  }

  getClient(): Prisma.TransactionClient | PrismaService {
    // NOTE: テスト環境ではjest-prismaがPrismaClientを切り替えて提供するため、そのまま返す
    if (isTest) return this.prisma;

    const prismaTransactionClient = this.getPrismaTransactionClient();
    ...
  }

Ref

Discussion