Cloud Firestoreで多量のデータをバッチ処理するときのスマートな書き方
バッチ処理
FirebaseのCloud Functions、或いはローカルに書いたスクリプトを使って多量のデータをバッチ処理でCloud FirestoreのDBに書き込みたいケースが有るかと思います。
ただ、バッチ処理の制約上、一度に500件ずつしか書き込むことができないため、500件以上処理する場合には500件ずつ区切ってバッチ処理をするように考慮する必要があります。
例えば、以下の例はusers
コレクションを全て取得し、新たにフィールドを追加してマイグレーションする処理になります。
import * as adimn from 'firebase-admin'
const db = admin.firestore()
const users = await db.collection('users').get())
const batch = db.batch()
users.docs.forEach(user => batch.update(user.ref, { ... })
await batch.commit()
この例の場合、500件以下であれば問題ありませんが、500件を超えてしまうと書き込み時にエラーになります。エラーになった場合、全てのドキュメントに対しての書き込みが失敗することになります。
これを解決するために、データの配列をfor
文を使って処理することを試みます。
for
文を使った普通の書き方
for文を使い、1つずつbatchに詰め込みつつカウントしていき、500に到達したときに書き込みを行い、batchとカウントをリセットして...を繰り返してみます。
import * as adimn from 'firebase-admin'
const db = admin.firestore()
const users = await db.collection('users').get())
const limit = 500
let count = 0
let batch = db.batch()
for (const user of users.docs) {
batch.update(user.ref, { ... })
count += 1
if (count === limit) {
await batch.commit()
batch = db.batch()
count = 0
}
}
if (count > 0) {
await batch.commit()
}
ちなみにforEatch
を使ってしまうと、forEachの内部では非同期処理をawait
することができないため、使いません。
一見良さそうに見えますが、あまり筋の良くない書き方に見えます...。
詰め込んだ数をカウントしないといけないことと、for
文を抜けたあともbatch
に詰め込んだものが残っている可能性があるのでそれを確認して必要があれば書き込む処理を書かないといけません。
もう少し綺麗にスマートに書きたい、ということで本題に移ります。
配列をchunkする
多量のデータを持つ配列を、一定数ごとに区切る(chunk)し、それを順々に処理する、というアプローチを試みます。
例えば、ある数字の配列を3つずつchunkしていくと次のようになります。
const list = [1, 2, 3, 4, 5, 6, 7]
const chunkedList = [
[1,2,3],
[4,5,6],
[7]
]
つまり、一定数ごとに配列をchunkし、2次元配列を作り、その配列をfor文で回して処理していくというアプローチになります。
これにより予め詰め込むべき適切な数で要素を分割しているので、いくつデータをbatch
に詰めたかの関心を排除することができます。
chunk
関数
キーとなるchunk関数ですが、lodash
パッケージに関数が導入されているため、それを活用するととても手っ取り早いです。
import * as _ from 'lodash'
const list = [1, 2, 3, 4, 5, 6, 7]
const chunkedList = _.chunk(list, 3)
console.log(chunkedList)
// Output
//[
// [1,2,3],
// [4,5,6],
// [7]
//]
自作する場合
lodashに依存したくない...という場合は以下の関数を任意の場所に定義します。
export const chunk = <T extends any[]>(array: T, size: number): T[] =>
array.reduce(
(newarr, _, i) =>
i % size ? newarr : [ ...newarr, array.slice(i, i + size)],
[]
)
全体像
このchunk
関数を使って、先程の処理を書き換えてみます。
import * as adimn from 'firebase-admin'
const db = admin.firestore()
const users = await db.collection('users').get())
const limit = 500
for(const chunkedUsers of chunk(users.docs, limit)) {
const batch = db.batch()
chunkedUsers.forEach(user => batch.update(user.ref, { ... })
await batch.commit()
}
先に紹介した方法より、よりスマートにかけたと思います。
ぜひ活用してください 🐈
注意点
バッチ処理に関してですが、基本的には一度に500件まで書き込めますが、FieldValue.serverTimestamp()
,FieldValue.increment()
,FieldValue.arrayUnion()
が含まれる場合は、その数だけ書き込みできる件数が減少するので注意ください。
つまりこれらのフィールド変換系の値が含まれる場合は、追加オペレーションとみなされ、相対的に書き込みできるドキュメントの数が減少します。
その場合は以下のようにchunkする数を調整しましょう。
const limit = 250
for(const chunkedUsers of chunk(users.docs, limit)) {
const batch = db.batch()
chunkedUsers.forEach(user => batch.update(user.ref, {
foo: FieldValue.increment(1)
})
await batch.commit()
}
詳しくは以下のドキュメントに記載があります。
最後に
もし記事が役に立った!参考になった!という場合はLikeしてもらえると嬉しいです 🙌
今後の技術ネタの投稿のモチベーションになります。
サポートもお待ちしています 🐈
元ネタ
少し前に投稿したツイートを記事化しました。
Discussion