🛂

dbt を用いた Dataflow パイプラインのデータテスト

「Happy Elements Advent Calendar 2023」 12月2日の記事です

はじめに

Happy Elements 株式会社 カカリアスタジオ 分析チーム データエンジニア の K.M. です。

ログ連携基盤の構築・運用や、Looker でダッシュボードを作るなど、データの整備を行っております。
今年は、全てのゲームタイトルでログ連携基盤を新しく入れ替えることが出来ました。
使用するリソースが減ったので障害ポイントが減り、障害時の対応もやりやすくなりました。
さらにログ連携基盤の Dataflow ソースコードを更新した際に、ログが正しく連携されているかをテスト出来るようにしたので、そのデータテストについて紹介させていただきたいと思います!

ログ連携基盤について

アプリの行動ログがサーバーから出力され AWS CloudWatch Logs へ入ります。
CloudWatch Logs にはサブスクリプションフィルターが設定されおり、Kinesis Data Firehose へ送られ、S3 バケットに保存されます。
GCP Dataflow が定期的に実行され、S3 にあるログファイルを BigQuery へロードする、という構成になっています。
ログ連携基盤構成図

2022年のアドベントカレンダーに「ゲーム運営を支えるログ配信基盤について」の記事もありますのでそちらもご覧ください!

ビルド時にデータテストを追加

Dataflow のソースコードは Java で書かれています。
開発者がこのソースコードを更新しマージすると、自動的に Cloud Build が起動します。
ビルドプロセスの最後に、新たに作成された Docker イメージを使って Dataflow を実行、さらに dbt を用いたデータテストを追加しました。
このデータテストでエラーが発生した場合、ビルド全体がエラーとなります。

ビルドプロセスの詳細

  1. JAR ファイルのビルド
  2. Docker イメージのビルド
  3. メタデータファイル生成
  4. 成果物のアップロード
  5. 新 Docker イメージで Dataflow 実行
  6. dbt によるデータテスト

dbt のデータテスト

dbt は、SQL を用いたデータ変換を自動化するオープンソースのコマンドラインツールです。
このツールにはデータ品質テスト機能が含まれているため、この機能を活用します。

dbt の簡単な使い方

pip でインストール

dbt は Python パッケージとして提供されているため、pip を使用してインストールすることができます。
以下のコマンドでインストールします。

pip install dbt

dbt プロジェクトの作成

まず dbt プロジェクトを作成します。

dbt init <プロジェクト名>

初期ファイルとディレクトリが作成されます。

└── <プロジェクト名>
    ├── README.md
    ├── analyses
    ├── dbt_project.yml
    ├── macros
    ├── models
    ├── seeds
    ├── snapshots
    └── tests

データベースの接続設定

データベースへの接続設定は、dbt init コマンドを使用して ~/.dbt/profiles.yml ファイルに記述します。
ただし、プロジェクトのルートディレクトリに profiles.yml ファイルを設置すると、その設定が優先されます。

BigQuery にサービスアカウントを使って接続する profiles.yml の例です。

profiles.yml
my_bigquery_db:
  target: dev
  outputs:
    dev:
      type: bigquery
      method: service-account
      project: my-dev-gcp-project
      dataset: my_dev_dataset
      threads: 1
      keyfile: /path/to/dev-service-account.json
    prod:
      type: bigquery
      method: service-account
      project: my-prod-gcp-project
      dataset: my_prod_dataset
      threads: 1
      keyfile: /path/to/prod-service-account.json

モデルを定義

models ディレクトリに SQL ファイルを作成します。
今回はデータテストの目的のみで使用するため、単純な SELECT 文を記述します。

table_01 という名前のテーブルを選択する SQL ファイルの例を示します。

models/table_01/table_01.sql
SELECT 
  *
FROM 
  `{{ model.schema }}.table_01`

スキーマを定義するファイルを作成します。下記の例では、table_01 テーブルに、id name status フィールドがあります。

models/table_01/schema.yml
version: 2

models:
  - name: table_01
    columns: 
      - name: id
      - name: name
      - name: status

dbt の汎用テスト

組み込みテスト

dbt にはすぐに使える4つの組み込みテストがあります。

関数名 用途
unique 値が一意であることを検証します
not_null 値がNULLでないことを検証します
accepted_values 値が設定したリストの値に含まれていることを検証します
relationships 特定のカラムの値が別のテーブルの特定のカラムに存在することを検証します

dbt-utilsdbt-expectations のようなオープンソースパッケージには、さまざまな汎用テストが定義されています。

カスタムテスト

カスタムテストを作成することも可能です。
SQL ファイルで定義し、そのクエリが返す結果が0行であることを期待します。
つまり、テストクエリが行を返す場合、それはテストが失敗したことを意味します。

次の例は、テーブルのレコード数が指定された数であるかをテストします。
引数で渡された count とレコード数が一致しない場合、テストは失敗します。

tests/generic/test_row_count.sql
{% test row_count(model, count) %}

SELECT 
  COUNT(*) AS cnt
FROM 
  `{{ model }}`
HAVING NOT(cnt = {{ count }}) 

{% endtest %}

テストの設定

それでは、テストを設定していきます。
テストは、スキーマを定義している schema.yml ファイルに設定します。

設定したテストケースの例

  • table_01 テーブルのレコード数は100件であること
  • id はユニークであること
  • id は NULL でないこと
  • status は active, inactive, suspended, deleted のいずれかであること
models/table_01/schema.yml
version: 2

models:
  - name: table_01
    tests:
      - row_count: 
          count: 100
    columns: 
      - name: id
        description: id field check.
        tests:
          - unique
          - not_null
      - name: name
      - name: status
        description: status field check.
        tests:
          - accepted_values:
              values: ['active', 'inactive', 'suspended', 'deleted']

dbt test コマンド

dbt test コマンドでテストが実行されます。
これを Cloud Build 上で実行し、テストに1つでも失敗した場合ビルドがエラーとなります。

$ dbt test
07:26:51  Found 1 model, 4 tests, 0 snapshots, 0 analyses, 358 macros, 0 operations, 0 seed files, 0 sources, 0 exposures, 0 metrics, 0 groups
07:26:51  
07:26:52  Concurrency: 1 threads (target='dev')
07:26:52  
07:26:53  1 of 4 START test unique_table_01_id ............................ [RUN]
07:26:53  1 of 4 PASS unique_table_01_id .................................. [PASS in 1.30s]
07:26:54  2 of 4 START test not_null_table_01_id .......................... [RUN]
07:26:54  2 of 4 PASS not_null_table_01_id ................................ [PASS in 1.30s]
07:26:55  3 of 4 START test accepted_values_table_01_status ............... [RUN]
07:26:55  3 of 4 PASS accepted_values_table_01_status ..................... [PASS in 1.30s]
07:26:56  4 of 4 START test row_count_table_01_100 ........................ [RUN]
07:26:56  4 of 4 PASS row_count_table_01_100 .............................. [PASS in 1.30s]
07:26:57  
07:26:57  Finished running 4 tests in 0 hours 0 minutes and 6.30 seconds (6.30s).
07:26:57  
07:26:57  Completed successfully
07:26:57  
07:26:57  Done. PASS=4 WARN=0 ERROR=0 SKIP=0 TOTAL=4

まとめ

Dataflow のソースコード更新後に問題がないかを dbt を使ってテストする方法について紹介しました。
Apache Beam SDK をアップデートした後、通常5分で終わっていた Dataflow のジョブが45分もかかるという問題に遭遇しましたが、テストをしっかり準備していたことで問題を早期発見し、高速にイテレーションを回すことができました。
今回ご紹介したような取り組みでアップデート後の動作確認をより効率的に進めることができ、本番環境への反映も安心して行えるようになりました!

GitHubで編集を提案
Happy Elements

Discussion