Airflowで構築したワークフローを検証する
データ基盤の監視
データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、監視は言うまでもなく品質を担保するため重要な存在です。データ基盤監視の考え方についてこの2つの記事が紹介しています。
同じくSQLによるデータ基盤を監視しており、最も大きな違いは自作ツールかAirflowで検証することだけです。本文はAirflowで構築したワークフローの検証についてもう少し紹介したいと思います。まず、Data Pipelines Pocket Referenceではデータ基盤検証の原則が紹介されました。
Validate Early, Validate Often
要はできるだけ早く、できるだけ頻繁に検証するとのことです。ELTあるいはETL処理においては、Extract, Load, Transformそれぞれのステップが終了した直後に監視するのは最も理想的だと思います。
Transformはデータセットのコンテキストを把握しておかないと検証できないため、データセットごとに対応していく必要があります。ExtractとLoadはnon-contextで汎用的な検証ができるため、データ基盤構築の序盤からやっておいた方が安心だと思います。
Extractの検証
ストレージサービス(例えばGCS)をデータレイクにする場合、データソースからデータレイクにデータがちゃんとレプリケートされたかを検証するためにAirflowのairflow.providers.google.cloud.sensors.gcs
を利用すると簡単にできます。
一つのファイルをチェックする場合はほとんどないと思うので、GCSObjectExistenceSensor
よりGCSObjectsWithPrefixExistenceSensor
の方がもっと実用的でしょう。下記のタスクをExtractと次の処理の間に挟むと、障害の早期発見が期待できます。なお、Extract時点で既に障害が起きている場合はほとんどデータソース側の処理が失敗しているので、アプリケーション側と連携して作業する必要があります。
check_extract = GCSObjectsWithPrefixExistenceSensor(
task_id="check_extract",
bucket="{YOUR_BUCKET}",
prefix="{YOUR_PREFIX}",
)
Loadの検証
ELTとETL処理、Loadするタイミングは異なりますが、検証の方法(データがデータウェアハウスあるいはデータマートにロードされたか)は同じです。よく使われているデータウェアハウスサービスBigQueryだと、Airflowのairflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator
を利用できます。
下記のタスクをLoad処理の後ろに追加すれば良いです。
check_load = BigQueryCheckOperator(
task_id="check_load",
sql={YOUR_SQL},
use_legacy_sql=False,
params={
"bq_suffix": Variable.get("bq_suffix"),
"dataset": setting.bq.dataset,
"table": setting.bq.table_name,
},
location="asia-northeast1",
)
SQLは検証用のクエリとなっており、BigQueryのメタデータテーブルがよく用いられます。例えばこの記事が紹介されたクエリでテーブルが空になっているかを確認できます。
SELECT
total_rows
FROM
${dataset_id}.INFORMATION_SCHEMA.PARTITIONS
WHERE
table_name = '${table_name}'
ORDER BY
last_modified_time DESC
LIMIT 1;
GCPについて簡単に説明しましたが、AWSとAzureも似たようなことはできるはずです。
皆さんのワークフロー設計にご参考になれば幸いです。
Discussion