🤩

embulkを使ってなんちゃってETL基盤の構築

2022/03/18に公開約9,500字

はじめに

普段は大学生をしています.embulkに触れる機会があったので簡単な例を用いてなんちゃってETL基盤の構築していきます.なんちゃってとつけているのは自分がまだまだショボいからです!!!!

ソースコード

今回使うソースコードです

https://github.com/msd05keisuke/nanchatte-etl

今回やりたいこと

  • バスケットボール部の部員をとバレーボール部の部員を管理しているテーブルがそれぞれ存在しています.項目としてid(主キー), name(名前), height(身長), age(年齢), created_at(データ登録日)が挙げられます.
  • バスケットボール部の部員とバレーボール部の部員の情報まとめたオールスターテーブルを作りたいです.項目としてid(主キー), name(名前), height(身長), age(年齢) team(所属部活), origin_id(元テーブルのid), origin_created_at(元テーブルのデータ登録日), created_at(データ登録日)が挙げられます.

環境構築

ディレクトリ構成

.
├── docker-compose.yml
├── embulk
    ├── Dockerfile
    └── config.yml

docker-composeを使って構築する

今回は複数個のコンテナを立てるので、docker-compose.ymlに下記のように記述します.Mariadbに関してはdockerhubを参考にしました.

docker-compose.yml
version: '3'
services:
    input1:
        container_name: input1
        image: mariadb
        restart: always
        ports:
          - 3306:3306
        environment:
          MARIADB_ROOT_PASSWORD: password
          TZ: Asia/Tokyo
    input2:
        container_name: input2
        image: mariadb
        restart: always
        ports:
          - 3307:3306
        environment:
          MARIADB_ROOT_PASSWORD: password
          TZ: Asia/Tokyo
    output:
        container_name: output
        image: mariadb
        restart: always
        ports:
          - 3308:3306
        environment:
          MARIADB_ROOT_PASSWORD: password
          TZ: Asia/Tokyo
    embulk:
        container_name: embulk
        build: ./embulk
        tty: true

embulkに関してはパッとしたimageがなかったのでDockerfileを使って構築していきます.下記のように記述しました.プラグインに関しては後ほど詳しく触れます.

Dockerfile
FROM openjdk:8-slim

ENV LANG=C.UTF-8 \
    PATH_TO_EMBULK=/opt/embulk \
    PATH=${PATH}:/opt/embulk

# タイムゾーンを変更
RUN ln -sf /usr/share/zoneinfo/Asia/Tokyo /etc/localtime

# vim, ping などをインストール
RUN apt-get update && apt-get install -y curl vim
RUN apt-get install -y iputils-ping

# Embulk をインストール
RUN mkdir -p ${PATH_TO_EMBULK} \
    && curl --create-dirs -o ${PATH_TO_EMBULK}/embulk -L "https://dl.embulk.org/embulk-0.9.23.jar" \
    && chmod +x ${PATH_TO_EMBULK}/embulk

# mysql用のプラグインをインストール
RUN embulk gem install embulk-input-mysql 
RUN embulk gem install embulk-output-mysql
# filter用プラグインをインストール
RUN embulk gem install embulk-filter-column

WORKDIR /app
COPY ./config.yml ./

CMD [ "bash" ]

とりあえず、ここまででembulkを使ってなんちゃってETLを行う環境が整いました.

embulk

ここでは、embulkに関することを詳しくまとめていきます.

今回利用するプラグイン

Dockerfileに記述してインストール済みなので何かする必要はないです.

embulk-input-mysql

MySQL(Mariadb)をExtract(抽出)する際に使います.

https://github.com/embulk/embulk-input-jdbc/tree/master/embulk-input-mysql

embulk-filter-column

データを「Transform(変換)」する際に使います.

https://github.com/sonots/embulk-filter-column

embulk-output-mysql

MySQL(Mariadb)を「Load(書き出し)」する際に使います.

https://github.com/embulk/embulk-output-jdbc/tree/master/embulk-output-mysql

設定ファイル(config.yml)

embulkの設定ファイルの記述を行なっていきます.設定ファイルはDBごとに用意する必要があるので今回の場合だと、バスケットボール部用とバレーボール部用それぞれ用意する必要があります.下記は例としてバスケットボール部用を取り上げます.

config.yml
in:
  type: mysql
  host: input1
  database: input1
  user: root
  password: password
  table: "basketballteams"
  select: "*"
  incremental: true
  incremental_columns: [id]
filters:
  - type: column
    columns:
        - {name: name}
        - {name: age}
        - {name: height}
    add_columns:
      - {name: team, type: string, default: 'basketball'}
      - {name: origin_id, src: id}
      - {name: origin_created_at, type: timestamp, src: created_at}
    default_timezone: "Asia/Tokyo"
out:
  type: mysql
  host: output
  database: output
  user: root
  password: password
  table: "allstars"
  mode: insert

ここで全て説明するよりも、今回利用するプラグインのところで挙げたドキュメントを参照したほうが絶対にわかりやすいので私が特に重要だと感じたポイントだけ載せます.

差分ローディングを有効にする

実現したいこと↓毎回全てのデータを入れ替えるのは非効率です.(しかもエラーになるケースも)

そのためにconfig.ymlに下記の記述が必要です.idやcreated_atを指定するのが一般的みたいです.

incremental: true
incremental_columns: [id]

抽出したデータを思い通りの形に変換したい(プラグインをフル活用)

設定ファイルの下記の部分になります.やっている中でシンプルにプラグインすげぇと思いました.今回の場合だと元テーブルのidを書き出し先でorigin_idとして保持したかったです.最初は「どうやるんだ?」ととても悩みましたがプラグインを入れたら一発でした.プラグインに関しては丁寧にまとめてくれている方がいたので参考にしてみてください.また、それぞれのプラグインのドキュメントには使用例が載っているのでとても理解しやすかったです.

- type: column
    columns:
        - {name: name}
        - {name: age}
        - {name: height}
    add_columns:
      - {name: team, type: string, default: "basketball"}
      - {name: origin_id, src: id}
      - {name: origin_created_at, type: timestamp, src: created_at}
    default_timezone: "Asia/Tokyo"

modeを理解しておく

下記の部分です.

mode: insert

modeに関してまとめます.TRUNCATEするものと差分ローディングは相性が悪いと感じました.一度テーブルを消すので目的のテーブルに残るのは差分のデータのみとなります.insertが無難な気がしますが、目的に合わせて利用するのが良さそうです.

mode 挙動
insert 一時的にテーブルを作ってそこに追加してから目的テーブルへコピーする.
insert_direct 目的テーブルに直接追加する.
truncate_insert テーブルをTRUNCATEする以外はinsertと同じ.(resume可)
merge 同じキーのデータがあった場合は上書き更新する.それ以外の挙動はinsertと同じ.
merge_direct 直接目的テーブルに対して直接追加する.同じキーのデータがあった場合は上書き更新する.
replace テーブルをTRUNCATEする以外はinsertと同じ.(resume不可)

実行してみる

コンテナを立ち上げます.

$ docker-compose build
$ docker-compose up -d

今回はバスケットボール部のデータを転送したいと思います.テストデータは作成済みの程で進めます.コンテナに入って確認してみます.

$ docker exec -it input1 bin/bash
root@b7a5ee5c6e82:/# mysql -p
Enter password:password
....
MariaDB [(none)]> use input1;
Database changed
MariaDB [input1]> desc basketballteams;
+------------+-------------+------+-----+---------------------+----------------+
| Field      | Type        | Null | Key | Default             | Extra          |
+------------+-------------+------+-----+---------------------+----------------+
| id         | int(11)     | NO   | PRI | NULL                | auto_increment |
| name       | varchar(10) | NO   |     | NULL                |                |
| height     | int(11)     | NO   |     | NULL                |                |
| age        | int(11)     | NO   |     | NULL                |                |
| created_at | timestamp   | NO   |     | current_timestamp() |                |
+------------+-------------+------+-----+---------------------+----------------+
MariaDB [input1]> select * from basketballteams;
+----+---------+--------+-----+---------------------+
| id | name    | height | age | created_at          |
+----+---------+--------+-----+---------------------+
|  9 | keisuke |    176 |  21 | 2022-03-18 17:36:31 |
| 10 | msd     |    177 |  21 | 2022-03-19 03:42:59 |
+----+---------+--------+-----+---------------------+

embulkは書き出し先にテーブルが存在しないと自動で作成してくれますが、AUTO_INCREMENTなどは考えてくれないので自分であらかじめ作成しておくのが良さそうです.確認してみます.

$ docker exec -it output bin/bash
root@749af95744e1:/# mysql -p
Enter password: password
....
MariaDB [(none)]> use output;
Database changed
MariaDB [output]> desc allstars;
+-------------------+-------------+------+-----+---------------------+----------------+
| Field             | Type        | Null | Key | Default             | Extra          |
+-------------------+-------------+------+-----+---------------------+----------------+
| id                | int(11)     | NO   | PRI | NULL                | auto_increment |
| name              | varchar(10) | YES  |     | NULL                |                |
| age               | int(11)     | YES  |     | NULL                |                |
| height            | int(11)     | YES  |     | NULL                |                |
| team              | varchar(11) | YES  |     | NULL                |                |
| origin_id         | int(11)     | YES  |     | NULL                |                |
| origin_created_at | timestamp   | YES  |     | NULL                |                |
| created_at        | timestamp   | NO   |     | current_timestamp() |                |
+-------------------+-------------+------+-----+---------------------+----------------+
MariaDB [output]> select * from allstars;
Empty set (0.001 sec)

長くなりましたが、いよいよembulkを動かしていきます.-c diff.ymlをつけて差分を記録します.これをしないと差分ローディングができません.最後の行でしっかりとidが記録されているのが分かります.不安な人はdiff.ymlの中身を覗いてみてください.

$ docker exec -it embulk bash
root@8cca802ef4ee:/app# embulk run config.yml -c diff.yml
....
2022-03-18 19:00:33.740 +0000 [INFO] (main): Next config diff: {"in":{"last_record":[10]},"out":{}}

実際にデータはうまく反映されているのか見てみましょう.「おおおお!うまくできてる」と思いきやorigin_created_atを見てください.元テーブルのtimestampがUTC戻されてしまっています.コンテナのtimezoneはJSTにしているのでこの辺はembulkが内部的に保持しているのかもしれません.

MariaDB [output]> select * from allstars;
+----+---------+------+--------+------------+-----------+---------------------+---------------------+
| id | name    | age  | height | team       | origin_id | origin_created_at   | created_at          |
+----+---------+------+--------+------------+-----------+---------------------+---------------------+
|  1 | keisuke |   21 |    176 | basketball |         9 | 2022-03-18 08:36:31 | 2022-03-19 04:00:33 |
|  2 | msd     |   21 |    177 | basketball |        10 | 2022-03-18 18:42:59 | 2022-03-19 04:00:33 |
+----+---------+------+--------+------------+-----------+---------------------+---------------------+
2 rows in set (0.001 sec)

origin_created_at問題を除けばうまく行っています.バレーボール部の方も同様に行えばできそうです.

さいごに

初めて投稿しました.にわかブッこいてるとこもあると思うので間違っていたら指摘してください.

Discussion

ログインするとコメントできます