Prisma で作る ActiveRecord ライクな in_batches: 大量データを効率的に処理しよう
Ruby on Rails の ORM である ActiveRecord には、in_batches
という便利なメソッドがあります。
これは大量のレコードを一度にメモリにロードすることなく、 ほどよい塊 で少しずつ処理するためのメソッドです。
// 1000件ずつ読み込んで処理
User.in_batches(of: 1000) do |batch|
batch.each do |user|
# 各ユーザーに対する処理をここに記述
end
end
もし User.all.each
のように書いてしまうと、大量のレコードを読み込んだ場合にメモリを無駄に消費してしまい、パフォーマンスに影響を与えかねません。
in_batches はこうした問題を解決することができる一つの手段です。
では Node.js において人気のある Prisma で同じ体験を得るにはどうすればよいでしょうか?
Prisma には ActiveRecord の in_batches に相当する組み込みのメソッドは残念ながら無いようですが、ストリームAPIを使って大量レコードを処理する方法が提案されていました。
やっぱり課題に感じたことは誰かが先に実現していますね!
・・・
で終わってしまうにはもったいないので、今回は別のアプローチ(ジェネレーター)を活用して直感的に使える ActiveRecord ライクな inBatches
を作ってみたいと思います。
✨ ジェネレーターとは
ジェネレーター関数は通常の関数とは少し異なり、処理を途中で一時停止・あとから再開できる関数です。
なお、Ruby では Enumerator が同様の役割を果たします。
特徴
- 関数は
function*
で定義します(非同期ならasync function*
) -
yield
キーワードを使い、その都度値を返します(遅延評価)。 - 呼び出し側は
next()
メソッドで次の値を取得できます。
簡単な例を見てみましょう。
function* counter() {
let i = 0
while (true) {
yield i
i++
}
}
const it = counter()
it.next().value // 0
it.next().value // 1
it.next().value // 2
counter
を呼ぶと一見無限ループしそそうですが、ここではジェネレーターオブジェクトが返却されます。
next()
が呼ばれるたびに yield
まで処理が進み、値を返して一旦停止します。
この仕組みにより、「必要なときに必要な分だけ」値を取得できるわけです。
今回はジェネレーターの非同期版を使って、データベースから少しずつレコードを取得する inBatches
を実装します。
なお、for of ループで回すことで next()
を呼び出すことができますが、上記の例で使うと無限ループしてしまうので注意してください。
🎯 今回のゴール
簡易版 となる inBatches
メソッドを Prisma Client に実装します。
- 主キーは id (int型) とし、id の昇順で行ったうえでページングする
-
prisma.user.inBatches
のようにどのモデルからでも呼び出せるようにする -
for await...of
でイテレートできる -
inBatches
が返すオブジェクトは ActiveRecord::Relation のようなオブジェクトではなく、シンプルな配列(User[]
のような)とする
🔧 Prisma Client を拡張して実装
Prisma には defineExtension という仕組みがあり、これを使うことでモデルに独自のメソッドを追加できます。
// findMany の引数・結果の型を導出するためのユーティリティ
type FindManyArgs<T> = Prisma.Args<T, 'findMany'>
type FindManyResult<T, A extends FindManyArgs<T>> = Prisma.Result<T, A, 'findMany'>
const inBatchesExtension = Prisma.defineExtension({
model: {
// すべてのモデルに inBatches メソッドを追加する
$allModels: {
async *inBatches<T>(
this: T,
args: FindManyArgs<T>,
): AsyncGenerator<FindManyResult<T, typeof args>> {
const batchSize = args.take ?? 1000
let cursorId: number | undefined = undefined
while (true) {
// Prisma の cursor でページングする
const records = await (this as any).findMany({
...args,
take: batchSize,
...(cursorId ? { cursor: { id: cursorId }, skip: cursorId ? 1 : 0 } : {}),
orderBy: { id: 'asc' },
} as FindManyArgs<T>) as unknown as FindManyResult<T, typeof args>
if (records.length === 0) {
break
}
// ここで一時停止し、呼び出し元に records を返す
yield records
if (records.length < batchSize) {
break
}
const lastRecord = records[records.length - 1]
if (lastRecord && 'id' in lastRecord && typeof lastRecord.id === 'number') {
cursorId = lastRecord.id
} else {
throw new Error('不正なデータです')
}
}
},
},
},
})
const prismaClient = new PrismaClient({
// 接続先設定やロギングの設定など
})
export const prisma = prismaClient.$extends(inBatchesExtension)
$allModels
を使うことで、すべてのモデルに対して inBatches
が利用可能になります。
簡略版ではあるものの、ActiveRecord の in_batches の処理と基本的な流れは同じになっています。
また、このメソッドを利用する場合は findMany と同じ引数を渡せるようになっているのもポイントです(もうちょっと上手い型付けができそうですが 😅)。
使い方
import { prisma } from './prisma'
async function main() {
for await (const userBatch of prisma.user.inBatches({
// 条件は既存の findMany と同じように書ける
where: { createdAt: { gte: new Date('2025-01-01') } },
take: 10,
})) {
console.log(userBatch.length)
}
}
main()
batchSize ごとに分割されたレコードを順次処理できるようになり、効率的に大量のレコードを扱うことができます。
また、主キーである id を基準にページングしているため、安定して動作します。
(他のカラムでも良いですが一意なキーの方が望ましいです)
今回はジェネレーター関数を使いましたが、コールバック関数を受け取る形でも実装できます。
コールバックで書く場合はネストが深くなってしまうので、個人的には今回の形の方が好みです。
🔭 今後に向けて
簡易版として ActiveRecord の in_batches
を再現できました。
id でしかソートできない、ソート順も昇順のみという制約はありますが、今回のコードを改善すればより汎用的に使えるようになるでしょう。
また、 ActiveRecord には find_each という1レコードずつ処理するメソッドも用意されています。
こちらも inBatches を内部で使うことで簡単に実装できそうですね。
スペースマーケットでは NestJS + Prisma を採用してバックエンド開発を行っていますので、
今後は NestJS に組み込める形で extension を提供し、バッチ処理を書いていく予定です。
では、よき Prisma ライフを! ✋️

スペースを簡単に貸し借りできるサービス「スペースマーケット」のエンジニアによる公式ブログです。 弊社採用技術スタックはこちら -> whatweuse.dev/company/spacemarket
Discussion