😁

Fivetranを用いたPostgreSQL RDSの差分転送と削除データ同期のハマりポイントやツラミについて

2023/07/12に公開

SHE株式会社でエンジニアをやっております。さくです。

記事の概要

この記事では、プロジェクトにおいて、ユーザー情報から適性を絞り込みマッチングさせるための基盤として、会員情報の検索基盤を構築しました。

検索機能の要件を満たしていたため、データの統合場所としてBigQueryを用いることになり、RDSからBigQueryへのETLとしてFivetranを使用しました。

Fivetranとは、コンソールでのポチポチでRDSからBigQueryなどさまざまなDBやデータハウス、データレイクなどにELTしてくれるサービスです。概要はこちらの記事がおすすめです。

他チームで既に RDS→Fivetran→BigQuery の部分は整備されていたので、そこをトレースする形にすることで、リードタイムを小さくする狙いがありました。

データの信用性という観点で、Fivetranからの差分転送において、削除データを同期する必要がありました。この記事では、その過程で遭遇したハマりポイントについて共有したいと思います。

Prismaを用いたスキーマ設定とmigrationの生成での注意点についても詳しく書いています。最終的にはハッキーな方法を採用する必要があったため、類似のアーキテクチャでの実装を考えている方などの参考になればと思います。

アーキテクチャ全体の紹介

現状、アプリケーションはバックエンドにNode.js、フロントエンドにNext.jsを使用しており、DBとしてはAWSのRDS(PostgreSQL)を採用しています。

具体的なアーキテクチャは、以下のようになります。

となります。今回は、黄色く囲んだRDS→Fivetran→BigQueryへの転送部分について書いていきます。

Fivetranの差分転送における削除データの同期について

FivetranではPostgreSQL RDSに対する転送方式として、インスタンスタイプなどにはよりますが(参考)、XMIN、WAL、TeleportSyncから転送方式を選ぶことができます(転送方式の詳細については、ドキュメントを見ていただければと思います)。

今回は差分転送においてレコードの削除が同期されるか、という点が重要でした。データ活用という文脈が大きかったので、データの信用性を担保する必要がありました。

各転送方式の詳細と採用にいたった経緯、ハマりポイントなど記載していきます。

XMINについて

まず、XMINの方式では削除された行を検知ができないとのことで、選択肢から外れました。XMINとは、行の変更ごとに一意なIDのようですので、削除されたデータは検知できません。

TeleportSyncについて

こちらは、削除されたデータも同期できる方式で、コンソールでのポチポチのみで設定できるので、リードタイムを考えると、こちらの方式がベストだったのですが、結論としてTeleportSyncの方も僕らの環境では使用しなかったです。というのも、Fivetran側でサポートしているプライマリーキーの型が以下になるのですが、

Constrained VARCHAR
UUID
BIGINT
SMALLINT
INT
Constrained NUMERIC
DATE

現状使用してるDBでは多くのテーブルがTEXT型のプライマリーキーを保持していたためです。
現状、Node.jsを採用しているバックエンドでは、ORMとしてPrismaを使用しています。PrismaはDBのスキーマ定義とmigrationの生成 / 実行も行うことができ、TypeScriptに似た文法でスキーマを定義していくことができます(差分検知してmigrationも自動で作ってくれたりします)。

僕らのプロジェクトではプライマリーキーにCUIDを用いており、レコードの型は以下のようにStringとしています。

model City {
  id           String        @id @unique
  name         String
}

こうすると、デフォルトではPostgreSQL側ではTEXT型になるようです(PostgreSQL側の型を指定する方法も真下にかいてあります)。

既存のテーブルは全てプライマリーキーとしてCUIDを用いてString型で設計していたため、こちらを全て変更するコストが大きいと予想されるのと、よりアプリケーションに近い部分がETLという文脈でFivetranに依存するのは避けたいと考え、WALを採用するに至りました。

WALについて

といっても、こちらはRDSにおいてパラメータグループを使用して、logical_replicationを1に設定して再起動する必要などがあったのですが、よりアプリケーションから遠いのと、設定自体はコストが低いと判断しました。
また、その他にPublicationとReplication Slotの設定をクエリにて設定する必要がありますが、こちらも一度行えばアプリケーションから意識する必要は全くないです。

ただこちらの設定作業を実行するにあたって気づいたハマりポイントがいくつかありまして、そちらを紹介していきます。

FivetranとPrismaの組み合わせで起きた問題について

主に、ORMと使用しているPrismaとの相性のところでいくつか問題が出てきましたので、紹介したいと思います。

複合主キーが作られていないテーブルがあった

WALの利用には主キーが必要になり、これには複合主キーも使用できるのですが、Prismaでのスキーマ設定の認識を間違っており、一部複合主キーが作られていないテーブルがありました。

Prismaのスキーマにおいて、複合ユニーク制約の設定は以下のようにできるのですが、

model EntryQuestion {
    entryId        String
    questionId     String
    @@unique([entryId, questionId])
}

これだけだと、複合主キーは作られないようでした。複合主キーを設定するには、以下のようにする必要があります。

model EntryQuestionAnswer {
    entryId        String
    questionId     String
    @@id([entryId, questionId]) ## ここが必要
}

暗黙的に作られる中間テーブルには明示的にスキーマによる複合主キーの設定ができない

先程の例のように、中間テーブルなどを明示的に設定している場合は@@idを用いて複合主キーの設定ができるのですが、Prismaでは暗黙的に中間テーブルを作ってくれる機能もあり、そちらを使用すると複合主キーをスキーマに設定することができないようでした。

具体的には以下のようにスキーマを設定すると暗黙的な中間テーブルを作ることができます。

model Article {
  id   String @id @default(cuid())
  text String
  tags Tag[]  @relation("TagToArticle")
}

model Tag {
  id       String    @id @default(cuid())
  name     String
  articles Article[] @relation("TagToArticle")
}

上記のように設定すると、暗黙的にTagToArticleというテーブルが作られます。こちらには複合ユニーク制約が作られるのですが、複合主キーは作られません。

ということで、migrationにSQLを書いて実行することで以下のように複合主キーを追加しました。(暗黙的に作られるテーブルには、自動的にAとBという名前のカラムが設定されます。)

ALTER TABLE "_TagToArticle" ADD CONSTRAINT "_TagToArticle_pkey" PRIMARY KEY  ("A", "B");

この設定により、無事WALによる削除されたデータの検知を含む差分転送に成功しました。

しかし、migration側に新たな問題が発生しました。

Prismaが暗黙的に作られる中間テーブルの差分を検知して、毎回複合主キーを落とそうとしてしまう

今回、migrationを手動で作成し、複合主キーを暗黙的に作られる中間テーブルに追加しましたが、Prismaのスキーマ設定にはその記載がないため、migrationを新たに作成する際にPrismaがその差分を検知して、複合主キーを落とそうとしていまうことが発覚しました。

デプロイ時のmigrationの実行では、npx prisma migrate deploy というコマンドを使用しています。こちらのコマンドでは、ドキュメントにもある通りスキーマの変更やDBとの差分を検知しないため、デプロイ時には問題になりません。

ただ、開発時にはスキーマ変更からのmigrationの生成が非常に便利なので、こちらをprisma migrate devを使用しており、こちらが毎度差分を検知して複合主キーを落とすためのmigrationを生成してしまい、開発体験が非常に悪い状態となってしまいました。

これには、非常にハッキーな対策を強いられてしまいました。具体的には、migrateの実行に使用していた、package.jsonのscriptのpre機能を用いて、migrationの実行の直前に、用意しておいた複合主キーを落とすSQLの入ったmigrationをコピーしてmigrationの中に含めておくというものでした。

ローカルのDBにおいては、Fivetranによる差分転送は行わないので複合主キーの存在は影響しないのでこの方法を取りました。具体的には以下のようになります。

用意しておくmigrationファイル

ALTER TABLE "_TagToArticle" DROP CONSTRAINT "_TagToArticle";

package.jsonにおけるscriptの設定

{
  "scripts": {
    "premigrate": "npm run migrate:apply-local-patch",
    "migrate": "npx prisma migrate dev",
    "migrate:apply-local-patch": "shx cp -r ./scripts/db/local_migration_patch/* ./prisma/migrations/",
}

  • local_migration_patchは.gitignoreに設定を追加して、リモートリポジトリにはpushされないように設定してあります。

このように非常にハッキーな方法を取ることになってしまいましたが、なんとかFivetranにおける差分転送と、削除データの同期の実装を完了することができました。

終わりに

Fivetranにおける差分転送と、削除データの同期の実装にはいくつかのハードルがありましたが、それらをクリアして最終的に成功したことは、チームにとっては大きな成果でした。

この経験は、それぞれのツールの特性や制約を理解し、それを設計や実装に反映することの重要性を改めて認識させてくれました。

本記事が、同じような課題に直面している他の開発者の方々にとって、何かしらの参考になれば幸いです。

SHE Tech Blog

Discussion