🔥

Firebaseを使った大量ドキュメントの処理実現について

2021/12/18に公開

この記事は Goodpatch Advent Calendar の18日目の記事です。

はじめに

Goodpatch で自社プロダクトを開発している石井です。
今回は Goodpatch が提供しているオンラインホワイトボード「Strap」のボード複製機能について、 Firebase を利用してどのように実現したのか記載しています。
StrapのバックエンドにFirebaseを選定した理由に関しては、弊チームのバックエンドエンジニアである、やっはーさんの記事を御覧ください。
https://goodpatch-tech.hatenablog.com/entry/2021/12/13/101720

ボード複製機能の要件と実現方法

Strap では無限に広がるボードにエレメントと呼ばれる図形やテキスト、画像などを配置・編集する使い方が基本的です。そのため Firestore 上ではboards の subcollection に elements がくる構造をしています。

ボード複製の機能要件としては下記がありました。

  1. 大量のデータを処理する
  2. 複製中にユーザーの操作を阻害しないようにする
  3. 複製の状態を適切にユーザーに通知する

上記の要件をどのように実現したのか、またそこで出てきた技術的な課題をどのように対応したのかについてお伝えします。

大量のデータを処理する

Strap では大きめの処理やバックエンドで処理したほうが良いものに関しては、CloudFunctions を利用しています。
今回のボード複製機能に関しては、認可やリクエスト検証と実際の複製処理との責務を分離するために1つのエンドポイントで実装せず、複製処理自体はPub/Sub トリガを使用して実現する設計にしました。
https://cloud.google.com/functions/docs/calling/pubsub
そのような構成により、複製処理自体をサブスクライバで受けれるようになるので関数から関数へ処理を移譲することができます。

バックエンド側の構成図

また関数内部の処理に関しては、 Firebase から大量のドキュメントを処理するために提供されている bulkWriter を使用しています。
https://googleapis.dev/nodejs/firestore/latest/BulkWriter.html
Batch 処理だと同時に処理できるドキュメント数の上限が 500 件ですが、bulkWriter の場合は上限がないため大量のドキュメントを処理する場合に重宝しています。

コードで表現すると下記のようになります

functions
import * as functions from 'firebase-functions';
import { EventContext } from 'firebase-functions';
import { Message } from 'firebase-functions/lib/providers/pubsub';
import { FUNCTION_REGION } from '../../utils/commons';
import { DUPLICATE_BOARD_TOPIC } from '../../constants/pubsubTopics';
import type { DuplicateBoardMessageData } from '../../types';

// Pub/Sub Trigger
export const PubSubDuplicateBoard = functions
  .region(FUNCTION_REGION)
  .runWith(options)
  .pubsub.topic(DUPLICATE_BOARD_TOPIC) // トピックはパブリッシュ側と共通のものを使用する
  .onPublish(async (message: Message, context: EventContext): Promise<void> => {
    // パブリッシュ側から送信されたメッセージデータをJSONに変換
    const messageDataBuffer = Buffer.from(message.data, 'base64').toString();
    const messageData: DuplicateBoardMessageData =
      JSON.parse(messageDataBuffer);
      
    const bulkWriter = admin.firestore().bulkWriter();
    
    // bulkWriterを使用して実際の処理をする
    const elementsSnapshot = await firestore
      .collection('boards')
      .doc(boardId)
      .collection('elements')
      .get();

    for (const doc of elementsSnapshot.docs) {
      // 実際の複製処理
      bulkWriter.set(data);
    }

    // このタイミングでbulkの処理が実行される
    await bulkWriter.close();
  });

Pub/Sub トリガ を使用することにより、複製処理自体をバックグラウンド関数内で完結させることができるようになりました。

CloudFunctions のランタイムのタイムアウトに対する対応について

Pub/Subトリガを使用した構成で進めていく上で、ランタイムのタイムアウト問題の課題があります。CloudFunctions を使用する際の制約として、一つの関数を動かし続ける時間の上限があります。現時点での CloudFunctions の関数の最大実行時間は約 9 分です。
https://cloud.google.com/functions/quotas#time_limits
ボード複製機能に関しては対象のデータ数が処理時間に直接影響するため、CloudFunctions 側のタイムアウト時間を最大に伸ばしています。

設定自体は runWith メソッドで指定できます。
functions
import { RuntimeOptions } from 'firebase-functions';

const options: RuntimeOptions = {
  timeoutSeconds: 540,
};

export const SampleEndpoint = functions.runWith(options).pubsub.topic(...).onPublish()

また関数の最大実行時間を超過すると関数自体が強制終了してしまうため、それ以上の処理が継続できません。
そのためアプリケーション側の対応として、内部処理で擬似的にタイムアウト状態を作り出してエラー処理をする対応をしています。

functions
const DUPLICATION_TIMEOUT = 480000;

export const PubSubDuplicateBoard = functions
  .region(FUNCTION_REGION)
  .runWith(options)
  .pubsub.topic(DUPLICATE_BOARD_TOPIC)
  .onPublish(async (message: Message, context: EventContext): Promise<void> => {
    // 内部処理で擬似的にタイムアウトエラーを作り出す
    const timerId = setTimeout(() => {
      // 失敗状態のデータ更新
      throw Error('duplication timeout');
    }, DUPLICATION_TIMEOUT);

    try {
      // 実際の複製処理
    } catch (e) {
      // エラーハンドリング
    } finally {
      clearTimeout(timerId);
    }
  });

このような対応により、アプリケーション側で失敗状態を制御できるようになりました。

複製中にユーザーの操作を阻害しないようにする

ボード複製機能要件と実際の複製処理と認可やリクエストの検証の責務を分離する目的を達成するために、HTTPS Callable Functions からPub/Subトリガにトピックをパブリッシュする構成にしました。
https://firebase.google.com/docs/functions/callable?hl=ja

コードとしては、HTTPS Callable Functions からこのようにパブリッシュしています。

functions
// Callable Functions
export const duplicateBoard = functions
  .region(FUNCTION_REGION)
  .https.onCall(
    async (
      data: DuplicateBoardRequest,
      context: CallableContext,
    ): Promise<DuplicateBoardResponse> => {
      // 実行者の認証や認可
      // リクエストの検証

      // Pub/Sub Topicをパブリッシュする
      const pubSubClient = new PubSub();
      // Pub/Sub Triggerの関数に渡したいデータをオブジェクトの形で作成
      const publishData: DuplicateBoardMessageData = {
        // ボードの情報やユーザー情報など
      };

      try {
        const messageId = await pubSubClient
          .topic(DUPLICATE_BOARD_TOPIC) // サブスクライバーで受けるトピックと共通のものを使用
          .publishJSON(publishData);
        console.log(`Message ${messageId} published.`);
      } catch (e) {
        // エラー処理
      }
    },
  );

// Pub/Sub Trigger
export const PubSubDuplicateBoard = functions
  .region(FUNCTION_REGION)
  .runWith(options)
  .pubsub.topic(DUPLICATE_BOARD_TOPIC)
  .onPublish(async (message: Message, context: EventContext): Promise<void> => {
    // Callableから送信したデータをこちらでJSONに変換
    const messageDataBuffer = Buffer.from(message.data, 'base64').toString();
    const messageData: DuplicateBoardMessageData =
      JSON.parse(messageDataBuffer);

    // 実際の複製処理
  });

そうすることにより、複製を実行する認可とリクエストの正当性が担保されれば、ユーザーにレスポンスを早めに返却し、実処理でユーザーをブロッキングしないように機能を実装できます。

複製の状態を適切にユーザーに通知する

複製処理自体をバックグラウンド関数で実行する構成の課題として、バックグラウンド関数で動いている処理の状態をクライアント側が知る術がないというものがあります。
今回は「複製の完了をユーザーに通知する」という機能要件だけでなく、複製の失敗状態の定義、複製状態に応じたクライアント側でのインタラクションも実現する必要があったため、対策を講じる必要がありました。

今回は、board ドキュメントに複製の状態をもたせる方針で実現しました。

ドキュメントの状態なども含めた構成図

型としては下記のようになっています。

type Board = {
  name: string;
  // こちらで複製状態を定義
  duplicateStatus: 'completed' | 'failed' | 'processing';
  duplicateFrom: string;
  ...
}

これを定義することにより失敗状態の定義ができ、処理系以外の要因で処理が終了し、duplicateStatus が変わらなかった場合でもクライアント側で複製に失敗したボードをハンドリングできるようになりました。
またこのような失敗状態にあるボードはScheduled Functionsを用いて定期的に削除をしています。

クライアント側では、複製の通知用にリアルタイムリスナを作成し通知表現を実現しています。

frontend
syncOnce = async (boardId: Board['id']): Promise<void> => {
  const workspaceRef = this.db.collection('workspaces').doc(workspaceId);
  this.unsubscribers[boardId] = workspaceRef
    .collection('boards')
    .doc(boardId)
    .onSnapshot((snapshot) => {
      const data = snapshot.data() as BoardData;
      const { duplicateStatus } = data;

      // 複製状態自体がなければunsubscribeする
      if (!duplicateStatus) {
        this.unsync(snapshot.id);
        return;
      }

      // ステータスに更新があった場合に通知をする
      if (duplicateStatus !== DUPLICATE_STATUS.PROCESSING) {
        this.notifyDuplicationStatus(snapshot.id, data);
        this.unsync(snapshot.id);
      }
    });
};

unsync = (boardId: Board['id']) => {
  const unsubscribe = this.unsubscribers[boardId];
  if (!unsubscribe) return;

  unsubscribe();
};

この実装方針により、複製状態を正確にユーザーに通知できるだけでなく、複製に失敗したボードをユーザーに表示しないようにするための制御もすることができました。

まとめ

  • Firestoreを使用して大量のドキュメントを処理するためにPub/Subトリガで処理をする構成にし、内部処理では bulkWriter を使用した
  • ユーザーからのリクエストは HTTPS Callable Functionsを呼び出し、実際の処理はバックグラウンドで行う構成にした
  • 複製の状態をFirestoreのドキュメントのプロパティで表現することにより、バックグランド関数の更新がクライアント側でも把握できるようになった

おわりに

Strapの中でPub/Sub トリガ を使用して実現した機能が初めてだったので、このような構成の土台を作ることができ、未来につながる実装だったのではないかと思っています。
また処理に時間の掛かる可能性があるからこそ、ユーザーへの通知は丁寧に行いたかったので、デザイナーと共創して通知機能の表現や複製中の状態表現などを議論しながら進められて個人的にもよい経験になりました。

Strapを開発/運用するGoodpatchのStrapチームでは世界を前進させるプロダクトを一緒に創っていける仲間を募集しています!

Goodpatchでもデザイン好きなエンジニアの仲間を募集しています。

少しでもご興味を持たれた方は、ぜひ一度カジュアルにお話ししましょう!

Discussion