Supabaseで無料でレプリケーションしたい【スクレイピングデータの同期目的】
収集したデータをいい感じに処理してブラウザで表示するサイトを開発中。
スクレイピングでデータ収集しているのだが、処理前のデータをDBに保存している。
本番DBにそのままスクレイピング結果を入れてしまっても良いのだが、
現実的には I/O が頻発するため、余計な負荷をかけてしまう。
ユーザーに直接関係ない処理は裏側でひっそりと行いたい。
そこで、本番DBの他に "スクレイピング用のDB" を用意することにした。
本番サイトで利用するデータだけを格納したテーブルを対象にレプリケーションすれば、本番DBに余計な負荷をかけなくて済むだろう。
ということで、
- スクレイピング用DBをプライマリサーバーとし、
- 本番DBを受信側(サブスクリプション)としたい。
レプリケーション?パブリケーション?
ちなみに、SupabaseはPostgreSQLが使われているが、PostgreSQLではレプリケーションではなく「パブリケーション」という呼び方をするらしい。
CREATE SUBSCRIPTION
が使えない
Supabaseでは Supabaseではセキュリティの観点から superuser
での実行が許可されていない。
そのため、 CREATE SUBSCRIPTION
が使えない。つまり、パブリケーションはプライマリサーバーとしての動きのみ可能で、受信側をすることはできない模様。
WALファイル?とかいうやつも取得できないらしい。
▼参考
ということで、他の方法を考えてみることにする。
Node.jsでLISTENして擬似的にレプリケーションを実現する
恐らくこの方法が現実的な方法だと思われる。
擬似的なレプリケーションのため、データの抜け落ちの懸念があるが、スクレイピングでデータ更新をする程度の利用を想定しているので大きな問題は無いと判断した。
処理の流れとしては以下の通りだ。
- Node.jsでプライマリDBをLISTENしておく。
- プライマリDB側にデータを挿入するなどする。その際に
replicate_states
というカラムをtext形で用意し、pendingのときはレプリケーション未実行という扱いにする。削除するときは delete という文字列にしておき、delete文の実行はこのタイミングでは行わない。 -
NOTIFY hoge_channel, 'レプリケーション依頼';
等としてNode.jsに変更があったことを通知する。 -
replicate_states
が false のデータをSELECTで取得し、セカンダリDBへとまとめてupsertする。その後、replicate_states
を done に変更する。deleteのものは削除を実行し、プライマリDB側からも対象を削除しておく。 - 完了。
これで理論上は実現できるはずだ。
公式的な論理レプリケーションじゃないけど、論理レプリケーションとも言えるかもしれない。
言ってしまえばどんな環境にも設置できるWebhookみたいなものだと考えるとイメージしやすいかと。
準備する
LISTEN や NOTIFY を使えるようにするため、SupabaseのPostgreSQLの拡張モジュール「tcn」を有効化する。
Node.jsの記述を書いていこう
npm install pg knex
pg
と knex
というnpmパッケージをインストールしておく。
knexは supabase-js と同じような書き心地でSQLが書けてすごく便利です。
生のSQL文を書かずに気軽に実行できるのでオススメ。
import knex from "knex";
import { Notification } from "pg";
// ここに格納されたデータをコピーする
const primaryClient = knex({
client: "pg",
connection: {
host: "【ここにホスト名】",
database: "【ここにデータベース名】",
port: 5432, // notificationが有効なポート番号
user: "【ここにDBユーザー名】",
password: "【ここにDBパスワード】",
},
});
// このDBにデータをコピーする想定
const productionClient = knex({
client: "pg",
connection: {
host: "【ここにホスト名】",
database: "【ここにデータベース名】",
port: 6543, // 普通のSQLクエリを実行する際のポート番号
user: "【ここにDBユーザー名】",
password: "【ここにDBパスワード】",
},
});
(async () => {
const connection = await primaryClient.client.acquireConnection();
connection.query("LISTEN hoge_channel");
connection.on("notification", async (msg: Notification) => {
const originalData: any = await primaryClient("products").returning("*").where("replicate_status", "pending").update({
replicate_status: "running",
});
if (originalData.length > 0) {
// 挿入先では「replicate_status」カラムが不要なので除外する
const filteredRows = originalData.map((row: any) => {
const { replicate_status, ...rest } = row;
return rest;
});
// 実行中の行番号を配列形式で取得
const targetRowIds = originalData.map((row: any) => {
const { id } = row;
return id;
});
// upsert的な処理をしている
await productionClient("products").insert(filteredRows).onConflict("id").merge();
// runningステータスをdoneに変更して完了
await primaryClient("products").whereIn("id", targetRowIds).update({
replicate_status: "done",
});
} else {
console.log("対象データが存在しませんでした。");
}
});
})();
とするだけで使える。
NOTIFYを受け取るには「5432」ポートで接続する必要があるので注意すること。
このコードを実行した状態でPostgreSQL側で NOTIFY hoge_channel, 'はろーわーるど!';
を実行すると「はろーわーるど」という文字列が payload の中身として渡ってくれる。今回は文字列自体は重要ではないが、何らかのコントロールをする際にも活用できるので、一応文字列を渡せることも知っておくと良いだろう。
とりあえず上記のコードでは NOTIFYがLISTEN中のチャンネル(今回はhoge_channel)に飛んでくると、 notification
の中身が実行されるということ。