dbt を用いた Dataflow パイプラインのデータテスト
「Happy Elements Advent Calendar 2023」 12月2日の記事です
はじめに
Happy Elements 株式会社 カカリアスタジオ 分析チーム データエンジニア の K.M. です。
ログ連携基盤の構築・運用や、Looker でダッシュボードを作るなど、データの整備を行っております。
今年は、全てのゲームタイトルでログ連携基盤を新しく入れ替えることが出来ました。
使用するリソースが減ったので障害ポイントが減り、障害時の対応もやりやすくなりました。
さらにログ連携基盤の Dataflow ソースコードを更新した際に、ログが正しく連携されているかをテスト出来るようにしたので、そのデータテストについて紹介させていただきたいと思います!
ログ連携基盤について
アプリの行動ログがサーバーから出力され AWS CloudWatch Logs へ入ります。
CloudWatch Logs にはサブスクリプションフィルターが設定されおり、Kinesis Data Firehose へ送られ、S3 バケットに保存されます。
GCP Dataflow が定期的に実行され、S3 にあるログファイルを BigQuery へロードする、という構成になっています。
2022年のアドベントカレンダーに「ゲーム運営を支えるログ配信基盤について」の記事もありますのでそちらもご覧ください!
ビルド時にデータテストを追加
Dataflow のソースコードは Java で書かれています。
開発者がこのソースコードを更新しマージすると、自動的に Cloud Build が起動します。
ビルドプロセスの最後に、新たに作成された Docker イメージを使って Dataflow を実行、さらに dbt を用いたデータテストを追加しました。
このデータテストでエラーが発生した場合、ビルド全体がエラーとなります。
ビルドプロセスの詳細
- JAR ファイルのビルド
- Docker イメージのビルド
- メタデータファイル生成
- 成果物のアップロード
- 新 Docker イメージで Dataflow 実行
- dbt によるデータテスト
dbt のデータテスト
dbt は、SQL を用いたデータ変換を自動化するオープンソースのコマンドラインツールです。
このツールにはデータ品質テスト機能が含まれているため、この機能を活用します。
dbt の簡単な使い方
pip でインストール
dbt は Python パッケージとして提供されているため、pip を使用してインストールすることができます。
以下のコマンドでインストールします。
pip install dbt
dbt プロジェクトの作成
まず dbt プロジェクトを作成します。
dbt init <プロジェクト名>
初期ファイルとディレクトリが作成されます。
└── <プロジェクト名>
├── README.md
├── analyses
├── dbt_project.yml
├── macros
├── models
├── seeds
├── snapshots
└── tests
データベースの接続設定
データベースへの接続設定は、dbt init
コマンドを使用して ~/.dbt/profiles.yml
ファイルに記述します。
ただし、プロジェクトのルートディレクトリに profiles.yml
ファイルを設置すると、その設定が優先されます。
BigQuery にサービスアカウントを使って接続する profiles.yml
の例です。
my_bigquery_db:
target: dev
outputs:
dev:
type: bigquery
method: service-account
project: my-dev-gcp-project
dataset: my_dev_dataset
threads: 1
keyfile: /path/to/dev-service-account.json
prod:
type: bigquery
method: service-account
project: my-prod-gcp-project
dataset: my_prod_dataset
threads: 1
keyfile: /path/to/prod-service-account.json
モデルを定義
models
ディレクトリに SQL ファイルを作成します。
今回はデータテストの目的のみで使用するため、単純な SELECT 文を記述します。
table_01
という名前のテーブルを選択する SQL ファイルの例を示します。
SELECT
*
FROM
`{{ model.schema }}.table_01`
スキーマを定義するファイルを作成します。下記の例では、table_01
テーブルに、id
name
status
フィールドがあります。
version: 2
models:
- name: table_01
columns:
- name: id
- name: name
- name: status
dbt の汎用テスト
組み込みテスト
dbt にはすぐに使える4つの組み込みテストがあります。
関数名 | 用途 |
---|---|
unique | 値が一意であることを検証します |
not_null | 値がNULLでないことを検証します |
accepted_values | 値が設定したリストの値に含まれていることを検証します |
relationships | 特定のカラムの値が別のテーブルの特定のカラムに存在することを検証します |
dbt-utils や dbt-expectations のようなオープンソースパッケージには、さまざまな汎用テストが定義されています。
カスタムテスト
カスタムテストを作成することも可能です。
SQL ファイルで定義し、そのクエリが返す結果が0行であることを期待します。
つまり、テストクエリが行を返す場合、それはテストが失敗したことを意味します。
次の例は、テーブルのレコード数が指定された数であるかをテストします。
引数で渡された count
とレコード数が一致しない場合、テストは失敗します。
{% test row_count(model, count) %}
SELECT
COUNT(*) AS cnt
FROM
`{{ model }}`
HAVING NOT(cnt = {{ count }})
{% endtest %}
テストの設定
それでは、テストを設定していきます。
テストは、スキーマを定義している schema.yml
ファイルに設定します。
設定したテストケースの例
- table_01 テーブルのレコード数は100件であること
- id はユニークであること
- id は NULL でないこと
- status は
active
,inactive
,suspended
,deleted
のいずれかであること
version: 2
models:
- name: table_01
tests:
- row_count:
count: 100
columns:
- name: id
description: id field check.
tests:
- unique
- not_null
- name: name
- name: status
description: status field check.
tests:
- accepted_values:
values: ['active', 'inactive', 'suspended', 'deleted']
dbt test コマンド
dbt test
コマンドでテストが実行されます。
これを Cloud Build 上で実行し、テストに1つでも失敗した場合ビルドがエラーとなります。
$ dbt test
07:26:51 Found 1 model, 4 tests, 0 snapshots, 0 analyses, 358 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
07:26:51
07:26:52 Concurrency: 1 threads (target='dev')
07:26:52
07:26:53 1 of 4 START test unique_table_01_id ............................ [RUN]
07:26:53 1 of 4 PASS unique_table_01_id .................................. [PASS in 1.30s]
07:26:54 2 of 4 START test not_null_table_01_id .......................... [RUN]
07:26:54 2 of 4 PASS not_null_table_01_id ................................ [PASS in 1.30s]
07:26:55 3 of 4 START test accepted_values_table_01_status ............... [RUN]
07:26:55 3 of 4 PASS accepted_values_table_01_status ..................... [PASS in 1.30s]
07:26:56 4 of 4 START test row_count_table_01_100 ........................ [RUN]
07:26:56 4 of 4 PASS row_count_table_01_100 .............................. [PASS in 1.30s]
07:26:57
07:26:57 Finished running 4 tests in 0 hours 0 minutes and 6.30 seconds (6.30s).
07:26:57
07:26:57 Completed successfully
07:26:57
07:26:57 Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4
まとめ
Dataflow のソースコード更新後に問題がないかを dbt を使ってテストする方法について紹介しました。
Apache Beam SDK をアップデートした後、通常5分で終わっていた Dataflow のジョブが45分もかかるという問題に遭遇しましたが、テストをしっかり準備していたことで問題を早期発見し、高速にイテレーションを回すことができました。
今回ご紹介したような取り組みでアップデート後の動作確認をより効率的に進めることができ、本番環境への反映も安心して行えるようになりました!
Discussion