⏱️

Datastoreで、スケーラブルかつクエリ可能かつ即時反映されるカウンタを実装する

2022/04/20に公開

DatastoreモードのFirestoreでは、1秒に1回以上エンティティの更新が起きないよう設計することが推奨されています。公式ドキュメント

これにより、Datastoreではカウンタを持ったエンティティを設計するのが難しくなっています。
例えば、mishでは動画の再生数、コメント数、ページビュー数などのカウンタが実装されていますが、かなり設計難易度が高かったので、どのように実装すべきかをまとめます。

オープンソースとして再利用可能にしてありますので、良ければ使ってみてください。リポジトリ

分散カウンタ

上記制約を避けてカウンタを実装するパターンとして、分散カウンタという設計手法があります。

const datastore = new Datastore();

const increment = async (id: string) => {
  const key = datastore.key(["Counter", `${id}-${Math.floor(Math.random() * 100)}`]);

  const transaction = datastore.transaction();
  const [entity] = await transaction.get(key);
  datastore.update({
    key,
    data: { id, count: (entity?.count ?? 0) + 1 },
  });
  await transaction.commit();
};

const getCount = async (id: string) => {
  const [distributedCounters] = await datastore.createQuery("Counter").filter("id", "=", id).run();

  return distributedCounters.reduce<number>((previous, entity) => previous + entity.count, 0);
};

この例では、一つのidに対して100個のエンティティを用意しています。これにより更新処理が分散し、1秒間に100回程度の更新に耐えることができるようになりました。

課題

この分散カウンタで充分なケースもありますが、カウンタの値でクエリしたい場合には不十分です。

const getRanking = async () => {
  const [entities] = await datastore.createQuery("Counter").order("count", { descending: true }).run();

  return entities;
};

このようなコードを書いたとしても、分散したカウンタに対してクエリを投げても無茶苦茶な値が返ってくるだけです。

分散カウンタを集計する

分散カウンタの課題を解決するためには、カウンタを集計して別のエンティティに入れる必要があります。
集計方法はいくつか考えられます。

全てのカウンタをデイリーで集計する

すぐに思いつくのが、一日の終わりにカウンタを全てクエリして集計する方法ですが問題点もすぐに思いつきます。

  • 集計の反映に最悪一日待つ必要がある
  • 集計に一日以上かかるようになると詰む

集計するIDにフラグを立てて一定時間おきに集計する

更新があったIDに対して、Memcached等のKVSを利用してフラグを立てておき、数分おきにフラグが立っているIDに対してのみ集計を回していく方法が考えられますが、こちらにも問題があります。

  • フラグが立つと同時に集計が走ってしまった場合、Datastoreの更新結果がクエリに反映されない可能性がある
    • トランザクションを使えばある程度回避できるが、トランザクションの衝突が多発する
  • スケーラビリティが売りのDatastoreを使っているのに、KVSのスケーラビリティを気にする必要が出てくる
  • 反映に数分待つ必要があり、即時反映とは言えない

ではどうするか

これらの課題を解決するために作ったのがこちらです。
このリポジトリでは、以下のような動きを実装しています。

  1. カウンタをincrementする際に、同時にCloudTasks上にタスクを作る
  2. CloudTasksが数秒後に集計するためのサービスを呼び出す
  3. 呼び出されたサービスが、CloudTasksから受け取ったkeyを元に集計を行う

以上の実装で、数秒の遅延で反映される集計できるようになっています。
これではincrementするたびにタスクが作られてしまいそうですが、1の際に実行時間付きでDatastore内にフラグを立てており、タスクが実行されるまでの間はタスクを作らないようになっています。この作らないようにする時間は、集計がスキップされてしまわないようにマージンをとっています。

また、CloudTasksは1秒あたり500回までしかタスクを作れませんが、余計なタスクを作っていないので1秒あたり2500個のIDに対してincrementが行えます。また、これで足りなくなってしまった場合にもキューを増やして分散させることでスケールアウトすることが可能です。

備考

  • このライブラリはmishのプロダクションで運用されており、今の所問題なく動いています。PR大歓迎です。
  • Googleがスタートアップ向けに開催してくれているテクニカルセッションで相談してみましたが、やはり他にあまり適切な方法はなさそうとのことでした。
  • 現状、nodeのDatastoreクライアントライブラリではexcludeFromIndexesを取ることができません。このためバージョン0.0.6では集計が走ると全てのプロパティがインデックスされてしまうという不具合がありました。バージョン0.0.7では、クライアントライブラリをforkし、独自実装をすることでなんとかしています。Issueを立てましたが反応がありません…。 もし良ければ僕のコメントに🚀でも押しておいてください。
  • Datastoreは裏で動いているインスタンスを意識しなくてよいレベルのスケーラビリティで、スケールしないことはできないようになっているという設計がとても気に入っているのですが、この記事の通り更新頻度の高いエンティティの実装が大変すぎるのはネックです。せめて公式でベストプラクティスを用意してくれないものか。

Discussion