embulkを使ってなんちゃってETL基盤の構築
はじめに
普段は大学生をしています.embulkに触れる機会があったので簡単な例を用いてなんちゃってETL基盤の構築していきます.なんちゃってとつけているのは自分がまだまだショボいからです!!!!
ソースコード
今回使うソースコードです
今回やりたいこと
- バスケットボール部の部員をとバレーボール部の部員を管理しているテーブルがそれぞれ存在しています.項目としてid(主キー), name(名前), height(身長), age(年齢), created_at(データ登録日)が挙げられます.
- バスケットボール部の部員とバレーボール部の部員の情報まとめたオールスターテーブルを作りたいです.項目としてid(主キー), name(名前), height(身長), age(年齢) team(所属部活), origin_id(元テーブルのid), origin_created_at(元テーブルのデータ登録日), created_at(データ登録日)が挙げられます.
環境構築
ディレクトリ構成
.
├── docker-compose.yml
├── embulk
├── Dockerfile
└── config.yml
docker-composeを使って構築する
今回は複数個のコンテナを立てるので、docker-compose.ymlに下記のように記述します.Mariadbに関してはdockerhubを参考にしました.
version: '3'
services:
input1:
container_name: input1
image: mariadb
restart: always
ports:
- 3306:3306
environment:
MARIADB_ROOT_PASSWORD: password
TZ: Asia/Tokyo
input2:
container_name: input2
image: mariadb
restart: always
ports:
- 3307:3306
environment:
MARIADB_ROOT_PASSWORD: password
TZ: Asia/Tokyo
output:
container_name: output
image: mariadb
restart: always
ports:
- 3308:3306
environment:
MARIADB_ROOT_PASSWORD: password
TZ: Asia/Tokyo
embulk:
container_name: embulk
build: ./embulk
tty: true
embulkに関してはパッとしたimageがなかったのでDockerfileを使って構築していきます.下記のように記述しました.プラグインに関しては後ほど詳しく触れます.
FROM openjdk:8-slim
ENV LANG=C.UTF-8 \
PATH_TO_EMBULK=/opt/embulk \
PATH=${PATH}:/opt/embulk
# タイムゾーンを変更
RUN ln -sf /usr/share/zoneinfo/Asia/Tokyo /etc/localtime
# vim, ping などをインストール
RUN apt-get update && apt-get install -y curl vim
RUN apt-get install -y iputils-ping
# Embulk をインストール
RUN mkdir -p ${PATH_TO_EMBULK} \
&& curl --create-dirs -o ${PATH_TO_EMBULK}/embulk -L "https://dl.embulk.org/embulk-0.9.23.jar" \
&& chmod +x ${PATH_TO_EMBULK}/embulk
# mysql用のプラグインをインストール
RUN embulk gem install embulk-input-mysql
RUN embulk gem install embulk-output-mysql
# filter用プラグインをインストール
RUN embulk gem install embulk-filter-column
WORKDIR /app
COPY ./config.yml ./
CMD [ "bash" ]
とりあえず、ここまででembulkを使ってなんちゃってETLを行う環境が整いました.
embulk
ここでは、embulkに関することを詳しくまとめていきます.
今回利用するプラグイン
Dockerfileに記述してインストール済みなので何かする必要はないです.
MySQL(Mariadb)をExtract(抽出)する際に使います.
データを「Transform(変換)」する際に使います.
MySQL(Mariadb)を「Load(書き出し)」する際に使います.
設定ファイル(config.yml)
embulkの設定ファイルの記述を行なっていきます.設定ファイルはDBごとに用意する必要があるので今回の場合だと、バスケットボール部用とバレーボール部用それぞれ用意する必要があります.下記は例としてバスケットボール部用を取り上げます.
in:
type: mysql
host: input1
database: input1
user: root
password: password
table: "basketballteams"
select: "*"
incremental: true
incremental_columns: [id]
filters:
- type: column
columns:
- {name: name}
- {name: age}
- {name: height}
add_columns:
- {name: team, type: string, default: 'basketball'}
- {name: origin_id, src: id}
- {name: origin_created_at, type: timestamp, src: created_at}
default_timezone: "Asia/Tokyo"
out:
type: mysql
host: output
database: output
user: root
password: password
table: "allstars"
mode: insert
ここで全て説明するよりも、今回利用するプラグインのところで挙げたドキュメントを参照したほうが絶対にわかりやすいので私が特に重要だと感じたポイントだけ載せます.
差分ローディングを有効にする
実現したいこと↓毎回全てのデータを入れ替えるのは非効率です.(しかもエラーになるケースも)
そのためにconfig.ymlに下記の記述が必要です.idやcreated_atを指定するのが一般的みたいです.
incremental: true
incremental_columns: [id]
抽出したデータを思い通りの形に変換したい(プラグインをフル活用)
設定ファイルの下記の部分になります.やっている中でシンプルにプラグインすげぇと思いました.今回の場合だと元テーブルのidを書き出し先でorigin_idとして保持したかったです.最初は「どうやるんだ?」ととても悩みましたがプラグインを入れたら一発でした.プラグインに関しては丁寧にまとめてくれている方がいたので参考にしてみてください.また、それぞれのプラグインのドキュメントには使用例が載っているのでとても理解しやすかったです.
- type: column
columns:
- {name: name}
- {name: age}
- {name: height}
add_columns:
- {name: team, type: string, default: "basketball"}
- {name: origin_id, src: id}
- {name: origin_created_at, type: timestamp, src: created_at}
default_timezone: "Asia/Tokyo"
modeを理解しておく
下記の部分です.
mode: insert
modeに関してまとめます.TRUNCATEするものと差分ローディングは相性が悪いと感じました.一度テーブルを消すので目的のテーブルに残るのは差分のデータのみとなります.insertが無難な気がしますが、目的に合わせて利用するのが良さそうです.
mode | 挙動 |
---|---|
insert | 一時的にテーブルを作ってそこに追加してから目的テーブルへコピーする. |
insert_direct | 目的テーブルに直接追加する. |
truncate_insert | テーブルをTRUNCATEする以外はinsertと同じ.(resume可) |
merge | 同じキーのデータがあった場合は上書き更新する.それ以外の挙動はinsertと同じ. |
merge_direct | 直接目的テーブルに対して直接追加する.同じキーのデータがあった場合は上書き更新する. |
replace | テーブルをTRUNCATEする以外はinsertと同じ.(resume不可) |
実行してみる
コンテナを立ち上げます.
$ docker-compose build
$ docker-compose up -d
今回はバスケットボール部のデータを転送したいと思います.テストデータは作成済みの程で進めます.コンテナに入って確認してみます.
$ docker exec -it input1 bin/bash
root@b7a5ee5c6e82:/# mysql -p
Enter password:password
....
MariaDB [(none)]> use input1;
Database changed
MariaDB [input1]> desc basketballteams;
+------------+-------------+------+-----+---------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+------------+-------------+------+-----+---------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(10) | NO | | NULL | |
| height | int(11) | NO | | NULL | |
| age | int(11) | NO | | NULL | |
| created_at | timestamp | NO | | current_timestamp() | |
+------------+-------------+------+-----+---------------------+----------------+
MariaDB [input1]> select * from basketballteams;
+----+---------+--------+-----+---------------------+
| id | name | height | age | created_at |
+----+---------+--------+-----+---------------------+
| 9 | keisuke | 176 | 21 | 2022-03-18 17:36:31 |
| 10 | msd | 177 | 21 | 2022-03-19 03:42:59 |
+----+---------+--------+-----+---------------------+
embulkは書き出し先にテーブルが存在しないと自動で作成してくれますが、AUTO_INCREMENTなどは考えてくれないので自分であらかじめ作成しておくのが良さそうです.確認してみます.
$ docker exec -it output bin/bash
root@749af95744e1:/# mysql -p
Enter password: password
....
MariaDB [(none)]> use output;
Database changed
MariaDB [output]> desc allstars;
+-------------------+-------------+------+-----+---------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+-------------+------+-----+---------------------+----------------+
| id | int(11) | NO | PRI | NULL | auto_increment |
| name | varchar(10) | YES | | NULL | |
| age | int(11) | YES | | NULL | |
| height | int(11) | YES | | NULL | |
| team | varchar(11) | YES | | NULL | |
| origin_id | int(11) | YES | | NULL | |
| origin_created_at | timestamp | YES | | NULL | |
| created_at | timestamp | NO | | current_timestamp() | |
+-------------------+-------------+------+-----+---------------------+----------------+
MariaDB [output]> select * from allstars;
Empty set (0.001 sec)
長くなりましたが、いよいよembulkを動かしていきます.-c diff.ymlをつけて差分を記録します.これをしないと差分ローディングができません.最後の行でしっかりとidが記録されているのが分かります.不安な人はdiff.ymlの中身を覗いてみてください.
$ docker exec -it embulk bash
root@8cca802ef4ee:/app# embulk run config.yml -c diff.yml
....
2022-03-18 19:00:33.740 +0000 [INFO] (main): Next config diff: {"in":{"last_record":[10]},"out":{}}
実際にデータはうまく反映されているのか見てみましょう.「おおおお!うまくできてる」と思いきやorigin_created_atを見てください.元テーブルのtimestampがUTC戻されてしまっています.コンテナのtimezoneはJSTにしているのでこの辺はembulkが内部的に保持しているのかもしれません.
MariaDB [output]> select * from allstars;
+----+---------+------+--------+------------+-----------+---------------------+---------------------+
| id | name | age | height | team | origin_id | origin_created_at | created_at |
+----+---------+------+--------+------------+-----------+---------------------+---------------------+
| 1 | keisuke | 21 | 176 | basketball | 9 | 2022-03-18 08:36:31 | 2022-03-19 04:00:33 |
| 2 | msd | 21 | 177 | basketball | 10 | 2022-03-18 18:42:59 | 2022-03-19 04:00:33 |
+----+---------+------+--------+------------+-----------+---------------------+---------------------+
2 rows in set (0.001 sec)
origin_created_at問題を除けばうまく行っています.バレーボール部の方も同様に行えばできそうです.
さいごに
初めて投稿しました.にわかブッこいてるとこもあると思うので間違っていたら指摘してください.
Discussion