👨🎤ELT連携の仕様、Singer触ってみた👨🎤
tl;dr
- データの抽出・挿入を行うプログラムの間の仕様だよ
- 仕様に沿ったプログラムを組み合わせることで、ETL・ELTのEL部分を作れるよ
- 色々なデータソース・シンク向けに、Singerに沿ったプログラムが公開されているよ
Singerとは
公式サイト曰く、
The open-source standard for writing scripts that move data.
だそうです。
ETL・ELT(やEtLT)のデータの抽出(E)と、データの格納(L)の仕様で、
- どこかからデータを抽出し、標準出力に出力するプログラム(Tap)
- 標準入力からデータを受け取り、どこかに格納するプログラム(Target)
について、
-
設定ファイル
- Config、State、Catalogの三種類のJSONファイルを設定として受け取ります
-
TapとTargetの間のデータの形式
- データ、スキーマ、状態(State)の三種類のデータを、JSON(JSONL?)やり取りします
を定めています。Singerに沿ったプログラムを作成することで、TapとTargetを任意の組み合わせで利用することが出来ます。
対応している連携(Singer対応のTap・Target)
Singerのホームページには、
Tapでは
- DB(PostgreSQL、MySQL、DynamoDB、Db2)
- マーケティングのSaaS(SalesforceやGooegle Analytics)
Targetでは
- (ローカルの)CSV
- PostgreSQL、BigQuery
などの連携が記載されています。
また、上記のリストに無い連携でも
- Singerに仕様に合わせて自前で作成
-
MeltanoHubでMeltanoのTap(Meltanoの用語ではExtractor)、Target(Meltanoの用語ではLoader)を見つける
- MeltanoHubの記載はMeltano向けですが、Singerとして使えるものもあります(全て使えるかは不明)
で対応可能です。
なお、各Tap・Targetは独立に開発されており、用語や品質はばらつきがあります。
(この部分は他のETLツールとの比較で議論になっているようです)
エコシステム
Singerを利用したSaaS・ツールがいくつか存在します(主観ですがStitchとMeltanoをよく聞きます)。
SaaS・ツール毎に提供している機能は違いますが、大まかにはSingerが提供していない
- GUI画面
- ホスティング環境
- パイプライン(SingerはTap、Targetを個別に提供するだけ)
- 変換処理(ETLやELTのTの部分)
- 設定ファイルの記載の支援
などを提供しているようです。
Tap・Targetを試してみる
いくつかTapとTargetを動かしてみます。Ubuntu 20.04 (Windows10のWSL2上)、Python3.8で試しました。
MySQL Tap
まずTapの例としてtap-mysqlを試してみます。試す流れとしては、
- tap-mysqlのインストール
- 動作確認用のMySQL(Dockerコンテナ)を起動
- ダミーのテーブル作成
- tap-mysql用の設定ファイルを作成
- 接続用情報を記載した設定ファイルと、処理対象のテーブルを指定する設定ファイルの二つが必要です
- tap-mysqlの実行
の作業を行います。
準備
tap-mysqlをインストールします。なお、SingerのプラクティスとしてはTap・Target毎に環境(venv)を作ることを推奨されています(この記事では横着して省略しています)。
pip install tap-mysql
動作確認用に、MySQLをコンテナを使います。docker-compose.yamlを記載して
version: '3.1'
services:
db:
image: mysql:8.0.31
command: --default-authentication-plugin=mysql_native_password
ports:
- 3306:3306
restart: always
environment:
MYSQL_ROOT_PASSWORD: example
起動します。
docker-compose up
ローカルからMySQLに接続して、適当にダミーデータを作っておきます。
sudo apt-get -y install mysql-clients
# パスワードはexample
mysql -h 127.0.0.1 -u root -p -P 3306
create database main;
use main;
CREATE TABLE t1 (x varchar(200), y int);
INSERT INTO t1 VALUES ('key1', 2);
設定
接続情報をmysql_config.jsonに保存します。tap-mysqlではホスト・ポート認証情報を設定しますが、Singerでは設定ファイルの内容を特に決めていない(Tap・Target依存)ので、Tap・Targetのドキュメント・ソースコードを見て確認する必要があります。
{
"host": "127.0.0.1",
"port": "3306",
"user": "root",
"password": "example"
}
tapがデータを取得するには、接続情報に加えて、取得対象(tap-mysqlの場合はテーブル)を指定したCatalogを準備する必要があります。
Discoveryモード(tap-mysql discover)で実行するとCatalogの雛形を作成してくれるので、それを元に少し編集します。
tap-mysql --config mysql_config.json --discover > mysql_properties.json
# discoverの結果はシステムテーブルなども含まれるためjqでフィルター
cat mysql_properties.json | jq '.streams[] | select(.table_name=="t1") | {streams: [.]} ' > mysql_t1_properties.json
t1テーブルを出力するために、breadcrumbが空なメタデータにselected、replication-methodを追加しました。
{
"streams": [
{
"tap_stream_id": "main-t1",
"table_name": "t1",
"schema": {
"properties": {
"y": {
"inclusion": "available",
"minimum": -2147483648,
"maximum": 2147483647,
"type": [
"null",
"integer"
]
},
"x": {
"inclusion": "available",
"maxLength": 200,
"type": [
"null",
"string"
]
}
},
"type": "object"
},
"stream": "t1",
"metadata": [
{
"breadcrumb": [],
"metadata": {
"selected-by-default": false,
"database-name": "main",
"row-count": 1,
"is-view": false,
"selected": true,
"table-key-properties": [],
"replication-method": "FULL_TABLE"
}
},
{
"breadcrumb": [
"properties",
"y"
],
"metadata": {
"selected-by-default": true,
"sql-datatype": "int"
}
},
{
"breadcrumb": [
"properties",
"x"
],
"metadata": {
"selected-by-default": true,
"sql-datatype": "varchar(200)"
}
}
]
}
]
}
実行
Singerの仕様で決まっている、
- SCHEMA
- STATE
- REDCORD
のJSONが標準出力に出力され、また、RECORDにはMySQLに入れたデータが出力されていることがわかります。
(なお、この他にも、ログとACTIVE_VERSIONが出力されています)
tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json
INFO Server Parameters: version: 8.0.31, wait_timeout: 2700, innodb_lock_wait_timeout: 2700, max_allowed_packet: 67108864, interactive_timeout: 28800
INFO Server SSL Parameters (blank means SSL is not active): [ssl_version: ], [ssl_cipher: ]
{"type": "STATE", "value": {"currently_syncing": "main-t1"}}
INFO Beginning sync for InnoDB table main.t1
INFO Stream t1 is using full table replication
{"type": "SCHEMA", "stream": "t1", "schema": {"properties": {"y": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "x": {"inclusion": "available", "maxLength": 200, "type": ["null", "string"]}}, "type": "object"}, "key_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "t1", "version": 1671627031099}
INFO Running SELECT `y`,`x` FROM `main`.`t1`
{"type": "RECORD", "stream": "t1", "record": {"y": 2, "x": "key1"}, "version": 1671627031099, "time_extracted": "2022-12-21T12:50:31.101311Z"}
{"type": "RECORD", "stream": "t1", "record": {"y": 3, "x": "key2"}, "version": 1671627031099, "time_extracted": "2022-12-21T12:50:31.101311Z"}
INFO METRIC: {"type": "counter", "metric": "record_count", "value": 2, "tags": {"database": "main", "table": "t1"}}
{"type": "STATE", "value": {"currently_syncing": "main-t1"}}
{"type": "ACTIVATE_VERSION", "stream": "t1", "version": 1671627031099}
{"type": "STATE", "value": {"currently_syncing": "main-t1", "bookmarks": {"main-t1": {"initial_full_table_complete": true}}}}
INFO METRIC: {"type": "timer", "metric": "job_duration", "value": 0.005346775054931641, "tags": {"job_type": "sync_table", "database": "main", "table": "t1", "status": "succeeded"}}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"main-t1": {"initial_full_table_complete": true}}}}
CSV Target
先程のtap-mysqlの例では、MySQLから抽出したデータが標準出力に出力されているだけです。Tap・Targetの連携の例として、標準入力から受け取ったデータを、ローカルのCSVファイルとして出力するtarget-csvと連携させてみます。
まずはパッケージをインストールします。
pip install target-csv
target-csvの場合は設定無しでも動きます(区切り文字などは設定することも可能)。先ほどのtap-mysqlとつなげてみます。
# 出力をわかりやすくするため、標準エラー出力(ログ)は捨てています
tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json 2>/dev/null | target-csv 2>/dev/null
{"currently_syncing": null, "bookmarks": {"main-t1": {"initial_full_table_complete": true}}}
新しくCSVファイルが作られ(ファイル名はtarget-csvの実行日時)、tap-mysqlの出力をCSVにしたデータが出力されています。
cat t1-20221221T223018.csv
y,x
2,key1
3,key2
Signerではこのように、Tapが出力した出力を、パイプやファイル経由でTargetに渡すことでELパイプラインを構築できます。
DuckDB Target
SingerのWebページになく、MeltanoHubにある連携を使う例として、DuckDB(target-duckdb)を連携させてみます。DuckDBについては🦆🦆🦆🦆🦆🦆DuckDB入門🦆🦆🦆🦆🦆🦆などを参照してください。
SingerのWebページにはDuckDBのTargetは登録されていません。
一方、MeltanoHubにはDuckDBの連携がありそうです。
MeltanoHubのページにはMeltano経由のインストール方法だけが紹介されていますが、Meltanoに依存しない処理としても使えます。
pip install target-duckdb
接続情報をduckdb_config.jsonに記載します(filepathは適当に設定してください)
{
"filepath": "/home/notrogue/project/singer/duckdb.file",
"default_target_schema": "main"
}
入力としてtap-mysqlを使います。
DuckDBはkey_propertiesが必要らしい(設定しないとkey_properties field is required
)ので、tap-mysqlの設定ファイル(table-key-properties)に設定しておきます。
"metadata": [
{
"breadcrumb": [],
"metadata": {
"selected-by-default": false,
"database-name": "main",
"row-count": 1,
"is-view": false,
"selected": true,
"table-key-properties": ["x"],
"replication-method": "FULL_TABLE"
}
},
(変更付近だけ抜粋。全体はtap-mysqlの章を参照してください)
実行します
tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json | target-duckdb -c duckdb_config.json
結果を確認します
duckdb duckdb.file -s "SELECT * FROM main.t1"
┌─────────┬───────┐
│ x │ y │
│ varchar │ int32 │
├─────────┼───────┤
│ key1 │ 2 │
│ key2 │ 3 │
└─────────┴───────┘
良さそうですね。
色々いじる
これまでTapとTargetをリダイレクトで直接つなげていました。Tap・Targetの間のデータは、標準入出力を流れるJSONであれば良いので、適当に編集を挟むことも可能です。
JSONをいじる
処理の間にjqコマンドを挟み、特定の条件にあてはまるレコードをマスクしてみます。
# x=key1だけマスク
tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json 2>/dev/null | jq -c '. | if .type == "RECORD" and .record.x=="key1" then .record += {"x":"****"} else . end' | target-csv
cat t1-20221221T233333.csv
y,x
2,****
3,key2
Fan-In・Fan-Out
名前付きパイプなどを使うことで、
- 複数のTapのデータを一つのTargetに送る(Fan-in)
- 一つのTapから複数のTargetに送る(Fan-Out)
ことも可能です。
動作に使う名前付きパイプを作ります
mkfifo np np2
fan-inの確認に使うために、t2というテーブルをMySQLに作ります。
# テーブルの作成
mysql -h 127.0.0.1 -u root -p -P 3306
use main;
CREATE TABLE t2 (x varchar(200), y int);
INSERT INTO t2 VALUES ('hogehoge', 2);
対応するCatalogをmysql_t2_propeties.jsonとして保存します。mysql_t2_propeties.jsonは、mysql_propeties.jsonと取得するテーブルを変えただけの設定ファイルです。
{
"streams": [
{
"tap_stream_id": "main-t2",
"table_name": "t2",
"schema": {
"properties": {
"y": {
"inclusion": "available",
"minimum": -2147483648,
"maximum": 2147483647,
"type": [
"null",
"integer"
]
},
"x": {
"inclusion": "available",
"maxLength": 200,
"type": [
"null",
"string"
]
}
},
"type": "object"
},
"stream": "t1",
"metadata": [
{
"breadcrumb": [],
"metadata": {
"selected-by-default": false,
"database-name": "main",
"row-count": 1,
"is-view": false,
"selected": true,
"table-key-properties": ["x"],
"replication-method": "FULL_TABLE"
}
},
{
"breadcrumb": [
"properties",
"y"
],
"metadata": {
"selected-by-default": true,
"sql-datatype": "int"
}
},
{
"breadcrumb": [
"properties",
"x"
],
"metadata": {
"selected-by-default": true,
"sql-datatype": "varchar(200)"
}
}
]
}
]
}
二つのMySQLテーブルを、名前付きパイプ経由で一つのtarget-csvに送ります。
tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json > np &
tap-mysql -c mysql_config.json --catalog mysql_t2_properties.json > np &
target-csv < np
一つのファイルに出力されました。
cat t1-20221222T194626.csv
x,y
key1,2
2,hogehoge
key2,3
逆にtap-mysqlのデータを複数のtargetに送ってみます。ここでは、target-csvとtarget-duckの二つに送ってみます。
tap-mysql -c mysql_config.json --catalog mysql_t2_properties.json | tee np np2
CSVファイルにも
cat t1-20221222T200441.csv
y,x
2,hogehoge
DuckDBにも
duckdb duckdb.file -s "SELECT * FROM main.t1"
┌──────────┬───────┐
│ x │ y │
│ varchar │ int32 │
├──────────┼───────┤
│ hogehoge │ 2 │
└──────────┴───────┘
データが送られています。
Discussion