Marquez・OpenLineage触ってみた

2022/12/07に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#7です。

OpenLineageとMarquezを調べて・触ってみました。

tl;dr

  • OpenLineageはリネージュ登録の標準仕様、Marquezはそのリファレンス実装だよ
  • データがどこから来て、どこに行って、どんな処理を噛ませたかを記録することができるよ
  • 連携できるシステムには注意だよ
    • Airflowは連携できる。Spark・Great Expectatinosはドキュメントにあるけど要注意

OpenLineageとは

公式ページ曰く、

OpenLineage is an open framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata.

色々なシステムがリネージュのメタデータを相互運用できるような仕様を中心としたフレームワーク(ここでは枠組みの意味?)です。
そもそもリネージュ(data lineage)とはなんぞやという話・リネージュのご利益については、datakinさんの記事あたりを参照してください。

リネージュの管理には、

  • リネージュを提供するシステム(下図の上)
  • リネージュを管理するシステム(下図の下)

の二種類のシステムが必要です。共通仕様があれば、実装の手間や関係システムの交換が楽になるのでは?というモチベーションで標準化仕様を決めたいようです。


OpenLineageの公式サイトより

仕様の中身としては、

が公開されています。

連携

メタデータ管理

先程のOpenLineageの概要図では、

の4つのシステムがリネージュを管理するシステムとして挙げられています。これらシステムとOpenLineageの対応状況は

  • MaruquezはOpenLineageのリファレンス実装として実装されている
  • AmundsenとEgeriaは連携がありそう
    • (試していないので、どの程度実用かは不明)
  • Apache Atlasは連携見つからず…

となっています。
(他のデータカタログ(e.g. GCPのData Catalog)で対応しているシステムがあるかは不明…)

データ提供元

2022/12現在、OpenLineageに対応しているデータの提供元は

6種類の記載があります。これだけ見ると連携多いように思えますが、いくつか注意が必要な点があります:

  • FlinkはExperimentalです
  • dbtは対応するDBが限られます
  • (Marquezの話)SparkとGreat Expectationsは連携できる情報に制限があります
    • OpenLineageの仕様が変わった影響です
    • 例えば、Great Expectationsではバリデーションの成否やバリデーションの名前などは、Great Expectations->Marquezからのリクエストには含まれますが、Marquez側で無視され、WebUIでは確認できません

連携が組み込みで提供されていないツールを利用する場合は、

のいずれかで対応することになりそうです。

Marquezとは

OpenLineageはリネージュ登録のAPIの枠組み・仕様です。具体的な実装としては、Marquezというデータカタログがあり、

  • OpenLineageの(リネージュの登録)のリファレンス実装API
  • OpenLineage以外のAPI(検索等
    • ちなみに、MarquezのOpenLineageでない部分のクライアントライブラリは別に存在
  • WebUI

を提供してくれます。

Maquezに関しては、日本語の記事に限っても

などの方が紹介記事を書いてくださっています。

アーキテクチャ・コンポーネント

https://marquezproject.github.io/marquez/deployment-overview.html

実装のアーキテクチャは比較的シンプルなようで(下図)、

Marquez公式ページのDeploymentページより)

  • APIのサーバー
  • WebUIのサーバー
  • メタデータを保存するRDB

の3つのコンポーネントからなります(Schedulerの部分はMarquezではなく、Marquezと連携するリネージュの提供元です)。

デプロイ環境としては、Kubernetesで動かす用のHelmチャートと、AWSで動かすためのマニュアルが用意されています。

OpenLineage・Marquez関係の動き

インターネットで公開されている情報をまとめてみました。流れとしてはMarquezが先にあって、その後OpenLineageが仕様として提案されたようです。

Marquez触ってみる

ローカルのコンテナで

  1. Marquez(API、WebUI、DB)をコンテナで起動
  2. curlでリクエスト
  3. Airflowからの連携

を試してみます。

(Ubuntu20.04、WSL2(Windows 10)、Airflow 2.4.3で確認)

Marquez(API、WebUI、DB)をコンテナで起動

https://openlineage.io/getting-started/#collect-run-level-metadata

git clone git@github.com:MarquezProject/marquez.git && cd marquez
./docker/up.sh

このシェルスクリプトはdocker-composeの薄いラッパーで、

  • データベース(PostgreSQL)
  • APIサーバ
  • Webを提供するサーバ

が起動します。

curlでリクエスト

例通りに、リネージュを登録するAPIをcurlで呼んでみます。

 curl -X POST http://localhost:5000/api/v1/lineage \
  -H 'Content-Type: application/json' \
  -d '{
        "eventType": "START",
        "eventTime": "2020-12-28T19:52:00.001+10:00",
        "run": {
          "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
        },
        "job": {
          "namespace": "my-namespace",
          "name": "my-job"
        },
        "inputs": [{
          "namespace": "my-namespace",
          "name": "my-input"
        }],  
        "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
      }'

Webブラウザでhttp://localhost:3000を開き、namespace(右上)を「my-namespace」選ぶと、APIで登録した「my-job」がジョブ一覧にrunningで表示されています。

もう一回リクエストすると(eventTypeをCOMPLETE)、StateがCompletedに変わり、入力(myInput)・出力(myOutput)がリネージュグラフに追加されます。

curl -X POST http://localhost:5000/api/v1/lineage \
  -H 'Content-Type: application/json' \
  -d '{
        "eventType": "COMPLETE",
        "eventTime": "2020-12-28T20:52:00.001+10:00",
        "run": {
          "runId": "d46e465b-d358-4d32-83d4-df660ff614dd"
        },
        "job": {
          "namespace": "my-namespace",
          "name": "my-job"
        },
        "outputs": [{
          "namespace": "my-namespace",
          "name": "my-output",
          "facets": {
            "schema": {
              "_producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client",
              "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet",
              "fields": [
                { "name": "a", "type": "VARCHAR"},
                { "name": "b", "type": "VARCHAR"}
              ]
            }
          }
        }],
        "producer": "https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"
      }'

Airflowからの連携

Airflowの設定

AirflowとMarquezを連携させるには、

が必要です。
(なお、OpenLineageのドキュメントにはAIRFLOW__LINEAGE__BACKEND 設定しろとありますが、設定しなくても動きました。pypi側にもAirflow2.3以上の場合は、設定いらない旨の記載あり)

まずは、必要なパッケージを入れたDockerイメージを作ります。docker-compose.yamlと同じディレクトリでDockerfileを作ります。

FROM apache/airflow:2.4.3
RUN pip install --no-cache-dir  openlineage-airflow

docker-compose.yamlを変更して、カスタムしたイメージを使うようにします。
(x-airflow-commonのimageをコメントアウトして、build .を追加)

x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
  build: .

Marquez(というかOpenLineage)のURLを指定します。真面目にやるならコンテナネットワーク設定すべきかもしれませんが、AirflowのコンテナからMarquezのAPIには、ホスト経由でアクセスさせます(Airflowのコンテナ->ホスト->Marquez)。
まずは、コンテナからホストにアクセスするためのIPアドレスを確認します。

 docker network inspect airflow_default | jq '.[] | .IPAM.Config[] | .Gateway '
"172.19.0.1"

docker-compose.yamlを変更して、OPENLINEAGE_URLとOPENLINEAGE_NAMESPACEを設定します。
(x-airflow-commonのenvironmentに追加)

x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  # image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.4.3}
  build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
    #  今回追加の部分
    OPENLINEAGE_URL: http://172.19.0.1:5000
    OPENLINEAGE_NAMESPACE: airflow

DAGを準備

動作確認のDAGとして、Astronomerの例のDAGを使います。この例では、PostgresqlOperator(Marquezに対応しているOperatorの一つ)を使い

  • animal_adoptions_combinedというテーブルを作成
  • adoption_center_1テーブルとadoption_center_2(事前作成)をanimal_adoptions_combinedテーブルに挿入

するDAGと、

  • adoption_reporting_longというテーブルを作成
  • animal_adoptions_combinedからadoption_reporting_longに集計し挿入

するDAGの、二つのDAGからなります。なお、DAGのコード上は特にOpenLineage・Marquezの設定はありません。

docker-compose.yamlのあるディレクトリの、dagsディレクトリに適当なファイル名(ここではopenlineage_1.pyとopenlineage_2.py)で保存します。

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

from datetime import datetime, timedelta

create_table_query= '''
CREATE TABLE IF NOT EXISTS animal_adoptions_combined (date DATE, type VARCHAR, name VARCHAR, age INTEGER);
'''

combine_data_query= '''
INSERT INTO animal_adoptions_combined (date, type, name, age) 
SELECT * 
FROM adoption_center_1
UNION 
SELECT *
FROM adoption_center_2;
'''


with DAG('lineage-combine-postgres',
         start_date=datetime(2020, 6, 1),
         max_active_runs=1,
         schedule_interval='@daily',
         default_args = {
            'retries': 1,
            'retry_delay': timedelta(minutes=1)
        },
         catchup=False
         ) as dag:

    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='postgres_default',
        sql=create_table_query
    ) 

    insert_data = PostgresOperator(
        task_id='combine',
        postgres_conn_id='postgres_default',
        sql=combine_data_query
    ) 

    create_table >> insert_data
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

from datetime import datetime, timedelta

aggregate_reporting_query = '''
INSERT INTO adoption_reporting_long (date, type, number)
SELECT c.date, c.type, COUNT(c.type)
FROM animal_adoptions_combined c
GROUP BY date, type;
'''

with DAG('lineage-reporting-postgres',
         start_date=datetime(2020, 6, 1),
         max_active_runs=1,
         schedule_interval='@daily',
         default_args={
            'retries': 1,
            'retry_delay': timedelta(minutes=1)
        },
         catchup=False
         ) as dag:

    create_table = PostgresOperator(
        task_id='create_reporting_table',
        postgres_conn_id='postgres_default',
        sql='CREATE TABLE IF NOT EXISTS adoption_reporting_long (date DATE, type VARCHAR, number INTEGER);',
    ) 

    insert_data = PostgresOperator(
        task_id='reporting',
        postgres_conn_id='postgres_default',
        sql=aggregate_reporting_query
    ) 

    create_table >> insert_data

DAGが失敗した時の挙動を確認

DAGを実行してみます(Airflow UIからTrigger DAG Run)。この時点ではPostgreSQLのConnection(postgres_conn_id)を設定していないので、DAG Runはエラーになりますが、MarquezにAirflowのジョブに対応する履歴が追加されているはずです。
(一つRunningになっている理由は不明…リトライが起きるとなるのかな)

DAGが成功した時の挙動を確認

DAGが成功するように、

  • Airflow Connectionの設定
  • 動作に必要なデータベース・テーブルの作成

を行います。別にPostgreSQLのデータベースを用意しても良いのですが、メタデータデータベースがPostgreSQLなので横着してそれを使います。

メタデータデータベースのPodのIPアドレスを確認します。

docker inspect airflow-postgres-1  | jq '.[] | .NetworkSettings.Networks.airflow_default.IPAddress'
"172.19.0.3"

Airflow Connectionを設定します(パスワードの部分もairflow)。

動作に使うデータベース・テーブルを作成します。

# メタデータデータベースのコンテナに入って
docker exec -it airflow-postgres-1  /bin/bash
# (コンテナ内)psqlでデータベースに接続し、https://docs.astronomer.io/learn/airflow-openlineage#generate-and-view-lineage-dataのSQLを実行します

psql -h localhost -p 5432 -U airflow
create database lineagedemo;
\c lineagedemo
CREATE TABLE IF NOT EXISTS adoption_center_1
(date DATE, type VARCHAR, name VARCHAR, age INTEGER);

CREATE TABLE IF NOT EXISTS adoption_center_2
(date DATE, type VARCHAR, name VARCHAR, age INTEGER);

INSERT INTO
    adoption_center_1 (date, type, name, age)
VALUES
    ('2022-01-01', 'Dog', 'Bingo', 4),
    ('2022-02-02', 'Cat', 'Bob', 7),
    ('2022-03-04', 'Fish', 'Bubbles', 2);

INSERT INTO
    adoption_center_2 (date, type, name, age)
VALUES
    ('2022-06-10', 'Horse', 'Seabiscuit', 4),
    ('2022-07-15', 'Snake', 'Stripes', 8),
    ('2022-08-07', 'Rabbit', 'Hops', 3);

再度DAGを実行すると今度は成功するはずで、MarquezにもStateがCompletedで登録されているはずです。

クールなことに、Marquezの画面ではジョブの履歴以外にも、

  • 入力・出力テーブルのカラム
  • 処理に使ったクエリ
  • 関係するテーブル・タスクの紐づけ

を閲覧することができます。DAG Runエラー時の調査や、データ元の仕様変更の影響調査などに役立ちそうです。

Discussion