Open6

Supabaseで無料でレプリケーションしたい【スクレイピングデータの同期目的】

masa5714masa5714

収集したデータをいい感じに処理してブラウザで表示するサイトを開発中。
スクレイピングでデータ収集しているのだが、処理前のデータをDBに保存している。

本番DBにそのままスクレイピング結果を入れてしまっても良いのだが、
現実的には I/O が頻発するため、余計な負荷をかけてしまう。
ユーザーに直接関係ない処理は裏側でひっそりと行いたい。

そこで、本番DBの他に "スクレイピング用のDB" を用意することにした。
本番サイトで利用するデータだけを格納したテーブルを対象にレプリケーションすれば、本番DBに余計な負荷をかけなくて済むだろう。

ということで、

  • スクレイピング用DBをプライマリサーバーとし、
  • 本番DBを受信側(サブスクリプション)としたい。
masa5714masa5714

レプリケーション?パブリケーション?

ちなみに、SupabaseはPostgreSQLが使われているが、PostgreSQLではレプリケーションではなく「パブリケーション」という呼び方をするらしい。

masa5714masa5714

Supabaseでは CREATE SUBSCRIPTIONが使えない

Supabaseではセキュリティの観点から superuser での実行が許可されていない。
そのため、 CREATE SUBSCRIPTION が使えない。つまり、パブリケーションはプライマリサーバーとしての動きのみ可能で、受信側をすることはできない模様。

WALファイル?とかいうやつも取得できないらしい。

▼参考
https://supabase.com/docs/guides/database/postgres/roles-superuser

ということで、他の方法を考えてみることにする。

masa5714masa5714

Node.jsでLISTENして擬似的にレプリケーションを実現する

恐らくこの方法が現実的な方法だと思われる。
擬似的なレプリケーションのため、データの抜け落ちの懸念があるが、スクレイピングでデータ更新をする程度の利用を想定しているので大きな問題は無いと判断した。

処理の流れとしては以下の通りだ。

  1. Node.jsでプライマリDBをLISTENしておく。
  2. プライマリDB側にデータを挿入するなどする。その際に replicate_states というカラムをtext形で用意し、pendingのときはレプリケーション未実行という扱いにする。削除するときは delete という文字列にしておき、delete文の実行はこのタイミングでは行わない。
  3. NOTIFY hoge_channel, 'レプリケーション依頼'; 等としてNode.jsに変更があったことを通知する。
  4. replicate_states が false のデータをSELECTで取得し、セカンダリDBへとまとめてupsertする。その後、 replicate_states を done に変更する。deleteのものは削除を実行し、プライマリDB側からも対象を削除しておく。
  5. 完了。

これで理論上は実現できるはずだ。
公式的な論理レプリケーションじゃないけど、論理レプリケーションとも言えるかもしれない。

言ってしまえばどんな環境にも設置できるWebhookみたいなものだと考えるとイメージしやすいかと。

masa5714masa5714

準備する

LISTEN や NOTIFY を使えるようにするため、SupabaseのPostgreSQLの拡張モジュール「tcn」を有効化する。

masa5714masa5714

Node.jsの記述を書いていこう

npm install pg knex

pgknex というnpmパッケージをインストールしておく。

knexは supabase-js と同じような書き心地でSQLが書けてすごく便利です。
生のSQL文を書かずに気軽に実行できるのでオススメ。

import knex from "knex";
import { Notification } from "pg";

// ここに格納されたデータをコピーする
const primaryClient = knex({
  client: "pg",
  connection: {
    host: "【ここにホスト名】",
    database: "【ここにデータベース名】",
    port: 5432, // notificationが有効なポート番号
    user: "【ここにDBユーザー名】",
    password: "【ここにDBパスワード】",
  },
});

// このDBにデータをコピーする想定
const productionClient = knex({
  client: "pg",
  connection: {
      host: "【ここにホスト名】",
      database: "【ここにデータベース名】",
      port: 6543, // 普通のSQLクエリを実行する際のポート番号
      user: "【ここにDBユーザー名】",
      password: "【ここにDBパスワード】",
  },
});

(async () => {
  const connection = await primaryClient.client.acquireConnection();
  connection.query("LISTEN hoge_channel");
  connection.on("notification", async (msg: Notification) => {
    const originalData: any = await primaryClient("products").returning("*").where("replicate_status", "pending").update({
      replicate_status: "running",
    });

    if (originalData.length > 0) {
      // 挿入先では「replicate_status」カラムが不要なので除外する
      const filteredRows = originalData.map((row: any) => {
        const { replicate_status, ...rest } = row;
        return rest;
      });

      // 実行中の行番号を配列形式で取得
      const targetRowIds = originalData.map((row: any) => {
        const { id } = row;
        return id;
      });

      // upsert的な処理をしている
      await productionClient("products").insert(filteredRows).onConflict("id").merge();

      // runningステータスをdoneに変更して完了
      await primaryClient("products").whereIn("id", targetRowIds).update({
        replicate_status: "done",
      });
    } else {
      console.log("対象データが存在しませんでした。");
    }
  });
})();

とするだけで使える。
NOTIFYを受け取るには「5432」ポートで接続する必要があるので注意すること。

このコードを実行した状態でPostgreSQL側で NOTIFY hoge_channel, 'はろーわーるど!'; を実行すると「はろーわーるど」という文字列が payload の中身として渡ってくれる。今回は文字列自体は重要ではないが、何らかのコントロールをする際にも活用できるので、一応文字列を渡せることも知っておくと良いだろう。

とりあえず上記のコードでは NOTIFYがLISTEN中のチャンネル(今回はhoge_channel)に飛んでくると、 notification の中身が実行されるということ。