Marquez・OpenLineage触ってみた
気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#7です。
OpenLineageとMarquezを調べて・触ってみました。
tl;dr
- OpenLineageはリネージュ登録の標準仕様、Marquezはそのリファレンス実装だよ
- データがどこから来て、どこに行って、どんな処理を噛ませたかを記録することができるよ
- 連携できるシステムには注意だよ
- Airflowは連携できる。Spark・Great Expectatinosはドキュメントにあるけど要注意
OpenLineageとは
公式ページ曰く、
OpenLineage is an open framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata.
色々なシステムがリネージュのメタデータを相互運用できるような仕様を中心としたフレームワーク(ここでは枠組みの意味?)です。
そもそもリネージュ(data lineage)とはなんぞやという話・リネージュのご利益については、datakinさんの記事あたりを参照してください。
リネージュの管理には、
- リネージュを提供するシステム(下図の上)
- リネージュを管理するシステム(下図の下)
の二種類のシステムが必要です。共通仕様があれば、実装の手間や関係システムの交換が楽になるのでは?というモチベーションで標準化仕様を決めたいようです。
仕様の中身としては、
が公開されています。
連携
メタデータ管理
先程のOpenLineageの概要図では、
の4つのシステムがリネージュを管理するシステムとして挙げられています。これらシステムとOpenLineageの対応状況は
- MaruquezはOpenLineageのリファレンス実装として実装されている
- AmundsenとEgeriaは連携がありそう
- (試していないので、どの程度実用かは不明)
- Apache Atlasは連携見つからず…
となっています。
(他のデータカタログ(e.g. GCPのData Catalog)で対応しているシステムがあるかは不明…)
データ提供元
2022/12現在、OpenLineageに対応しているデータの提供元は
の6種類の記載があります。これだけ見ると連携多いように思えますが、いくつか注意が必要な点があります:
- FlinkはExperimentalです
- dbtは対応するDBが限られます
- (Marquezの話)SparkとGreat Expectationsは連携できる情報に制限があります
- OpenLineageの仕様が変わった影響です
- 例えば、Great Expectationsではバリデーションの成否やバリデーションの名前などは、Great Expectations->Marquezからのリクエストには含まれますが、Marquez側で無視され、WebUIでは確認できません
連携が組み込みで提供されていないツールを利用する場合は、
- Javaのクライアントライブラリ
- Pythonのクライアントライブラリ
- APIを直接叩く
のいずれかで対応することになりそうです。
Marquezとは
OpenLineageはリネージュ登録のAPIの枠組み・仕様です。具体的な実装としては、Marquezというデータカタログがあり、
- OpenLineageの(リネージュの登録)のリファレンス実装API
- OpenLineage以外のAPI(検索等)
- ちなみに、MarquezのOpenLineageでない部分のクライアントライブラリは別に存在
- WebUI
を提供してくれます。
Maquezに関しては、日本語の記事に限っても
などの方が紹介記事を書いてくださっています。
アーキテクチャ・コンポーネント
実装のアーキテクチャは比較的シンプルなようで(下図)、
(Marquez公式ページのDeploymentページより)
- APIのサーバー
- WebUIのサーバー
- メタデータを保存するRDB
の3つのコンポーネントからなります(Schedulerの部分はMarquezではなく、Marquezと連携するリネージュの提供元です)。
デプロイ環境としては、Kubernetesで動かす用のHelmチャートと、AWSで動かすためのマニュアルが用意されています。
OpenLineage・Marquez関係の動き
インターネットで公開されている情報をまとめてみました。流れとしてはMarquezが先にあって、その後OpenLineageが仕様として提案されたようです。
-
WeWorkの社内ツールとして、Marquezが開発される
- 2018年
-
MarquezがOSS化・Linux Foundation(LF AI&Data)参加
- 2019年
-
datakin創立
- MarquezをベースとしたSaaSの提供や、Marquezの開発している(していた)企業
*[2019年
- MarquezをベースとしたSaaSの提供や、Marquezの開発している(していた)企業
-
datakinがOpenLineageを提案
- 2020年
-
OpenLineageがLinux Foundation(LF AI&Data)に参加
- 2021年
-
AstronomerがDatakinを買収
- 2022年
- AstronomerはAirflowのSaaSを提供している企業です
Marquez触ってみる
ローカルのコンテナで
- Marquez(API、WebUI、DB)をコンテナで起動
- curlでリクエスト
- Airflowからの連携
を試してみます。
(Ubuntu20.04、WSL2(Windows 10)、Airflow 2.4.3で確認)
Marquez(API、WebUI、DB)をコンテナで起動
git clone git@github.com:MarquezProject/marquez.git && cd marquez
./docker/up.sh
このシェルスクリプトはdocker-composeの薄いラッパーで、
- データベース(PostgreSQL)
- APIサーバ
- Webを提供するサーバ
が起動します。
curlでリクエスト
例通りに、リネージュを登録するAPIをcurlで呼んでみます。
curl -X POST http://localhost:5000/api/v1/lineage \
-H 'Content-Type: application/json' \
-d '{
"eventType": "START",
"eventTime": "2020-12-28T19:52:00.001+10:00",
"run": {
"runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
},
"job": {
"namespace": "my-namespace",
"name": "my-job"
},
"inputs": [{
"namespace": "my-namespace",
"name": "my-input"
}],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
}'
Webブラウザでhttp://localhost:3000を開き、namespace(右上)を「my-namespace」選ぶと、APIで登録した「my-job」がジョブ一覧にrunningで表示されています。
もう一回リクエストすると(eventTypeをCOMPLETE)、StateがCompletedに変わり、入力(myInput)・出力(myOutput)がリネージュグラフに追加されます。
curl -X POST http://localhost:5000/api/v1/lineage \
-H 'Content-Type: application/json' \
-d '{
"eventType": "COMPLETE",
"eventTime": "2020-12-28T20:52:00.001+10:00",
"run": {
"runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
},
"job": {
"namespace": "my-namespace",
"name": "my-job"
},
"outputs": [{
"namespace": "my-namespace",
"name": "my-output",
"facets": {
"schema": {
"_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
"_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
"fields": [
{ "name": "a", "type": "VARCHAR"},
{ "name": "b", "type": "VARCHAR"}
]
}
}
}],
"producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
}'
Airflowからの連携
Airflowの設定
AirflowとMarquezを連携させるには、
- (AirflowとMarquezクラスタの用意)
- Airflowのクラスタの用意には、公式が用意してくれているdocker-compose.yamlを用いました。このドキュメントに記載の(Marquez関係ない部分の)設定を行ってください
- 連携パッケージ(openlineage-airflow)をインストールした、カスタムイメージを使うように変更
- Marquez(というかOpenLineage)のURLとNamespaceを環境変数に設定
-
連携するOperatorを使ったDAGを実行
- Airflowには色々なOperatorがありますが、2022/12現在で対応しているのは、PostgresOperator、MySqlOperator、BigQueryOperator、SnowflakeOperator、GreatExpecationsOperator、PythonOperatorだけです
- 自前のOperatorを対応させるには、BaseExtractorを継承させメソッドを実装します
が必要です。
(なお、OpenLineageのドキュメントにはAIRFLOW__LINEAGE__BACKEND 設定しろとありますが、設定しなくても動きました。pypi側にもAirflow2.3以上の場合は、設定いらない旨の記載あり)
まずは、必要なパッケージを入れたDockerイメージを作ります。docker-compose.yamlと同じディレクトリでDockerfileを作ります。
FROM apache/airflow:2.4.3
RUN pip install --no-cache-dir openlineage-airflow
docker-compose.yamlを変更して、カスタムしたイメージを使うようにします。
(x-airflow-commonのimageをコメントアウトして、build .を追加)
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
build: .
Marquez(というかOpenLineage)のURLを指定します。真面目にやるならコンテナネットワーク設定すべきかもしれませんが、AirflowのコンテナからMarquezのAPIには、ホスト経由でアクセスさせます(Airflowのコンテナ->ホスト->Marquez)。
まずは、コンテナからホストにアクセスするためのIPアドレスを確認します。
docker network inspect airflow_default | jq '.[] | .IPAM.Config[] | .Gateway '
"172.19.0.1"
docker-compose.yamlを変更して、OPENLINEAGE_URLとOPENLINEAGE_NAMESPACEを設定します。
(x-airflow-commonのenvironmentに追加)
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
# image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
# 今回追加の部分
OPENLINEAGE_URL: http://172.19.0.1:5000
OPENLINEAGE_NAMESPACE: airflow
DAGを準備
動作確認のDAGとして、Astronomerの例のDAGを使います。この例では、PostgresqlOperator(Marquezに対応しているOperatorの一つ)を使い
- animal_adoptions_combinedというテーブルを作成
- adoption_center_1テーブルとadoption_center_2(事前作成)をanimal_adoptions_combinedテーブルに挿入
するDAGと、
- adoption_reporting_longというテーブルを作成
- animal_adoptions_combinedからadoption_reporting_longに集計し挿入
するDAGの、二つのDAGからなります。なお、DAGのコード上は特にOpenLineage・Marquezの設定はありません。
docker-compose.yamlのあるディレクトリの、dagsディレクトリに適当なファイル名(ここではopenlineage_1.pyとopenlineage_2.py)で保存します。
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
create_table_query= '''
CREATE TABLE IF NOT EXISTS animal_adoptions_combined (date DATE, type VARCHAR, name VARCHAR, age INTEGER);
'''
combine_data_query= '''
INSERT INTO animal_adoptions_combined (date, type, name, age)
SELECT *
FROM adoption_center_1
UNION
SELECT *
FROM adoption_center_2;
'''
with DAG('lineage-combine-postgres',
start_date=datetime(2020, 6, 1),
max_active_runs=1,
schedule_interval='@daily',
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=1)
},
catchup=False
) as dag:
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='postgres_default',
sql=create_table_query
)
insert_data = PostgresOperator(
task_id='combine',
postgres_conn_id='postgres_default',
sql=combine_data_query
)
create_table >> insert_data
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
aggregate_reporting_query = '''
INSERT INTO adoption_reporting_long (date, type, number)
SELECT c.date, c.type, COUNT(c.type)
FROM animal_adoptions_combined c
GROUP BY date, type;
'''
with DAG('lineage-reporting-postgres',
start_date=datetime(2020, 6, 1),
max_active_runs=1,
schedule_interval='@daily',
default_args={
'retries': 1,
'retry_delay': timedelta(minutes=1)
},
catchup=False
) as dag:
create_table = PostgresOperator(
task_id='create_reporting_table',
postgres_conn_id='postgres_default',
sql='CREATE TABLE IF NOT EXISTS adoption_reporting_long (date DATE, type VARCHAR, number INTEGER);',
)
insert_data = PostgresOperator(
task_id='reporting',
postgres_conn_id='postgres_default',
sql=aggregate_reporting_query
)
create_table >> insert_data
DAGが失敗した時の挙動を確認
DAGを実行してみます(Airflow UIからTrigger DAG Run)。この時点ではPostgreSQLのConnection(postgres_conn_id)を設定していないので、DAG Runはエラーになりますが、MarquezにAirflowのジョブに対応する履歴が追加されているはずです。
(一つRunningになっている理由は不明…リトライが起きるとなるのかな)
DAGが成功した時の挙動を確認
DAGが成功するように、
- Airflow Connectionの設定
- 動作に必要なデータベース・テーブルの作成
を行います。別にPostgreSQLのデータベースを用意しても良いのですが、メタデータデータベースがPostgreSQLなので横着してそれを使います。
メタデータデータベースのPodのIPアドレスを確認します。
docker inspect airflow-postgres-1 | jq '.[] | .NetworkSettings.Networks.airflow_default.IPAddress'
"172.19.0.3"
Airflow Connectionを設定します(パスワードの部分もairflow)。
動作に使うデータベース・テーブルを作成します。
# メタデータデータベースのコンテナに入って
docker exec -it airflow-postgres-1 /bin/bash
# (コンテナ内)psqlでデータベースに接続し、https://docs.astronomer.io/learn/airflow-openlineage#generate-and-view-lineage-dataのSQLを実行します
psql -h localhost -p 5432 -U airflow
create database lineagedemo;
\c lineagedemo
CREATE TABLE IF NOT EXISTS adoption_center_1
(date DATE, type VARCHAR, name VARCHAR, age INTEGER);
CREATE TABLE IF NOT EXISTS adoption_center_2
(date DATE, type VARCHAR, name VARCHAR, age INTEGER);
INSERT INTO
adoption_center_1 (date, type, name, age)
VALUES
('2022-01-01', 'Dog', 'Bingo', 4),
('2022-02-02', 'Cat', 'Bob', 7),
('2022-03-04', 'Fish', 'Bubbles', 2);
INSERT INTO
adoption_center_2 (date, type, name, age)
VALUES
('2022-06-10', 'Horse', 'Seabiscuit', 4),
('2022-07-15', 'Snake', 'Stripes', 8),
('2022-08-07', 'Rabbit', 'Hops', 3);
再度DAGを実行すると今度は成功するはずで、MarquezにもStateがCompletedで登録されているはずです。
クールなことに、Marquezの画面ではジョブの履歴以外にも、
- 入力・出力テーブルのカラム
- 処理に使ったクエリ
- 関係するテーブル・タスクの紐づけ
を閲覧することができます。DAG Runエラー時の調査や、データ元の仕様変更の影響調査などに役立ちそうです。
Discussion