🐼

Great Expectations入門(Pandas DataFrame編)

2022/03/21に公開

Great Expectationsとは

データの品質をチェックする、Pythonのライブラリです(Data Quality・Data Monitoring・Data Testingとか呼ばれるカテゴリー)。

公式ページ曰く「Great Expectations helps data teams eliminate pipeline debt, through data testing, documentation, and profiling.」で、つまり、

  • データのテスト
    • このテストのことをExpectationと呼びます
    • あるカラムがあるか、カラムの値が特定の範囲か、NULLが一定の割合かなどをチェックできます
    • 組み込みのExpectationの一覧はこちら
  • データの文章化
    • どんなテストをパスしたか、どのテストで失敗したか、失敗した値や割合をハイカラなHTMLで閲覧できます
  • プロファイリング
    • Expectationの雛形をデータから作成することができます

の3つの機能で、"Pipeline debt"を解消することを目指しています。

この記事

Great Expectationsがサポートしているデータソースは

  • クラウドのDWH(Athena,BigQuery,RedShift,Snowflake)
  • RDB(SQLServer,MySQL,PostgreSQL,SQLite)
  • ファイル

がありますが、それ以外のデータを対象としたい場合、Pandas(Spark) DataFrame経由で対応できそうです。

チュートリアルではローカルファイルの説明がありますし、RDB・DWHへのアクセスはWeb上に記事があったのですが、DataFrame経由のアクセスに戸惑ったのでそのメモです。

流れ

チュートリアルでは、great_expectations CLIによるひな形(Jupyter Notebook)を使って、

  1. 準備(パッケージのインストール、ディレクトリの初期化)
  2. データソースの設定(Pandas経由でのローカルのファイルを読むように)
  3. Expectationsの作成(1.で設定したファイルから自動プロファイル)
  4. バリデーション(CheckPointの実行)

と進みます。

同じようにしても芸がないので、この記事では、

  1. 準備(チュートリアルと同じ)
  2. データソースの設定(バリデーション実行時にPandasのDataFrameを読むように)
  3. Expectationsの作成(手動での追加)
  4. バリデーション(CheckPointの実行)

の流れを、great_expectations CLIによるひな形なしで行います。

やってみる

準備

(チュートリアルと同じです)

> pip install great-expectations
> pip freeze | grep great
great-expectations==0.14.11

> great_expectations -y init
# 結果省略

> tree
.
└── great_expectations
    ├── checkpoints
    ├── expectations
    ├── great_expectations.yml
    ├── plugins
    │   └── custom_data_docs
    │       ├── renderers
    │       ├── styles
    │       │   └── data_docs_custom_styles.css
    │       └── views
    ├── profilers
    └── uncommitted
        ├── config_variables.yml
        ├── data_docs
        └── validations

データソースの設定

データソースを設定します。ここで設定したデータソースは、

  1. バリデーションの実行(CheckPointの実行)
  2. Expectationの準備

の二箇所で使用します。

なお、Expectationの準備は以下の四種類の方法があり、

  1. 設定のJSONをそのまま書く
  2. Pythonでインタラクティブに設定する(この記事の方法)
  3. プロファイリングして設定する(チュートリアルの方法)
  4. 自作する

このうち、1と4に関してはデータソースの設定とは独立に設定することができます。

ドキュメントの記載に従い、

を設定したデータソースを設定します。

(以下特に記載の無い限り、Pythonインタプリタでの処理です)

import great_expectations as ge
context = ge.data_context.DataContext()

from ruamel import yaml

datasource_config = {
    "name": "example_datasource",
    "class_name": "Datasource",
    "module_name": "great_expectations.datasource",
    "execution_engine": {
        "module_name": "great_expectations.execution_engine",
        "class_name": "PandasExecutionEngine",
    },
    "data_connectors": {
        "default_runtime_data_connector_name": {
            "class_name": "RuntimeDataConnector",
            "module_name": "great_expectations.datasource.data_connector",
            "batch_identifiers": ["default_identifier_name"],
        },
    },
}
context.test_yaml_config(yaml.dump(datasource_config))
context.add_datasource(**datasource_config)

Expectationの設定

  • 上記で作成したデータソースからデータを取得する、BatchRequest
  • Expectationの集まりである、Expectation Suite
  • BatchRequestからデータを取得しExpectation Suiteを実行する、Validator

の3つのオブジェクトを作成し、カラム「a」が存在するExpectationを登録します。

データを渡さずExpectationを設定することもできますが、データ(BatchRequest)に対してインタラクティブにExpectationを実行するため、成功・失敗を確認しながら進めることができます。
(今回の例だとメリット、わかりにくいですが)

import pandas as pd
from great_expectations.core.batch import RuntimeBatchRequest

suite = context.create_expectation_suite(
    'pandas_dataframe',
    overwrite_existing=True
)

df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["a", "b", "c"])

batch_request = RuntimeBatchRequest(
    datasource_name="example_datasource",
    data_connector_name="default_runtime_data_connector_name",
    data_asset_name="pandas_test",
    runtime_parameters={"batch_data": df}, 
    batch_identifiers={"default_identifier_name": "default_identifier"},
)

validator = context.get_validator(
    batch_request=batch_request, expectation_suite_name="pandas_dataframe"
)
validator.expect_column_to_exist('a')

DataFrame dfに「a」というカラムはあるので、一番最後のExpectationsは成功を返します。

{
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "result": {},
  "success": true,
  "meta": {}
}

テストに使うExpectation Suite(Expectationの集まり)をJSONファイルに保存します。

validator.save_expectation_suite()

保存されているか見てみましょう

cat great_expectations/expectations/pandas_dataframe.json
{
  "data_asset_type": null,
  "expectation_suite_name": "pandas_dataframe",
  "expectations": [
    {
      "expectation_type": "expect_column_to_exist",
      "kwargs": {
        "column": "a"
      },
      "meta": {}
    }
  ],
  "ge_cloud_id": null,
  "meta": {
    "great_expectations_version": "0.14.11"
  }
}

CheckPoint

CheckPointは、

  • Expectation Suite(Expectationの集まり)
  • Batch(データソースから取得したデータの集まり)
  • Action(検証後に行う行動)

をまとめたものです。今回はActionは特に設定せず、上記で設定したExpectation Suiteと、データソースを設定します。

checkpoint_config = {
    "name": "column_a_check",
    "config_version": 1,
    "class_name": "SimpleCheckpoint",
    "validations": [
        {
            "batch_request": {
                "datasource_name": "example_datasource",
                "data_connector_name": "default_runtime_data_connector_name",
                "data_asset_name": "pandas_df",
            },
            "expectation_suite_name": "pandas_dataframe",
        }
    ],
}
context.add_checkpoint(**checkpoint_config)

(なお、data_asset_nameの役割が何かわかってないです…)

動かしてみる

Expectationの作成時に使用したのと同じDataFrameで、CheckPointを実行します。

results = context.run_checkpoint(
    checkpoint_name="column_a_check",
    batch_request={
        "runtime_parameters": {"batch_data": df},
        "batch_identifiers": {
            "default_identifier_name": "validation_df"
        },
    },
)

それとは別に、必要なカラムがない(カラムaの代わりにaaa)でもCheckPointを実行します。


failed_df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["aaa", "b", "c"])

results = context.run_checkpoint(
    checkpoint_name="column_a_check",
    batch_request={
        "runtime_parameters": {"batch_data": failed_df},
        "batch_identifiers": {
            "default_identifier_name": "failed_df"
        },
    },
)

見てみる

context.open_data_docs()

成功した方のCheckPoint

失敗した方のCheckPoint

補足:UserConfigurableProfiler

チュートリアルではファイルを元にExpectationを作成しましたが、DataFrameでも同様の事ができます。

DataFrameの内容を元にExpectationを作成

from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler

suite = context.create_expectation_suite(
    'pandas_dataframe_profiler',
    overwrite_existing=True
)
validator = context.get_validator(
    batch_request=batch_request, expectation_suite_name="pandas_dataframe_profiler"
)
profiler = UserConfigurableProfiler(profile_dataset=validator)
suite = profiler.build_suite()
validator.save_expectation_suite(discard_failed_expectations=False)

CheckPointの作成と実行

checkpoint_config = {
    "name": "profiler_checkpoint",
    "class_name": "SimpleCheckpoint",
    "validations": [
        {
            "batch_request": {
                "datasource_name": "example_datasource",
                "data_connector_name": "default_runtime_data_connector_name",
                "data_asset_name": "taxi_data",
            },
            "expectation_suite_name": "pandas_dataframe_profiler",
        }
    ],
}
context.add_checkpoint(**checkpoint_config)

context.run_checkpoint(
    checkpoint_name="profiler_checkpoint",
    batch_request={
        "runtime_parameters": {"batch_data": df},
        "batch_identifiers": {
            "default_identifier_name": "<YOUR MEANINGFUL IDENTIFIER>"
        },
    },
)
context.build_data_docs()

結果

Discussion