[embulk]PostgreSQL→BigQueryのデータ移行をクーロン実行する
embulkを用いたPostgreSQL→BigQueryへのデータ移行をクーロン実行(バッチ実行)するためのサンプルコード(覚え書き)
環境
- 移行元: Amazon Aurora(PostgreSQL互換)
→ VPC(AWS)のプライベートサブネットにある想定 - 移行先: BigQuery
- embulk: バージョン0.9.24
→ 移行元のAmazon Aurora(PostgreSQL互換)と同じVPC内のパブリックサブネットにEC2をたて、そこにインストールして使います(以下、embulkサーバと呼びます)
今回は、Amazon Aurora(PostgreSQL互換)→BigQueryへのデータ移行になりますが、embulkの設定ファイルを対象環境用の記述に合わせれば実行できるはず
前提条件
embulkサーバにて、embulk実行専用のembulkユーザーを作成し、
- embulkとプラグインembulk-input-postgresql, embulk-output-bigqueryをインストール済
- Cloud SDK のインストールし、Cloud SDK ツールの承認を実施済
であること
ソースファイル群の構成
ソースファイル群の構成は以下の通り
embulk実行専用のembulkユーザーを作成し、そのhomeディレクトリ下に諸々のファイル群を配置しています
複数テーブルを同じ移行元から移行先へ移管するため、接続情報など共通の内容については、外部ファイル(共通ファイル)に記載しています。
/home/embulk/
| --- auth-- BigQuery接続用秘密鍵格納ディレクトリ
| |--- xxxxxx.json
|
|
| --- hogehoge
|
|--- log -- ログ出力ディレクトリ
| |--- xxxxxx.log
|
|--- tmp
| |--- BigQuery移行用の一時ファイル格納先
| |--- .yyyyy.zzzz.csv.gz
|
|--- setVariable.sh -- 環境変数設定用
|--- common.sh -- 共通関数記述用
|--- convertTable.sh -- 処理実行用
|
|--- _in_postgresql.yml.liquid -- PostgreSQL接続共通部
|--- _out_bigquery_header.yml.liquid -- BigQuery接続共通部(ヘッダ部分)
|--- _out_bigquery_footer.yml.liquid -- BigQuery接続共通部(フッタ部分)
|--- shops.yml -- 移行対象1 shopsテーブル用
|--- members_tmp.yml -- 移行対象2 membersテーブル用
|--- daily_logs.yml -- 移行対象3 daily_logs用
1. テーブル定義
(1) 移行元(PostgreSQL)
移行元のPostgreSQLにおけるテーブル定義です
①shops.ddl
店舗情報のテーブル
簡易的なインターネットサイトの店舗を想定してます
create table shops (
shop_id bigint not null
, shop_name character varying(200) not null
, category_id smallint not null
);
②members.ddl
会員情報のテーブル
create table members (
member_id character varying(25) not null
, name character varying(200) not null
, regist_date date not null
, sex character(1)
, birthday date
, latitude double precision
, longitude double precision
, location geometry(POINT,4301)
, towncode character(11)
, citycode character(5)
);
③daily_logs.ddl
会員の店舗訪問履歴を記録するテーブルです
DDL自体には記載してませんが、月単位でパーティションが切られているものとします
create table daily_logs (
date timestamp(6) without time zone not null
, shop_id int64 not null
, member_id character varying(25) not null
);
(2) 移行先(BigQuery)
続いて、移行元であるPostgreSQLのテーブル定義をもとに、BigQueryにおけるテーブル定義(DDL)を作成します
移行先のデータセットは、test_bqというネーミングであるものとします
BigQueryは分析用だからか、RDBにおけるデータ型よりもザックリとした型の持ち方をしてますね
①shops.ddl
create table `test_bq.shops` (
shop_id int64 not null
, shop_name string not null
, category_id int64 not null
);
②-1: members_tmp.ddl
後述もしますが、embulkでは地理データ型(geometry型やgeography型)の取り扱いがないため、いったん対象カラムをテキスト値として投入するための一時テーブルを作成します
create table `test_bq.members_tmp` (
member_id string not null
, name string not null
, regist_date date not null
, sex string
, birthday date
, latitude float64
, longitude float64
, location string --元の型は、geometry(POINT,4301)→ WKTに変換してstring型で保管(embulkのサポートする型の関係)
, towncode string
, citycode string
);
②-2: members.ddl
BigQueryで使用する本テーブルです
こちらも後述しますが、members_tmpにstring型で取り込んだlocationカラムの値をBigQueryにおける地理データ型であるgeography型に変換して取り込みます
→ BigQueryでは、geometry型の取り扱いがないため、やむを得ずといった感じ。この値を用いて計算など行う場合は、結果にずれが出ますので、その点はご注意を。
create table `test_bq.members` (
member_id string not null
, name string not null
, regist_date date not null
, sex string
, birthday date
, latitude float64
, longitude float64
, location geography -- BigQueryではgeometry型の取り扱いがないため、geography型にする
, town_code string
, citycode string
);
③daily_logs.ddl
データ量が大量になるため、月単位でパーティションを切ります
今回は最大半年の使用を想定し、それ以上のデータは削除されるよう期限を設定しました
create table `test_bq.daily_logs` (
date datetime not null
, shop_id int64 not null
, member_id string
)
partition by
datetime_trunc(date, month)
options(
partition_expiration_days=190 -- パーティションへのデータの保持期限を半年強に設定
, require_partition_filter = true
);
2. 設定ファイル
ソースファイル群の構成にも記載しましたが、外部ファイルを用いるため、呼び出し元(個々のテーブルデータ移行用の設定ファイル)、呼び出し先(外部ファイル)とも、liquidテンプレートを使用します(末尾に「.liquid」をつける)
詳しくは、Embulk configuration file formatの「Including files」の箇所に記載されてます
(1) 共通ファイル
PostgreSQL, BigQuery接続部など、複数ファイルで共通の箇所については外部ファイルに記載しておくと何かと便利です
① _in_postgresql.yml.liquid
移行元のPostgreSQL接続部です
「env.XXXX」(XXXXは環境変数名)とすることで、環境変数に設定した値を用いることができます
in:
type: postgresql
host: {{ env.HOST }} # 接続DBのホスト名
user: {{ env.USER }} # DBのユーザー名
password: {{ env.PASSWORD }} # 接続DBのパスワード
database: {{ env.DATABASE }} # 接続DBのデータベース(スキーマ)
② _out_bigquery_header.yml.liquid
移行先のPostgreSQL接続部です
out:
type: bigquery
auth_method: json_key
json_keyfile: /home/embulk/auth/{{ env.GCP_KEY }} # BigQueryアクセス用の秘密鍵
path_prefix: /home/embulk/hoge/tmp/
file_ext: .csv.gz
source_format: CSV
project: {{ env.PROJECT }} # 対象プロジェクト
dataset: fuga
auto_create_table: true
③ _out_bigquery_footer.yml.liquid
BigQuery接続のための共通部のうち、フォーマットに関する部分です
formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
encoders:
- {type: gzip}
(2) 各テーブル移行用設定ファイル
各テーブル移行用のembulkの設定ファイルです
ポイントを各ファイル下に記載します(内容として重複するものは省いてます)
①shops.yml.liquid
shopsテーブルのデータ移行用設定ファイル
{% include 'in_postgresql' %}
table: shops
{% include 'out_bigquery_header' %}
table: shops
column_options:
- {name: shop_id, mode: required} # 必須項目
- {name: shop_name, mode: required} # 必須項目
- {name: category_id, mode: required} # 必須項目
mode: replace # データを洗い替え
{% include 'out_bigquery_footer' %}
■ ポイント
移行先BigQuery側の設定として、
- カラム定義の必須項目(not null)の箇所をcolumn_optionsでカラム名(name)を指定し、mode: requiredとしている
→ この記述がないとembulkコマンド実行時にエラーで落ちる - mode: replaceとすることで、追加(デフォルト)ではなくデータの洗い替えをしている
こと
{% include 'in_postgresql' %}
query: | # geometry型のlocationをいったんWKTに変換するため、SQLを記述(ST_AsTextで変換)
select
member_id, name, regist_date, sex, birthday, latitude, longitude, ST_AsText(location) as location, towncode, citycode
from
members
{% include 'out_bigquery_header' %}
table: members_tmp
column_options:
- {name: member_id, mode: required} # 必須項目
- {name: name, mode: required} # 必須項目
- {name: regist_date, type: date, mode: required} # 必須項目, 日付をdate型に指定
- {name: birthday, type: date} # 日付をdate型に指定
mode: replace # データを洗い替え
{% include 'out_bigquery_footer' %}
■ポイント
移行元のPostgreSQL側の設定として、
- embulkのpostgresqlプラグインがgeometry型に対応していないため、テーブル名の指定ではなく、queryでクエリを書いていったんWKT(テキスト値)に変換している
→ embulkのbigqueryプラグインもBigQueryにおける地理データ型geography型に対応していないため、一度tmpテーブルにstring型で取り込んだ後に変換し、本テーブルに取り込むというスタイルをとってます(後続のシェルスクリプトで、bqコマンドを使って実施)
こと
移行先のBigQuery側の設定として、
- 移行元でdate型だったregist_dateはembulk上でstring型に変換されるため、date型を指定して取り込んでいる
こと
{% include 'in_postgresql' %}
query: | # 日付範囲指定のためSQLを記述し、環境変数を用いる
select
*
from
daily_logs
where
'{{ env.START_DATE }}' <= date
and date < '{{ env.END_DATE }}'
column_options:
date: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Tokyo"}
{% include 'out_bigquery_header' %}
table: daily_logs${{ env.TARGET_MONTH }} # 取り込み先のパーティションを指定
column_options:
- {name: date, type: datetime, mode: REQUIRED} # 必須項目
- {name: shop_id, mode: REQUIRED} # 必須項目
- {name: member_id, type: REQUIRED} # 必須項目
{% include 'out_bigquery_footer' %}
time_partitioning: # パーティションテーブルのため必要な記述を書く
type: MONTH
field: date
■ ポイント
移行元のPostgreSQL側の設定として、
- 取り込み対象の日付範囲を指定するため、テーブル名の指定ではなく、queryでクエリを書いて取り込み範囲を指定していること
→ 環境変数(env.xxxxx : xxxxxは変数名 で指定)を使ってシェルファイルにて、動的に指定できるようにしました - timestamp型のdateの定義がtimestamp without time zone(PostgreSQL自体の設定としてはAsia/Tokyo)なので、タイムゾーンをAsia/Tokyoと明記している
こと
移行先のBigQuery側の設定として、
- 対象テーブルがパーティションテーブル(月単位)のため、
- 取り込みパーティション(年月)を指定している
- time_partitioning欄にパーティションのタイプ(MONTH)とカラム名(date)を記載している
- modeは設定してないので、デフォルトのappend(追加)になる
こと
3. シェルファイル
① setVariable.sh
環境変数設定用のシェルファイルです
ここで設定した内容を、上述の通り、設定ファイルで使用します
本番環境、検証環境とも同じファイルで実行できるよう、ホスト名で条件分岐してみました
※本番環境には、「-prod-」という文字列が入っているものとします
#!bin/bash
#
# 各種変数の設定用
#
# 稼働環境(本番 or STG)に応じた変数の設定(※)
hostname=`hostname`
if [[ "$hostname" == *-prod-* ]]; then
HOST='本番DBのホスト名'
USER='本番DBのユーザー名'
PASSWORD='本番DBのパスワード'
DATABASE='本番DBの接続先DB'
PROJECT='本番DBからの移行先BigQueryプロジェクト'
GCP_KEY='xxxxx.json'
else
HOST='検証DBのホスト名'
USER='検証DBのユーザー名'
PASSWORD='検証DBのパスワード'
DATABASE='検証DBの接続先DB'
PROJECT='検証DBからの移行先BigQueryプロジェクト'
GCP_KEY='xxxxx.json'
fi
# 環境変数の設定
export HOST
export USER
export PASSWORD
export DATABASE
export PROJECT
export GCP_KEY
② common.sh
共通関数記述用のファイルです
今回、処理実行用のファイルは、convertTable.shのみですが、他にも実行ファイルが存在する場合、共通で実行する処理を共通関数として、外部ファイルに記述しておくと便利
#!bin/bash
#
# 共通ファイル
# 各シェルファイルで使う共通変数、関数記載用
#
# 共通で使用する変数を定義
# 処理開始日時をUnixTimeに変換
startTimeUt=`date +%s`
# embulkでの処理のリトライ回数上限
retry=3
# リトライ時の待機時間
sleepTime='60s'
#
# テーブルの最終更新日時によって、データ取り込み時のエラー発生有無を判定
# sql内のtest_bqは接続先データセット(適宜変更すること)
#
# $1 テーブル名
# $2 処理開始日時
#
function checkLastModifiedTime(){
sql='select format_timestamp("%F %T", timestamp_millis(last_modified_time), "Asia/Tokyo") as last_modified_time
from `test_bq.__TABLES__` where table_id = "'$1'"'
bqResult=`bq query --nouse_legacy_sql $sql`
lastModifiedTime=`echo $bqResult | sed -e "s/+---------------------+//g" -e "s/ | //g" -e "s/last_modified_time//g"`
lastModifiedTimeUt=$(date -d "${lastModifiedTime}" +%s)
# 指定日時より先の場合は成功(真)、以前の場合は失敗(偽)
if [[ $lastModifiedTimeUt -gt $2 ]]; then
return 0
else
return 1
fi
}
#
# エラーメールの送信
# 送信元: hoge@example.com
# 送信先: fuga@example.com
#
# $1 エラーメッセージ
#
function sendErrorMail(){
HOSTNAME=$(hostname)
echo -e $1
echo -e $1 | mail -s "テーブル更新処理失敗@${HOSTNAME}" -r hoge@example.com fuga@example.com
}
■ポイント
- embulkのトランザクション制御がall or nothingを保障していること利用し、テーブルの最終更新日時(last_modified_time)で、データ移行の成否を判定しているところ
→ bqコマンドの実行結果そのままだと、余分な文字列が入るため、sedコマンドを用いて日時以外の部分を削除しています
コマンド実行後の終了コードで判定すればいいのでは?と思われる方もいるかもですが、
Embulkのエラー処理について調べてみる | 今日もプログラミング
によれば、エラーが発生しても終了コードが0で帰ってくるパターンがあるらしく、これでは正確な判定はできないな、と思い最終更新日時を用いました
③ convertTable.sh
処理実行コマンドを記載したファイル
複数のテーブルに対し、embulkコマンドを実行し、データ移行を実施します
#!bin/bash
#
# テーブルデータ取り込み用
#
# シェルファイルのパスに移動
cd $(dirname $0)
# 共通関数ファイルの読み込み
. ./common.sh
# 各種変数の設定
source ./setVariable.sh
# daily_logsは、前日分のみ処理対象
START_DATE=`date -d "1 day ago" +'%Y-%m-%d'` && export START_DATE
END_DATE_NEXT=`date +'%Y-%m-%d'` && export END_DATE_NEXT
TARGET_MONTH=`date -d "1 day ago" +'%Y%m'` && export TARGET_MONTH
# 処理対象テーブル
tables=('shops' 'members_tmp' 'daily_logs')
# 取り込みエラーの発生したテーブル
errorTables=''
echo 'テーブルデータ更新処理開始'
for table in ${tables[@]}
do
retryCount=$retry
while :
do
echo -e "\n$tableの更新を開始します\n"
embulk run ./$table.yml.liquid
# 最終更新日時チェックで成否を判定
checkLastModifiedTime $table $startTimeUt
result=$?
# テーブルの最終更新日時が処理開始日時より前の場合は、処理失敗→一定回数リトライ
if [[ $result -ne 0 && $retryCount -gt 0 ]]; then
echo "$tableの更新に失敗しました"
sleep $sleepTime
$((retryCount--))
echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
else
if [[ $result -eq 0 ]]; then
echo "$tableの更新に成功しました"
else
echo "$tableの更新に失敗しました"
$errorTables+=$table','
fi
break
fi
done
done
if [[ "$errorTables" != *members_tmp* ]]; then
# 本来地理データ型のデータの箇所をstring → geographyに変換し、membersテーブルに投入
retryCount=$retry
table='members'
while :
do
bq query \
--destination_table test_bq.members \
--replace \
--use_legacy_sql=false \
"select member_id, name, shop_name, regist_date, sex, birthday, latitude, longitude, ST_GeogFromText(location, planar=>TRUE, make_valid =>TRUE) as location, town_code, city_code from test_bq.members_tmp
where location is null
or ((-180 <= cast(replace(split(location, ' ')[offset(0)], 'POINT(','') as float64) and cast(replace(split(location, ' ')[offset(0)], 'POINT(','') as float64) <= 180)
and (-90 <= cast(replace(split(location, ' ')[offset(1)], ')','') as float64) and cast(replace(split(location, ' ')[offset(1)], ')','') as float64) <= 90))"
checkLastModifiedTime 'shops' $startTimeUt
$result=$?
# テーブルの最終更新日時が処理開始日時より前の場合は、処理失敗→一定回数リトライ
if [[ $result -ne 0 && $retryCount -gt 0 ]]; then
echo "$tableの更新に失敗しました"
sleep $sleepTime
$((retryCount--))
echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
else
if [[ $result -eq 0 ]]; then
echo "$tableの更新に成功しました"
else
echo "$tableの更新に失敗しました"
errorTables+=$table','
fi
break
fi
done
fi
if [[ -n "$errorTables" ]]; then
message="${errorTables/%?/}の更新処理に失敗しました\n"
message+="詳細はログをご確認ください\n"
sendErrorMail $message
else
echo '全てのテーブルの更新に成功しました'
fi
# BigQuery移行用の一時ファイルを削除
rm -f ./tmp/.*.csv.gz
echo 'テーブルデータ更新処理終了'
■ポイント
-
daily_logsに関し、前日分のデータを取り込めるよう、環境変数設定をしている
-
PostgreSQL → BigQuery移行対象のテーブルを配列に入れることで処理をスッキリさせている
-
embulkコマンド実行時にリトライ処理を入れている
→ プラグインのロードエラーなどもそこそこ起こるっぽい
[参考]
https://twitter.com/hiroysato/status/1237745198337150976?s=20 -
一度地理データ型のカラムlocationをstring型として、tmpテーブル(members_tmp)に取り込んだものを地理関数(ST_GeogFromText)で変換して本テーブル(members)に取り込んでいる
-
テキスト値(WKT)→ geography型への変換時に緯度経度に不正な値が入っていた場合に省く条件を入れている(今回locationのWKTが、 POINT(経度 緯度) 例: POINT(139.57785717404542 35.64436768705323) となっているため上述の書き方)
→ PostgreSQL(PostGIS)のgeometry不正な値でも投入されてしまうようですが、BigQueryだとエラーで落ちる -
処理の最後にBigQuery移行用に作成された一時ファイルを削除する処理を入れている
→ Class: Tempfile | ruby-doc.prgによれば、通常は自動で削除されるが、削除までにかなりの期間がかかる場合があるので、明示的に削除したほうがよいとのこと
→→ このいわゆるゴミが原因でディスクの空き容量が不足し処理が落ちたことが何度もありました(途中で処理が落ちた時なども、念のため、tmpディレクトリ下にゴミが残っていないか確認するようにするとよさそう)
→→ 複数処理ファイルを作成する場合はサーバのスペックと相談の上、並列処理を許可するか決定、許可する場合は、tmpディレクトリ下に処理ファイル別のディレクトリをきり、_out_bigquery_header.yml.liquidに記載のpath_prefixを個別ファイル単位での記述に変更するようにしましょう(別の処理による削除を防ぐため)。許可しない場合はREADMEなどにその旨を書いておくのがベター
こと
4. クーロン設定
最後にクーロン設定です
毎日6:00に、convertTable.shを実行し、ログの記録もする、という内容にしてます
00 06 * * * PATH=$PATH:/home/embulk/google-cloud-sdk/bin:/home/embulk/.embulk/bin; bash /home/embulk/hogehoge/convertTable.sh >> /home/embulk/hogehoge/log/postgres_to_bigquery_.$(date +'\%Y\%m\%d').log 2>&1
■ポイント
- 処理実行前にCloud SDK, embulkへのパスを通している
→ 今回、作成したembulkユーザーにてCloud SDK, embulkをインストールしているので、そのままではクーロン実行時に使えないため(/etc/crontabに追記する、でもよいでしょう)
→→ パスがわからない場合は、whichコマンドで確認するようにしましょう
こと
終わりに
環境変数を使えば、定期的に日時範囲指定をしてのデータ移行もできるので便利!
クーロン実行関係ない部分が多くなってしまった点、一部補足がやたらと長くなってしまった点ご容赦を。。
Glue を用いて Aurora MySQL から BigQuery にデータを Export する | Qiita
を見ていると、Amazon Aurora → BigQueryの移行には、Amazon Glueを使う手もあるのかな
こちらも試してみたいです
参考
PostgreSQL
BigQuery
- 国土交通省のGISデータをBigQueryにポリゴンとしてデータ投入する | Qiita
- 地理空間データの操作 | BigQuery | Google Cloud
- 地理関数 | BigQuery | Google Cloud
- 分割テーブルの作成 | BigQuery | Google Cloud
- String functions | BigQuery - Google Cloud
- bq コマンドライン ツールの使用 | Google Cloud
Discussion