Dataplex Auto Data Quality(データ品質)による品質チェックの自動化

2024/09/24に公開

WED株式会社でデータエンジニアをしているthimi0412です。
レシート買取アプリONEでは、撮影されたレシート画像からOCRを行い、レシート情報(購入店舗、商品名、価格等)を抽出して構造化し、DBに保存しています。

他企業とのデータ連携や納品、機械学習を行う上でデータの品質のチェックを自動化したいと思いDataplexのAuto Data Qualityを使用しました。今回はシンプルなデータの重複チェックを行なった例を紹介しようと思います。
https://cloud.google.com/dataplex/docs/auto-data-quality-overview

やったこと

  1. DataplexのAuto Data Qualityを実行
  2. 実行結果をBigQueryのテーブルとして保存
  3. 作成したテーブルに対してSQL実行して品質チェックがNGだったものを表示

Dataplex Auto Data Qualityの作成

ここでは、JANコードを管理しているテーブルに対して、JANコードが重複していないかチェックを行います。弊社では、OCRで取得した商品名からJANコードを推論し、付与を行なっています。
https://zenn.dev/wed_engineering/articles/20240731-indexing-product-names

JANコードは、日々新しいものが連携されてきており、対象のテーブルに追加されていきます。

やりたいことの流れ

  1. 定期的にテーブルをスキャン
  2. スキャン結果をBigQueryのテーブルとして保存
  3. スキャン結果テーブルにSQLで実行して品質チェックに失敗しているテーブルを検知する

今回は、TerraformでDataplex Auto Data Qualityを作成しました。

resource "google_dataplex_datascan" "dmt_v2_jan" {
  project      = data.google_project.main.number
  location     = "us-central1"
  data_scan_id = "dmt-v2-jan"

  # チェック対象テーブル
  data {
    resource = "//bigquery.googleapis.com/${google_bigquery_table.dmt_v2_jan.id}"
  }

  execution_spec {
    trigger {
      schedule {
        cron = "TZ=Asia/Tokyo 0 10 * * 1" # 月曜日の10時
      }
    }
  }
  data_quality_spec {
    sampling_percent = 100
    post_scan_actions {
      bigquery_export {
        # 結果を保存するBQを指定
        results_table = "//bigquery.googleapis.com/${google_bigquery_dataset.dataplex.id}/tables/scan_result"
      }
    }
    rules {
      column    = "jan_code"
      dimension = "UNIQUENESS"
      uniqueness_expectation {}
    }
  }
}

Applyをして設定を見ると、結果を BigQuery と Dataplex Catalog UI に公開するというところが有効化されていないので、コンソール上から有効化しました。
Terraformでplanを行なっても差分として現れなかったので、まだ対応していないのかと思われます。

この設定を行うとBigQuery Studioからでも対象のテーブルで実行結果を見ることができます。

実行結果のテーブル

スキャンが実行されるとテーブルが作成されるので、以下のようなSQLでどのような結果になったのかを確認します。job_quality_result.passedの値がfalseのものをwhereの条件に入れれば、チェックがNGだったものだけを抽出できます。このクエリを定期的に実行し、Slackチャンネル等に通知を行えば、品質チェックを行うことができます。

select
  data_source.dataset_id,
  data_source.table_id,
  rule_column,
  rule_type,
  rule_dimension,
  job_quality_result.passed,
  job_quality_result.score,
  rule_rows_passed_percent,
  datetime(job_start_time, 'Asia/Tokyo') as job_start_time
from
  dataplex.scan_result


↑の画像だとreceipt_itemsというテーブルでreceipt_item_idが重複していることがわかりました。
後にこのテーブルを作成しているSQLの修正を行い、その後の結果ではチェックをパスすることができました。

パーティションフィルタを必須に設定をしているテーブルへのスキャン

大規模なテーブルだとパーティションの設定を行うことでクエリのコストを抑えますが、そのようなテーブルに対してスキャンを実行する際には、row_filterでパーティション設定をしているカラムを設定し、where句の内容を記述します。

今回は、レシートデータという特性上、直近2年のデータでreceipt_idが重複していないかをチェックしました。DATETIME_SUBCURRENT_DATETIME等の関数も使えます。先ほどのスキャン結果のテーブルの内容を見るとわかりますが、内部ではSQLが実行されているので、割と何でも書けます。

data_quality_spec {
    sampling_percent = 100
    post_scan_actions {
      bigquery_export {
        results_table = "//bigquery.googleapis.com/${google_bigquery_dataset.dataplex.id}/tables/scan_result"
      }
    }
    row_filter = "updated_at >= DATETIME_SUB(CURRENT_DATETIME('Asia/Tokyo'), INTERVAL 2 YEAR)"
    rules {
      column    = "receipt_id"
      dimension = "UNIQUENESS"
      uniqueness_expectation {}
    }
  }

終わりに

今回は、UNIQUENESS(一意性) のチェックのみしか行なっていませんでしたが、そのほか外れ値や自分たちで書いたSQLも実行できるので、この辺をカスタムしていき、より多くの観点からデータの品質チェックを行おうと思っております。

WED Engineering Blog

Discussion