🧭

[embulk]PostgreSQL→BigQueryに大量のデータを移行する

2022/08/24に公開

PostgreSQL→BigQueryに大量のデータを移行する内容の覚書きです

概要

大量のデータを一気にデータ移行すると、処理が途中で落ちてしまうことがあるため、
BigQuery上に一時テーブルを作成し、一定数ずつデータ移行した後、
一時テーブルからの移行の宛先テーブルに正規の移行先のテーブルを指定する方法で実施します

環境

  • 移行元: Amazon Aurora(PostgreSQL互換)
    → VPC(AWS)のプライベートサブネットにある想定
  • 移行先: BigQuery
  • embulk: バージョン0.9.24
    → 移行元のAmazon Aurora(PostgreSQL互換)と同じVPC内のパブリックサブネットにEC2をたて、そこにインストールして使います(以下、embulkサーバと呼びます)

今回は、Amazon Aurora(PostgreSQL互換)→BigQueryへのデータ移行になりますが、embulkの設定ファイルを対象環境用の記述に合わせれば実行できるはず

前提条件

embulkサーバにて、embulk実行専用のembulkユーザーを作成し、

であること

イメージ図

雑多な感じですが、イメージ図は以下の通り

ソースファイル

移行対象のテーブルは、
[embulk]データ移行をクーロン実行する
に記載のmembersテーブルであるものとします
また、一時テーブルであるmembers_tmpもmembersと同じものとします

設定ファイル2ファイルと共通関数用のファイル, 実行用のシェルファイルの計4ファイルを用います

(1) 設定ファイル(データ件数カウント用)

移行対象のデータ件数をカウントするための設定ファイルです

members_count.yml.liquid
in:
  type: postgresql
  host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
  user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
  password: zzzzzzz # 上記ユーザーのパスワードを入力
  database: sample1
  query: |
    select
      count(1)
    from
      members
out:
  type: stdout

ポイント

  • 結果を標準出力することで、後続のシェルファイルで変数に入れる → 移行対象の件数分移行できるようにしています

(2) 設定ファイル(データ移行用)

Amazon Aurora(PostgreSQL互換)上のmembersテーブル → BigQuery上のmembers_tmpにデータを移行する設定ファイルです
今回の根幹ともいえる存在

members_tmp.yml.liquid
in:
  type: postgresql
  host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
  user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
  password: zzzzzzz # 上記ユーザーのパスワードを入力
  database: sample1
  query: |
    select
      *
    from
      members
    order by
      date, content_id, user_id
    limit {{ env.LIMIT }} offset {{ env.OFFSET }}
  column_options:
    regist_date: {type: string, timestamp_format: "%Y-%m-%d", timezone: "Asia/Tokyo"}
    birthday: {type: string, timestamp_format: "%Y-%m-%d", timezone: "Asia/Tokyo"}
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: /sample1/aaaaa.json # /sample1 ディレクトリにBigQuery接続用のキーaaaaa.jsonを配置
  path_prefix: /sample1/tmp/embulk/ # 一時ディレクトリとして/sample1/tmp/embulkを作成
  file_ext: .csv.gz
  source_format: CSV
  project: sample # プロジェクト名を入力
  dataset: sample1
  auto_create_table: true
  table: members_tmp
  column_options:
    - {name: member_id, mode: REQUIRED} # 必須項目
    - {name: name, mode: REQUIRED} # 必須項目
    - {name: regist_date, type: date, mode: required} # 必須項目, 日付をdate型に指定
    - {name: birthday, type: date} # 日付をdate型に指定
  formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
  encoders:
  - {type: gzip}

ポイント

  • データを必ず同じ順で取得できるようソート
  • この後のシェルファイルで行う環境変数設定によって、一定件数ずつ移行できるようにしています

(3) シェルファイル(共通関数用)

他の処理でも共通して使えそうな内容を関数化したシェルファイルです

common.sh
#!bin/bash

#
# 共通ファイル
# 各シェルファイルで使う共通変数、関数記載用
#

# 共通で使用する変数を定義
# 処理開始日時をUnixTimeに変換
startTimeUt=`date +%s`
# embulkでの処理のリトライ回数上限
retry=3
# リトライ時の待機時間
sleepTime='60s'

#
# テーブルの最終更新日時によって、データ取り込み時のエラー発生有無を判定
#
# $1 テーブル名
# $2 処理開始日時
#
function checkLastModifiedTime(){
  sql='select format_timestamp("%F %T", timestamp_millis(last_modified_time), "Asia/Tokyo") as last_modified_time
       from `sample1.__TABLES__` where table_id = "'$1'"'
  bqResult=`bq query --nouse_legacy_sql $sql`
  lastModifiedTime=`echo $bqResult | sed -e "s/+---------------------+//g" -e "s/ | //g" -e "s/last_modified_time//g"`
  lastModifiedTimeUt=$(date -d "${lastModifiedTime}" +%s)
  # テスト内容をより明確にするために一時的にechoする処理を入れておく
  echo "処理開始日時のタイムスタンプ:"$2
  echo "テーブル最終更新日時のタイムスタンプ:"$lastModifiedTimeUt 
  # 指定日時より先の場合は成功(真)、以前の場合は失敗(偽)
  if [[ $lastModifiedTimeUt -gt $2 ]]; then
    return 0
  else
    return 1
  fi
}

#
# エラーメールの送信
#
# $1 エラーメッセージ
#
function sendErrorMail(){
  HOSTNAME=$(hostname)
  echo -e $1
  echo -e $1 | mail -s "$2@${HOSTNAME}" -r hoge@example.com fuga@example.com
}

ポイント

  • embulkの性質(トランザクション制御でall or nothing)を使用し、処理の成否を判定している
  • データの洗い替えやエラー発生時の終了コードが1にならないケースにも対応できます

(4) シェルファイル(処理実行用)

moveUserVisitLog.sh
#!bin/bash

# シェルファイルのパスに移動
cd $(dirname $0)

# 共通ファイルの読み込み
. ./common.sh

# 1回あたりの処理件数を設定: 今回は、10万件
LIMIT=100000 && export LIMIT
# リトライ処理の回数: 今回は、3回
retry=3
# リトライ処理時の待機時間: 今回は、60秒
sleepTime=60
# 異常終了メールのタイトル
title="membersテーブル更新処理失敗"

echo '処理開始'

# 移行対象テーブルの件数確認
retryCount=$retry # リトライ回数初期化
while :
do
  result=`embulk run ./members_count.yml.liquid --log-level error`
  count=`echo "$result" | head -n 2 | tail -f -n 1`
  expr "$count" + 1 >&/dev/null
  isNum=$?
  if [ $isNum -ge 2 && $retryCount -gt 0 ]; then
  # 実行結果が不正な(数字でない)場合は2以上になる
    echo "membersテーブルの件数取得に失敗しました"
    sleep $sleepTime
    $((retryCount--))
    echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
  else
      if [ $isNum -lt 2 ]; then
        echo "移行対象は"$count"件です"
        break
      else
        # 一定回数以上失敗した場合は処理を中断: membersテーブル更新後の検知だと時間差が大きすぎるため、ここでいったんストップ
        message="membersテーブルの件数取得に失敗しました\n"
        message+="詳細はログをご確認ください\n"
        sendErrorMail $message $title
        exit
      fi
    fi
done

# 一時テーブルのデータを全件削除
bq query \
--use_legacy_sql=false \
"truncate table
  sample1.members_tmp"
   
# 1回あたりの処理件数ずつデータ移行
for((i=0 ; i < $count ; i=$i+$LIMIT))
do
  OFFSET=$i && export OFFSET
  retryCount=$retry
  table='members_tmp'
  while :
  do
    echo "$OFFSET件目から$LIMIT件移行します"
    # 成否判定のため、処理開始日時を前の処理よりもあとのタイムスタンプに更新
    startTimeUt=`date +%s`
    embulk run ./$table.yml.liquid --log-level warn
    checkLastModifiedTime $table $startTimeUt
    result=$?
    if [[ $result -ne 0  && $retryCount -gt 0 ]]; then
      echo "$tableの更新に失敗しました"
      sleep $sleepTime
      $((retryCount--))
      echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
    else
      if [[ $result -eq 0 ]]; then
        echo "$OFFSET件目から$LIMIT件の移行に成功しました"
        break
      else
        # 一定回数以上失敗した場合は処理を中断
        message="$tableの更新に失敗しました\n"
        message+="詳細はログをご確認ください\n"
        sendErrorMail $message $title
        exit
      fi  
    fi
  done
done

# 一時テーブル → 本テーブルへ移行
retryCount=$retry
table='members'
tableOrigin='members_tmp'
while :
do
  bq query \
  --destination_table sample1.$table \
  --replace \
  --use_legacy_sql=false \
  "select
    *
   from
    sample1.$tableOrigin"
   checkLastModifiedTime $table $startTimeUt
  result=$?
  # テーブルの最終更新日時が処理開始日時より前の場合は、処理失敗→一定回数リトライ
  if [ $result -ne 0  && $retryCount -gt 0 ]; then
    echo "$tableの更新に失敗しました"
    sleep $sleepTime
    $((retryCount--))
    echo "リトライします。リトライ回数:$(($retry-$retryCount)) 回目"
  else
    if [[ $result -eq 0 ]]; then
      echo "$tableの更新に成功しました"
      break
    else
      message="$tableの更新に失敗しました\n"
      message+="詳細はログをご確認ください\n"
      sendErrorMail $message $title
      exit
    fi
  fi
done

# BigQuery移行用の一時ファイルを削除
rm -f ./tmp/.*.csv.gz

echo '処理終了'

ポイント

  • embulkによる処理はそれなりに落ちる可能性があるので、リトライ処理をいれています

終わりに

サーバのスペックの関係もあると思いますが、、大量のデータを一気に移行しようとしてembulkの処理が落ちてしまったので、分割して移行することにしました
一時テーブルに少しずつデータを移行した後、BigQuery側で本テーブルに一気に移行することで、大量データの洗い替える場合でも不具合なく移行することができます
移行件数については、embulkサーバのスペックと要相談(本記事では10万件ずつですが、実際は2,000万件単位で行っています → EC2はt3.medium)

参考

Discussion