Supabaseのレプリケーションを無料で実現する【スクレイピング用途向け】
擬似的なレプリケーションをNode.jsで実行する例を公開しました。
Supabaseを使った開発でも、スクレイピング等で収集したデータを加工して配信するという形のサービスを運営したい方も多いのではないでしょうか。本番DBにそのままスクレイピング結果を挿入しても良いのですが、 Input/Output が大量に発生してしまうので、余計な負荷がDBにかかってしまいます。
ユーザーに関係ない処理は裏側でひっそりと行いたいところです。かと言って、Cronなどでスケジューリングして実行するのもリアルタイム感が薄くて微妙です。鮮度の高い情報を配信したいですよね。
そこで、本番DBの他に「スクレイピング/処理用のDB」を用意することにしました。このDBにデータを溜め込んでいき、データ加工が完了した後に本番DBに丸々コピーすれば余計な負荷を減らすことができそうです。(Supabaseでは2つのインスタンスが無料!)
しかし、ここで問題になるのが Supabaseではレプリケーションが有料機能である という点です。個人開発者としては節約して実現したいですよね!実現してやりましょう!
【無料で実現するアイデア】擬似的なレプリケーション
そこで考えたのが「擬似的なレプリケーション」です。
下記のような流れで処理を行います。

- プライマリDBをNode.jsでLISTENしておく。
 - スクレイピング処理/データ加工をしてデータを挿入する。
 - 同期したいタイミングで 
NOTIFYクエリを実行する。 - Node.jsが自動的に反応してプライマリDBからデータ取得を開始。
 - コピー先であるセカンダリDBに複製データを挿入する。
 
という流れです。
※ここで出てくる「セカンダリDB」は本番DBを指します。
この方法の良い点としては、
- リアルタイム感を損なわずに任意のタイミングで同期を実行できる。
 - Webサーバー化してない自宅PCでも処理が可能になる。ネットに繋がっていればどんなマシンも同期サーバーとして稼働できる。
 - Webhookのように外部に公開しなくても動いてくれるので割と安全である。
 
感覚としてはWebhookに似てるが、Webhookよりも場所を選ばずにホストできるという感じですね!NOTIFYは鞭を叩く作業みたいなもんですね!
とりあえずやってみよう!
Node.jsでの疑似レプリケーション処理を作る
npm install knex pg @types/pg
knex と pg パッケージをインストールしておきましょう。
knexはsupabase-jsのような書き心地でSQLを実行できる便利なライブラリです。
import knex from "knex";
import { Notification } from "pg";
// ここに格納されたデータをコピーする
const primaryClient = knex({
  client: "pg",
  connection: {
    host: "【ここにホスト名】",
    database: "【ここにデータベース名】",
    port: 5432, // notificationが有効なポート番号
    user: "【ここにDBユーザー名】",
    password: "【ここにDBパスワード】",
  },
});
// このDBにデータをコピーする想定
const productionClient = knex({
  client: "pg",
  connection: {
      host: "【ここにホスト名】",
      database: "【ここにデータベース名】",
      port: 6543, // 普通のSQLクエリを実行する際のポート番号
      user: "【ここにDBユーザー名】",
      password: "【ここにDBパスワード】",
  },
});
(async () => {
  const connection = await primaryClient.client.acquireConnection();
  connection.query("LISTEN hoge_channel");
  connection.on("notification", async (msg: Notification) => {
    const { payload } = msg;
    if (!payload) return;
    console.log(payload);
    if (payload === "keep-alive") return;
    // + で繋ぐことで複数テーブルの同期を実行できます。
    const tableNames = payload.split("+");
    for (let key in tableNames) {
      await replicateDB(tableNames[key]);
    }
  });
  // LISTENが死んでしまうようなので3分毎に接続維持を実行する
  setInterval(async () => {
    connection.query("NOTIFY hoge_channel, 'keep-alive';");
  }, 3 * 60 * 1000);
})();
async function replicateDB(tableName: string) {
  await replicateUpsert(tableName);
  await replicateDelete(tableName);
}
async function replicateUpsert(tableName: string) {
  const originalData: any = await primaryClient(tableName).returning("*").where("replicate_status", "pending").update({
    replicate_status: "running",
  });
  if (originalData.length > 0) {
    // 同期先で replicate_status カラムが不要なので捨てる
    const filteredRows = originalData.map((row: any) => {
      const { replicate_status, ...rest } = row;
      return rest;
    });
    // replicate_status変更用にidを取り出す
    const targetRowIds = originalData.map((row: any) => {
      const { id } = row;
      return id;
    });
    await productionClient(tableName).insert(filteredRows).onConflict("id").merge();
    await primaryClient(tableName).whereIn("id", targetRowIds).update({
      replicate_status: "done",
    });
  }
}
async function replicateDelete(tableName: string) {
  const originalData: any = await primaryClient(tableName).returning("*").where("replicate_status", "delete").update({
    replicate_status: "running",
  });
  if (originalData.length > 0) {
    const targetRowIds = originalData.map((row: any) => {
      const { id } = row;
      return id;
    });
    await Promise.all([productionClient(tableName).whereIn("id", targetRowIds).delete(), primaryClient(tableName).whereIn("id", targetRowIds).delete()]);
  }
}
ポイント
- 
connection.query("LISTEN hoge_channel");でhone_channelをLISTENしています。hoge_channelにNOTIFYが飛んでくると、このNode.jsの「notification」の箇所が実行されるようになります。 - 
replicate_statusというカラムで同期状態を管理しています。pending(未実行)running(実行中)delete(削除)done(完了)の4種類で管理します。 - 
deleteは論理削除をするようにしましょう。NOTIFYのタイミングで削除を実行します。これでプライマリDBと本番DBどちらもデータが削除できます。 - 
setIntervalで3分ごとにLISTENの接続維持を行っています。 
PostgreSQLでテーブルを作ろう
今回は下記のテーブルを作ります。
create table
  public.products (
    id bigint generated by default as identity,
    public_id uuid not null default gen_random_uuid (),
    name text not null,
    created_at timestamp with time zone not null default now(),
    updated_at timestamp with time zone not null default now(),
    constraint products_pkey primary key (id),
    constraint products_public_id_key unique (public_id)
  ) tablespace pg_default;
これをプライマリDBとセカンダリDBに作ります。
プライマリDBの方だけはレプリケーションの状態を記録しておきたいので、 replicate_status カラムを text 型で用意しておきましょう。( replicate_status text not null default 'pending'::text )
プライマリDBの方にテストとして適当なデータを入れておいてください。replicate_statusはpendingとしておきましょう。
実際に同期されるか確認してみよう!
Node.jsで先程のコードを実行しておきます。
実行したら、プライマリDBのSQLエディタで NOTIFY hoge_channel, '同期します!'; クエリを実行してみましょう。
すると、Node.jsのコンソールには「NOTIFYされました!」が出力され、プライマリDBのデータがセカンダリDBに同じものが入っていることが確認できるはずです!
supabase-jsからNOTIFYを実行できるようにPostgreSQL関数を自作しよう
CREATE OR REPLACE FUNCTION public.replicate(names TEXT)
  RETURNS void
  LANGUAGE plpgsql
  SECURITY DEFINER
  SET search_path TO 'public'
AS $function$
BEGIN
  PERFORM pg_notify('hoge_channel', $1);
END
$function$;
これがPostgreSQL自作関数です。
names引数に実行したいテーブル名を指定するだけです。
.rpc("replicate", { names: "products" }) とすれば products テーブルの同期が実行されます。
外部キー制約が使われているデータは、NOTIFYの順番を意識する必要があります。 例えばproductsとproduct_variationsテーブルがあるとして、product_variationsの中にproductsのidが絡む場合は、 .rpc("replicate", { names: "products+product_variations" }) とすると、 productsテーブルの同期処理が終わってから product_variations テーブルの同期処理を行ってくれます。
さいごに
使用用途は限られるものの、リアルタイム機能のちょっとおもしろい活用方法をご紹介しました。意地でも無料で使い倒したるという強い意思でこんな方法を考えてみました笑
ちなみにトリガーでNOTIFYを自動実行するという方法もありますが、大量データを挿入すると、大量に実行されて遅くなる可能性があるのでオススメできません。(500行扱うと500回実行される。)なので、まとめて実行できる本記事のように実行したいタイミングでNOTIFYするべきでしょう。
MySQLをデータ保管先としても使える!
この疑似レプリケーションの良いところは、MySQLをデータ保管先としても利用できる点です。knex経由でレプリケーションするため、MySQLかPostgreSQLかそれほど意識する必要がありません。安価なDBを見つけて保管・処理用DBとして酷使するのもアリでしょう!(年間で40GBぐらいデータ溜まりそうなシステムを超安価に作れそうか個人的に検討しています。)
Discussion