🧭

[embulk]PostgreSQLでtimestamp型(without time zone)のデータをBigQueryに移行する

2022/06/26に公開

PostgreSQLでwithout time zoneのtimestamp型のデータをBigQueryに移行する際にはまったので覚書き
移行は、embulkというバルクデータローダー(データを抽出→転送するためのツール)を用いて行います

環境

  • 移行元: PostgreSQL 13.4(厳密にはAmazon Aurora(PostgreSQL互換))
    → timezoneはAsia/Tokyoとします(「show timezone;」コマンドで確認できます)
  • 移行先: BigQuery
  • embulk稼働用: Amazon Linux2
    → わりとメモリを食うので、データ量が多い場合はt3.microとかではなくt3.mediumなどそこそこメモリサイズ大きめのものにしておくとよい

1. 事前準備

まず事前準備を行います
embulkの動作するサーバは使用環境の整備から、PostgtreSQL, BigQueryについてはデータベース自体の準備はできているものとします

Cloud SDK

embulkの動作環境(サーバ)にCloud SDKをインストールします

# ホームに移動
$ cd ~
$ pwd

# DL
$ curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-370.0.0-linux-x86_64.tar.gz

# 解凍・展開
$ tar xfvz google-cloud-sdk-370.0.0-linux-x86_64.tar.gz

# インストール: いくつか質問が出るが全て「y」でOK
$ ./google-cloud-sdk/install.sh

# パスを通しておく
echo 'export PATH="$HOME/google-cloud-sdk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

# 初期化する
gcloud init

# [ACCOUNT]には、BigQuery接続用のサービス アカウントのメール欄の値、[KEY_FILE]には接続用の鍵ファイル名(embulkサーバに配置してある想定)を入れる(※)
$ gcloud auth activate-service-account [ACCOUNT] --key-file=[KEY_FILE]

# 以下のようなメッセージが出ていた場合は、
Updates are available for some Cloud SDK components.  To install them,
please run:
  $ gcloud components update

# 下記のコマンドも実行しておく
$ gcloud components update

※BigQuery接続用のサービスアカウントの作成がまだの場合はサービス アカウントの作成と管理 | IAM -Google Cloudを参考に作成すること

embulk

続いて、embulkの使用環境の整備を行います

(1) Java8(JRE)のインストール

embulkの動作に必要なJava8(JRE)をインストールします
今回、OSがAmazon Linux2のため同じくAmazonの提供しているOpenJDKであるAmazon Correto8をチョイスしました

※2022/06/24現在、安定板のv0.9は、Java9以降には対応していないので、8より上のバージョンをインストールしないよう注意!

インストール方法は以下の通り

Amazon Linux2の場合
# インストール
$ sudo amazon-linux-extras enable corretto8
$ sudo yum install java-1.8.0-amazon-corretto
$ sudo yum install java-1.8.0-amazon-corretto-devel

# バージョン確認
$ java -version

なお、OSがAmazon Linux2でない場合のインストールコマンドは以下の通り

Amazon Linux2でない場合
# インストール
$ sudo rpm --import https://yum.corretto.aws/corretto.key 
$ sudo curl -L -o /etc/yum.repos.d/corretto.repo https://yum.corretto.aws/corretto.repo
$ sudo yum install -y java-1.8.0-amazon-corretto-devel

# バージョン確認
$ java -version

(2) embulkのインストール

以下のコマンドを順に実行してインストールします(公式のTOPページに載っている内容そのままです
embulkを使用する予定のユーザーで実行しましょう

# インストール
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
# バージョン確認
$ embulk -version

(3) 必要なプラグインのインストール

今回は、PostgreSQL → BigQueryに移行するので、
①embulk-input-postgresql: PostgreSQLからのデータ抽出用
②embulk-output-bigquery: BigQueryへのデータ転送用
のプラグインを入れます。

(2) に引き続き、以下のとコマンドを実行

# embulk-input-postgresqlのインストール
$ embulk gem install embulk-input-postgresql

# embulk-output-bigqueryのインストール
$ embulk gem install jwt:2.3.0 # 旧バージョンのjwtをインストール
$ embulk gem install embulk-output-bigquery

# インストールの確認: インストールしたプラグインが表示されればOK
$ embulk gem list 

→ embulk-output-bigqueryが依存しているjwtのバージョンと対応するRubyのバージョン, embulkのバージョンの関係で旧バージョンのjwtをインストールする手順を入れています

詳細は、
embulk-output-bigqueryインストール時にRuby version >= 2.5でエラーになる問題 | Qiita
を参考に

PostgreSQL

sample1というスキーマに、user_visit_logというテーブルがあるという想定でいきます
DDLは以下の通り

user_visit_log.ddl
create table sample1.user_visit_log (
  date timestamp(6) without time zone not null
  , content_id integer not null
  , user_id character varying(20) not null
);

移行用のテストデータも作成しておきます

user_visit_log テストデータ
insert into sample1.user_visit_log values ('2022/06/25 00:00:10', 12345, 'a7dc9j0sk');
insert into sample1.user_visit_log values ('2022/06/25 00:00:13', 12356, 'b2d3kj1as');
insert into sample1.user_visit_log values ('2022/06/25 00:00:17', 12345, 'z1a9kj3pa');
insert into sample1.user_visit_log values ('2022/06/25 00:00:21', 12368, 'l5az7j8rb');
insert into sample1.user_visit_log values ('2022/06/25 00:00:22', 12362, 'u8vb6j8xg');
-- 結果確認用
select * from sample1.user_visit_log;

BigQuery

PostgreSQL環境に合わせ、sample1というデータセットに、user_visit_logというテーブルを作ります

DDLは以下の通りで、コンソールのクエリエディタから実行しておきます

sample1.user_visit_log
create table `sample1.user_visit_log` (
  date timestamp not null
  , content_id int64 not null
  , user_id string not null
);

→ 「データセット名.テーブル名」は、バッククォートで囲わなくても実行できますが、SQLが入れ子構造になった場合などに失敗することがあるので、日ごろから上述のようにバッククォートで囲うことを意識するとよいです

※PostgreSQLとBigQueryのデータ型は完全には一致していない(PostgreSQLのデータ型の分類が細かく、BigQueryがザックリしている感じ)ので、以下を参考に対応するデータ型を設定しました。

2. データ移行の実施

(1) データ移行情報記載ファイルの作成

embulk実行用のサーバにて、データ移行情報を記載したymlファイルを作成します

user_visit_log.yml
in:
  type: postgresql
  host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
  user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
  password: zzzzzzz # 上記ユーザーのパスワードを入力
  database: sample1
  table: user_visit_log
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: /sample1/aaaaa.json # /sample1 ディレクトリにBigQuery接続用のキーaaaaa.jsonを配置
  path_prefix: /sample1/tmp/embulk/ # 一時ディレクトリとして/sample1/tmp/embulkを作成
  file_ext: .csv.gz
  source_format: CSV
  project: sample # プロジェクト名を入力
  dataset: sample1
  auto_create_table: true
  table: user_visit_log
  column_options:
    - {name: date, mode: REQUIRED}
    - {name: content_id, mode: REQUIRED}
    - {name: user_id, mode: REQUIRED}
  formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
  encoders:
  - {type: gzip}

(2) データ移行確認コマンドの実行

以下のように、実行内容の確認コマンドを実行してymlファイルのin配下の記述に問題がないか確認します

データ移行確認コマンドと結果
# 実行内容の確認
$ embulk preview user_visit_log.yml
# 以下embulk previewコマンド実行後に、想定される表示内容
+-------------------------+-----------------+----------------+
|          date:timestamp | content_id:long | user_id:string |
+-------------------------+-----------------+----------------+
| 2022-06-24 15:00:10 UTC |          12,345 |      a7dc9j0sk |
| 2022-06-24 15:00:13 UTC |          12,356 |      b2d3kj1as |
| 2022-06-24 15:00:17 UTC |          12,345 |      z1a9kj3pa |
| 2022-06-24 15:00:21 UTC |          12,368 |      l5az7j8rb |
| 2022-06-24 15:00:22 UTC |          12,362 |      u8vb6j8xg |
+-------------------------+-----------------+----------------+

上記のように、移行対象のデータが、対応するembulk typeとともに表示されます
これは移行元→embulk取り込み時に変換される内容を表しています
データ型に関し、timestampは概ねそのままですが、integer → long, character varying(20) → stringと変換されていますね

(3) データ移行コマンドの実行

(2) のデータ移行確認コマンドの結果に問題がなければ、データ移行コマンドを実行してBigQueryにデータを取り込みましょう

データ移行コマンド
# データ移行の実行
$ embulk run user_visit_log.yml

※previewコマンドが成功してもembulk→BigQueryの取り込み時エラーなどで、runコマンドが失敗する場合もあります
その場合は、メッセージに従って修正後、再度実行してください

(4) 結果の確認

BigQueryにて、以下のコマンドを実行して中身を確認

確認用SQL
select * from `sample1.user_visit_log`;

おそらくdateの日時が-9時間 UTCになっているかと(もっというと、embulk previewの時点でそうなってますが)
PostgreSQLではdateはtimestamp(6) without timezoneとなっていますが、冒頭の「環境」に記載の通りPostgreSQL自体のタイムゾーン設定がAsia/Tokyoなのでそちらに従います

一方、BigQueryのtimestamp型にはタイムゾーンの概念がなく、デフォルト表記がUTCになるため、-9時間で表示されてしまったというわけ

Asia/Tokyoで表示したい場合は以下のようにすればよいです

select 
 format_timestamp('%Y-%m-%d %H:%M:%S', date, 'Asia/Tokyo') as date,
 content_id,
 user_id
from 
 `sample1.user_visit_log`;

余談ですが、上記のようにフォーマットした場合は、string型扱いになる(dateカラム)ので、クエリ結果を別テーブルなどに取り込む場合は注意が必要です

なお、日本時間(Asia/Tokyo)ベースでdateで絞り込みなど行う場合は以下のようにします

dateが日本時間で2022-06-01 00
select 
 format_timestamp('%Y-%m-%d %H:%M:%S', date, 'Asia/Tokyo') as date,
 content_id,
 user_id
from 
 `sample1.user_visit_log`
where
 timestamp('2022-06-01 00:00:00', 'Asia/Tokyo') <= date;

条件句でtimestamp関数を用い、タイムゾーンを設定するのがポイントですね

なお、timestamp関数を使わなくてもクエリの実行自体は可能ですが、タイムゾーンがUTC扱いになってしまうので、想定通りの結果が得られません

3. 代替案

とはいえ、上述の方法だとBigQueryの知見がない方が来た際に勘違いやミスの原因にもなりかねません
タイムゾーンを意識しないなら、BigQuery側では、datetime型を用いるのもひとつの手

sample1.user_visit_log(BigQuery側で実行)
create table `sample1.user_visit_log2` (
  date datetime not null
  , content_id int64 not null
  , user_id string not null
);

テーブル名は重複を防ぐため、user_visit_log2としています

データ移行用のymlファイルは以下のようになります

user_visit_log2.yml
in:
  type: postgresql
  host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
  user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
  password: zzzzzzz # 上記ユーザーのパスワードを入力
  database: sample1
  table: user_visit_log
  column_options:
    date: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Tokyo"}
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: /sample1/aaaaa.json # /sample1 ディレクトリにBigQuery接続用のキーaaaaa.jsonを配置
  path_prefix: /sample1/tmp/embulk/ # 一時ディレクトリとして/sample1/tmp/embulkを作成
  file_ext: .csv.gz
  source_format: CSV
  project: sample # プロジェクト名を入力
  dataset: sample1
  auto_create_table: true
  table: user_visit_log2
  column_options:
    - {name: date, type: datetime, mode: REQUIRED}
    - {name: content_id, mode: REQUIRED}
    - {name: user_id, mode: REQUIRED}
  formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
  encoders:
  - {type: gzip}

PostgreSQLからの抽出時に一度string型に変換し、BigQueryへの移行時にdatetimeに変換するという点がポイント

以下によれば、PostgreSQLでのデータ型がtimestamp型のwithout timezoneだとembulkがタイムゾーンを考慮してくれないらしいので(実際やってみてもそうなった)、抽出時にtimezoneも指定しておきます(触れてませんでしたが、2-(2)でembulk previewした時の時刻が-9時間表記になっているのも、この点が原因かと)
Embulkと日付データ(その2: PostgreSQLへのデータ投入) | Qiita

続いて、確認コマンドの実行→問題なければ、移行コマンドを実行しましょう

# 確認コマンドの実行
$ embulk preview user_visit_log2.yml
# 確認コマンドの実行結果
+---------------------+-----------------+----------------+
|         date:string | content_id:long | user_id:string |
+---------------------+-----------------+----------------+
| 2022-06-25 00:00:10 |          12,345 |      a7dc9j0sk |
| 2022-06-25 00:00:13 |          12,356 |      b2d3kj1as |
| 2022-06-25 00:00:17 |          12,345 |      z1a9kj3pa |
| 2022-06-25 00:00:21 |          12,368 |      l5az7j8rb |
| 2022-06-25 00:00:22 |          12,362 |      u8vb6j8xg |
+---------------------+-----------------+----------------+
# データ移行の実行
$ embulk run user_visit_log2.yml

確認コマンドの結果を見ると、dateのembulk typeがstring型となっており、日本時間で表示されていることがわかります

これなら特にフォーマットせず、想定通りの結果が得られます
BigQueryでの結果確認用SQLを実行して確認してみましょう

結果確認用SQL
select 
 *
from 
 `sample1.user_visit_log2`;

日時で絞り込む場合は以下の通り

select 
 *
from 
 `sample1.user_visit_log2`
where
 datetime('2022-06-01 00:00:00') <= date;
 
 -- 以下でもOK
 select 
 *
from 
 `sample1.user_visit_log2`
where
 '2022-06-01 00:00:00' <= date;

まとめ

今回のポイントは、

  • BigQueryのtimestamp型自体にはタイムゾーンがなく、デフォルト表記はUTCである
  • PostgreSQLでtimezone型(without timezone)の場合は、embulkがtimezoneを考慮してくれない(ので、datetime型などにする場合は指定が必要)
    というところかと

上記を考慮の上、timezone型のデータを適切な形で移行するようにしたいですね

補足事項(2022年8月15日追記)

date型に関しても、そのままだとUTC扱いとなるようで、BigQuery移行時に-1日になってしまう現象が見られました。
timestamp型のカラムと同様、in配下のcolumn_optionsでフォーマットとtimezoneを指定することで本現象を防ぐことができたので、こちらに記載しておきます。

以下、サンプルです。

sample_date.yml
in:
  ### 省略 ### 
  table: sample_date
  column_options: # date型なので、時分秒は不要
    date: {type: string, timestamp_format: "%Y-%m-%d", timezone: "Asia/Tokyo"}
out:
  ### 省略 ### 
  column_options:
    - {name: date, type: date, mode: REQUIRED}
  ### 省略 ### 

日付関連のデータ型については、想定通りの値が投入されるか一通り確認するのがよさそう。

参考

1. 事前準備

Cloud SDK

embulk

(1) Java8(JRE)のインストール
(2) embulkのインストール

2. データ移行の実施

(1) データ移行情報記載ファイルの作成

(3) 結果の確認

3. 代替案

更新履歴

更新日 更新内容
2022/10/04 Cloud SDKのインストール手順の追加

Discussion