[embulk]PostgreSQLでtimestamp型(without time zone)のデータをBigQueryに移行する
PostgreSQLでwithout time zoneのtimestamp型のデータをBigQueryに移行する際にはまったので覚書き
移行は、embulkというバルクデータローダー(データを抽出→転送するためのツール)を用いて行います
環境
- 移行元: PostgreSQL 13.4(厳密にはAmazon Aurora(PostgreSQL互換))
→ timezoneはAsia/Tokyoとします(「show timezone;」コマンドで確認できます) - 移行先: BigQuery
- embulk稼働用: Amazon Linux2
→ わりとメモリを食うので、データ量が多い場合はt3.microとかではなくt3.mediumなどそこそこメモリサイズ大きめのものにしておくとよい
1. 事前準備
まず事前準備を行います
embulkの動作するサーバは使用環境の整備から、PostgtreSQL, BigQueryについてはデータベース自体の準備はできているものとします
Cloud SDK
embulkの動作環境(サーバ)にCloud SDKをインストールします
# ホームに移動
$ cd ~
$ pwd
# DL
$ curl -O https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-370.0.0-linux-x86_64.tar.gz
# 解凍・展開
$ tar xfvz google-cloud-sdk-370.0.0-linux-x86_64.tar.gz
# インストール: いくつか質問が出るが全て「y」でOK
$ ./google-cloud-sdk/install.sh
# パスを通しておく
echo 'export PATH="$HOME/google-cloud-sdk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
# 初期化する
gcloud init
# [ACCOUNT]には、BigQuery接続用のサービス アカウントのメール欄の値、[KEY_FILE]には接続用の鍵ファイル名(embulkサーバに配置してある想定)を入れる(※)
$ gcloud auth activate-service-account [ACCOUNT] --key-file=[KEY_FILE]
# 以下のようなメッセージが出ていた場合は、
Updates are available for some Cloud SDK components. To install them,
please run:
$ gcloud components update
# 下記のコマンドも実行しておく
$ gcloud components update
※BigQuery接続用のサービスアカウントの作成がまだの場合はサービス アカウントの作成と管理 | IAM -Google Cloudを参考に作成すること
embulk
続いて、embulkの使用環境の整備を行います
(1) Java8(JRE)のインストール
embulkの動作に必要なJava8(JRE)をインストールします
今回、OSがAmazon Linux2のため同じくAmazonの提供しているOpenJDKであるAmazon Correto8をチョイスしました
※2022/06/24現在、安定板のv0.9は、Java9以降には対応していないので、8より上のバージョンをインストールしないよう注意!
インストール方法は以下の通り
# インストール
$ sudo amazon-linux-extras enable corretto8
$ sudo yum install java-1.8.0-amazon-corretto
$ sudo yum install java-1.8.0-amazon-corretto-devel
# バージョン確認
$ java -version
なお、OSがAmazon Linux2でない場合のインストールコマンドは以下の通り
# インストール
$ sudo rpm --import https://yum.corretto.aws/corretto.key
$ sudo curl -L -o /etc/yum.repos.d/corretto.repo https://yum.corretto.aws/corretto.repo
$ sudo yum install -y java-1.8.0-amazon-corretto-devel
# バージョン確認
$ java -version
(2) embulkのインストール
以下のコマンドを順に実行してインストールします(公式のTOPページに載っている内容そのままです
embulkを使用する予定のユーザーで実行しましょう
# インストール
$ curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc
# バージョン確認
$ embulk -version
(3) 必要なプラグインのインストール
今回は、PostgreSQL → BigQueryに移行するので、
①embulk-input-postgresql: PostgreSQLからのデータ抽出用
②embulk-output-bigquery: BigQueryへのデータ転送用
のプラグインを入れます。
(2) に引き続き、以下のとコマンドを実行
# embulk-input-postgresqlのインストール
$ embulk gem install embulk-input-postgresql
# embulk-output-bigqueryのインストール
$ embulk gem install jwt:2.3.0 # 旧バージョンのjwtをインストール
$ embulk gem install embulk-output-bigquery
# インストールの確認: インストールしたプラグインが表示されればOK
$ embulk gem list
→ embulk-output-bigqueryが依存しているjwtのバージョンと対応するRubyのバージョン, embulkのバージョンの関係で旧バージョンのjwtをインストールする手順を入れています
詳細は、
embulk-output-bigqueryインストール時にRuby version >= 2.5でエラーになる問題 | Qiita
を参考に
PostgreSQL
sample1というスキーマに、user_visit_logというテーブルがあるという想定でいきます
DDLは以下の通り
create table sample1.user_visit_log (
date timestamp(6) without time zone not null
, content_id integer not null
, user_id character varying(20) not null
);
移行用のテストデータも作成しておきます
insert into sample1.user_visit_log values ('2022/06/25 00:00:10', 12345, 'a7dc9j0sk');
insert into sample1.user_visit_log values ('2022/06/25 00:00:13', 12356, 'b2d3kj1as');
insert into sample1.user_visit_log values ('2022/06/25 00:00:17', 12345, 'z1a9kj3pa');
insert into sample1.user_visit_log values ('2022/06/25 00:00:21', 12368, 'l5az7j8rb');
insert into sample1.user_visit_log values ('2022/06/25 00:00:22', 12362, 'u8vb6j8xg');
-- 結果確認用
select * from sample1.user_visit_log;
BigQuery
PostgreSQL環境に合わせ、sample1というデータセットに、user_visit_logというテーブルを作ります
DDLは以下の通りで、コンソールのクエリエディタから実行しておきます
create table `sample1.user_visit_log` (
date timestamp not null
, content_id int64 not null
, user_id string not null
);
→ 「データセット名.テーブル名」は、バッククォートで囲わなくても実行できますが、SQLが入れ子構造になった場合などに失敗することがあるので、日ごろから上述のようにバッククォートで囲うことを意識するとよいです
※PostgreSQLとBigQueryのデータ型は完全には一致していない(PostgreSQLのデータ型の分類が細かく、BigQueryがザックリしている感じ)ので、以下を参考に対応するデータ型を設定しました。
- 第8章 データ型 | PostgreSQL 13.1文書
- データ型 | BigQuery | Google Cloud
- embulk-input-postgresql | GitHub
- embulk-output-bigquery | GitHub
2. データ移行の実施
(1) データ移行情報記載ファイルの作成
embulk実行用のサーバにて、データ移行情報を記載したymlファイルを作成します
in:
type: postgresql
host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
password: zzzzzzz # 上記ユーザーのパスワードを入力
database: sample1
table: user_visit_log
out:
type: bigquery
auth_method: json_key
json_keyfile: /sample1/aaaaa.json # /sample1 ディレクトリにBigQuery接続用のキーaaaaa.jsonを配置
path_prefix: /sample1/tmp/embulk/ # 一時ディレクトリとして/sample1/tmp/embulkを作成
file_ext: .csv.gz
source_format: CSV
project: sample # プロジェクト名を入力
dataset: sample1
auto_create_table: true
table: user_visit_log
column_options:
- {name: date, mode: REQUIRED}
- {name: content_id, mode: REQUIRED}
- {name: user_id, mode: REQUIRED}
formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
encoders:
- {type: gzip}
(2) データ移行確認コマンドの実行
以下のように、実行内容の確認コマンドを実行してymlファイルのin配下の記述に問題がないか確認します
# 実行内容の確認
$ embulk preview user_visit_log.yml
# 以下embulk previewコマンド実行後に、想定される表示内容
+-------------------------+-----------------+----------------+
| date:timestamp | content_id:long | user_id:string |
+-------------------------+-----------------+----------------+
| 2022-06-24 15:00:10 UTC | 12,345 | a7dc9j0sk |
| 2022-06-24 15:00:13 UTC | 12,356 | b2d3kj1as |
| 2022-06-24 15:00:17 UTC | 12,345 | z1a9kj3pa |
| 2022-06-24 15:00:21 UTC | 12,368 | l5az7j8rb |
| 2022-06-24 15:00:22 UTC | 12,362 | u8vb6j8xg |
+-------------------------+-----------------+----------------+
上記のように、移行対象のデータが、対応するembulk typeとともに表示されます
これは移行元→embulk取り込み時に変換される内容を表しています
データ型に関し、timestampは概ねそのままですが、integer → long, character varying(20) → stringと変換されていますね
(3) データ移行コマンドの実行
(2) のデータ移行確認コマンドの結果に問題がなければ、データ移行コマンドを実行してBigQueryにデータを取り込みましょう
# データ移行の実行
$ embulk run user_visit_log.yml
※previewコマンドが成功してもembulk→BigQueryの取り込み時エラーなどで、runコマンドが失敗する場合もあります
その場合は、メッセージに従って修正後、再度実行してください
(4) 結果の確認
BigQueryにて、以下のコマンドを実行して中身を確認
select * from `sample1.user_visit_log`;
おそらくdateの日時が-9時間 UTCになっているかと(もっというと、embulk previewの時点でそうなってますが)
PostgreSQLではdateはtimestamp(6) without timezoneとなっていますが、冒頭の「環境」に記載の通りPostgreSQL自体のタイムゾーン設定がAsia/Tokyoなのでそちらに従います
一方、BigQueryのtimestamp型にはタイムゾーンの概念がなく、デフォルト表記がUTCになるため、-9時間で表示されてしまったというわけ
Asia/Tokyoで表示したい場合は以下のようにすればよいです
select
format_timestamp('%Y-%m-%d %H:%M:%S', date, 'Asia/Tokyo') as date,
content_id,
user_id
from
`sample1.user_visit_log`;
余談ですが、上記のようにフォーマットした場合は、string型扱いになる(dateカラム)ので、クエリ結果を別テーブルなどに取り込む場合は注意が必要です
なお、日本時間(Asia/Tokyo)ベースでdateで絞り込みなど行う場合は以下のようにします
select
format_timestamp('%Y-%m-%d %H:%M:%S', date, 'Asia/Tokyo') as date,
content_id,
user_id
from
`sample1.user_visit_log`
where
timestamp('2022-06-01 00:00:00', 'Asia/Tokyo') <= date;
条件句でtimestamp関数を用い、タイムゾーンを設定するのがポイントですね
なお、timestamp関数を使わなくてもクエリの実行自体は可能ですが、タイムゾーンがUTC扱いになってしまうので、想定通りの結果が得られません
3. 代替案
とはいえ、上述の方法だとBigQueryの知見がない方が来た際に勘違いやミスの原因にもなりかねません
タイムゾーンを意識しないなら、BigQuery側では、datetime型を用いるのもひとつの手
create table `sample1.user_visit_log2` (
date datetime not null
, content_id int64 not null
, user_id string not null
);
テーブル名は重複を防ぐため、user_visit_log2としています
データ移行用のymlファイルは以下のようになります
in:
type: postgresql
host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
password: zzzzzzz # 上記ユーザーのパスワードを入力
database: sample1
table: user_visit_log
column_options:
date: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Tokyo"}
out:
type: bigquery
auth_method: json_key
json_keyfile: /sample1/aaaaa.json # /sample1 ディレクトリにBigQuery接続用のキーaaaaa.jsonを配置
path_prefix: /sample1/tmp/embulk/ # 一時ディレクトリとして/sample1/tmp/embulkを作成
file_ext: .csv.gz
source_format: CSV
project: sample # プロジェクト名を入力
dataset: sample1
auto_create_table: true
table: user_visit_log2
column_options:
- {name: date, type: datetime, mode: REQUIRED}
- {name: content_id, mode: REQUIRED}
- {name: user_id, mode: REQUIRED}
formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
encoders:
- {type: gzip}
PostgreSQLからの抽出時に一度string型に変換し、BigQueryへの移行時にdatetimeに変換するという点がポイント
以下によれば、PostgreSQLでのデータ型がtimestamp型のwithout timezoneだとembulkがタイムゾーンを考慮してくれないらしいので(実際やってみてもそうなった)、抽出時にtimezoneも指定しておきます(触れてませんでしたが、2-(2)でembulk previewした時の時刻が-9時間表記になっているのも、この点が原因かと)
Embulkと日付データ(その2: PostgreSQLへのデータ投入) | Qiita
続いて、確認コマンドの実行→問題なければ、移行コマンドを実行しましょう
# 確認コマンドの実行
$ embulk preview user_visit_log2.yml
# 確認コマンドの実行結果
+---------------------+-----------------+----------------+
| date:string | content_id:long | user_id:string |
+---------------------+-----------------+----------------+
| 2022-06-25 00:00:10 | 12,345 | a7dc9j0sk |
| 2022-06-25 00:00:13 | 12,356 | b2d3kj1as |
| 2022-06-25 00:00:17 | 12,345 | z1a9kj3pa |
| 2022-06-25 00:00:21 | 12,368 | l5az7j8rb |
| 2022-06-25 00:00:22 | 12,362 | u8vb6j8xg |
+---------------------+-----------------+----------------+
# データ移行の実行
$ embulk run user_visit_log2.yml
確認コマンドの結果を見ると、dateのembulk typeがstring型となっており、日本時間で表示されていることがわかります
これなら特にフォーマットせず、想定通りの結果が得られます
BigQueryでの結果確認用SQLを実行して確認してみましょう
select
*
from
`sample1.user_visit_log2`;
日時で絞り込む場合は以下の通り
select
*
from
`sample1.user_visit_log2`
where
datetime('2022-06-01 00:00:00') <= date;
-- 以下でもOK
select
*
from
`sample1.user_visit_log2`
where
'2022-06-01 00:00:00' <= date;
まとめ
今回のポイントは、
- BigQueryのtimestamp型自体にはタイムゾーンがなく、デフォルト表記はUTCである
- PostgreSQLでtimezone型(without timezone)の場合は、embulkがtimezoneを考慮してくれない(ので、datetime型などにする場合は指定が必要)
というところかと
上記を考慮の上、timezone型のデータを適切な形で移行するようにしたいですね
補足事項(2022年8月15日追記)
date型に関しても、そのままだとUTC扱いとなるようで、BigQuery移行時に-1日になってしまう現象が見られました。
timestamp型のカラムと同様、in配下のcolumn_optionsでフォーマットとtimezoneを指定することで本現象を防ぐことができたので、こちらに記載しておきます。
以下、サンプルです。
in:
### 省略 ###
table: sample_date
column_options: # date型なので、時分秒は不要
date: {type: string, timestamp_format: "%Y-%m-%d", timezone: "Asia/Tokyo"}
out:
### 省略 ###
column_options:
- {name: date, type: date, mode: REQUIRED}
### 省略 ###
日付関連のデータ型については、想定通りの値が投入されるか一通り確認するのがよさそう。
参考
1. 事前準備
Cloud SDK
embulk
(1) Java8(JRE)のインストール
- Amazon Linux 2 に Amazon Corretto 8 をインストールする手順 | aws
- Debian ベース Linux、RPM ベース Linux、Alpine Linux ディストリビューションに Amazon Corretto 8 をインストールする手順 | aws
(2) embulkのインストール
2. データ移行の実施
(1) データ移行情報記載ファイルの作成
(3) 結果の確認
3. 代替案
更新履歴
更新日 | 更新内容 |
---|---|
2022/10/04 | Cloud SDKのインストール手順の追加 |
Discussion