🙆

pg _floでPostgreSQLのCDCを検証する

2024/12/18に公開

こんちには。
この記事は モニクル Advent Calendar 2024 18日目の記事です。
17日目は Cursor ComposerとCopilot Editsのよしなに力の比較 でした。そちらもどうぞ!
昔の自分の記事を見たところ、アドベントカレンダーで記事を書くのが10年ぶりでした。

pg_floというPostgreSQLのCDC(Change Data Capture)を行うツールを見つけたので検証してみ ました。
https://github.com/pgflo/pg_flo
pg_floはPostgreSQL上で行われたデータの更新をレプリケーションしたり、JSON形式で出力できるツールです。

サービスを運営していると、RDBを分割だったり一部データの移行をしたいことが稀によくあります。そういうケースで新しいデータベースへの差分更新の反映や、RDBへの書き込みを他のシステムに伝搬させたい時にCDCが活用できます。

なお、PostgreSQLについてはほぼ初心者なので、何かおかしいこと書いてたら教えてください。

動かしてみる

何ができるかは見た方が早いので、pg_floを動かしてみます。
pg_floを動かすにはPostgreSQLデータベース、 NATS Server、 Replicator、 Workerが必要になります。
それぞれの役割などは後述します。

以下の構成のテーブルでCDCをしてみます。

Column Type Nullable
id integer not null
email character varying(255) not null
passwrod character varying(100) not null
name character varying(255) nullable
created_at timestamp without time zone not null

pg_floはキャプチャしたデータの出力先として標準出力、Webhook、PostgreSQL、ファイルが選べます。
今回は出力内容の確認のために標準出力と、異常時の挙動を確認したいのでWebhookを試してみます。

今回は検証なのでdocker composeを使っており、pg_floのconfigファイルは以下のようにしています。
ほとんどの設定値は環境変数でも指定できるようなので、データベースのパスワードなどは基本的に環境変数で指定することになるでしょう。

config.yaml
# Replicator settings
host: "db"
port: 5432
dbname: "test"
user: "postgres"
password: "postgres"
group: "pg_flo"
schema: "app"
tables:
  - "users"
status-dir: "/etc/pg_flo/status"

# webhook sink
webhook-url: "http://webhook:5000"

# Common settings
nats-url: "nats://nats:4222"

pg_floを扱うためにはNATS Serverを起動する必要があります。NATS Serverの概要については後述しますが、私も今回の検証で初めて知りました。
pg_floのドキュメントには特にNATS Serverの設定について記述が見つからなかったので、ChatGPTに作ってもらったら以下のようになりました。

nats-server.conf
# NATSサーバーの基本設定
listen: 0.0.0.0:4222
jetstream: true

# JetStreamストレージ設定
store_dir: "/var/lib/nats/jetstream"

# ログ設定
logtime: true
debug: true
trace: true

標準出力

pg_flo を以下のオプションで実行すると標準出力へ出力するWorkerが起動します。

pg_flo worker stdout --config/etc/pg_flo/config.yaml

それぞれのインスタンスを起動してPostgreSQLの指定したテーブルのデータを更新すると、WorkerがINSERT, UPDATE, DELETEでそれぞれ以下の内容を出力します。自動でJSONにしてくれるので加工がしやすいですね。

INSERT
 {
  "EmittedAt": "2024-12-12T02:06:20.930011268Z",
  "LSN": "0/19A3F70",
  "NewTuple": {
    "id": 1,
    "name": "customer1",
    "email": "foo1@test.com",
    "password": "foobar",
    "created_at": "2024-12-09T08:16:31.923737Z"
  },
  "ReplicationKey": {
    "Type": "PRIMARY KEY",
    "Columns": [
      "id"
    ]
  },
  "Schema": "app",
  "Table": "users",
  "Type": "INSERT"
}

UPDATEは更新したカラム以外もすべて返ってきます。

UPDATE
{
  "EmittedAt": "2024-12-12T02:06:26.988633704Z",
  "LSN": "0/19A72C0",
  "NewTuple": {
    "created_at": "2024-12-09T08:16:31.923737Z",
    "email": "foo8@test.com",
    "id": 8,
    "name": "foo8",
    "password": "updated"
  },
  "ReplicationKey": {
    "Type": "PRIMARY KEY",
    "Columns": [
      "id"
    ]
  },
  "Schema": "app",
  "Table": "users",
  "Type": "UPDATE"
}

DELETEはPKだけ値があって、ほかのカラムはすべてnullになるようです。

DELETE
{
  "EmittedAt": "2024-12-12T02:06:59.525625752Z",
  "LSN": "0/19A74E0",
  "OldTuple": {
    "created_at": null,
    "email": null,
    "id": 11,
    "name": null,
    "password": null,
    "phone_number": null,
    "post_code": null,
    "updated_at": null
  },
  "ReplicationKey": {
    "Type": "PRIMARY KEY",
    "Columns": [
      "id"
    ]
  },
  "Schema": "app",
  "Table": "users",
  "Type": "DELETE"
}

Webhook

生成されるデータの形式が分かったので、Webhookも試してみます。
WebhookのときはリクエストしたいURLを設定する必要があります。上述の設定ファイルにURLを記述しているので、そのまま利用できます。

pg_flo を以下のオプションで実行するとWebhookに出力するWorkerが起動します。

pg_flo worker webhook --config/etc/pg_flo/config.yaml

なお、今回のWebhookのレシーバーはリクエスト内容をconsole.logに出力するだけのNodeJSサーバーにしています。

Workerを起動してテーブルのデータを更新すると、Webhookレシーバーに以下のヘッダーでPOSTされます。データの更新内容がPOSTのBodyとしてリクエストされますが、内容は標準出力のものと同じなので割愛します。

webhook header
{
  host: 'webhook:5000',
  'user-agent': 'Go-http-client/1.1',
  'content-length': '371',
  'content-type': 'application/json',
  'accept-encoding': 'gzip'
}

テーブルの複数件のレコードを更新すると、更新内容がまとまってではなくレコード数分のリクエストが送信されます。
大量にレコードを更新するとその分のHTTPリクエストが直列に送信されるため、データの反映が遅くなります。更新頻度が多いテーブルを対象にする場合は注意が必要です。

レシーバーでエラーが起きて2xx系以外のHTTPステータスを返すと自動的にリトライする機能もあります。
一応、デフォルトではリトライ回数が3回のようですが(というかまだ変更できなさそう)、試したところではエラーが解消されるまでリトライされていました。ここは是非ともエクスポーネンシャルバックオフしたいところ。
ちなみにWebhookレシーバーが500ステータスを返すと、Workerで以下のようなログが出力されます。

worker-1 | WRN Received non-2xx status code, retrying... attempt=1 statusCode=500
worker-1 | WRN Received non-2xx status code, retrying... attempt=2 statusCode=500
worker-1 | ERR Failed to write batch to sink error="webhook request failed with status code: 500 after 3 attempts" component=worker
worker-1 | ERR Failed to flush buffer on interval error="webhook request failed with status code: 500 after 3 attempts" component=worker

ルーティング、ルール

pg_floには特定のカラムをレプリケーションの対象外にしたり値をマスクする機能があります。
この機能を使うことで移行元と移行先のデータ構造を合わせる必要が無くなり、自由にデータ設計を行えるのですごく魅力的なんですが、試したところうまく動かすことができませんでした。
以前別のツールを検証していたとき、テーブル構造を合わせないといけない、という制約にガッカリしたので、導入するならこの機能はちゃんと動かしたいです。

どう動いているか

ここまででpg_floで何ができるか見てきました。ではpg_floがどのようにリトライ機能などを実現しているか見ていきます。

pg_floはNATS Serverに依存しています。
NATS ServerとはCNCF Incubatingプロジェクトに登録されているメッセージブローカーツールです。
https://nats.io/

pg_floのコンポーネントにReplicator、 Workerがありますが、これらがそれぞれNATS ServerのProducer、 Consumerになっています。

ReplicatorがPostgreSQLのWALを監視して、データの更新があったらNATS Serverにメッセージを書き込みます。

WorkerがNATS Serverからメッセージを取得して、JSONに変換して出力します。

メッセージの処理がどこまで進んでいるか、正常に終了したかどうかの管理はNATSが行っており、pg_floはNATSとPostgreSQL、出力先の紐づけを行ってくれているわけですね。

なお、同じgroupを参照したWorkerを複数台立ち上げると、どれか1つのWorkerインスタンスでメッセージを取得するようになります。メッセージの取得はWorkerからNATSに対してのポーリングなので、早いもの勝ちで取ってきていそうです。
ですが、1Groupあたり1Workerの設定が推奨されているようなので、スケールアウトしたければgroupを分けてWorkerインスタンスを並べる方が良さそうです。

NATSのことは今回初めて知りましたが、重複処理防止がどのように実現されているか、メッセージの配信がAtLeaseOnceなのか、ExactlyOnceにできるのかなども気になるので、もうちょっと調べてみたいところです。

まとめ

RDBからの移行先のデータベース選定に自由度を持つために、JSONなど扱いやすいデータフォーマットを使えると便利だなと思っていたところで良さそうなツールを見つけられました。
WALをJSONに変換してくれるツールは他にもありますが、Webhookの機能が内包されていたり、処理済み位置を記録してレプリケーター自体や後続処理が失敗した際にもその時点から自動でリトライしてくれるので運用しやすそうです。

現時点でのバージョンがまだ0.0.12で、各種設定値もデフォルト値がハードコードされている状態なのでProduction Readyかというと難しそうですが、期待できそうなツールでした。

株式会社モニクル

Discussion