🧭

[embulk]日付範囲の指定でプレペアドステートメントを使用する

2022/07/03に公開

概要

embulkで日付範囲の指定でプレペアドステートメントを使用する方法です
例えば、タイムスタンプが昨日のデータのみ移行する場合を考えてみます

環境

  • 移行元: Amazon Aurora(PostgreSQL互換)
  • 移行先: BigQuery
  • embulk: バージョン0.9.24

前提条件

テーブル定義と移行データ

移行元(PostgreSQL)と移行先(BigQuery)のテーブル定義です
内容は、社内メンバーの社内システムへのアクセスログを記録するinternal_system_access_logテーブルとします

移行元(PostgreSQL)のテーブル定義

移行元(PostgreSQL)のテーブル定義です
対象テーブルは、officeというデータベースにあるものとし、月単位でパーティションが切られているものとします

internal_system_access_log
-- データベースofficeに入り、パーティションテーブルを作成
create table internal_system_access_log (
  access_time timestamp(6) without time zone
  , system_id character varying(10)
  , member_id integer
) partition by range(access_time);

-- パーティションの作成(いったん手動で3ヶ月分)
create table internal_system_access_log_202205 partition of internal_system_access_log for values from ('2022-05-01 00:00:00') to ('2022-05-31 23:59:59'); 
create table internal_system_access_log_202206 partition of internal_system_access_log for values from ('2022-06-01 00:00:00') to ('2022-06-30 23:59:59'); 
create table internal_system_access_log_202207 partition of internal_system_access_log for values from ('2022-07-01 00:00:00') to ('2022-07-31 23:59:59'); 

移行データ

今日の日付が2022年7月3日として、前日のデータを移行するため、以下のデータを上記のテーブルに投入しておきます。
※今回は最低、前日の2022年7月2日分があればよいので他はスキップでもOK

-- 2022年6月30日分
insert into internal_system_access_log values ('2022-06-30 23:08:29', 'B008', '300049');
-- 2022年7月1日分
insert into internal_system_access_log values ('2022-07-01 08:08:05', 'A001', '100001');
insert into internal_system_access_log values ('2022-07-01 10:05:21', 'D012', '200037');
-- 2022年7月2日分
insert into internal_system_access_log values ('2022-07-02 09:07:23', 'B027', '100068');
insert into internal_system_access_log values ('2022-07-02 13:52:42', 'C019', '100001');
insert into internal_system_access_log values ('2022-07-02 15:29:28', 'D014', '300042');
-- 2022年7月3日分
insert into internal_system_access_log values ('2022-07-03 11:03:37', 'A001', '100001');

移行先(BigQuery)のテーブル定義

移行先(BigQuery)のテーブル定義です
こちらも、access_time列をベースに、月単位で分割されているものとし、保持期間は400日とします
access_timeは元テーブルではtimestamp型ですが、BigQueryでは、デフォルトでUTC表記&扱いになってしまうため、datetime型にしておきます

internal_system_access_log
-- データセットofficeに、分割テーブルを作成(時間単位の列で分割)
create table `office.internal_system_access_log` (
  access_time datetime not null
  , system_id string not null
  , member_id int64 not null
)partition by
 datetime_trunc(access_time, month)
options(
 partition_expiration_days=400 
 , require_partition_filter=true
);

embulkの設定ファイルの作成

データ移行用のembulkの設定ファイルを作成します
今回、環境変数を使って日付の指定を行うので、ファイルの末尾を「.yml.liquid」にします。

Liquid template engineを使うために、そうしているのですが、詳細については、using-variables | Embulk configuration file format | embulk.orgをご覧ください。

設定ファイルの中身は以下のようになります。

internal_system_access_log.yml.liquid
in:
  type: postgresql
  host: xxxxxxxxxxxxxxxxxxxxxx.ap-northeast-1.rds.amazonaws.com # ホスト名を入力
  user: yyyyyyyyyy # PostgreSQL接続ユーザー名を入力
  password: zzzzzzz # 上記ユーザーのパスワードを入力
  database: office
  query: |
    select
     *
    from
     internal_system_access_log
    where
      '{{ env.YESTERDAY }}' <= access_time
      and access_time < '{{ env.TODAY }}'
  column_options:
    access_time: {type: string, timestamp_format: "%Y-%m-%d %H:%M:%S", timezone: "Asia/Tokyo"}
out:
  type: bigquery
  auth_method: json_key
  json_keyfile: /office/aaaaa.json # /office ディレクトリにBigQuery接続用のキーaaaaa.jsonを配置
  path_prefix: /office/tmp/embulk/ # 一時ディレクトリとして/office/tmp/embulkを作成
  file_ext: .csv.gz
  source_format: CSV
  project: bbbbb # プロジェクト名を入力
  dataset: office
  auto_create_table: true
  table: internal_system_access_log${{ env.TARGET_MONTH }}
  column_options:
    - {name: access_time, type: datetime, mode: REQUIRED}
    - {name: system_id, mode: REQUIRED}
    - {name: member_id, mode: REQUIRED}
  formatter: {type: csv, charset: UTF-8, delimiter: ',', header_line: false}
  encoders:
  - {type: gzip}
  time_partitioning: # 分割テーブルの設定に関する記述
    type: MONTH
    field: access_time

環境変数を用いる場合、 {{ env.xxx }} (「xxx」は環境変数名)とするのがポイント
文字列として投入するため、クォートするのを忘れずに

他のポイントとしては、

  • 日本時間で移行するため、inのcolumn_optionsで、タイムゾーンをAsia/Tokyoに設定しながらフォーマット
  • 移行先のBigQueryで分割テーブルを用いているため、outにて、time_partitioningに関する記述を実施

しているところでしょうか

シェルファイルの作成

上のembulkの設定ファイルの実行に必要な環境変数の設定から、データ移行の実行まで、シェルファイルを使って実行してしまうことにします。

execMove.sh
#!bin/bash

# 環境変数の設定
YESTERDAY=`date -d "1 day ago" +'%Y-%m-%d'` && export YESTERDAY
TODAY=`date +'%Y-%m-%d'` && export TODAY
TARGET_MONTH=`date -d "1 day ago" +'%Y%m'` && export TARGET_MONTH

# データ移行の実行
embulk run internal_system_access_log.yml.liquid

以下の通り実行して、前日の2022年7月2日分の3レコードのみがBigQueryに移行できていればOK

$ bash execMove.sh
確認用SQL(BigQueryで実行のこと)
select * from `office.internal_system_access_log`
where  '2022-07-02' <= access_time
 and access_time < '2022-07-03';

終わりに

プレペアドステートメントを使う方法については、
https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-postgresql#use-incremental-loading-with-raw-query
に記載の方法が正規のようですが、
https://github.com/embulk/embulk-input-jdbc/blob/master/embulk-input-jdbc/src/main/java/org/embulk/input/jdbc/AbstractJdbcInputPlugin.java#L357
によれば、バインド変数名が対象のテーブル自体にないといけないらしく、access_timeに関し、where句で2つバインドする今回のパターンでは難しいため、苦肉の策で環境変数を使用しました(厳密にはプレペアドステートメントではないですね汗)。

もっと良い方法ご存じの方がいらしたら、ご共有いただけると嬉しいです。

参考

Discussion