👽

TROCCOのdbt連携を使ってワークフローにデータテストを組み込む

2024/12/19に公開

はじめに

2024年9月に開催されたTROCCOのユーザー会「TROCCO UG」(現:pUG)で
yappli 山本さんの「私のdbt布教用資料 〜TROCCOUG Ver.〜」を聞いて
dbtを触ってみたいと思ってから3か月が経っていました。
年を越す前にやりたいことはやっておこうということでdbt連携を試してみました。

山本さんの登壇内容や当日の様子はこちらをご覧ください。

ユーザー会のみなさま、いつも行動のきっかけを与えてくださって大変ありがたいです。

dbtでのモデル作成とテスト実行

環境

  • Windows11
  • dbt-core 1.8.9
  • Google BigQuery

ソースの宣言

今回は Top Streamers on Twitch のデータセットを使用しました。
BigQueryにtop_streamers_on_twitchテーブルを作成してymlファイルでソースとして宣言します。

sources.yml
version: 2

sources:
  - name: dbt_trocco_integration # ソース名
    schema: dbt_trocco_integration  # データセット名
    tables:
      - name: top_streamers_on_twitch

dbt_project.yml の models-path に設定したディレクトリ models に ymlファイルを置き、
dbt_trocco_integration という名前で dbt_trocco_integration データセットの
top_streamers_on_twitch テーブルをソースとして宣言しています。

モデルの作成

以下のようなモデルを作成しました。

channel_language.sql
SELECT
  channel
  , language
FROM
  {{ source('dbt_trocco_integration', 'top_streamers_on_twitch') }}
channel_language.sql
SELECT
  channel
  , watch_time
  , stream_time
  , followers
  , partnered
FROM
  {{ source('dbt_trocco_integration', 'top_streamers_on_twitch') }}
WHERE
  partnered IS TRUE

sources関数を使ってsource('ソース名','テーブル名')でソースデータを参照しています。

partnered_top_streamers.sql
WITH partnered_channel AS (

  SELECT * FROM {{ ref('partnered_channel') }}

)

, channel_language AS (

  SELECT * FROM {{ ref('channel_language') }}

)

, partnered_top_streamers AS (
  SELECT
    partnered_channel.channel
    , partnered_channel.watch_time
    , partnered_channel.stream_time
    , partnered_channel.followers
    , partnered_channel.partnered
    , channel_language.language
  FROM
    partnered_channel
  LEFT JOIN
    channel_language
    ON
      partnered_channel.channel = channel_language.channel
)

SELECT * FROM partnered_top_streamers

ソースから作成した2つのモデルを ref('モデル名') で参照しています。

テストの実行

Singularテスト

存在してはいけないデータの有無を検証する Singularテスト を実行します。

always_failtest.sql
SELECT partnered
FROM {{ ref('partnered_top_streamers') }}
WHERE partnered = TRUE

dbt_project.yml の test-paths に設定したディレクトリ tests に
always_failtest.sql を作成して partnered カラムがTRUEのデータが
抽出されるテストを書きました。

Genericテスト

dbtでは規定の Genericテスト を利用することもできます。

schema.yml
version: 2

models:
  - name: partnered_top_streamers
    columns:
      - name: channel
        data_tests:
          - unique
          - not_null
          - relationships:
              to: ref('channel_language')
              field: channel

テスト対象のモデルが置いてあるディレクトリに schema.yml という名前でテストを設定しています。
モデル partnered_top_streamerschannel カラムに対して以下のテストを実行します。

  • uniquechannel のデータが全てユニークである
  • not_null : channel に NULL が含まれていない
  • relationshipschannel のデータが channel_language モデル の channel に含まれている

テストの実行結果

dbt testでテストを実行しました。

Running with dbt=1.8.9
Registered adapter: bigquery=1.8.3
Found 3 models, 1 test, 2 sources, 484 macros

Concurrency: 1 threads (target='dev')

1 of 1 START test always_failtest........................................... [RUN]
1 of 1 FAIL 978 always_failtest .............................. [FAIL 978 in 2.20s]

Finished running 1 test in 0 hours 0 minutes and 3.38 seconds (3.38s).

Completed with 1 error and 0 warnings:

Failure in test always_failtest (tests\always_failtest.sql)
Got 978 results, configured to fail if != 0

compiled code at target\compiled\dbt_trocco_integration_test\tests\always_failtest.sql

Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1

always_failtest に失敗しています。

TROCCOでのdbt連携

上記テストをTROOCOでスケジュール実行するためにまずはTROCCO側のdbt連携設定を行います。

Gitリポジトリ連携


リポジトリのURL、ブランチ名、dbtのバージョンを入力してdbtのGitリポジトリを連携させます。

上記の通り登録されました。

dbtジョブ設定

TROCCOでdbtジョブを設定します。

上で登録したGitリポジトリを選択します。
今回はテストを実行したいので実行コマンドにはdbt testを設定します。

ワークフローへの組み込み

ワークフローの作成

テストをスケジュール実行するためにワークフローを作成します。


データ集計を行うデータマートにdbtでテストを行うジョブを接続しています。

ワークフローはスケジュール設定機能で実行頻度(毎時・毎日・毎週・毎月)と実行タイミング(時刻や曜日、日付)を設定できます。
また、通知設定機能で終了時または失敗時のみに限定してSlackなどで通知を飛ばすことも可能です。

以上で設定は完了です。TROCCO側のdbt連携はあっという間にできました。

結果の確認


dbtで dbt test を実行した時と同様にテストが失敗してエラーが発生しています。
結果がSlackやメールで通知されるので早期の発見と対応ができるようになります。

おわりに

dbtを導入するメリットについておおまかには理解していましたがTROCCOとCOMETAを既に導入している場合、恩恵はそう大きくないのではないかと思っていました。
しかし、TROCCOとCOMETAのみでは得られない「データテストのやりやすさ」については実際にテストをdbtで設計してTROCCOでdbt連携試してみて、それだけでも十分導入する価値があると感じました。

※今回の実践では dbt入門 を大変参考にさせていただきました。

Discussion