🧱

Prisma で作る ActiveRecord ライクな in_batches: 大量データを効率的に処理しよう

に公開

Ruby on Rails の ORM である ActiveRecord には、in_batches という便利なメソッドがあります。
これは大量のレコードを一度にメモリにロードすることなく、 ほどよい塊 で少しずつ処理するためのメソッドです。

// 1000件ずつ読み込んで処理
User.in_batches(of: 1000) do |batch|
  batch.each do |user|
    # 各ユーザーに対する処理をここに記述
  end
end

もし User.all.each のように書いてしまうと、大量のレコードを読み込んだ場合にメモリを無駄に消費してしまい、パフォーマンスに影響を与えかねません。
in_batches はこうした問題を解決することができる一つの手段です。

では Node.js において人気のある Prisma で同じ体験を得るにはどうすればよいでしょうか?

Prisma には ActiveRecord の in_batches に相当する組み込みのメソッドは残念ながら無いようですが、ストリームAPIを使って大量レコードを処理する方法が提案されていました。
https://github.com/prisma/prisma/issues/5055

やっぱり課題に感じたことは誰かが先に実現していますね!

・・・

で終わってしまうにはもったいないので、今回は別のアプローチ(ジェネレーター)を活用して直感的に使える ActiveRecord ライクな inBatches を作ってみたいと思います。

✨ ジェネレーターとは

ジェネレーター関数は通常の関数とは少し異なり、処理を途中で一時停止・あとから再開できる関数です。
なお、Ruby では Enumerator が同様の役割を果たします。

特徴

  • 関数は function* で定義します(非同期なら async function*
  • yield キーワードを使い、その都度値を返します(遅延評価)。
  • 呼び出し側は next() メソッドで次の値を取得できます。

簡単な例を見てみましょう。

function* counter() {
  let i = 0
  while (true) {
    yield i
    
    i++
  }
}


const it = counter()
it.next().value // 0
it.next().value // 1
it.next().value // 2

counter を呼ぶと一見無限ループしそそうですが、ここではジェネレーターオブジェクトが返却されます。
next() が呼ばれるたびに yield まで処理が進み、値を返して一旦停止します。
この仕組みにより、「必要なときに必要な分だけ」値を取得できるわけです。

今回はジェネレーターの非同期版を使って、データベースから少しずつレコードを取得する inBatches を実装します。
なお、for of ループで回すことで next() を呼び出すことができますが、上記の例で使うと無限ループしてしまうので注意してください。

🎯 今回のゴール

簡易版 となる inBatches メソッドを Prisma Client に実装します。

  • 主キーは id (int型) とし、id の昇順で行ったうえでページングする
  • prisma.user.inBatches のようにどのモデルからでも呼び出せるようにする
  • for await...of でイテレートできる
  • inBatches が返すオブジェクトは ActiveRecord::Relation のようなオブジェクトではなく、シンプルな配列(User[]のような)とする

🔧 Prisma Client を拡張して実装

Prisma には defineExtension という仕組みがあり、これを使うことでモデルに独自のメソッドを追加できます。

// findMany の引数・結果の型を導出するためのユーティリティ
type FindManyArgs<T> = Prisma.Args<T, 'findMany'>
type FindManyResult<T, A extends FindManyArgs<T>> = Prisma.Result<T, A, 'findMany'>

const inBatchesExtension = Prisma.defineExtension({
  model: {
    // すべてのモデルに inBatches メソッドを追加する
    $allModels: {
      async *inBatches<T>(
        this: T,
        args: FindManyArgs<T>,
      ): AsyncGenerator<FindManyResult<T, typeof args>> {
        const batchSize = args.take ?? 1000
        let cursorId: number | undefined = undefined

        while (true) {
          // Prisma の cursor でページングする
          const records = await (this as any).findMany({
            ...args,
            take: batchSize,
            ...(cursorId ? { cursor: { id: cursorId }, skip: cursorId ? 1 : 0 } : {}),
            orderBy: { id: 'asc' },
          } as FindManyArgs<T>) as unknown as FindManyResult<T, typeof args>

          if (records.length === 0) {
            break
          }

          // ここで一時停止し、呼び出し元に records を返す
          yield records

          if (records.length < batchSize) {
            break
          }

          const lastRecord = records[records.length - 1]

          if (lastRecord && 'id' in lastRecord && typeof lastRecord.id === 'number') {
            cursorId = lastRecord.id
          } else {
            throw new Error('不正なデータです')
          }
        }
      },
    },
  },
})

const prismaClient = new PrismaClient({
  // 接続先設定やロギングの設定など
})

export const prisma = prismaClient.$extends(inBatchesExtension)

$allModels を使うことで、すべてのモデルに対して inBatches が利用可能になります。
簡略版ではあるものの、ActiveRecord の in_batches の処理と基本的な流れは同じになっています。

また、このメソッドを利用する場合は findMany と同じ引数を渡せるようになっているのもポイントです(もうちょっと上手い型付けができそうですが 😅)。

使い方

import { prisma } from './prisma'

async function main() {
  for await (const userBatch of prisma.user.inBatches({
    // 条件は既存の findMany と同じように書ける
    where: { createdAt: { gte: new Date('2025-01-01') } },
    take: 10,
  })) {
    console.log(userBatch.length)
  }
}

main()

batchSize ごとに分割されたレコードを順次処理できるようになり、効率的に大量のレコードを扱うことができます。
また、主キーである id を基準にページングしているため、安定して動作します。
(他のカラムでも良いですが一意なキーの方が望ましいです)

今回はジェネレーター関数を使いましたが、コールバック関数を受け取る形でも実装できます。
コールバックで書く場合はネストが深くなってしまうので、個人的には今回の形の方が好みです。

🔭 今後に向けて

簡易版として ActiveRecord の in_batches を再現できました。
id でしかソートできない、ソート順も昇順のみという制約はありますが、今回のコードを改善すればより汎用的に使えるようになるでしょう。

また、 ActiveRecord には find_each という1レコードずつ処理するメソッドも用意されています。
こちらも inBatches を内部で使うことで簡単に実装できそうですね。

スペースマーケットでは NestJS + Prisma を採用してバックエンド開発を行っていますので、
今後は NestJS に組み込める形で extension を提供し、バッチ処理を書いていく予定です。

では、よき Prisma ライフを! ✋️

スペースマーケット Engineer Blog

Discussion