[embulk]PostgreSQL→BigQueryに大量のデータを移行する
PostgreSQL→BigQueryに大量のデータを移行する内容の覚書きです
概要
大量のデータを一気にデータ移行すると、処理が途中で落ちてしまうことがあるため、
BigQuery上に一時テーブルを作成し、一定数ずつデータ移行した後、
一時テーブルからの移行の宛先テーブルに正規の移行先のテーブルを指定する方法で実施します
環境
- 移行元: Amazon Aurora(PostgreSQL互換)
→ VPC(AWS)のプライベートサブネットにある想定 - 移行先: BigQuery
- embulk: バージョン0.9.24
→ 移行元のAmazon Aurora(PostgreSQL互換)と同じVPC内のパブリックサブネットにEC2をたて、そこにインストールして使います(以下、embulkサーバと呼びます)
今回は、Amazon Aurora(PostgreSQL互換)→BigQueryへのデータ移行になりますが、embulkの設定ファイルを対象環境用の記述に合わせれば実行できるはず
前提条件
embulkサーバにて、embulk実行専用のembulkユーザーを作成し、
- embulkとプラグインembulk-input-postgresql, embulk-output-bigqueryをインストール済
- Cloud SDK のインストールし、Cloud SDK ツールの承認を実施済
であること
イメージ図
雑多な感じですが、イメージ図は以下の通り
ソースファイル
移行対象のテーブルは、
[embulk]データ移行をクーロン実行する
に記載のmembersテーブルであるものとします
また、一時テーブルであるmembers_tmpもmembersと同じものとします
設定ファイル2ファイルと共通関数用のファイル, 実行用のシェルファイルの計4ファイルを用います
(1) 設定ファイル(データ件数カウント用)
移行対象のデータ件数をカウントするための設定ファイルです
in:
type: postgresql
host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
password: zzzzzzz # 上記ユーザーのパスワードを入力
database: sample1
query: |
select
count(1)
from
members
out:
type: stdout
ポイント
- 結果を標準出力することで、後続のシェルファイルで変数に入れる → 移行対象の件数分移行できるようにしています
(2) 設定ファイル(データ移行用)
Amazon Aurora(PostgreSQL互換)上のmembersテーブル → BigQuery上のmembers_tmpにデータを移行する設定ファイルです
今回の根幹ともいえる存在
in:
type: postgresql
host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
password: zzzzzzz # 上記ユーザーのパスワードを入力
database: sample1
query: |
select
*
from
members
order by
date, content_id, user_id
limit {{ env.LIMIT }} offset {{ env.OFFSET }}
column_options:
regist_date: {type: string, timestamp_format: "%Y-%m-%d", timezone: "Asia/Tokyo"}
birthday: {type: string, timestamp_format: "%Y-%m-%d", timezone: "Asia/Tokyo"}
out:
type: bigquery
auth_method: json_key
json_keyfile: /sample1/aaaaa.json # /sample1 ディレクトリにBigQuery接続用のキーaaaaa.jsonを配置
path_prefix: /sample1/tmp/embulk/ # 一時ディレクトリとして/sample1/tmp/embulkを作成
file_ext: .csv.gz
source_format: CSV
project: sample # プロジェクト名を入力
dataset: sample1
auto_create_table: true
table: members_tmp
column_options:
- {name: member_id, mode: REQUIRED} # 必須項目
- {name: name, mode: REQUIRED} # 必須項目
- {name: regist_date, type: date, mode: required} # 必須項目, 日付をdate型に指定
- {name: birthday, type: date} # 日付をdate型に指定
formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
encoders:
- {type: gzip}
ポイント
- データを必ず同じ順で取得できるようソート
- この後のシェルファイルで行う環境変数設定によって、一定件数ずつ移行できるようにしています
(3) シェルファイル(共通関数用)
他の処理でも共通して使えそうな内容を関数化したシェルファイルです
#!bin/bash
#
# 共通ファイル
# 各シェルファイルで使う共通変数、関数記載用
#
# 共通で使用する変数を定義
# 処理開始日時をUnixTimeに変換
startTimeUt=`date +%s`
# embulkでの処理のリトライ回数上限
retry=3
# リトライ時の待機時間
sleepTime='60s'
#
# テーブルの最終更新日時によって、データ取り込み時のエラー発生有無を判定
#
# $1 テーブル名
# $2 処理開始日時
#
function checkLastModifiedTime(){
sql='select format_timestamp("%F %T", timestamp_millis(last_modified_time), "Asia/Tokyo") as last_modified_time
from `sample1.__TABLES__` where table_id = "'$1'"'
bqResult=`bq query --nouse_legacy_sql $sql`
lastModifiedTime=`echo $bqResult | sed -e "s/+---------------------+//g" -e "s/ | //g" -e "s/last_modified_time//g"`
lastModifiedTimeUt=$(date -d "${lastModifiedTime}" +%s)
# テスト内容をより明確にするために一時的にechoする処理を入れておく
echo "処理開始日時のタイムスタンプ:"$2
echo "テーブル最終更新日時のタイムスタンプ:"$lastModifiedTimeUt
# 指定日時より先の場合は成功(真)、以前の場合は失敗(偽)
if [[ $lastModifiedTimeUt -gt $2 ]]; then
return 0
else
return 1
fi
}
#
# エラーメールの送信
#
# $1 エラーメッセージ
#
function sendErrorMail(){
HOSTNAME=$(hostname)
echo -e $1
echo -e $1 | mail -s "$2@${HOSTNAME}" -r hoge@example.com fuga@example.com
}
ポイント
- embulkの性質(トランザクション制御でall or nothing)を使用し、処理の成否を判定している
- データの洗い替えやエラー発生時の終了コードが1にならないケースにも対応できます
(4) シェルファイル(処理実行用)
#!bin/bash
# シェルファイルのパスに移動
cd $(dirname $0)
# 共通ファイルの読み込み
. ./common.sh
# 1回あたりの処理件数を設定: 今回は、10万件
LIMIT=100000 && export LIMIT
# リトライ処理の回数: 今回は、3回
retry=3
# リトライ処理時の待機時間: 今回は、60秒
sleepTime=60
# 異常終了メールのタイトル
title="membersテーブル更新処理失敗"
echo '処理開始'
# 移行対象テーブルの件数確認
retryCount=$retry # リトライ回数初期化
while :
do
result=`embulk run ./members_count.yml.liquid --log-level error`
count=`echo "$result" | head -n 2 | tail -f -n 1`
expr "$count" + 1 >&/dev/null
isNum=$?
if [ $isNum -ge 2 && $retryCount -gt 0 ]; then
# 実行結果が不正な(数字でない)場合は2以上になる
echo "membersテーブルの件数取得に失敗しました"
sleep $sleepTime
$((retryCount--))
echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
else
if [ $isNum -lt 2 ]; then
echo "移行対象は"$count"件です"
break
else
# 一定回数以上失敗した場合は処理を中断: membersテーブル更新後の検知だと時間差が大きすぎるため、ここでいったんストップ
message="membersテーブルの件数取得に失敗しました\n"
message+="詳細はログをご確認ください\n"
sendErrorMail $message $title
exit
fi
fi
done
# 一時テーブルのデータを全件削除
bq query \
--use_legacy_sql=false \
"truncate table
sample1.members_tmp"
# 1回あたりの処理件数ずつデータ移行
for((i=0 ; i < $count ; i=$i+$LIMIT))
do
OFFSET=$i && export OFFSET
retryCount=$retry
table='members_tmp'
while :
do
echo "$OFFSET件目から$LIMIT件移行します"
# 成否判定のため、処理開始日時を前の処理よりもあとのタイムスタンプに更新
startTimeUt=`date +%s`
embulk run ./$table.yml.liquid --log-level warn
checkLastModifiedTime $table $startTimeUt
result=$?
if [[ $result -ne 0 && $retryCount -gt 0 ]]; then
echo "$tableの更新に失敗しました"
sleep $sleepTime
$((retryCount--))
echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
else
if [[ $result -eq 0 ]]; then
echo "$OFFSET件目から$LIMIT件の移行に成功しました"
break
else
# 一定回数以上失敗した場合は処理を中断
message="$tableの更新に失敗しました\n"
message+="詳細はログをご確認ください\n"
sendErrorMail $message $title
exit
fi
fi
done
done
# 一時テーブル → 本テーブルへ移行
retryCount=$retry
table='members'
tableOrigin='members_tmp'
while :
do
bq query \
--destination_table sample1.$table \
--replace \
--use_legacy_sql=false \
"select
*
from
sample1.$tableOrigin"
checkLastModifiedTime $table $startTimeUt
result=$?
# テーブルの最終更新日時が処理開始日時より前の場合は、処理失敗→一定回数リトライ
if [ $result -ne 0 && $retryCount -gt 0 ]; then
echo "$tableの更新に失敗しました"
sleep $sleepTime
$((retryCount--))
echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
else
if [[ $result -eq 0 ]]; then
echo "$tableの更新に成功しました"
break
else
message="$tableの更新に失敗しました\n"
message+="詳細はログをご確認ください\n"
sendErrorMail $message $title
exit
fi
fi
done
# BigQuery移行用の一時ファイルを削除
rm -f ./tmp/.*.csv.gz
echo '処理終了'
ポイント
- embulkによる処理はそれなりに落ちる可能性があるので、リトライ処理をいれています
終わりに
サーバのスペックの関係もあると思いますが、、大量のデータを一気に移行しようとしてembulkの処理が落ちてしまったので、分割して移行することにしました
一時テーブルに少しずつデータを移行した後、BigQuery側で本テーブルに一気に移行することで、大量データの洗い替える場合でも不具合なく移行することができます
移行件数については、embulkサーバのスペックと要相談(本記事では10万件ずつですが、実際は2,000万件単位で行っています → EC2はt3.medium)
Discussion