Redshift から Aurora へ大容量のデータを安定同期するためのINSERT術
こんにちは!
ourly株式会社 執行役員CTO(@tigers_loveng)の相澤です。
以前公開した以下の記事で、Redshift → S3 → バッチ → Aurora PostgreSQL というパイプラインの全体像は紹介したのですが、バッチ処理の具体の話までは触れていませんでした。
Aurora PostgreSQL(DB)にINSERTする対象のデータは、最も大きいファイルだと100MB近くあり、CSVの行数でいうと約30万行あります。
何も考えずに1行ずつINSERTしていくと30万回のINSERT処理が走ることになるため、DBへの負荷が心配ですし、すべてのデータを変数に展開してメモリに乗せてしまうとバッチサーバーへの負荷が心配です。
この記事では続編として、DBにINSERTするバッチ処理にはどのような工夫を施しているのかをいくつか紹介できればと思います。
ファイルはストリーミングではなくダウンロードして直接読み込む
懸念/課題
Redshift が S3 に吐き出した GZip をストリーミングしながら INSERT … VALUES …
を実行していると、ネットワークが途切れた際に例外が発生してトランザクションがロールバックされてしまいます。
ロールバック + 再実行すると、インサートするデータ量が多い分、かなり時間がかかるため、なるべく外部要因でのバッチ処理の失敗可能性を低くしておいた方が良いと考えました。
解決方法
以下の流れでインサートするデータのロードを行うことにしました。
- GZip を ローカルの
Tempfile
にダウンロード-
Tempfile
(https://docs.ruby-lang.org/ja/latest/class/Tempfile.html) で展開することで、不要なファイルが残らないようにできる
-
- 処理が完了するまでハンドルを保持
-
ensure
でclose
し、後始末を徹底 - 再実行はローカル I/O だけで済むため、速度も安定性も向上
begin
tmp_file = Tempfile.open('ファイル名')
# === S3から対象のファイルを上記 Tempfile にダウンロードする処理 ===
@opened_gz_reader = Zlib::GzipReader.new(tmp_file)
# === バルクインサート処理 ===
ensure
@opened_gz_reader.close
end
分割してバルクインサートを行う
懸念/課題
バルクインサートは効率が良い反面、一度に処理できるデータ量は無限ではない※ため、気を付ける必要があります。
※ SQLの実行サイズ制限について
あくまでPostgreSQLの話にはなりますが、SQLの実行サイズ制限については、シンプルなクエリとパラメータバインドが必要な場合で分かれますが、双方に上限があります。
- 簡易プロトコル(Simple Query)の場合
- PostgreSQL のメモリ割り当ては
MaxAllocSize = 1 GiB − 1 byte
で制限されるため、1 GiB を超える長さのクエリは送れません。
- PostgreSQL のメモリ割り当ては
- 拡張プロトコル(Bind メッセージ)
- Bind メッセージ内の「パラメータ値数」が
Int16
(2 byte)で定義されており、最大 65535 個までしか渡せません。 - つまり、
INSERT … VALUES …
においては 列数 × 行数で65535を超えるSQLは実行できないことになります。
- Bind メッセージ内の「パラメータ値数」が
- 参照リンク
インサートするデータが特定の行数を超えるとエラーになるというよりは、SQL文全体の長さ(データ量)に制限があるという感じです。
また、大量の行数をバルクインサートするためには、それ用のデータ配列を生成する必要があり、その配列要素数が多くなる程、メモリを圧迫することになります。
解決方法
今回は行数も多いですが列数も多かったため、メモリへの負荷を考慮し、ある程度絞った行数で分割してバルクインサートすることにしました。
また、ビジネス要件としてデータのリアルタイム性がそこまで求められなかったため、バッチ処理に多少時間がかかっても問題ないというのもポイントの一つです。
具体的には以下コードのような感じで500行ずつバルクインサートしています。
BULK_INSERT_BATCH_SIZE = 500
@stack = []
def stack_row_and_bulk_insert!(row)
@stack << row
return if @stack.size < BULK_INSERT_BATCH_SIZE
Model.insert_all(@stack)
@stack = []
end
バルクインサート後、配列要素を空にする
課題/懸念
分割バルクインサートと課題感は同じですが、不要になった配列は空にすることでメモリ(ヒープ)への負荷を抑えることができます。
解決方法
以下のように、バルクインサート後、空配列で定義しなおすことで、旧オブジェクトへの参照が断たれるため、GCのタイミングでメモリが解放されます。
Model.insert_all(@buf)
@buf = []
さらなる改善ポイント
この記事を書くにあたって、改めて課題/懸念と解決方法の正確性を調べている時に気づいたのですが、GCのタイミングでメモリ解放されるということは、つまりそのタイミングまではメモリは占有したままということになります。
なので、即座にメモリを解放するためには Array#clear
を利用し、オブジェクトは同じものを利用しつつ要素を全て削除するほうが望ましいなと気づきました。
まとめ
今回は、別環境にある大量データをDBに登録する際に、工夫した点をtipsとして以下3つ紹介させていただきました。
- ネットワーク切断の影響を排除するため、まず S3 のファイルをローカルにダウンロード
- データ量過多による制限回避のため、バルクインサートを分割して行う
- メモリ負荷をさげるため、バルクインサート後、配列要素を空にする
AIによって動くコードはすぐに生成できるようになった一方で、こういったソースコード以外の影響も考慮した品質の高いコードは一定人間の手も介在する必要がまだまだあると感じる場面も多いです。
引き続き自分自身の知識のアップデートも行っていかねばと思う日々です...!
それでは今回はこのへんで。最後まで読んでいただき、ありがとうございました!
Discussion