Great Expectationsとdbtとairflowでロバストなデータパイプラインを構築する
これは何か
Building a robust data pipeline with dbt, Airflow, and Great Expectationsという2020年のdbt関連のセッションの動画を見て、Great Expectationsとdbtでテストしながらデータモデルを構築するパイプラインをAirflowで動かすことで、ロバストなデータパイプラインを作るのだという内容が気になったので、実際に自分でも作って動かしてみる。
一連の流れ
データパイプラインの概要は下記の通り。
①Great ExpectationsでBigQueryのテーブルに対してバリデーションチェックをかける。バリデーション結果はGCSにアップロードし、閲覧できるようにする。
②バリデーションが成功したら、dbtでデータのTransformを実行する。
③Transformationが成功したら、テストを実行する。
④①〜③をAirflowで動かす。
準備
Great ExpectationsやBigQuery周りは、基本的に前回の記事で書いたものを流用する。
FROM python:3.9-slim
RUN apt-get update -y && \
apt-get install --no-install-recommends -y -q \
git libpq-dev python3-dev build-essential && \
apt-get clean && \
rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN pip install --upgrade pip && \
pip install great_expectations && \
pip install sqlalchemy==1.4.25 && \
pip install sqlalchemy_bigquery && \
pip install pybigquery && \
pip install dbt-bigquery && \
pip install apache-airflow[gcp] && \
pip install airflow-dbt
RUN curl -sSL https://sdk.cloud.google.com | bash
ENV PATH $PATH:/root/google-cloud-sdk/bin
ENV PYTHONIOENCODING=utf-8
ENV LANG C.UTF-8
version: '3.9'
services:
great_expectations:
container_name: great_expectations
build: .
ports:
- "8888:8888"
- "8080:8080"
- "8081:8081"
tty: true
working_dir: /usr/app
environment:
- DBT_PROFILES_DIR=/usr/app/dbt
- AIRFLOW_HOME=/usr/app/dag
volumes:
- ./scr:/usr/app
- gcloud-config:/root/.config
secrets:
- gcp_secret
terraform:
container_name: terraform
entrypoint: ash
image: hashicorp/terraform:latest
working_dir: /tmp/terraform
volumes:
- ./scr/terraform:/tmp/terraform
- gcloud-config:/root/.config
tty: true
secrets:
- gcp_secret
gcloud:
container_name: gcloud
entrypoint: "gcloud"
image: google/cloud-sdk:alpine
volumes:
- gcloud-config:/root/.config
volumes:
gcloud-config:
secrets:
gcp_secret:
file:
{サービスアカウントのキーファイル}
GCSやBigQuery周りはterraformで構成した。
provider "google" {
project = var.gcp_project_id
region = "us-central1"
credentials = "${file("${var.GOOGLE_APPLICATION_CREDENTIALS}")}"
}
resource "google_bigquery_dataset" "bigquery_dataset" {
dataset_id = "sasakky_data_infra_dataset"
friendly_name = "sasakky_data_infra_dataset"
location = "us-central1"
}
resource "google_storage_bucket" "cloud_storage_bucket" {
name = "sasakky_gcs_bucket"
location = "us-central1"
force_destroy = true
website {
main_page_suffix = "index.html"
not_found_page = "404.html"
}
}
variable "gcp_project_id" {
default = "{project_id}"
}
variable "GOOGLE_APPLICATION_CREDENTIALS" {
default = "/run/secrets/gcp_secret"
}
構成
ディレクトリ構成は以下。
.
├── Dockerfile
├── docker-compose.yml
└── scr
├── dag(airflowの設定、DAGの配置など)
├── dbt(dbtプロジェクト)
└── great_expectations(Great Expectationsプロジェクト)
Great Expectationsの設定
BigQuery内のcustomerテーブルに対してバリデーションをかける。基本は前回の記事のものを流用するが、一部修正した。
①Data DocsをGCSにアップロードするために、great_expectations.ymlに以下の内容を追記する。
data_docs_sites:
gs_site:
class_name: SiteBuilder
store_backend:
class_name: TupleGCSStoreBackend
project: {プロジェクトID}
bucket: sasakky_gcs_bucket
site_index_builder:
class_name: DefaultSiteIndexBuilder
②Suiteから不要なexpectationsを除外した。PKチェックとNullチェックだけ残す。
{
"data_asset_type": null,
"expectation_suite_name": "customer_suite",
"expectations": [
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {
"column": "customer_id"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "customer_id"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "customer_name"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "gender_cd"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "gender"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "birth_day"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "age"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "postal_cd"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "address"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "application_store_cd"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "application_date"
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "status_cd"
},
"meta": {}
}
],
"ge_cloud_id": null,
"meta": {
"great_expectations_version": "0.14.12"
}
}
dbtの設定
customerテーブルから男性を抜き出したビューを作成する。
select
*
from
{{ source('sasakky_data_infra_dataset', 'customer') }}
where
gender_cd = "0" -- 0はgender="男性"
customerテーブルをsource登録し、customers_menに対して正しくtransformが実行できているかテストを記述する。
version: 2
sources:
- name: sasakky_data_infra_dataset
tables:
- name: customer
models:
- name: customers_men
columns:
- name: customer_id
description: Primary key
tests:
- unique
- not_null
- name: gender_cd
description: 性別コード(0:男性)
tests:
- accepted_values:
values: ['0']
- name: gender
description: 性別名
tests:
- accepted_values:
values: ['男性']
airflowの設定
前回に引き続きこちらの記事を参考にさせていただき、Great ExpectationsのDAGの処理は記述した。
Airflowのウェブサーバーを立ち上げたら、connectionを作成する。
DAGファイルは以下のように記述した。Airflowでdbtを扱うために、airflow-dbtというライブラリを使用した。
from datetime import timedelta
import os
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.base_hook import BaseHook
from airflow import AirflowException
from airflow_dbt.operators.dbt_operator import (
DbtRunOperator,
DbtTestOperator
)
from airflow.utils.dates import days_ago
default_args = {
'start_date': days_ago(0),
'retries': 0,
}
def validate(**context):
from great_expectations.data_context import DataContext
conn = BaseHook.get_connection('sasakky_bigquery')
connection_json = conn.extra_dejson
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = connection_json['extra__google_cloud_platform__key_path']
data_context: DataContext = DataContext(context_root_dir="/usr/app/great_expectations")
result = data_context.run_checkpoint(
checkpoint_name="checkpoint_customer",
batch_request=None,
run_name=None,
)
data_context.build_data_docs()
if not result["success"]:
raise AirflowException("Validation of the data is not successful ")
with DAG(dag_id='etl', default_args=default_args, schedule_interval='@daily') as dag:
ge_check = PythonOperator(
task_id='validate',
python_callable=validate,
provide_context=True,
)
dbt_test = DbtTestOperator(
task_id='dbt_test',
retries=0,
profiles_dir='/usr/app/dbt',
dbt_bin='/usr/local/bin/dbt',
dir='/usr/app/dbt'
)
dbt_run = DbtRunOperator(
task_id='dbt_run',
profiles_dir='/usr/app/dbt',
dbt_bin='/usr/local/bin/dbt',
dir='/usr/app/dbt'
)
ge_check >> dbt_run >> dbt_test
実行
AirflowからDAGをトリガーして実行する。
処理が最後まで成功したので、GCSを確認してアップロードしたData Docsを確認してみる。
次に、Great Expectationsにバリデーションが失敗した場合の挙動を確かめる。
birth_dayカラムをnullにして、再度DAGを実行してみる。
バリデーションが失敗したため、dbtのタスクは実行されない。
Data Docsを見ても、failedになっている。
考察
Great Expectationsとdbtでデータパイプラインのテストを実行することができた。
本番運用時にはAirflowでスケジューリングしながら、定期的に最新版のバリデーション結果を取得し、データの異常検知ができそうだ。
dbtでもデータソースに対するテストを実行できると思うので、使い分けをどうするかについては悩ましいところ。
動画内でも言われているように、実際のデータのテスト(今回で言うとNullチェックや主キーのチェック、値が範囲内に含まれているかのチェックなど)はGreat Expectationsで行い、それが担保された上でデータの変換ロジックのテスト(今回で言うと、customers_menに男性データのみを正しく抽出できているか)をdbtで実行するようにすべきか。
お互いのツールの得意不得意もありそうなので、dbt側のテストについてはもう少し深ぼって分析したい。
あとはGreat ExpectationsでBigQueryにtmpのテーブルが作成されてしまう謎があるので、その回避方法を調べたい。
Discussion