DMSを使ってRDSのデータをSnowflakeにリアルタイム同期している場合はリカバリ用のSQLを用意しておくと捗るという話
前置き
こんにちは。株式会社GENDAのデータエンジニアのこみぃです。
本日のお話は以下の記事の続きのようなお話です。
DMSを使ったRDSとSnowflakeのリアルタイム同期
DMSを使ってRDSとSnowflakeのデータをリアルタイム同期するのは↑の記事でも触れている通りで可能です。
構造を簡単に説明すると、DMSを使ってS3にレコードの変更履歴を都度送信し、それをsnowpipeでSnowflakeに取り込みます。
Snowflakeでは変更履歴を統合し、現状のRDSの状態を再現します。
例としてはこんな感じです。
select * from sample_tables order by updated_at desc;
+--------+-------+-------------------------+-------+
| ID | VALUE | UPDATED_AT | _TYPE |
|--------+-------+-------------------------+-------|
| 100001 | 20 | 2023-01-11 11:54:20.917 | U |
| 100001 | 69 | 2023-01-11 11:49:16.329 | I |
| 100002 | 1 | 2023-01-11 11:49:16.330 | I |
+--------+-------+-------------------------+-------+
上記のテーブルはID=100001,100002というレコードが2023-01-11 11:49:16に作られ、2023-01-11 11:54:20にID=100001のレコードがアップデートされたという履歴が積み上がっています。
このレコードから、現在のRDSのデータの状況は再現できます。以下のような感じですね。
select * from sample_tables_origin order by id;
+--------+-------+-------------------------+
| ID | VALUE | UPDATED_AT |
|--------+-------+-------------------------+
| 100001 | 20 | 2023-01-11 11:54:20.917 |
| 100002 | 1 | 2023-01-11 11:49:16.330 |
+--------+-------+-------------------------+
DMSを使ったRDSとSnowflakeの同期はこのような構造で行われています。データを完全に同期しているわけではないが再現できるというのが面白いですね。
実はこの構造はちょっと前までのGCPの思想と近いものがあったりします。というのも、
データ基盤のSQLにUPDATEは不要である。UPDATEしたという履歴をINSERTすれば良い
というのがGCPが長らくBigQueryにupdateが存在しなかった理由でした。
GCPが推奨するログ基盤を構築すると、こういう形でマスターデータを再現したものが出来上がります。
データ基盤がリアルタイム性を持つことがどのくらい重要かはサービス内容などによりけりで、多くの場合は実際にはそんなにリアルタイム性は要りません。
しかし、リアルタイムであることで開ける道もあります。
特に時差のある複数の拠点でサービスを展開している場合などは、更新を日次にするとどちらかの拠点で更新時間が不自然になってしまいます。
なので、出来るならリアルタイムにしたいという需要もあります。
DMSがよく止まるっていうお話
さて、すばらしいシステムを構築したように見えるのですが、実はここに結構落とし穴が。
というのも、 このDMSが結構な頻度でエラーで止まります。
前回の記事で書いたように、追従できないALTERを発行されたら一撃で止まりますし、RDSの再起動などでも止まるケースがあります。
特に定期的なメンテナンスのたびに止まってしまうような状況だと、まあ辛い。
リカバリ用のSQLを用意しておこう
これに対しての対策なのですが、現状色々頑張ってみたのですが DMS自体を止まらなくするのはかなり難しい です。
なので苦肉の策ではありますが、DMSのRestart後に使うリカバリ用のSQLを用意しておくことをおすすめいたします。
上記のテーブルであれば、こんなセットを用意しておきましょう。
-- truncate table
truncate sample_tables;
-- load initial data
COPY INTO sample_tables
FROM (SELECT $1:id, $1:value, $1:updated_at, 'I' FROM @sample_tables/LOAD0000)
FILE_FORMAT = (TYPE = PARQUET COMPRESSION = AUTO)
;
-- load non-initial data
COPY INTO sample_tables
FROM (SELECT $1:id, $1:values, $1:updated_at, $1:Op FROM @sample_tables)
pattern = '^(?!.*LOAD.*\.parquet).*$'
FILE_FORMAT = (TYPE = PARQUET COMPRESSION = AUTO)
;
DMSはRestartするとまずすべてのデータを LOAD00XXXXXX.parquet というファイル名で書き出し、その後差分を {datetime}.parquet のようなファイル名で書き出すので、それぞれ読み込んでいるのがミソです。
これをテーブルの分用意しておいて、DMSが止まった場合の復旧を簡単にしておく、というのが現状でなんとかできる妥協点です。
本日の結論
本日の結論は簡単ですね。
DMSによるレプリケーションは定期的に止まるものと思え
結びの言葉
データパイプラインは一度構築したら終わりではないので、異常が起きたときの復旧手順は用意しておくに越したことはないですね。
願わくはDMSがもう少し安定したサービスになってくれると嬉しいんですが、そもそも異種のDB間でレプリケーションできている時点ですごいことなので、多くを望みすぎるのも良くないのかもしれません。
最後に一つ宣伝を。
私が所属する株式会社GENDAでは一緒に働く仲間をすごくすごい真剣に求めています。
興味がありましたらぜひお気軽にお声おかけください。
本日はこのあたりで。
それじゃあ、バイバイ!
Discussion