👨‍🎤

👨🎤ELT連携の仕様、Singer触ってみた👨🎤

2022/12/23に公開

tl;dr

  • データの抽出・挿入を行うプログラムの間の仕様だよ
  • 仕様に沿ったプログラムを組み合わせることで、ETL・ELTのEL部分を作れるよ
  • 色々なデータソース・シンク向けに、Singerに沿ったプログラムが公開されているよ

Singerとは

公式サイト曰く、

The open-source standard for writing scripts that move data.

だそうです。

ETL・ELT(やEtLT)のデータの抽出(E)と、データの格納(L)の仕様で、

  • どこかからデータを抽出し、標準出力に出力するプログラム(Tap)
  • 標準入力からデータを受け取り、どこかに格納するプログラム(Target)

について、

を定めています。Singerに沿ったプログラムを作成することで、TapとTargetを任意の組み合わせで利用することが出来ます。

対応している連携(Singer対応のTap・Target)

Singerのホームページには、

Tapでは

  • DB(PostgreSQL、MySQL、DynamoDB、Db2)
  • マーケティングのSaaS(SalesforceやGooegle Analytics)

Targetでは

  • (ローカルの)CSV
  • PostgreSQL、BigQuery

などの連携が記載されています。

また、上記のリストに無い連携でも

  • Singerに仕様に合わせて自前で作成
  • MeltanoHubでMeltanoのTap(Meltanoの用語ではExtractor)、Target(Meltanoの用語ではLoader)を見つける
    • MeltanoHubの記載はMeltano向けですが、Singerとして使えるものもあります(全て使えるかは不明)

で対応可能です。

なお、各Tap・Targetは独立に開発されており、用語や品質はばらつきがあります。
(この部分は他のETLツールとの比較で議論になっているようです)

エコシステム

Singerを利用したSaaS・ツールがいくつか存在します(主観ですがStitchとMeltanoをよく聞きます)。

SaaS・ツール毎に提供している機能は違いますが、大まかにはSingerが提供していない

  • GUI画面
  • ホスティング環境
  • パイプライン(SingerはTap、Targetを個別に提供するだけ)
  • 変換処理(ETLやELTのTの部分)
  • 設定ファイルの記載の支援

などを提供しているようです。

Tap・Targetを試してみる

いくつかTapとTargetを動かしてみます。Ubuntu 20.04 (Windows10のWSL2上)、Python3.8で試しました。

MySQL Tap

まずTapの例としてtap-mysqlを試してみます。試す流れとしては、

  • tap-mysqlのインストール
  • 動作確認用のMySQL(Dockerコンテナ)を起動
  • ダミーのテーブル作成
  • tap-mysql用の設定ファイルを作成
    • 接続用情報を記載した設定ファイルと、処理対象のテーブルを指定する設定ファイルの二つが必要です
  • tap-mysqlの実行

の作業を行います。

準備

tap-mysqlをインストールします。なお、SingerのプラクティスとしてはTap・Target毎に環境(venv)を作ることを推奨されています(この記事では横着して省略しています)。

pip install tap-mysql

動作確認用に、MySQLをコンテナを使います。docker-compose.yamlを記載して

version: '3.1'

services:

  db:
    image: mysql:8.0.31
    command: --default-authentication-plugin=mysql_native_password
    ports:
      - 3306:3306
    restart: always
    environment:
      MYSQL_ROOT_PASSWORD: example

起動します。

docker-compose up

ローカルからMySQLに接続して、適当にダミーデータを作っておきます。

sudo apt-get -y install mysql-clients
# パスワードはexample
mysql -h 127.0.0.1 -u root -p -P 3306
create database main;
use main;
CREATE TABLE t1 (x varchar(200), y int);
INSERT INTO t1 VALUES ('key1', 2);

設定

接続情報をmysql_config.jsonに保存します。tap-mysqlではホスト・ポート認証情報を設定しますが、Singerでは設定ファイルの内容を特に決めていない(Tap・Target依存)ので、Tap・Targetのドキュメント・ソースコードを見て確認する必要があります。

{
  "host": "127.0.0.1",
  "port": "3306",
  "user": "root",
  "password": "example"
}

tapがデータを取得するには、接続情報に加えて、取得対象(tap-mysqlの場合はテーブル)を指定したCatalogを準備する必要があります。
Discoveryモード(tap-mysql discover)で実行するとCatalogの雛形を作成してくれるので、それを元に少し編集します。

tap-mysql  --config mysql_config.json --discover > mysql_properties.json
# discoverの結果はシステムテーブルなども含まれるためjqでフィルター
cat mysql_properties.json  | jq '.streams[] | select(.table_name=="t1")  | {streams: [.]} ' > mysql_t1_properties.json

t1テーブルを出力するために、breadcrumbが空なメタデータにselected、replication-methodを追加しました。

{
  "streams": [
    {
      "tap_stream_id": "main-t1",
      "table_name": "t1",
      "schema": {
        "properties": {
          "y": {
            "inclusion": "available",
            "minimum": -2147483648,
            "maximum": 2147483647,
            "type": [
              "null",
              "integer"
            ]
          },
          "x": {
            "inclusion": "available",
            "maxLength": 200,
            "type": [
              "null",
              "string"
            ]
          }
        },
        "type": "object"
      },
      "stream": "t1",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected-by-default": false,
            "database-name": "main",
            "row-count": 1,
            "is-view": false,
            "selected": true,
            "table-key-properties": [],
            "replication-method": "FULL_TABLE"
          }
        },
        {
          "breadcrumb": [
            "properties",
            "y"
          ],
          "metadata": {
            "selected-by-default": true,
            "sql-datatype": "int"
          }
        },
        {
          "breadcrumb": [
            "properties",
            "x"
          ],
          "metadata": {
            "selected-by-default": true,
            "sql-datatype": "varchar(200)"
          }
        }
      ]
    }
  ]
}

実行

Singerの仕様で決まっている、

  • SCHEMA
  • STATE
  • REDCORD

のJSONが標準出力に出力され、また、RECORDにはMySQLに入れたデータが出力されていることがわかります。

(なお、この他にも、ログとACTIVE_VERSIONが出力されています)

tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json
INFO Server Parameters: version: 8.0.31, wait_timeout: 2700, innodb_lock_wait_timeout: 2700, max_allowed_packet: 67108864, interactive_timeout: 28800
INFO Server SSL Parameters (blank means SSL is not active): [ssl_version: ], [ssl_cipher: ]
{"type": "STATE", "value": {"currently_syncing": "main-t1"}}
INFO Beginning sync for InnoDB table main.t1
INFO Stream t1 is using full table replication
{"type": "SCHEMA", "stream": "t1", "schema": {"properties": {"y": {"inclusion": "available", "minimum": -2147483648, "maximum": 2147483647, "type": ["null", "integer"]}, "x": {"inclusion": "available", "maxLength": 200, "type": ["null", "string"]}}, "type": "object"}, "key_properties": []}
{"type": "ACTIVATE_VERSION", "stream": "t1", "version": 1671627031099}
INFO Running SELECT `y`,`x` FROM `main`.`t1`
{"type": "RECORD", "stream": "t1", "record": {"y": 2, "x": "key1"}, "version": 1671627031099, "time_extracted": "2022-12-21T12:50:31.101311Z"}
{"type": "RECORD", "stream": "t1", "record": {"y": 3, "x": "key2"}, "version": 1671627031099, "time_extracted": "2022-12-21T12:50:31.101311Z"}
INFO METRIC: {"type": "counter", "metric": "record_count", "value": 2, "tags": {"database": "main", "table": "t1"}}
{"type": "STATE", "value": {"currently_syncing": "main-t1"}}
{"type": "ACTIVATE_VERSION", "stream": "t1", "version": 1671627031099}
{"type": "STATE", "value": {"currently_syncing": "main-t1", "bookmarks": {"main-t1": {"initial_full_table_complete": true}}}}
INFO METRIC: {"type": "timer", "metric": "job_duration", "value": 0.005346775054931641, "tags": {"job_type": "sync_table", "database": "main", "table": "t1", "status": "succeeded"}}
{"type": "STATE", "value": {"currently_syncing": null, "bookmarks": {"main-t1": {"initial_full_table_complete": true}}}}

CSV Target

先程のtap-mysqlの例では、MySQLから抽出したデータが標準出力に出力されているだけです。Tap・Targetの連携の例として、標準入力から受け取ったデータを、ローカルのCSVファイルとして出力するtarget-csvと連携させてみます。

まずはパッケージをインストールします。

pip install target-csv

target-csvの場合は設定無しでも動きます(区切り文字などは設定することも可能)。先ほどのtap-mysqlとつなげてみます。

# 出力をわかりやすくするため、標準エラー出力(ログ)は捨てています
tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json  2>/dev/null | target-csv 2>/dev/null
{"currently_syncing": null, "bookmarks": {"main-t1": {"initial_full_table_complete": true}}}

新しくCSVファイルが作られ(ファイル名はtarget-csvの実行日時)、tap-mysqlの出力をCSVにしたデータが出力されています。

cat t1-20221221T223018.csv
y,x
2,key1
3,key2

Signerではこのように、Tapが出力した出力を、パイプやファイル経由でTargetに渡すことでELパイプラインを構築できます。

DuckDB Target

SingerのWebページになく、MeltanoHubにある連携を使う例として、DuckDB(target-duckdb)を連携させてみます。DuckDBについては🦆🦆🦆🦆🦆🦆DuckDB入門🦆🦆🦆🦆🦆🦆などを参照してください。

SingerのWebページにはDuckDBのTargetは登録されていません。

一方、MeltanoHubにはDuckDBの連携がありそうです。

MeltanoHubのページにはMeltano経由のインストール方法だけが紹介されていますが、Meltanoに依存しない処理としても使えます。

pip install target-duckdb

接続情報をduckdb_config.jsonに記載します(filepathは適当に設定してください)

{
    "filepath": "/home/notrogue/project/singer/duckdb.file",
    "default_target_schema": "main"
}

入力としてtap-mysqlを使います。
DuckDBはkey_propertiesが必要らしい(設定しないとkey_properties field is required)ので、tap-mysqlの設定ファイル(table-key-properties)に設定しておきます。

      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected-by-default": false,
            "database-name": "main",
            "row-count": 1,
            "is-view": false,
            "selected": true,
            "table-key-properties": ["x"],
            "replication-method": "FULL_TABLE"
          }
        },

(変更付近だけ抜粋。全体はtap-mysqlの章を参照してください)

実行します

tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json | target-duckdb -c duckdb_config.json

結果を確認します

 duckdb duckdb.file -s "SELECT * FROM main.t1"
┌─────────┬───────┐
│    x    │   y   │
│ varchar │ int32 │
├─────────┼───────┤
│ key1    │     2 │
│ key2    │     3 │
└─────────┴───────┘

良さそうですね。

色々いじる

これまでTapとTargetをリダイレクトで直接つなげていました。Tap・Targetの間のデータは、標準入出力を流れるJSONであれば良いので、適当に編集を挟むことも可能です。

JSONをいじる

処理の間にjqコマンドを挟み、特定の条件にあてはまるレコードをマスクしてみます。

# x=key1だけマスク
tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json  2>/dev/null | jq -c '. | if .type == "RECORD" and .record.x=="key1" then .record += {"x":"****"} else . end' | target-csv
 cat t1-20221221T233333.csv
y,x
2,****
3,key2

Fan-In・Fan-Out

名前付きパイプなどを使うことで、

  • 複数のTapのデータを一つのTargetに送る(Fan-in)
  • 一つのTapから複数のTargetに送る(Fan-Out)

ことも可能です。

動作に使う名前付きパイプを作ります

mkfifo np np2

fan-inの確認に使うために、t2というテーブルをMySQLに作ります。

# テーブルの作成
mysql -h 127.0.0.1 -u root -p -P 3306
use main;
CREATE TABLE t2 (x varchar(200), y int);
INSERT INTO t2 VALUES ('hogehoge', 2);

対応するCatalogをmysql_t2_propeties.jsonとして保存します。mysql_t2_propeties.jsonは、mysql_propeties.jsonと取得するテーブルを変えただけの設定ファイルです。

{
  "streams": [
    {
      "tap_stream_id": "main-t2",
      "table_name": "t2",
      "schema": {
        "properties": {
          "y": {
            "inclusion": "available",
            "minimum": -2147483648,
            "maximum": 2147483647,
            "type": [
              "null",
              "integer"
            ]
          },
          "x": {
            "inclusion": "available",
            "maxLength": 200,
            "type": [
              "null",
              "string"
            ]
          }
        },
        "type": "object"
      },
      "stream": "t1",
      "metadata": [
        {
          "breadcrumb": [],
          "metadata": {
            "selected-by-default": false,
            "database-name": "main",
            "row-count": 1,
            "is-view": false,
            "selected": true,
            "table-key-properties": ["x"],
            "replication-method": "FULL_TABLE"
          }
        },
        {
          "breadcrumb": [
            "properties",
            "y"
          ],
          "metadata": {
            "selected-by-default": true,
            "sql-datatype": "int"
          }
        },
        {
          "breadcrumb": [
            "properties",
            "x"
          ],
          "metadata": {
            "selected-by-default": true,
            "sql-datatype": "varchar(200)"
          }
        }
      ]
    }
  ]
}

二つのMySQLテーブルを、名前付きパイプ経由で一つのtarget-csvに送ります。

tap-mysql -c mysql_config.json --catalog mysql_t1_properties.json > np &
tap-mysql -c mysql_config.json --catalog mysql_t2_properties.json > np &
target-csv < np

一つのファイルに出力されました。

cat t1-20221222T194626.csv
x,y
key1,2
2,hogehoge
key2,3

逆にtap-mysqlのデータを複数のtargetに送ってみます。ここでは、target-csvとtarget-duckの二つに送ってみます。

tap-mysql -c mysql_config.json --catalog mysql_t2_properties.json | tee np np2

CSVファイルにも

cat t1-20221222T200441.csv
y,x
2,hogehoge

DuckDBにも

duckdb duckdb.file -s "SELECT * FROM main.t1"
┌──────────┬───────┐
│    x     │   y   │
│ varchar  │ int32 │
├──────────┼───────┤
│ hogehoge │     2 │
└──────────┴───────┘

データが送られています。

Discussion

ログインするとコメントできます