🧭

[embulk]PostgreSQL→BigQueryのデータ移行をクーロン実行する

2022/07/23に公開

embulkを用いたPostgreSQL→BigQueryへのデータ移行をクーロン実行(バッチ実行)するためのサンプルコード(覚え書き)

環境

  • 移行元: Amazon Aurora(PostgreSQL互換)
    → VPC(AWS)のプライベートサブネットにある想定
  • 移行先: BigQuery
  • embulk: バージョン0.9.24
    → 移行元のAmazon Aurora(PostgreSQL互換)と同じVPC内のパブリックサブネットにEC2をたて、そこにインストールして使います(以下、embulkサーバと呼びます)

今回は、Amazon Aurora(PostgreSQL互換)→BigQueryへのデータ移行になりますが、embulkの設定ファイルを対象環境用の記述に合わせれば実行できるはず

前提条件

embulkサーバにて、embulk実行専用のembulkユーザーを作成し、

であること

ソースファイル群の構成

ソースファイル群の構成は以下の通り
embulk実行専用のembulkユーザーを作成し、そのhomeディレクトリ下に諸々のファイル群を配置しています

複数テーブルを同じ移行元から移行先へ移管するため、接続情報など共通の内容については、外部ファイル(共通ファイル)に記載しています。

/home/embulk/
  | --- auth-- BigQuery接続用秘密鍵格納ディレクトリ
  |     |--- xxxxxx.json
  |
  |
  | --- hogehoge
	|
	|--- log -- ログ出力ディレクトリ
	|     |--- xxxxxx.log
	|
	|--- tmp 
        |     |--- BigQuery移行用の一時ファイル格納先
        |     |--- .yyyyy.zzzz.csv.gz
	|
	|--- setVariable.sh -- 環境変数設定用
	|--- common.sh -- 共通関数記述用
	|--- convertTable.sh -- 処理実行用
	|
	|--- _in_postgresql.yml.liquid -- PostgreSQL接続共通部
	|--- _out_bigquery_header.yml.liquid -- BigQuery接続共通部(ヘッダ部分)
	|--- _out_bigquery_footer.yml.liquid -- BigQuery接続共通部(フッタ部分)
	|--- shops.yml -- 移行対象1 shopsテーブル用
        |--- members_tmp.yml -- 移行対象2 membersテーブル用
	|--- daily_logs.yml -- 移行対象3 daily_logs用

1. テーブル定義

(1) 移行元(PostgreSQL)

移行元のPostgreSQLにおけるテーブル定義です

①shops.ddl

店舗情報のテーブル
簡易的なインターネットサイトの店舗を想定してます

shops.ddl
create table shops (
  shop_id bigint not null
  , shop_name character varying(200) not null
  , category_id smallint not null
);

②members.ddl

会員情報のテーブル

members.ddl
create table members (
  member_id character varying(25) not null
  , name character varying(200) not null
  , regist_date date not null
  , sex character(1)
  , birthday date
  , latitude double precision
  , longitude double precision
  , location geometry(POINT,4301)
  , towncode character(11)
  , citycode character(5)
);

③daily_logs.ddl

会員の店舗訪問履歴を記録するテーブルです
DDL自体には記載してませんが、月単位でパーティションが切られているものとします

daily_logs.ddl
create table daily_logs (
  date timestamp(6) without time zone not null
  , shop_id int64 not null
  , member_id character varying(25) not null
);

(2) 移行先(BigQuery)

続いて、移行元であるPostgreSQLのテーブル定義をもとに、BigQueryにおけるテーブル定義(DDL)を作成します
移行先のデータセットは、test_bqというネーミングであるものとします

BigQueryは分析用だからか、RDBにおけるデータ型よりもザックリとした型の持ち方をしてますね

①shops.ddl

shops.ddl
create table `test_bq.shops` (
  shop_id int64 not null
  , shop_name string not null
  , category_id int64 not null
);

②-1: members_tmp.ddl

後述もしますが、embulkでは地理データ型(geometry型やgeography型)の取り扱いがないため、いったん対象カラムをテキスト値として投入するための一時テーブルを作成します

members_tmp.ddl
create table `test_bq.members_tmp` (
  member_id string not null
  , name string not null
  , regist_date date not null
  , sex string
  , birthday date
  , latitude float64
  , longitude float64
  , location string --元の型は、geometry(POINT,4301)→ WKTに変換してstring型で保管(embulkのサポートする型の関係)
  , towncode string
  , citycode string
);

②-2: members.ddl

BigQueryで使用する本テーブルです
こちらも後述しますが、members_tmpにstring型で取り込んだlocationカラムの値をBigQueryにおける地理データ型であるgeography型に変換して取り込みます
→ BigQueryでは、geometry型の取り扱いがないため、やむを得ずといった感じ。この値を用いて計算など行う場合は、結果にずれが出ますので、その点はご注意を。

members.ddl
create table `test_bq.members` (
  member_id string not null
  , name string not null
  , regist_date date not null
  , sex string
  , birthday date
  , latitude float64
  , longitude float64
  , location geography -- BigQueryではgeometry型の取り扱いがないため、geography型にする
  , town_code string
  , citycode string
);

③daily_logs.ddl

データ量が大量になるため、月単位でパーティションを切ります
今回は最大半年の使用を想定し、それ以上のデータは削除されるよう期限を設定しました

daily_logs.ddl
create table `test_bq.daily_logs` (
  date datetime not null
  , shop_id int64 not null
  , member_id string
)
partition by
 datetime_trunc(date, month)
options(
 partition_expiration_days=190 -- パーティションへのデータの保持期限を半年強に設定
 , require_partition_filter = true
);

2. 設定ファイル

ソースファイル群の構成にも記載しましたが、外部ファイルを用いるため、呼び出し元(個々のテーブルデータ移行用の設定ファイル)、呼び出し先(外部ファイル)とも、liquidテンプレートを使用します(末尾に「.liquid」をつける)
詳しくは、Embulk configuration file formatの「Including files」の箇所に記載されてます

(1) 共通ファイル

PostgreSQL, BigQuery接続部など、複数ファイルで共通の箇所については外部ファイルに記載しておくと何かと便利です

① _in_postgresql.yml.liquid

移行元のPostgreSQL接続部です
「env.XXXX」(XXXXは環境変数名)とすることで、環境変数に設定した値を用いることができます

_in_postgresql.yml.liquid
in:
  type: postgresql
  host: {{ env.HOST }} # 接続DBのホスト名
  user: {{ env.USER }} # DBのユーザー名
  password: {{ env.PASSWORD }} # 接続DBのパスワード
  database: {{ env.DATABASE }} # 接続DBのデータベース(スキーマ)

② _out_bigquery_header.yml.liquid

移行先のPostgreSQL接続部です

_out_bigquery_header.yml.liquid
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: /home/embulk/auth/{{ env.GCP_KEY }} # BigQueryアクセス用の秘密鍵
  path_prefix: /home/embulk/hoge/tmp/
  file_ext: .csv.gz
  source_format: CSV
  project: {{ env.PROJECT }} # 対象プロジェクト
  dataset: fuga
  auto_create_table: true

③ _out_bigquery_footer.yml.liquid

BigQuery接続のための共通部のうち、フォーマットに関する部分です

_out_bigquery_header.yml.liquid
  formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
  encoders:
    - {type: gzip}

(2) 各テーブル移行用設定ファイル

各テーブル移行用のembulkの設定ファイルです
ポイントを各ファイル下に記載します(内容として重複するものは省いてます)

①shops.yml.liquid

shopsテーブルのデータ移行用設定ファイル

shops.yml.liquid
{% include 'in_postgresql' %}
  table: shops
{% include 'out_bigquery_header' %}
  table: shops
  column_options:
    - {name: shop_id, mode: required} # 必須項目
    - {name: shop_name, mode: required} # 必須項目
    - {name: category_id, mode: required} # 必須項目
  mode: replace # データを洗い替え
{% include 'out_bigquery_footer' %}

■ ポイント

移行先BigQuery側の設定として、

  • カラム定義の必須項目(not null)の箇所をcolumn_optionsでカラム名(name)を指定し、mode: requiredとしている
    → この記述がないとembulkコマンド実行時にエラーで落ちる
  • mode: replaceとすることで、追加(デフォルト)ではなくデータの洗い替えをしている

こと

members_tmp.yml.liquid
{% include 'in_postgresql' %}
  query: | # geometry型のlocationをいったんWKTに変換するため、SQLを記述(ST_AsTextで変換)
    select
      member_id, name, regist_date, sex, birthday, latitude, longitude, ST_AsText(location) as location, towncode, citycode
    from
      members 
{% include 'out_bigquery_header' %}
  table: members_tmp
  column_options:
    - {name: member_id, mode: required} # 必須項目
    - {name: name, mode: required} # 必須項目
    - {name: regist_date, type: date, mode: required} # 必須項目, 日付をdate型に指定
    - {name: birthday, type: date} # 日付をdate型に指定
  mode: replace # データを洗い替え
{% include 'out_bigquery_footer' %}

■ポイント

移行元のPostgreSQL側の設定として、

  • embulkのpostgresqlプラグインがgeometry型に対応していないため、テーブル名の指定ではなく、queryでクエリを書いていったんWKT(テキスト値)に変換している
    → embulkのbigqueryプラグインもBigQueryにおける地理データ型geography型に対応していないため、一度tmpテーブルにstring型で取り込んだ後に変換し、本テーブルに取り込むというスタイルをとってます(後続のシェルスクリプトで、bqコマンドを使って実施)

こと

移行先のBigQuery側の設定として、

  • 移行元でdate型だったregist_dateはembulk上でstring型に変換されるため、date型を指定して取り込んでいる

こと

daily_logs.yml.liquid
{% include 'in_postgresql' %}
  query: | # 日付範囲指定のためSQLを記述し、環境変数を用いる
    select
     *
    from
     daily_logs
    where
      '{{ env.START_DATE }}' <= date
      and date < '{{ env.END_DATE }}'
  column_options:
    date: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Tokyo"}
{% include 'out_bigquery_header' %}
  table: daily_logs${{ env.TARGET_MONTH }} # 取り込み先のパーティションを指定
  column_options:
    - {name: date, type: datetime, mode: REQUIRED} # 必須項目
    - {name: shop_id, mode: REQUIRED} # 必須項目
    - {name: member_id, type: REQUIRED} # 必須項目
{% include 'out_bigquery_footer' %}
  time_partitioning: # パーティションテーブルのため必要な記述を書く
    type: MONTH
    field: date

■ ポイント

移行元のPostgreSQL側の設定として、

  • 取り込み対象の日付範囲を指定するため、テーブル名の指定ではなく、queryでクエリを書いて取り込み範囲を指定していること
    → 環境変数(env.xxxxx : xxxxxは変数名 で指定)を使ってシェルファイルにて、動的に指定できるようにしました
  • timestamp型のdateの定義がtimestamp without time zone(PostgreSQL自体の設定としてはAsia/Tokyo)なので、タイムゾーンをAsia/Tokyoと明記している

こと

移行先のBigQuery側の設定として、

  • 対象テーブルがパーティションテーブル(月単位)のため、
    • 取り込みパーティション(年月)を指定している
    • time_partitioning欄にパーティションのタイプ(MONTH)とカラム名(date)を記載している
  • modeは設定してないので、デフォルトのappend(追加)になる

こと

3. シェルファイル

① setVariable.sh

環境変数設定用のシェルファイルです
ここで設定した内容を、上述の通り、設定ファイルで使用します

本番環境、検証環境とも同じファイルで実行できるよう、ホスト名で条件分岐してみました
※本番環境には、「-prod-」という文字列が入っているものとします

setVariable.sh
#!bin/bash

#
# 各種変数の設定用
#

# 稼働環境(本番 or STG)に応じた変数の設定(※)
hostname=`hostname`
if [[ "$hostname" == *-prod-* ]]; then
  HOST='本番DBのホスト名'
  USER='本番DBのユーザー名'
  PASSWORD='本番DBのパスワード'
  DATABASE='本番DBの接続先DB'
  PROJECT='本番DBからの移行先BigQueryプロジェクト'
  GCP_KEY='xxxxx.json'
else
  HOST='検証DBのホスト名'
  USER='検証DBのユーザー名'
  PASSWORD='検証DBのパスワード'
  DATABASE='検証DBの接続先DB'
  PROJECT='検証DBからの移行先BigQueryプロジェクト'
  GCP_KEY='xxxxx.json'
fi

# 環境変数の設定
export HOST
export USER
export PASSWORD
export DATABASE
export PROJECT
export GCP_KEY

② common.sh

共通関数記述用のファイルです
今回、処理実行用のファイルは、convertTable.shのみですが、他にも実行ファイルが存在する場合、共通で実行する処理を共通関数として、外部ファイルに記述しておくと便利

common.sh
#!bin/bash

#
# 共通ファイル
# 各シェルファイルで使う共通変数、関数記載用
#

# 共通で使用する変数を定義
# 処理開始日時をUnixTimeに変換
startTimeUt=`date +%s`
# embulkでの処理のリトライ回数上限
retry=3
# リトライ時の待機時間
sleepTime='60s'

#
# テーブルの最終更新日時によって、データ取り込み時のエラー発生有無を判定
# sql内のtest_bqは接続先データセット(適宜変更すること)
#
# $1 テーブル名
# $2 処理開始日時
#
function checkLastModifiedTime(){
  sql='select format_timestamp("%F %T", timestamp_millis(last_modified_time), "Asia/Tokyo") as last_modified_time
       from `test_bq.__TABLES__` where table_id = "'$1'"'
  bqResult=`bq query --nouse_legacy_sql $sql`
  lastModifiedTime=`echo $bqResult | sed -e "s/+---------------------+//g" -e "s/ | //g" -e "s/last_modified_time//g"`
  lastModifiedTimeUt=$(date -d "${lastModifiedTime}" +%s)
  # 指定日時より先の場合は成功(真)、以前の場合は失敗(偽)
  if [[ $lastModifiedTimeUt -gt $2 ]]; then
    return 0
  else
    return 1
  fi
}

#
# エラーメールの送信
# 送信元: hoge@example.com 
# 送信先: fuga@example.com
#
# $1 エラーメッセージ
#
function sendErrorMail(){
  HOSTNAME=$(hostname)
  echo -e $1
  echo -e $1 | mail -s "テーブル更新処理失敗@${HOSTNAME}" -r hoge@example.com fuga@example.com
}

■ポイント

  • embulkのトランザクション制御がall or nothingを保障していること利用し、テーブルの最終更新日時(last_modified_time)で、データ移行の成否を判定しているところ
    → bqコマンドの実行結果そのままだと、余分な文字列が入るため、sedコマンドを用いて日時以外の部分を削除しています

コマンド実行後の終了コードで判定すればいいのでは?と思われる方もいるかもですが、
Embulkのエラー処理について調べてみる | 今日もプログラミング
によれば、エラーが発生しても終了コードが0で帰ってくるパターンがあるらしく、これでは正確な判定はできないな、と思い最終更新日時を用いました

③ convertTable.sh

処理実行コマンドを記載したファイル
複数のテーブルに対し、embulkコマンドを実行し、データ移行を実施します

convertTable.sh
#!bin/bash

#
# テーブルデータ取り込み用
#

# シェルファイルのパスに移動
cd $(dirname $0)

# 共通関数ファイルの読み込み
. ./common.sh
# 各種変数の設定
source ./setVariable.sh

# daily_logsは、前日分のみ処理対象
START_DATE=`date -d "1 day ago" +'%Y-%m-%d'` && export START_DATE
END_DATE_NEXT=`date +'%Y-%m-%d'` && export END_DATE_NEXT
TARGET_MONTH=`date -d "1 day ago" +'%Y%m'` && export TARGET_MONTH

# 処理対象テーブル
tables=('shops' 'members_tmp' 'daily_logs')
# 取り込みエラーの発生したテーブル
errorTables=''

echo 'テーブルデータ更新処理開始'

for table in ${tables[@]}
do
  retryCount=$retry
  while :
  do
    echo -e "\n$tableの更新を開始します\n"
    embulk run ./$table.yml.liquid
    # 最終更新日時チェックで成否を判定
    checkLastModifiedTime $table $startTimeUt
    result=$?
    # テーブルの最終更新日時が処理開始日時より前の場合は、処理失敗→一定回数リトライ
    if [[ $result -ne 0  && $retryCount -gt 0 ]]; then
      echo "$tableの更新に失敗しました"
      sleep $sleepTime
      $((retryCount--))
      echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
    else
      if [[ $result -eq 0 ]]; then
        echo "$tableの更新に成功しました"
      else
        echo "$tableの更新に失敗しました"
        $errorTables+=$table','
      fi
      break
    fi
  done
done

if [[ "$errorTables" != *members_tmp* ]]; then
  # 本来地理データ型のデータの箇所をstring → geographyに変換し、membersテーブルに投入
  retryCount=$retry
  table='members'
  while :
  do
    bq query \
    --destination_table test_bq.members \
    --replace \
    --use_legacy_sql=false \
        "select member_id, name, shop_name, regist_date, sex, birthday, latitude, longitude, ST_GeogFromText(location, planar=>TRUE, make_valid =>TRUE) as location, town_code, city_code from test_bq.members_tmp
    where location is null
     or ((-180 <= cast(replace(split(location, ' ')[offset(0)], 'POINT(','') as float64) and cast(replace(split(location, ' ')[offset(0)], 'POINT(','') as float64) <= 180)
     and (-90 <= cast(replace(split(location, ' ')[offset(1)], ')','') as float64) and cast(replace(split(location, ' ')[offset(1)], ')','') as float64) <= 90))"
    checkLastModifiedTime 'shops' $startTimeUt
    $result=$?
    # テーブルの最終更新日時が処理開始日時より前の場合は、処理失敗→一定回数リトライ
    if [[ $result -ne 0  && $retryCount -gt 0 ]]; then
      echo "$tableの更新に失敗しました"
      sleep $sleepTime
      $((retryCount--))
      echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
    else
      if [[ $result -eq 0 ]]; then
        echo "$tableの更新に成功しました"
      else
        echo "$tableの更新に失敗しました"
        errorTables+=$table','
      fi
      break
    fi
  done
fi

if [[ -n "$errorTables" ]]; then
  message="${errorTables/%?/}の更新処理に失敗しました\n"
  message+="詳細はログをご確認ください\n"
  sendErrorMail $message
else
  echo '全てのテーブルの更新に成功しました'
fi

# BigQuery移行用の一時ファイルを削除
rm -f ./tmp/.*.csv.gz

echo 'テーブルデータ更新処理終了'

■ポイント

  • daily_logsに関し、前日分のデータを取り込めるよう、環境変数設定をしている

  • PostgreSQL → BigQuery移行対象のテーブルを配列に入れることで処理をスッキリさせている

  • embulkコマンド実行時にリトライ処理を入れている
    → プラグインのロードエラーなどもそこそこ起こるっぽい
    [参考]
    https://twitter.com/hiroysato/status/1237745198337150976?s=20

  • 一度地理データ型のカラムlocationをstring型として、tmpテーブル(members_tmp)に取り込んだものを地理関数(ST_GeogFromText)で変換して本テーブル(members)に取り込んでいる

  • テキスト値(WKT)→ geography型への変換時に緯度経度に不正な値が入っていた場合に省く条件を入れている(今回locationのWKTが、 POINT(経度 緯度) 例: POINT(139.57785717404542 35.64436768705323) となっているため上述の書き方)
    → PostgreSQL(PostGIS)のgeometry不正な値でも投入されてしまうようですが、BigQueryだとエラーで落ちる

  • 処理の最後にBigQuery移行用に作成された一時ファイルを削除する処理を入れている
    Class: Tempfile | ruby-doc.prgによれば、通常は自動で削除されるが、削除までにかなりの期間がかかる場合があるので、明示的に削除したほうがよいとのこと
    →→ このいわゆるゴミが原因でディスクの空き容量が不足し処理が落ちたことが何度もありました(途中で処理が落ちた時なども、念のため、tmpディレクトリ下にゴミが残っていないか確認するようにするとよさそう)
    →→ 複数処理ファイルを作成する場合はサーバのスペックと相談の上、並列処理を許可するか決定、許可する場合は、tmpディレクトリ下に処理ファイル別のディレクトリをきり、_out_bigquery_header.yml.liquidに記載のpath_prefixを個別ファイル単位での記述に変更するようにしましょう(別の処理による削除を防ぐため)。許可しない場合はREADMEなどにその旨を書いておくのがベター

こと

4. クーロン設定

最後にクーロン設定です
毎日6:00に、convertTable.shを実行し、ログの記録もする、という内容にしてます

00 06 * * * PATH=$PATH:/home/embulk/google-cloud-sdk/bin:/home/embulk/.embulk/bin; bash /home/embulk/hogehoge/convertTable.sh >> /home/embulk/hogehoge/log/postgres_to_bigquery_.$(date +'\%Y\%m\%d').log 2>&1

■ポイント

  • 処理実行前にCloud SDK, embulkへのパスを通している
    → 今回、作成したembulkユーザーにてCloud SDK, embulkをインストールしているので、そのままではクーロン実行時に使えないため(/etc/crontabに追記する、でもよいでしょう)
    →→ パスがわからない場合は、whichコマンドで確認するようにしましょう

こと

終わりに

環境変数を使えば、定期的に日時範囲指定をしてのデータ移行もできるので便利!

クーロン実行関係ない部分が多くなってしまった点、一部補足がやたらと長くなってしまった点ご容赦を。。

Glue を用いて Aurora MySQL から BigQuery にデータを Export する | Qiita
を見ていると、Amazon Aurora → BigQueryの移行には、Amazon Glueを使う手もあるのかな

こちらも試してみたいです

参考

PostgreSQL

BigQuery

embulk

クーロン

Discussion