Great Expectationsを使ったBigQueryでのデータバリデーション
データウェアハウスとしてBigQueryを使う環境でデータバリデーションを実行し、その結果をGoogle Cloud Storage(GCS)バケットに保存することで、データ品質に関するレポートを組織内で共有できるようにしてみます。バリデーションの実装はSQLとして用意するのではなく、JSONで仕様を記述できるGreat Expectationsというフレームワークを使います。バリデーションの実行はデータパイプラインの実装でよく使われるApache Airflowを使います。
データパイプラインでのデータバリデーションの必要性
データパイプラインでデータの自動連携やETLを運用していると、いつの間にか連携されてくるデータの性質や仕様が変わっていることがあります。その変化に気づかず、データパイプラインでデータを処理してユーザに提供してしまうと、そのデータを利用した業務での事故に繋がります。また、機械学習で使用するデータの場合は間違ったデータで学習することでリカバリのコストが大きくなる傾向にあります。
そのため、データパイプラインの中では適切ポイントでデータのバリデーションを行うことで、提供するデータの品質を一定に保つ必要があると思います。特に、データパイプラインに入ってくるデータは管轄外であることが多く不確実性が高いため、データパイプラインの入り口でのチェックは重要です。
バリデーションの実装
データパイプライン上でのバリデーションの実装方法はいくつか考えられます。一番簡単に思いつく方法としてはSQLでチェックする方法です。チェック対象がそれほど多くなければSQLによる実装でも問題ないですが、対象が増えてくるとそれに伴いボイラープレートも増えてきて管理も冗長になってくると思います。
そこで、データが満たすべき条件をコンフィグファイルとして記述して、それを基にバリデーションを実行(assertion)するフレームワークがいくつか公開されています。例えば、JSONファイルでassertionを記述するGreat ExpectationsやYAMLで記述するSoda SQLなどがあります。ここでは、Great Expectationsを使ったバリデーションの例を紹介します。
Great Expectations
Great ExpectationsはPython製のフレームワークで、great_expectations
コマンドを使ってバリデーションの設定を作成していきます。Great Expectationsでは、データバリデーションを構成する1つ1つのassertionにあたるものをExpectationと呼んでいます。このExpectationの設定をカラムごとに1つずつ1から記述していくのはJSON形式とはいえ骨の折れる作業なので、データプロファイリングの機能により自動でExpectaionを導出することができます。自動プロファイリングにより生成されたExpectationを必要に応じて調整し、最終的なassertionを構成します。出来上がった複数のExpectationを纏めてExpectaion Suiteと呼び、バリデーションしたいデータに対してこのExpectation Suiteに基づいたチェックの実行を行います。どのデータに対してどのExpectaion Suiteによるチェックを実行するのかという設定をCheckpointと呼びます。Checkpointはバリデーション実行だけでなくバリデーション結果の保存先の管理まで行います。バリデーション結果はData Docsと呼ばれるHTML形式のファイルとして出力されます。Data Docsはローカルファイルシステムだけでなく、GCSやAWS S3などにも自動で保存することができます。以上のExpectationやCheckpointなどの細かい調整は、PythonのAPIを使って行います。実際の開発では試行錯誤しながら何度も調整を行うことになるので、それらを行うためのJupyter notebookも自動で生成されます。
Apache Airflowを使ったGreat Expectaionsでのバリデーション実装
ここでは、Apache AirflowのワークフローでGreat ExpectaionsによるBigQueryテーブルに対するバリデーションを実行してみます。
今回は以下の流れで進めていきます。
- ローカルでGreat ExpectaionsのExpectaion SuiteとCheckpointを作成
- Great Expectationsのバリデーションを実行するAirflowワークフロー(DAG)を作成
- AirflowにDAGをデプロイして実行しバリデーション結果(Data Docs)を確認する
バリデーションを作成する際のプロファイリング対象データとして、データサイエンス100本ノックで用意されているデータの中のcategory
テーブルを使うので、GitHubのリポジトリからCSVファイルをダウンロードして事前にBigQueryにロードしておきます。ここでは、100knock.category
としてテーブルを作成した前提で進めます。尚、category
テーブルの中でもcategory_major_cd
カラムは04
,05
,06
,07
,08
,09
の6種類のコードで構成されるわかりやすいデータのため、このカラムをバリデーションの対象にします。また、上記データを基に作成したバリデーションを実際に実行する対象として、100knock.category_new
というテーブルも別途作成しておきます。今回、バリデーションを意図的に失敗させたいため、以下SQLによりcategory
のcategory_major_cd
カラムの値04
を00
に置換したテーブルをcategory_new
として作成します。
SELECT
IF(category_major_cd = '04', '00', category_major_cd) AS category_major_cd,
* EXCEPT(category_major_cd)
FROM
`100knock.category`
Great Expectaionsによるバリデーションの作成
はじめにローカルでGreat ExpectationsのExpectation SuiteとCheckpointを作成するため、pipでGreat Expectationsをインストールします。
pip install great_expectations
Great Expectationsはプロジェクトという単位でバリデーションに関わる設定ファイルやそれらを生成・調整するためのJupyter notebookなどを管理します。そのため、まずはプロジェクトを初期化します。
% great_expectations --v3-api init
Using v3 (Batch Request) API
___ _ ___ _ _ _
/ __|_ _ ___ __ _| |_ | __|_ ___ __ ___ __| |_ __ _| |_(_)___ _ _ ___
| (_ | '_/ -_) _` | _| | _|\ \ / '_ \/ -_) _| _/ _` | _| / _ \ ' \(_-<
\___|_| \___\__,_|\__| |___/_\_\ .__/\___\__|\__\__,_|\__|_\___/_||_/__/
|_|
~ Always know what to expect from your data ~
Let's create a new Data Context to hold your project configuration.
Great Expectations will create a new directory with the following structure:
great_expectations
|-- great_expectations.yml
|-- expectations
|-- checkpoints
|-- notebooks
|-- plugins
|-- .gitignore
|-- uncommitted
|-- config_variables.yml
|-- data_docs
|-- validations
OK to proceed? [Y/n]:Y
================================================================================
Congratulations! You are now ready to customize your Great Expectations configuration.
You can customize your configuration in many ways. Here are some examples:
Use the CLI to:
- Run `great_expectations --v3-api datasource new` to connect to your data.
- Run `great_expectations --v3-api checkpoint new <checkpoint_name>` to bundle data with Expectation Suite(s) in a Checkpoint for later re-validation.
- Run `great_expectations --v3-api suite --help` to create, edit, list, profile Expectation Suites.
- Run `great_expectations --v3-api docs --help` to build and manage Data Docs sites.
Edit your configuration in great_expectations.yml to:
- Move Stores to the cloud
- Add Slack notifications, PagerDuty alerts, etc.
- Customize your Data Docs
Please see our documentation for more configuration options!
カレントディレクトリにgreat_expectations
というディレクトリができます。この中に一連の設定ファイルや作業用のJupyter notebookが保存されます。
バリデーション対象のデータソースに関する設定をDatasourceと呼びます。Datasourceを作成する中で接続確認も実行されます。今回BigQueryへアクセスするため、事前にGCPへの認証設定をしておきます。(参考: https://googleapis.dev/python/google-api-core/latest/auth.html )
BigQueryへアクセスができるようになったらDatasourceを作成していきます。
% great_expectations --v3-api datasource new
Using v3 (Batch Request) API
What data would you like Great Expectations to connect to?
1. Files on a filesystem (for processing with Pandas or Spark)
2. Relational database (SQL)
: 2
Great Expectations relies on the library `sqlalchemy` to connect to your data, but the package `sqlalchemy` containing this library is not installed.
Would you like Great Expectations to try to execute `pip install sqlalchemy` for you? [Y/n]: Y
pip install sqlalchemy [------------------------------------] 0%
Which database backend are you using?
1. MySQL
2. Postgres
3. Redshift
4. Snowflake
5. BigQuery
6. other - Do you have a working SQLAlchemy connection string?
: 5
Great Expectations relies on the library `pybigquery.sqlalchemy_bigquery` to connect to your data, but the package `pybigquery` containing this library is not installed.
Would you like Great Expectations to try to execute `pip install pybigquery` for you? [Y/n]: Y
pip install pybigquery [------------------------------------] 0%
ここからインタラクティブにDatasourceの設定を進めるためJupyter notebookが立ち上がります。
notebookの中で以下変数の値を環境に合わせて変更して実行する。
# Datasource名を設定する
datasource_name = "my_datasource"
# BigQueryのconnection stringを設定する
connection_string = "bigquery://<project_id>/100knock"
BigQueryへ接続するためのDatasourceが作成されたので、ターミナルに戻ってExpectation Suiteの作成に入ります。Expectationの作成には自動プロファイリングを使うようにします。また、今回はcategory
テーブルを対象にします。
% great_expectations --v3-api suite new
Using v3 (Batch Request) API
How would you like to create your Expectation Suite?
1. Manually, without interacting with a sample batch of data (default)
2. Interactively, with a sample batch of data
3. Automatically, using a profiler
: 3
A batch of data is required to edit the suite - let's help you to specify it.
Select data_connector
1. default_runtime_data_connector_name
2. default_inferred_data_connector_name
: 2
Which data asset (accessible by data connector "default_inferred_data_connector_name") would you like to use?
1. category
: 1
Name the new Expectation Suite [category.warning]:
Great Expectations will create a notebook, containing code cells that select from available columns in your dataset and
generate expectations about them to demonstrate some examples of assertions you can make about your data.
When you run this notebook, Great Expectations will store these expectations in a new Expectation Suite "category.warning" here:
file:///home/koji/work/tmp/python/great_expectations/great_expectations/expectations/category/warning.json
Would you like to proceed? [Y/n]: Y
ここでもExpectation Suiteを設定するためにJupyter notebookが立ち上がります。
この中では、今回バリデーション対象のカラム category_major_cd
をコメントアウトします。(ignored_columns
に含まれるカラム名はバリデーション対象から外されます)
ignored_columns = [
#"category_major_cd",
"category_major_name",
"category_medium_cd",
"category_medium_name",
"category_small_cd",
"category_small_name",
]
notebookのセルを最後まで実行するとデータがプロファイリングされてExpectationが作成されます。更にそのExpectationを使って、プロファイルしたデータに対するバリデーションが実行されます。最後に、great_expectations/uncommitted/data_docs/local_site
配下にバリデーション結果のレポート(Data Docs)が保存され、category
テーブルに対するバリデーション結果のウィンドウが立ち上がります。
プロファイリングしたデータへのバリデーションなのでもちろん結果はSucceededになります。
バリデーション結果レポートを下の方にスクロールしていくと category_major_cd
カラムに対するバリデーション結果の詳細が確認できます。このカラムには04
,05
,06
,07
,08
,09
の6種類のコードだけが存在することがわかります。その他にも、NULL
を含まないなどのassertionが確認できます。
先程作成されたExpectationは、great_expectations/expectations/category/warning.json
に保存されており、内容を見てみるとcategory_major_cd
カラムは04
,05
,06
,07
,08
,09
の何れかの値を取ること(expect_column_values_to_be_in_set
)やNULL
を含まないこと(expect_column_values_to_not_be_null
)が定義されています。assertionの内容を変更する場合はこのJSONファイルを編集します。
...
{
"expectation_type": "expect_column_values_to_be_in_set",
"kwargs": {
"column": "category_major_cd",
"value_set": [
"04",
"05",
"06",
"07",
"08",
"09"
]
},
"meta": {}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "category_major_cd"
},
"meta": {}
},
...
Expectation Suiteができたので再度ターミナルに戻って今度はCheckpointを作ります。
great_expectations --v3-api checkpoint new my_checkpoint
notebookの中でCheckpointのコンフィグをYAML形式で設定しているセルがあるので、validations[0].batch_request.data_asset_name
にバリデーション対象のテーブル名 category_new
を設定します。
yaml_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
validations:
- batch_request:
datasource_name: my_datasource
data_connector_name: default_inferred_data_connector_name
data_asset_name: category_new
data_connector_query:
index: -1
expectation_suite_name: category.warning
"""
data_asset_name以外はそのままで全てのセルを実行していき、最後に context.add_checkpoint(**yaml.load(yaml_config))
を実行することで、 great_expectations/checkpoints
配下にCheckpointのコンフィグファイルmy_checkpoint.yml
が作成されます。
以上で、category_new
テーブルに対してバリデーションを実行する準備が整いました。
Airflowでの実行に向けた準備
ここからは、Airflowのワークフロー上からこのCheckpointを使ってバリデーションを実行する準備をしていきます。
今回はローカルにAirflow環境を構築して、BigQueryのcategory_new
テーブルへのバリデーションを実行し、バリデーション結果のData DocsをGCSバケットに保存するようにします。GCSバケットに保存することで、Google App Engineなどによるレポートサイトを立ち上げて社内で共有することができます。GCSにData Docsを出力したい場合はgreat_expectations.yml
のdata_docs_site
に以下設定を追加します。
data_docs_site:
...
gs_site:
class_name: SiteBuilder
store_backend:
class_name: TupleGCSStoreBackend
project: <project_id>
bucket: <GCS_bucket_name>
site_index_builder:
class_name: DefaultSiteIndexBuilder
AirflowのワークフロータスクからBigQueryとGCSにアクセスするためのサービスアカウントを作成して必要なロールをアサインしておきます。併せて、サービスアカウントのキー(JSONファイル)を作成してダウンロードしておきます。(ここではキーをgcp_credential.jsonとして保存しておきます)
Airflowは以下のリポジトリを使ってDocker Composeで立ち上げます。今回はバージョン1.10.14を使います。
git clone https://github.com/xnuinside/airflow_in_docker_compose
cd airflow_in_docker_compose
docker-compose -f docker-compose-with-celery-executor.yml up --build
先程作成したサービスアカウントのキー(JSONファイル)をAirflowのタスクからConnection経由で使用できるようにするため、files
ディレクトリにキーを保存します。
cp gcp_credential.json airflow_in_docker_compose/files
files
ディレクトリはAirflow webserverコンテナの/opt/airflow/files
にVolumeマウントされます。
Airflowが立ち上がったらWeb UI (http://localhost:8080/admin )にアクセスして、Connectionを登録します。
Airflow環境で、Great Expectationsを使ったBigQueryへのバリデーションの実行に必要なパッケージをインストールします。
# 以下コマンドをAirflow werbserver, worker, schedulerコンテナでそれぞれ実行する
# docker exec -it <Airflowの対象のコンテナ> bash
pip install apache-airflow-backport-providers-google pybigquery great_expectations
先程作成したgreat_expectaionsの設定ファイル群をAirflowのdagsディレクトリにデプロイします。airflow_files/dags
ディレクトリはAirflow webserverコンテナの/opt/airflow/dags
にVolumeマウントされます。
cp -r great_expectations airflow_in_docker_compose/airflow_files/dags/
PythonOperatorを使ってGreat Expectationsによるバリデーションを実行するだけのDAGを作成します。以下スクリプトをge_test.pyとして作成します。
from datetime import datetime
import os
from airflow import DAG
from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator
def validate(**context):
from great_expectations.data_context import DataContext
conn = BaseHook.get_connection('my_bq')
connection_json = conn.extra_dejson
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = connection_json['extra__google_cloud_platform__key_path']
data_context: DataContext = DataContext(context_root_dir="/opt/airflow/dags/great_expectations")
result = data_context.run_checkpoint(
checkpoint_name="my_checkpoint",
batch_request=None,
run_name=None,
)
data_context.build_data_docs()
if not result["success"]:
raise AirflowException("Validation of the data is not successful ")
with DAG(dag_id="ge_test_dag", start_date=datetime(2021, 7, 31),
schedule_interval=None) as dag:
PythonOperator(
task_id='validate',
python_callable=validate,
provide_context=True,
)
DAGをデプロイします。
cp ge_test.py airflow_in_docker_compose/airflow_files/dags/
AirflowのWeb UI(http://localhost:8080/admin )にアクセスしてDAGを確認します。
DAGの実行とバリデーション結果の確認
DAG実行の準備が整ったので、ge_test_dag
をTrigger DAGして実行します。そうするとcategory_new
テーブルに対してバリデーションするタスクが実行されます。category_new
テーブルはcategory_major_cd
カラムの値として本来存在しないはずの00
を含んでいるので、想定通りバリデーションが失敗します。
バリデーション結果をData Docsで確認します。Data Docsは自動的にGCSに保存されるので確認してみます。
ここで認証済みURLをクリックするとバリデーション結果としてData Docsが表示されます。
Statusが✗になっている欄を見ると想定通り00
という想定外の値が含まれていることが確認できます。
Data Docsをもとにデータ品質の状態を共有する
Data Docsをデータ生成元の担当者や利用者と共有することで、データの品質に関する状況を共有し、業務への影響や対応要否・優先度付けなどについて議論することができるようになると思います。尚、業務への影響範囲を把握する際は、データのリネージ情報も併せて用意されていると円滑に、そして正確に確認を進めることができると思います。
また、Data Docsは各テーブル・カラムの詳細な仕様書にもなるため、データカタログとリンクすることでメタ情報の増強にも役立つと思います。
Discussion