Great Expectations入門(Pandas DataFrame編)
Great Expectationsとは
データの品質をチェックする、Pythonのライブラリです(Data Quality・Data Monitoring・Data Testingとか呼ばれるカテゴリー)。
公式ページ曰く「Great Expectations helps data teams eliminate pipeline debt, through data testing, documentation, and profiling.」で、つまり、
- データのテスト
- このテストのことをExpectationと呼びます
- あるカラムがあるか、カラムの値が特定の範囲か、NULLが一定の割合かなどをチェックできます
- 組み込みのExpectationの一覧はこちら
- データの文章化
- どんなテストをパスしたか、どのテストで失敗したか、失敗した値や割合をハイカラなHTMLで閲覧できます
- プロファイリング
- Expectationの雛形をデータから作成することができます
の3つの機能で、"Pipeline debt"を解消することを目指しています。
この記事
Great Expectationsがサポートしているデータソースは
- クラウドのDWH(Athena,BigQuery,RedShift,Snowflake)
- RDB(SQLServer,MySQL,PostgreSQL,SQLite)
- ファイル
がありますが、それ以外のデータを対象としたい場合、Pandas(Spark) DataFrame経由で対応できそうです。
チュートリアルではローカルファイルの説明がありますし、RDB・DWHへのアクセスはWeb上に記事があったのですが、DataFrame経由のアクセスに戸惑ったのでそのメモです。
流れ
チュートリアルでは、great_expectations CLIによるひな形(Jupyter Notebook)を使って、
- 準備(パッケージのインストール、ディレクトリの初期化)
- データソースの設定(Pandas経由でのローカルのファイルを読むように)
- Expectationsの作成(1.で設定したファイルから自動プロファイル)
- バリデーション(CheckPointの実行)
と進みます。
同じようにしても芸がないので、この記事では、
- 準備(チュートリアルと同じ)
- データソースの設定(バリデーション実行時にPandasのDataFrameを読むように)
- Expectationsの作成(手動での追加)
- バリデーション(CheckPointの実行)
の流れを、great_expectations CLIによるひな形なしで行います。
やってみる
準備
(チュートリアルと同じです)
> pip install great-expectations
> pip freeze | grep great
great-expectations==0.14.11
> great_expectations -y init
# 結果省略
> tree
.
└── great_expectations
├── checkpoints
├── expectations
├── great_expectations.yml
├── plugins
│ └── custom_data_docs
│ ├── renderers
│ ├── styles
│ │ └── data_docs_custom_styles.css
│ └── views
├── profilers
└── uncommitted
├── config_variables.yml
├── data_docs
└── validations
データソースの設定
データソースを設定します。ここで設定したデータソースは、
- バリデーションの実行(CheckPointの実行)
- Expectationの準備
の二箇所で使用します。
なお、Expectationの準備は以下の四種類の方法があり、
このうち、1と4に関してはデータソースの設定とは独立に設定することができます。
ドキュメントの記載に従い、
- Execute EngineにはPandasExecutionEngine
- DataConnectorにはRuntimeDataConnector
を設定したデータソースを設定します。
(以下特に記載の無い限り、Pythonインタプリタでの処理です)
import great_expectations as ge
context = ge.data_context.DataContext()
from ruamel import yaml
datasource_config = {
"name": "example_datasource",
"class_name": "Datasource",
"module_name": "great_expectations.datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "PandasExecutionEngine",
},
"data_connectors": {
"default_runtime_data_connector_name": {
"class_name": "RuntimeDataConnector",
"module_name": "great_expectations.datasource.data_connector",
"batch_identifiers": ["default_identifier_name"],
},
},
}
context.test_yaml_config(yaml.dump(datasource_config))
context.add_datasource(**datasource_config)
Expectationの設定
- 上記で作成したデータソースからデータを取得する、BatchRequest
- Expectationの集まりである、Expectation Suite
- BatchRequestからデータを取得しExpectation Suiteを実行する、Validator
の3つのオブジェクトを作成し、カラム「a」が存在するExpectationを登録します。
データを渡さずExpectationを設定することもできますが、データ(BatchRequest)に対してインタラクティブにExpectationを実行するため、成功・失敗を確認しながら進めることができます。
(今回の例だとメリット、わかりにくいですが)
import pandas as pd
from great_expectations.core.batch import RuntimeBatchRequest
suite = context.create_expectation_suite(
'pandas_dataframe',
overwrite_existing=True
)
df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["a", "b", "c"])
batch_request = RuntimeBatchRequest(
datasource_name="example_datasource",
data_connector_name="default_runtime_data_connector_name",
data_asset_name="pandas_test",
runtime_parameters={"batch_data": df},
batch_identifiers={"default_identifier_name": "default_identifier"},
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="pandas_dataframe"
)
validator.expect_column_to_exist('a')
DataFrame dfに「a」というカラムはあるので、一番最後のExpectationsは成功を返します。
{
"exception_info": {
"raised_exception": false,
"exception_traceback": null,
"exception_message": null
},
"result": {},
"success": true,
"meta": {}
}
テストに使うExpectation Suite(Expectationの集まり)をJSONファイルに保存します。
validator.save_expectation_suite()
保存されているか見てみましょう
cat great_expectations/expectations/pandas_dataframe.json
{
"data_asset_type": null,
"expectation_suite_name": "pandas_dataframe",
"expectations": [
{
"expectation_type": "expect_column_to_exist",
"kwargs": {
"column": "a"
},
"meta": {}
}
],
"ge_cloud_id": null,
"meta": {
"great_expectations_version": "0.14.11"
}
}
CheckPoint
- Expectation Suite(Expectationの集まり)
- Batch(データソースから取得したデータの集まり)
- Action(検証後に行う行動)
をまとめたものです。今回はActionは特に設定せず、上記で設定したExpectation Suiteと、データソースを設定します。
checkpoint_config = {
"name": "column_a_check",
"config_version": 1,
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": {
"datasource_name": "example_datasource",
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": "pandas_df",
},
"expectation_suite_name": "pandas_dataframe",
}
],
}
context.add_checkpoint(**checkpoint_config)
(なお、data_asset_name
の役割が何かわかってないです…)
動かしてみる
Expectationの作成時に使用したのと同じDataFrameで、CheckPointを実行します。
results = context.run_checkpoint(
checkpoint_name="column_a_check",
batch_request={
"runtime_parameters": {"batch_data": df},
"batch_identifiers": {
"default_identifier_name": "validation_df"
},
},
)
それとは別に、必要なカラムがない(カラムaの代わりにaaa)でもCheckPointを実行します。
failed_df = pd.DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["aaa", "b", "c"])
results = context.run_checkpoint(
checkpoint_name="column_a_check",
batch_request={
"runtime_parameters": {"batch_data": failed_df},
"batch_identifiers": {
"default_identifier_name": "failed_df"
},
},
)
見てみる
context.open_data_docs()
成功した方のCheckPoint
失敗した方のCheckPoint
補足:UserConfigurableProfiler
チュートリアルではファイルを元にExpectationを作成しましたが、DataFrameでも同様の事ができます。
DataFrameの内容を元にExpectationを作成
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler
suite = context.create_expectation_suite(
'pandas_dataframe_profiler',
overwrite_existing=True
)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name="pandas_dataframe_profiler"
)
profiler = UserConfigurableProfiler(profile_dataset=validator)
suite = profiler.build_suite()
validator.save_expectation_suite(discard_failed_expectations=False)
CheckPointの作成と実行
checkpoint_config = {
"name": "profiler_checkpoint",
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": {
"datasource_name": "example_datasource",
"data_connector_name": "default_runtime_data_connector_name",
"data_asset_name": "taxi_data",
},
"expectation_suite_name": "pandas_dataframe_profiler",
}
],
}
context.add_checkpoint(**checkpoint_config)
context.run_checkpoint(
checkpoint_name="profiler_checkpoint",
batch_request={
"runtime_parameters": {"batch_data": df},
"batch_identifiers": {
"default_identifier_name": "<YOUR MEANINGFUL IDENTIFIER>"
},
},
)
context.build_data_docs()
結果
Discussion