💯

Great Expectationsを使ったBigQueryでのデータバリデーション

2021/08/07に公開

データウェアハウスとしてBigQueryを使う環境でデータバリデーションを実行し、その結果をGoogle Cloud Storage(GCS)バケットに保存することで、データ品質に関するレポートを組織内で共有できるようにしてみます。バリデーションの実装はSQLとして用意するのではなく、JSONで仕様を記述できるGreat Expectationsというフレームワークを使います。バリデーションの実行はデータパイプラインの実装でよく使われるApache Airflowを使います。

データパイプラインでのデータバリデーションの必要性

データパイプラインでデータの自動連携やETLを運用していると、いつの間にか連携されてくるデータの性質や仕様が変わっていることがあります。その変化に気づかず、データパイプラインでデータを処理してユーザに提供してしまうと、そのデータを利用した業務での事故に繋がります。また、機械学習で使用するデータの場合は間違ったデータで学習することでリカバリのコストが大きくなる傾向にあります。
そのため、データパイプラインの中では適切ポイントでデータのバリデーションを行うことで、提供するデータの品質を一定に保つ必要があると思います。特に、データパイプラインに入ってくるデータは管轄外であることが多く不確実性が高いため、データパイプラインの入り口でのチェックは重要です。

バリデーションの実装

データパイプライン上でのバリデーションの実装方法はいくつか考えられます。一番簡単に思いつく方法としてはSQLでチェックする方法です。チェック対象がそれほど多くなければSQLによる実装でも問題ないですが、対象が増えてくるとそれに伴いボイラープレートも増えてきて管理も冗長になってくると思います。
そこで、データが満たすべき条件をコンフィグファイルとして記述して、それを基にバリデーションを実行(assertion)するフレームワークがいくつか公開されています。例えば、JSONファイルでassertionを記述するGreat ExpectationsやYAMLで記述するSoda SQLなどがあります。ここでは、Great Expectationsを使ったバリデーションの例を紹介します。

Great Expectations

https://greatexpectations.io/

Great ExpectationsはPython製のフレームワークで、great_expectationsコマンドを使ってバリデーションの設定を作成していきます。Great Expectationsでは、データバリデーションを構成する1つ1つのassertionにあたるものをExpectationと呼んでいます。このExpectationの設定をカラムごとに1つずつ1から記述していくのはJSON形式とはいえ骨の折れる作業なので、データプロファイリングの機能により自動でExpectaionを導出することができます。自動プロファイリングにより生成されたExpectationを必要に応じて調整し、最終的なassertionを構成します。出来上がった複数のExpectationを纏めてExpectaion Suiteと呼び、バリデーションしたいデータに対してこのExpectation Suiteに基づいたチェックの実行を行います。どのデータに対してどのExpectaion Suiteによるチェックを実行するのかという設定をCheckpointと呼びます。Checkpointはバリデーション実行だけでなくバリデーション結果の保存先の管理まで行います。バリデーション結果はData Docsと呼ばれるHTML形式のファイルとして出力されます。Data Docsはローカルファイルシステムだけでなく、GCSやAWS S3などにも自動で保存することができます。以上のExpectationやCheckpointなどの細かい調整は、PythonのAPIを使って行います。実際の開発では試行錯誤しながら何度も調整を行うことになるので、それらを行うためのJupyter notebookも自動で生成されます。

Apache Airflowを使ったGreat Expectaionsでのバリデーション実装

ここでは、Apache AirflowのワークフローでGreat ExpectaionsによるBigQueryテーブルに対するバリデーションを実行してみます。

今回は以下の流れで進めていきます。

  1. ローカルでGreat ExpectaionsのExpectaion SuiteとCheckpointを作成
  2. Great Expectationsのバリデーションを実行するAirflowワークフロー(DAG)を作成
  3. AirflowにDAGをデプロイして実行しバリデーション結果(Data Docs)を確認する

バリデーションを作成する際のプロファイリング対象データとして、データサイエンス100本ノックで用意されているデータの中のcategoryテーブルを使うので、GitHubのリポジトリからCSVファイルをダウンロードして事前にBigQueryにロードしておきます。ここでは、100knock.categoryとしてテーブルを作成した前提で進めます。尚、categoryテーブルの中でもcategory_major_cdカラムは04,05,06,07,08,09の6種類のコードで構成されるわかりやすいデータのため、このカラムをバリデーションの対象にします。また、上記データを基に作成したバリデーションを実際に実行する対象として、100knock.category_newというテーブルも別途作成しておきます。今回、バリデーションを意図的に失敗させたいため、以下SQLによりcategorycategory_major_cdカラムの値0400に置換したテーブルをcategory_newとして作成します。

SELECT 
  IF(category_major_cd = '04', '00', category_major_cd) AS category_major_cd,
  * EXCEPT(category_major_cd)
FROM
  `100knock.category`

Great Expectaionsによるバリデーションの作成

はじめにローカルでGreat ExpectationsのExpectation SuiteとCheckpointを作成するため、pipでGreat Expectationsをインストールします。

pip install great_expectations

Great Expectationsはプロジェクトという単位でバリデーションに関わる設定ファイルやそれらを生成・調整するためのJupyter notebookなどを管理します。そのため、まずはプロジェクトを初期化します。

% great_expectations --v3-api init
Using v3 (Batch Request) API

  ___              _     ___                  _        _   _
 / __|_ _ ___ __ _| |_  | __|_ ___ __  ___ __| |_ __ _| |_(_)___ _ _  ___
| (_ | '_/ -_) _` |  _| | _|\ \ / '_ \/ -_) _|  _/ _` |  _| / _ \ ' \(_-<
 \___|_| \___\__,_|\__| |___/_\_\ .__/\___\__|\__\__,_|\__|_\___/_||_/__/
                                |_|
             ~ Always know what to expect from your data ~

Let's create a new Data Context to hold your project configuration.

Great Expectations will create a new directory with the following structure:

    great_expectations
    |-- great_expectations.yml
    |-- expectations
    |-- checkpoints
    |-- notebooks
    |-- plugins
    |-- .gitignore
    |-- uncommitted
        |-- config_variables.yml
        |-- data_docs
        |-- validations

OK to proceed? [Y/n]:Y

================================================================================

Congratulations! You are now ready to customize your Great Expectations configuration.

You can customize your configuration in many ways. Here are some examples:

  Use the CLI to:
    - Run `great_expectations --v3-api datasource new` to connect to your data.
    - Run `great_expectations --v3-api checkpoint new <checkpoint_name>` to bundle data with Expectation Suite(s) in a Checkpoint for later re-validation.
    - Run `great_expectations --v3-api suite --help` to create, edit, list, profile Expectation Suites.
    - Run `great_expectations --v3-api docs --help` to build and manage Data Docs sites.

  Edit your configuration in great_expectations.yml to:
    - Move Stores to the cloud
    - Add Slack notifications, PagerDuty alerts, etc.
    - Customize your Data Docs

Please see our documentation for more configuration options!

カレントディレクトリにgreat_expectationsというディレクトリができます。この中に一連の設定ファイルや作業用のJupyter notebookが保存されます。

バリデーション対象のデータソースに関する設定をDatasourceと呼びます。Datasourceを作成する中で接続確認も実行されます。今回BigQueryへアクセスするため、事前にGCPへの認証設定をしておきます。(参考: https://googleapis.dev/python/google-api-core/latest/auth.html )
BigQueryへアクセスができるようになったらDatasourceを作成していきます。

% great_expectations --v3-api datasource new

Using v3 (Batch Request) API

What data would you like Great Expectations to connect to?
    1. Files on a filesystem (for processing with Pandas or Spark)
    2. Relational database (SQL)
: 2
Great Expectations relies on the library `sqlalchemy` to connect to your data, but the package `sqlalchemy` containing this library is not installed.
    Would you like Great Expectations to try to execute `pip install sqlalchemy` for you? [Y/n]: Y
pip install sqlalchemy  [------------------------------------]    0%

Which database backend are you using?
    1. MySQL
    2. Postgres
    3. Redshift
    4. Snowflake
    5. BigQuery
    6. other - Do you have a working SQLAlchemy connection string?
: 5
Great Expectations relies on the library `pybigquery.sqlalchemy_bigquery` to connect to your data, but the package `pybigquery` containing this library is not installed.
    Would you like Great Expectations to try to execute `pip install pybigquery` for you? [Y/n]: Y
pip install pybigquery  [------------------------------------]    0%

ここからインタラクティブにDatasourceの設定を進めるためJupyter notebookが立ち上がります。

notebookの中で以下変数の値を環境に合わせて変更して実行する。

# Datasource名を設定する
datasource_name = "my_datasource"
# BigQueryのconnection stringを設定する
connection_string = "bigquery://<project_id>/100knock"

BigQueryへ接続するためのDatasourceが作成されたので、ターミナルに戻ってExpectation Suiteの作成に入ります。Expectationの作成には自動プロファイリングを使うようにします。また、今回はcategoryテーブルを対象にします。

% great_expectations --v3-api suite new

Using v3 (Batch Request) API

How would you like to create your Expectation Suite?
    1. Manually, without interacting with a sample batch of data (default)
    2. Interactively, with a sample batch of data
    3. Automatically, using a profiler
: 3

A batch of data is required to edit the suite - let's help you to specify it.

Select data_connector
    1. default_runtime_data_connector_name
    2. default_inferred_data_connector_name
: 2

Which data asset (accessible by data connector "default_inferred_data_connector_name") would you like to use?
    1. category
: 1

Name the new Expectation Suite [category.warning]:

Great Expectations will create a notebook, containing code cells that select from available columns in your dataset and
generate expectations about them to demonstrate some examples of assertions you can make about your data.

When you run this notebook, Great Expectations will store these expectations in a new Expectation Suite "category.warning" here:

  file:///home/koji/work/tmp/python/great_expectations/great_expectations/expectations/category/warning.json

Would you like to proceed? [Y/n]: Y

ここでもExpectation Suiteを設定するためにJupyter notebookが立ち上がります。

この中では、今回バリデーション対象のカラム category_major_cd をコメントアウトします。(ignored_columnsに含まれるカラム名はバリデーション対象から外されます)

ignored_columns = [
    #"category_major_cd",
    "category_major_name",
    "category_medium_cd",
    "category_medium_name",
    "category_small_cd",
    "category_small_name",
]

notebookのセルを最後まで実行するとデータがプロファイリングされてExpectationが作成されます。更にそのExpectationを使って、プロファイルしたデータに対するバリデーションが実行されます。最後に、great_expectations/uncommitted/data_docs/local_site 配下にバリデーション結果のレポート(Data Docs)が保存され、categoryテーブルに対するバリデーション結果のウィンドウが立ち上がります。

プロファイリングしたデータへのバリデーションなのでもちろん結果はSucceededになります。
バリデーション結果レポートを下の方にスクロールしていくと category_major_cd カラムに対するバリデーション結果の詳細が確認できます。このカラムには04,05,06,07,08,09の6種類のコードだけが存在することがわかります。その他にも、NULLを含まないなどのassertionが確認できます。

先程作成されたExpectationは、great_expectations/expectations/category/warning.jsonに保存されており、内容を見てみるとcategory_major_cdカラムは04,05,06,07,08,09の何れかの値を取ること(expect_column_values_to_be_in_set)やNULLを含まないこと(expect_column_values_to_not_be_null)が定義されています。assertionの内容を変更する場合はこのJSONファイルを編集します。

...
{
      "expectation_type": "expect_column_values_to_be_in_set",
      "kwargs": {
        "column": "category_major_cd",
        "value_set": [
          "04",
          "05",
          "06",
          "07",
          "08",
          "09"
        ]
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "category_major_cd"
      },
      "meta": {}
    },
...

Expectation Suiteができたので再度ターミナルに戻って今度はCheckpointを作ります。

great_expectations --v3-api checkpoint new my_checkpoint

notebookの中でCheckpointのコンフィグをYAML形式で設定しているセルがあるので、validations[0].batch_request.data_asset_nameにバリデーション対象のテーブル名 category_new を設定します。

yaml_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-my-run-name-template"
validations:
  - batch_request:
      datasource_name: my_datasource
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: category_new
      data_connector_query:
        index: -1
    expectation_suite_name: category.warning
"""

data_asset_name以外はそのままで全てのセルを実行していき、最後に context.add_checkpoint(**yaml.load(yaml_config)) を実行することで、 great_expectations/checkpoints 配下にCheckpointのコンフィグファイルmy_checkpoint.yml が作成されます。
以上で、category_newテーブルに対してバリデーションを実行する準備が整いました。

Airflowでの実行に向けた準備

ここからは、Airflowのワークフロー上からこのCheckpointを使ってバリデーションを実行する準備をしていきます。
今回はローカルにAirflow環境を構築して、BigQueryのcategory_newテーブルへのバリデーションを実行し、バリデーション結果のData DocsをGCSバケットに保存するようにします。GCSバケットに保存することで、Google App Engineなどによるレポートサイトを立ち上げて社内で共有することができます。GCSにData Docsを出力したい場合はgreat_expectations.ymldata_docs_siteに以下設定を追加します。

data_docs_site:
  ...
  gs_site:
    class_name: SiteBuilder
    store_backend:
      class_name: TupleGCSStoreBackend
      project: <project_id>
      bucket: <GCS_bucket_name>
    site_index_builder:
      class_name: DefaultSiteIndexBuilder

AirflowのワークフロータスクからBigQueryとGCSにアクセスするためのサービスアカウントを作成して必要なロールをアサインしておきます。併せて、サービスアカウントのキー(JSONファイル)を作成してダウンロードしておきます。(ここではキーをgcp_credential.jsonとして保存しておきます)

Airflowは以下のリポジトリを使ってDocker Composeで立ち上げます。今回はバージョン1.10.14を使います。
https://github.com/xnuinside/airflow_in_docker_compose

git clone https://github.com/xnuinside/airflow_in_docker_compose
cd airflow_in_docker_compose
docker-compose -f docker-compose-with-celery-executor.yml up --build

先程作成したサービスアカウントのキー(JSONファイル)をAirflowのタスクからConnection経由で使用できるようにするため、filesディレクトリにキーを保存します。

cp gcp_credential.json airflow_in_docker_compose/files

filesディレクトリはAirflow webserverコンテナの/opt/airflow/filesにVolumeマウントされます。
Airflowが立ち上がったらWeb UI (http://localhost:8080/admin )にアクセスして、Connectionを登録します。

Airflow環境で、Great Expectationsを使ったBigQueryへのバリデーションの実行に必要なパッケージをインストールします。

# 以下コマンドをAirflow werbserver, worker, schedulerコンテナでそれぞれ実行する
# docker exec -it <Airflowの対象のコンテナ> bash 
pip install apache-airflow-backport-providers-google pybigquery great_expectations

先程作成したgreat_expectaionsの設定ファイル群をAirflowのdagsディレクトリにデプロイします。airflow_files/dagsディレクトリはAirflow webserverコンテナの/opt/airflow/dagsにVolumeマウントされます。

cp -r great_expectations airflow_in_docker_compose/airflow_files/dags/

PythonOperatorを使ってGreat Expectationsによるバリデーションを実行するだけのDAGを作成します。以下スクリプトをge_test.pyとして作成します。

from datetime import datetime
import os

from airflow import DAG
from airflow import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.operators.python_operator import PythonOperator


def validate(**context):
    from great_expectations.data_context import DataContext

    conn = BaseHook.get_connection('my_bq')
    connection_json = conn.extra_dejson
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = connection_json['extra__google_cloud_platform__key_path']

    data_context: DataContext = DataContext(context_root_dir="/opt/airflow/dags/great_expectations")

    result = data_context.run_checkpoint(
        checkpoint_name="my_checkpoint",
        batch_request=None,
        run_name=None,
    )

    data_context.build_data_docs()

    if not result["success"]:
        raise AirflowException("Validation of the data is not successful ")


with DAG(dag_id="ge_test_dag", start_date=datetime(2021, 7, 31),
         schedule_interval=None) as dag:
    PythonOperator(
        task_id='validate',
        python_callable=validate,
        provide_context=True,
    )

DAGをデプロイします。

cp ge_test.py airflow_in_docker_compose/airflow_files/dags/

AirflowのWeb UI(http://localhost:8080/admin )にアクセスしてDAGを確認します。

DAGの実行とバリデーション結果の確認

DAG実行の準備が整ったので、ge_test_dagをTrigger DAGして実行します。そうするとcategory_newテーブルに対してバリデーションするタスクが実行されます。category_newテーブルはcategory_major_cdカラムの値として本来存在しないはずの00を含んでいるので、想定通りバリデーションが失敗します。

バリデーション結果をData Docsで確認します。Data Docsは自動的にGCSに保存されるので確認してみます。

ここで認証済みURLをクリックするとバリデーション結果としてData Docsが表示されます。

Statusが✗になっている欄を見ると想定通り00という想定外の値が含まれていることが確認できます。

Data Docsをもとにデータ品質の状態を共有する

Data Docsをデータ生成元の担当者や利用者と共有することで、データの品質に関する状況を共有し、業務への影響や対応要否・優先度付けなどについて議論することができるようになると思います。尚、業務への影響範囲を把握する際は、データのリネージ情報も併せて用意されていると円滑に、そして正確に確認を進めることができると思います。
また、Data Docsは各テーブル・カラムの詳細な仕様書にもなるため、データカタログとリンクすることでメタ情報の増強にも役立つと思います。

Discussion