Cloud Firestoreで多量のデータをバッチ処理するときのスマートな書き方

3 min読了の目安(約3500字TECH技術記事
Likes18

バッチ処理

FirebaseのCloud Functions、或いはローカルに書いたスクリプトを使って多量のデータをバッチ処理でCloud FirestoreのDBに書き込みたいケースが有るかと思います。

ただ、バッチ処理の制約上、一度に500件ずつしか書き込むことができないため、500件以上処理する場合には500件ずつ区切ってバッチ処理をするように考慮する必要があります。

例えば、以下の例はusersコレクションを全て取得し、新たにフィールドを追加してマイグレーションする処理になります。

import * as adimn from 'firebase-admin'

const db = admin.firestore()
const users = await db.collection('users').get())

const batch = db.batch()
users.docs.forEach(user => batch.update(user.ref, { ... })
await batch.commit()

この例の場合、500件以下であれば問題ありませんが、500件を超えてしまうと書き込み時にエラーになります。エラーになった場合、全てのドキュメントに対しての書き込みが失敗することになります。

これを解決するために、データの配列をfor文を使って処理することを試みます。

for文を使った普通の書き方

for文を使い、1つずつbatchに詰め込みつつカウントしていき、500に到達したときに書き込みを行い、batchとカウントをリセットして...を繰り返してみます。

import * as adimn from 'firebase-admin'

const db = admin.firestore()
const users = await db.collection('users').get())

const limit = 500
let count = 0
let batch = db.batch()

for (const user of users.docs) {
  batch.update(user.ref, { ... })
  count += 1
  
  if (count === limit) {
    await batch.commit()
    batch = db.batch()
    count = 0
  }
}

if (count > 0) {
  await batch.commit()
}

ちなみにforEatchを使ってしまうと、forEachの内部では非同期処理をawaitすることができないため、使いません。

一見良さそうに見えますが、あまり筋の良くない書き方に見えます...。
詰め込んだ数をカウントしないといけないことと、for文を抜けたあともbatchに詰め込んだものが残っている可能性があるのでそれを確認して必要があれば書き込む処理を書かないといけません。

もう少し綺麗にスマートに書きたい、ということで本題に移ります。

配列をchunkする

多量のデータを持つ配列を、一定数ごとに区切る(chunk)し、それを順々に処理する、というアプローチを試みます。

例えば、ある数字の配列を3つずつchunkしていくと次のようになります。

const list = [1, 2, 3, 4, 5, 6, 7]
const chunkedList = [
  [1,2,3], 
  [4,5,6], 
  [7]
]

つまり、一定数ごとに配列をchunkし、2次元配列を作り、その配列をfor文で回して処理していくというアプローチになります。
これにより予め詰め込むべき適切な数で要素を分割しているので、いくつデータをbatchに詰めたかの関心を排除することができます。

chunk関数

キーとなるchunk関数ですが、lodashパッケージに関数が導入されているため、それを活用するととても手っ取り早いです。

import * as _ from 'lodash'

const list = [1, 2, 3, 4, 5, 6, 7]
const chunkedList = _.chunk(list, 3)
console.log(chunkedList) 
// Output
//[
//  [1,2,3], 
//  [4,5,6], 
//  [7]
//]

自作する場合

lodashに依存したくない...という場合は以下の関数を任意の場所に定義します。

export const chunk = <T extends any[]>(array: T, size: number): T[] =>
  array.reduce(
    (newarr, _, i) =>
      i % size ? newarr : [ ...newarr, array.slice(i, i + size)],
    []
  )

全体像

このchunk関数を使って、先程の処理を書き換えてみます。

import * as adimn from 'firebase-admin'

const db = admin.firestore()
const users = await db.collection('users').get())

const limit = 500

for(const chunkedUsers of chunk(users.docs, limit)) {
  const batch = db.batch()
  chunkedUsers.forEach(user => batch.update(user.ref, { ... })
  await batch.commit()
}

先に紹介した方法より、よりスマートにかけたと思います。
ぜひ活用してください 🐈

注意点

バッチ処理に関してですが、基本的には一度に500件まで書き込めますが、FieldValue.serverTimestamp(),FieldValue.increment(),FieldValue.arrayUnion()が含まれる場合は、その数だけ書き込みできる件数が減少するので注意ください。
つまりこれらのフィールド変換系の値が含まれる場合は、追加オペレーションとみなされ、相対的に書き込みできるドキュメントの数が減少します。
その場合は以下のようにchunkする数を調整しましょう。

const limit = 250

for(const chunkedUsers of chunk(users.docs, limit)) {
  const batch = db.batch()
  chunkedUsers.forEach(user => batch.update(user.ref, { 
    foo: FieldValue.increment(1) 
  })
  await batch.commit()
}

詳しくは以下のドキュメントに記載があります。

最後に

もし記事が役に立った!参考になった!という場合はLikeしてもらえると嬉しいです 🙌
今後の技術ネタの投稿のモチベーションになります。
サポートもお待ちしています 🐈

元ネタ

少し前に投稿したツイートを記事化しました。