GlueでRedshiftからPostgresへデータを日次で差分更新する話

はじめに
データアナリティクスチームのre:Inventの朝で寝不足なゲインです。
今回はデータアナリティクスに入る前にやっていた仕事の話をします。
やりたいこと
外部からRedshift経由で共有されている数億レコードあるようなデータをPostgreSQLに連携したいという要件です。
- 一覧画面でインクリメンタルサーチで絞り込んだ結果だけを表示したい
- その画面ではユーザー入力によるデータと外部提供のデータをJOINして表示したい
- 更新頻度は1日1回程度で深夜に実行して日本でのビジネスタイムまでに間に合えば良い
というよくあるデータ連携の深夜バッチです。
アーキテクチャ案
(ボツ) 1. GlueでRedshiftからデータを吸い出して、直接PostgreSQLの対象テーブルにInsertする
最初に試したのはそのままデータをそのまま対象のテーブルに突っ込むという方法です。
しかし、PostgreSQL側のテーブル定義にユニークIndex等があった場合にGlueがいい感じにUpsertしたりすることができないため、エラーで処理が止まるためそもそも要件が満たせずボツとなりました。
(ボツ) 2. GlueでRedshiftからデータを吸い出して、tmpテーブルにInsertする. Insert後に対象テーブルとtmpテーブルをmergeする
一旦PostgreSQLのtmpテーブルにInsertしてからUpsertとDeleteを実現する方法です。
この方法では処理自体は可能なものの、1テーブルの連携のみでもCPU使用率が80%を超えたり、クエリに10分以上かかったりしてしまいました。
これはお金を払えば解決するという世界にまでは持っていけたのですが、どう考えてもサービス提供しないほうが利益を守れるような金額になりそうだったのでボツとなりました。
(採用) 3. GlueでRedshiftからデータを吸い出す。 別途S3にキャッシュとして前回のデータを持っておいて、それとの差分をGlueで計算後、差分のみPostgreSQLのtmpテーブルに書き込んでMergeする
以下の図のようなアーキテクチャを構築しました。

まずGlue Jobを起動し、S3から前回の実行時のキャッシュとDataFrameで取得します。
このキャッシュされたDataFrameと現在Redshiftから取得したDataFrameから新規に追加されたレコード、更新されたレコード、削除されたレコードを取得します。
これらをPostgreSQL側のUpsert用、Delete用のtmpテーブルに正常に書き込めた後に今回取得したRedshiftの内容をS3にキャッシュとして保存します。
これが成功した後には、Fargate上のpsql経由でMerge文を実行し、実際に利用しているテーブルへ更新をかけます。
発生した問題と対策
問題1: データの更新差分が検知できない
案1,2にあるように初めは全量をRedshiftからPostgreSQLにコピーするような検証をしていたのですが、あまりにデータ量が多すぎてビジネスタイムに間に合うようにするにはインスタンスサイズをあげるなどの対策が必要になってしまいました。
そのため差分更新を実現することになったのですが、外部のデータソースにはCreatedAtやUpdatedAtが提供されておらず暗黙のうちに更新が行われるため差分のみをPostgreSQLに連携するということがRedshiftのみでは実現できません。
対策1: S3にキャッシュ
一度Redshiftから取得したデータをS3にキャッシュし、毎回そこから差分を計算することにしました。
これによって重い処理をバッチ側に委譲することができ、実データが入っているPostgreSQL側のインスタンスサイズをむやみにスケールアップせずに済みました。
これによって冪等なデータロードが実現できました。
ただし初回ロードに関してはどうしても重いので、稼働しているサービスであれば少しずつ連携を行うなどの処置が必要になるでしょう。
問題2: Executorの No space left on device
Sparkの性質上巨大なデータに対してシャッフルが発生するとストレージ容量を多く利用します。
GlueでExecutorのストレージ容量を指定して増やすということができないため、今回のデータ量では No space left on device が発生してしまいました。
対策2: S3にシャッフルのデータを一時保存
GlueではCloud Shuffle Storage Plugin for Apache Spark を利用することでS3にシャッフルのデータを保存することができます。
こちらを利用することで No space left on device のエラーを発生せずに実行を完了することができました
スキーマの変更があった際にはS3に保存されているカラムと一致しなくなるため一度S3のキャッシュを削除して再度1から入れ直す必要があります。
現状では運用としては発生していませんが今後開発が活発になるのであればここを治す必要があります
例えばカラム数や項目が一致しなかったら自動で全量をSyncし直すなどの対策は考えられるでしょう。
まとめと感想
RedshiftのデータをPostgreSQLに日次連携するという普通とは逆のシステムをご紹介しました。
おかげで数億レコードあるデータを30分程度で毎日Syncすることが出来ております。
今回は検証する時間もなくPostgreSQLへの連携を行うというのを固定で考えましたが、現在だとIceberg + Parquetな組み合わせをアプリケーション側から使っちゃう手もあり得るかもしれませんね。
それでは良いAWS日和を!
Discussion