[embulk]日付範囲の指定でプレペアドステートメントを使用する
概要
embulkで日付範囲の指定でプレペアドステートメントを使用する方法です
例えば、タイムスタンプが昨日のデータのみ移行する場合を考えてみます
環境
- 移行元: Amazon Aurora(PostgreSQL互換)
- 移行先: BigQuery
- embulk: バージョン0.9.24
前提条件
- embulkがインストール済であること
- embulkのプラグインembulk-input-postgresql, embulk-output-bigqueryがインストール済であること
テーブル定義と移行データ
移行元(PostgreSQL)と移行先(BigQuery)のテーブル定義です
内容は、社内メンバーの社内システムへのアクセスログを記録するinternal_system_access_logテーブルとします
移行元(PostgreSQL)のテーブル定義
移行元(PostgreSQL)のテーブル定義です
対象テーブルは、officeというデータベースにあるものとし、月単位でパーティションが切られているものとします
-- データベースofficeに入り、パーティションテーブルを作成
create table internal_system_access_log (
access_time timestamp(6) without time zone
, system_id character varying(10)
, member_id integer
) partition by range(access_time);
-- パーティションの作成(いったん手動で3ヶ月分)
create table internal_system_access_log_202205 partition of internal_system_access_log for values from ('2022-05-01 00:00:00') to ('2022-05-31 23:59:59');
create table internal_system_access_log_202206 partition of internal_system_access_log for values from ('2022-06-01 00:00:00') to ('2022-06-30 23:59:59');
create table internal_system_access_log_202207 partition of internal_system_access_log for values from ('2022-07-01 00:00:00') to ('2022-07-31 23:59:59');
移行データ
今日の日付が2022年7月3日として、前日のデータを移行するため、以下のデータを上記のテーブルに投入しておきます。
※今回は最低、前日の2022年7月2日分があればよいので他はスキップでもOK
-- 2022年6月30日分
insert into internal_system_access_log values ('2022-06-30 23:08:29', 'B008', '300049');
-- 2022年7月1日分
insert into internal_system_access_log values ('2022-07-01 08:08:05', 'A001', '100001');
insert into internal_system_access_log values ('2022-07-01 10:05:21', 'D012', '200037');
-- 2022年7月2日分
insert into internal_system_access_log values ('2022-07-02 09:07:23', 'B027', '100068');
insert into internal_system_access_log values ('2022-07-02 13:52:42', 'C019', '100001');
insert into internal_system_access_log values ('2022-07-02 15:29:28', 'D014', '300042');
-- 2022年7月3日分
insert into internal_system_access_log values ('2022-07-03 11:03:37', 'A001', '100001');
移行先(BigQuery)のテーブル定義
移行先(BigQuery)のテーブル定義です
こちらも、access_time列をベースに、月単位で分割されているものとし、保持期間は400日とします
access_timeは元テーブルではtimestamp型ですが、BigQueryでは、デフォルトでUTC表記&扱いになってしまうため、datetime型にしておきます
-- データセットofficeに、分割テーブルを作成(時間単位の列で分割)
create table `office.internal_system_access_log` (
access_time datetime not null
, system_id string not null
, member_id int64 not null
)partition by
datetime_trunc(access_time, month)
options(
partition_expiration_days=400
, require_partition_filter=true
);
embulkの設定ファイルの作成
データ移行用のembulkの設定ファイルを作成します
今回、環境変数を使って日付の指定を行うので、ファイルの末尾を「.yml.liquid」にします。
Liquid template engineを使うために、そうしているのですが、詳細については、using-variables | Embulk configuration file format | embulk.orgをご覧ください。
設定ファイルの中身は以下のようになります。
in:
type: postgresql
host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
password: zzzzzzz # 上記ユーザーのパスワードを入力
database: office
query: |
select
*
from
internal_system_access_log
where
'{{ env.YESTERDAY }}' <= access_time
and access_time < '{{ env.TODAY }}'
column_options:
access_time: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Tokyo"}
out:
type: bigquery
auth_method: json_key
json_keyfile: /office/aaaaa.json # /office ディレクトリにBigQuery接続用のキーaaaaa.jsonを配置
path_prefix: /office/tmp/embulk/ # 一時ディレクトリとして/office/tmp/embulkを作成
file_ext: .csv.gz
source_format: CSV
project: bbbbb # プロジェクト名を入力
dataset: office
auto_create_table: true
table: internal_system_access_log${{ env.TARGET_MONTH }}
column_options:
- {name: access_time, type: datetime, mode: REQUIRED}
- {name: system_id, mode: REQUIRED}
- {name: member_id, mode: REQUIRED}
formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
encoders:
- {type: gzip}
time_partitioning: # 分割テーブルの設定に関する記述
type: MONTH
field: access_time
環境変数を用いる場合、 {{ env.xxx }} (「xxx」は環境変数名)とするのがポイント
文字列として投入するため、クォートするのを忘れずに
他のポイントとしては、
- 日本時間で移行するため、inのcolumn_optionsで、タイムゾーンをAsia/Tokyoに設定しながらフォーマット
- 移行先のBigQueryで分割テーブルを用いているため、outにて、time_partitioningに関する記述を実施
しているところでしょうか
シェルファイルの作成
上のembulkの設定ファイルの実行に必要な環境変数の設定から、データ移行の実行まで、シェルファイルを使って実行してしまうことにします。
#!bin/bash
# 環境変数の設定
YESTERDAY=`date -d "1 day ago" +'%Y-%m-%d'` && export YESTERDAY
TODAY=`date +'%Y-%m-%d'` && export TODAY
TARGET_MONTH=`date -d "1 day ago" +'%Y%m'` && export TARGET_MONTH
# データ移行の実行
embulk run internal_system_access_log.yml.liquid
以下の通り実行して、前日の2022年7月2日分の3レコードのみがBigQueryに移行できていればOK
$ bash execMove.sh
select * from `office.internal_system_access_log`
where '2022-07-02' <= access_time
and access_time < '2022-07-03';
終わりに
プレペアドステートメントを使う方法については、
に記載の方法が正規のようですが、 によれば、バインド変数名が対象のテーブル自体にないといけないらしく、access_timeに関し、where句で2つバインドする今回のパターンでは難しいため、苦肉の策で環境変数を使用しました(厳密にはプレペアドステートメントではないですね汗)。もっと良い方法ご存じの方がいらしたら、ご共有いただけると嬉しいです。
Discussion