📊

GCPとMarquezで始めるデータリネージ

2021/07/03に公開

データリネージの自動収集と可視化について調べた内容をもとに、実際にGCPで実装する方法をまとめていきます。

データリネージとは


業務システムから生成・抽出されるデータをなんらかの処理をして別のシステムへ連携したり、データ分析のために利用したりすることは企業内で日常的に行われています。また、これらデータのライフサイクルは適切に管理されていないと複雑化する傾向にあると思います。データのライフサイクルの複雑化により、利用されていないもしくは重複して保持しているデータが放置され無駄なコストが発生したり、データ品質の低下や機密情報への不適切なアクセスなどのトラブルに繋がることも予想されます。
このような問題に対処する手段の1つとして、データの生成元から利用先までの流れを常に追跡可能な状態に維持することが挙げられます。このデータの生成元から利用先までの流れをデータリネージといいます。

データリネージの用途

業務の効率化やコンプライアンス対応などを主な目的としてデータリネージ管理の仕組みを取り入れることになると思います。具体的には以下のようなタスクを効率的に行えるようにします。

  • データ周りの変更による影響の把握
  • データの異常の原因調査
  • データへのアクセスポリシーの管理
  • 分析レポートにおけるソースの明確化

データリネージシステムの種類

データリネージは、データの入力・変換処理・出力の情報を基に構成されます。データリネージを収集して管理するシステムは、データの収集方法によってActiveとPassiveの2種類に分類されます。

  • Active: データパイプライン側からメタデータを提供することでリネージ情報を登録する。
  • Passive: データ変換で実行するSQLを静的に解析することでリネージ情報を登録する。

GCPのリファレンスアーキテクチャ

https://cloud.google.com/architecture/building-a-bigquery-data-lineage-solution?hl=ja
BigQueryを利用したデータパイプラインにおけるデータリネージシステムのリファレンスアーキテクチャが紹介されています。

(監査ログ、Pub/Sub、ZetaSQL、Dataflow、Data Catalog を使用した BigQuery データリネージ システムの構築 より引用)

このリファレンスアーキテクチャは、BigQueryでのクエリ実行によるデータ変換の情報をオペレーションログからキャプチャして、そのログから変換元テーブル・変換SQL・出力先テーブルをリネージデータとして抽出し記録します。具体的な処理としてはSQLを解析して情報を抽出するためPassiveタイプのシステムに分類されます。

データリネージの情報はBigQueryのテーブルに保存され、Data Catalogのテーブルメタ情報にもタグ付けされます。また、抽出したリネージを他のシステムでも利用できるようにCloud Pub/Subのトピックにも出力されます。

リネージの可視化

Data Catalogにリネージの情報をタグ付けするだけでも便利なのですが、更にこの情報を統合して可視化することで、管理しているデータ間の依存関係を効率的に把握することができるようになると思います。GCPのリファレンスアーキテクチャを使えば、少ない手間でリネージの可視化が可能になります。簡単に考えられる方法は以下2つあります。

  • BigQueryに保存されているリネージ情報を抽出して可視化する。
  • Cloud Pub/Subのトピックに出力されるリネージ情報をサブスクライブして可視化する。

今回、2番目のPub/Sub連携を使った方法でリネージを可視化してみます。

リネージの可視化にはデータと処理の流れをグラフで表現する方法が一般的なようです。データリネージの管理システムとして、CollibraのData LineageやWeWorkがOSS化したMarquez、ASFのOSSプロジェクトであるApache Atlasなどがあります。

また、このデータリネージのデータモデルをオープンスタンダードとして定義するプロジェクトとしてOpenLineageというものがあります。OpenLineageはデータリネージを表現するデータセットやデータ処理のジョブを一般化してモデルとして定義しているものです。先程挙げたMarquezはこのOpenLineageのリファレンス実装となっています。

Marquez

MarquezはOSSのデータリネージの管理システムで、リネージシステム的にはActiveに分類されるものです。リネージデータの登録にはREST APIを使って登録しますが、その際に使用するデータモデルがOpenLineageに準拠しています。

GCPリファレンスアーキテクチャでリネージの可視化

GCPのリファレンスアーキテクチャでCloud Pub/Subのトピックに出力されたリネージ情報をMarquezへ連携させることでリネージを可視化します。そのために、Cloud Pub/Subのトピックに出力されたメッセージをpushサブスクリプションを使ってMarquezへ連携します。但し、リネージ情報をOpenLineageのデータモデルに変換する必要があるので、Cloud Pub/SubとMarquezの間に変換Proxyを実装します。変換Proxyは簡易的なHTTPサーバで良いのでCloud Functionsで実装することにします。また、MarquezはWeb-UIサーバ、APIサーバ、DBサーバで構成され、これらを起動するdocker-composeが用意されていますので、Kubernetesにも簡単にデプロイできるようになっています。今回は、簡易的にGoogle Compute Engine(GCE)でUbuntuのインスタンスを1つ立てて、その上でdocker-composeを起動します。

リファレンスアーキテクチャをデプロイする

以下の記事の手順に従ってGCP上に各コンポーネントをデプロイします。

https://cloud.google.com/architecture/building-a-bigquery-data-lineage-solution?hl=ja

リネージ情報→OpenLineage変換Proxyのデプロイ

まず始めにMarquezサーバをデプロイします。GCEインスタンスを作成して、Marquezサーバを起動します。(コマンドの中の環境変数の値は各自の環境に合わせて設定して下さい)

# MarquezのWeb UIにアクセスできるようにFireWallルールを作成する
gcloud compute firewall-rules create marquez-web-ui \
    --allow=tcp:3000 \
    --direction=INGRESS \
    --source-ranges='0.0.0.0/0' \
    --target-tags=$FW_TARGET_TAG

# GCEでUbuntu 20.04のインスタンスを作成
# FireWallルールで指定したtarget-tagを指定する
gcloud compute instances create $INSTANCE_NAME \
    --image=ubuntu-2004-focal-v20210610 \
    --image-project=ubuntu-os-cloud \
    --zone=$ZONE \
    --machine-type=e2-medium \
    --tags=$FW_TARGET_TAG

# インスタンスにSSH接続する
gcloud compute ssh $INSTANCE_NAME \
    --project $PROJECT_ID \
    --zone $ZONE

# 以降GCEインスタンス上で実行する

# インストールの事前準備
sudo apt remove docker docker-engine docker.io containerd runc
sudo apt update
sudo apt install \
    apt-transport-https \
    ca-certificates \
    curl \
    gnupg \
    lsb-release

# Dockerをインストールする
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg

echo \
  "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu \
  $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

sudo apt update
sudo apt install docker-ce docker-ce-cli containerd.io

sudo gpasswd -a $USER docker
# 再ログインする

# docker-composeをインストールする
sudo curl -L https://github.com/docker/compose/releases/download/1.29.2/docker-compose-`uname -s`-`uname -m` -o /usr/local/bin/docker-compose
sudo chmod +x /usr/local/bin/docker-compose

# Marquezをダウンロードして起動する
git clone https://github.com/MarquezProject/marquez.git
cd marquez
./docker/up.sh --tag 0.14.2

Marquezの起動が完了したらWebブラウザで<GCEの外部IPアドレス>:3000にアクセスしてみます。まだ、何も登録していないので以下のようなまっさらな画面が表示されると思います。

Cloud FunctionsからMarquezサーバへHTTPアクセスする際、GCPの内部ネットワーク経由でアクセスできるようにするためServerless VPC Accessを利用します。

# Serverless VPC Accessを構成する
gcloud services enable vpcaccess.googleapis.com
gcloud compute networks vpc-access connectors create $CONNECTOR_NAME \
  --network default \
  --region $REGION_ID \
  --max-instances 4 \
  --range 10.8.0.0/28

Cloud FunctionsにデプロイするProxyのコードを用意します。このProxyでは、リネージ情報をOpenLineageに変換してREST APIでMarquezに登録します。GCPリファレンスアーキテクチャのPub/Subトピックに出力されるリネージデータは、こちらの定義のProtocol BuffersでシリアライズされたバイナリをBASE64でエンコードしたものとなります。
これをOpenLineageのモデルに変換するため、BASE64をデコードしたProtocol Buffersのバイナリをデシリアライズする必要があります。

Protocol Buffersのデシリアライズ用のPythonコードを生成します。

# 以降ローカルで実行
# Protocol Buffersのコンパイラをインストール
apt install -y protobuf-compiler

# リファレンスアーキテクチャのリポジトリ内のProtocol Buffers定義ファイルがあるディレクトリを指定します
export SRC_DIR='bigquery-data-lineage/src/main/proto'
export DST_DIR=.
# Protocol Buffersを扱うコード lineage_messages_pb2.pyが生成される
protoc -I=$SRC_DIR --python_out=$DST_DIR $SRC_DIR/lineage_messages.proto

生成されたPythonモジュールでリネージのデータをデシリアライズし、OpenLineageのモデルに変換してMarquezに登録します。以下がその処理を行うProxyのコードになります。(Cloud Functionsにデプロイするコード)

def marquez_proxy(event, context):
    import base64
    from datetime import datetime, timezone
    import json
    import os
    import urllib
    import lineage_messages_pb2 as pb

    HOST = os.environ['MARQUEZ_SERVER']
    URL = f'http://{HOST}:5000/api/v1/lineage'
    job_start_req = {
        'eventType': 'START',
        'run': {},
        'job': {
            'namespace': 'my-lineage',
            'name': 'my-transform'
        },
        'producer': 'https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client'
    }
    job_complete_req = {
        'eventType': 'COMPLETE',
        'run': {},
        'job': {
            'namespace': 'my-lineage',
            'name': 'my-transform'
        },
        'outputs': [{
            'namespace': 'my-lineage',
            'facets': {
                'schema': {
                    '_producer': 'https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client',
                    '_schemaURL': 'https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet',
                }
            }
        }],
        'producer': 'https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client'
    }
    headers = {'Content-Type': 'application/json'}

    if 'data' in event:
        lineage_raw = base64.b64decode(event['data'])
	
	# Porotocol Buffersデータをデシリアライズする
        lineage = pb.CompositeLineage()
        lineage.ParseFromString(lineage_raw)

        job_start_req['eventTime'] = datetime.fromtimestamp(lineage.jobInformation.jobTime / 1000, timezone.utc).isoformat(timespec='milliseconds')
        job_start_req['run']['runId'] = lineage.jobInformation.jobId
        job_start_req['inputs'] = [
            {
                'namespace': 'my-lineage',
                'name': '.'.join(tbl.sqlResource.split('.')[-3:])
            } for tbl in lineage.tableLineage.parents
        ]

        req = urllib.request.Request(URL, json.dumps(job_start_req).encode(), headers)
        with urllib.request.urlopen(req) as res:
            print(f'Marquez JOB START response: {res.read()}')

        job_complete_req['eventTime'] = datetime.fromtimestamp(lineage.jobInformation.jobTime / 1000, timezone.utc).isoformat(timespec='milliseconds')
        job_complete_req['run']['runId'] = lineage.jobInformation.jobId
        job_complete_req['outputs'][0]['name'] = '.'.join(lineage.tableLineage.target.sqlResource.split('.')[-3:])
        job_complete_req['outputs'][0]['facets']['schema']['fields'] = [
            {'name': col.target.column, 'type': 'VARCHAR'} for col in lineage.columnsLineage
        ]

        req = urllib.request.Request(URL, json.dumps(job_complete_req).encode(), headers)
        with urllib.request.urlopen(req) as res:
            print(f'Marquez JOB COMPLETE response: {res.read()}')

OpenLineageでは、ジョブの開始と完了といったステータスを各リクエストのeventTypeフィールドに持ちます。Marquezではこれらのメッセージを受けて、データ処理の実行状況もリアルタイムで可視化されます。今回はジョブ開始(eventType=START)のメッセージのinputsフィールドに処理の入力となるBigQueryテーブル名を設定しています。そして、ジョブ完了(eventType=COMPLETE)のメッセージのoutputsフィールドに処理の出力となるBigQueryテーブル名を設定しています。このジョブ開始・完了のメッセージを受けてMarquezにリネージ情報が登録されグラフ表示することができます。

尚、本来はBigQueryのジョブ開始ログをeventType=STARTのメッセージに変換し、ジョブ完了ログをeventType=COMPLETEのメッセージに変換して、Marquezに対して個別にリクエストを送るべきなのですが、GCPのリファレンスアーキテクチャの実装ではジョブ完了のログのみ連携されるので、Proxyではこの完了ログを受け取ったらジョブ開始と完了のメッセージを一度に連続してMarquezに送信する実装にしています。
以上のコードをCloud Functionsにデプロイします。

export LINEAGE_TRANSFORM_FUNCTION=marquez_proxy
export LINEAGE_OUTPUT_PUBSUB_TOPIC=<GCPリファレンスアーキテクチャのPub/SubトピックID>
export MARQUEZ_HOST=<Marquezを起動するGCEのプライベートIPアドレス>

gcloud functions deploy $LINEAGE_TRANSFORM_FUNCTION \
--runtime python38 \
--trigger-topic $LINEAGE_OUTPUT_PUBSUB_TOPIC \
--vpc-connector $CONNECTOR_NAME \
--set-env-vars MARQUEZ_SERVER=$MARQUEZ_HOST

これで準備ができたので、以下記事の「抽出パイプラインの実行」の手順を実行してCloud Dataflowのストリーミングパイプラインを開始します。
https://cloud.google.com/architecture/building-a-bigquery-data-lineage-solution?hl=ja#running_the_extraction_pipeline

実行を開始したら、早速「サンプルデータに対してクエリを実行する」に記載の通りBigQueryでSQLを実行してテーブルを作成してみます。このSQLではデータセットbigquery-public-data.ncaa_basketballのテーブルmbb_teamsとteam_colorsをINNER JOINして新しいテーブルを作っています。
テーブルが作成されたら、Marquezにリネージ情報が登録されたか確認します。

2つのテーブルから1つのテーブルが作成されるリネージが表示されれば成功です。テーブルのアイコンをクリックするとスキーマ情報も表示されるのでとても便利です。
GCPのリファレンスアーキテクチャの実装とCloud Functions、GCEを使って簡単にデータリネージの可視化ができました。SaaSなどを契約する前に試験的にデータリネージの仕組みを取り込んでみたい場合に試してみても良いのではないでしょうか。
尚、GCPのコストを抑えたい場合、Cloud Dataflowをストリーミングで常時起動するのではなく定時バッチでスケジュール起動するのでも良いかと思います。その場合、リファレンスアーキテクチャでaudit logのsink先をPub/SubからCloud Storageに変えたほうが良さそうです。

Discussion