TROCCOのdbt連携を使ってワークフローにデータテストを組み込む
はじめに
2024年9月に開催されたTROCCOのユーザー会「TROCCO UG」(現:pUG)で
yappli 山本さんの「私のdbt布教用資料 〜TROCCOUG Ver.〜」を聞いて
dbtを触ってみたいと思ってから3か月が経っていました。
年を越す前にやりたいことはやっておこうということでdbt連携を試してみました。
山本さんの登壇内容や当日の様子はこちらをご覧ください。
ユーザー会のみなさま、いつも行動のきっかけを与えてくださって大変ありがたいです。
dbtでのモデル作成とテスト実行
環境
- Windows11
- dbt-core 1.8.9
- Google BigQuery
ソースの宣言
今回は Top Streamers on Twitch のデータセットを使用しました。
BigQueryにtop_streamers_on_twitchテーブルを作成してymlファイルでソースとして宣言します。
version: 2
sources:
- name: dbt_trocco_integration # ソース名
schema: dbt_trocco_integration # データセット名
tables:
- name: top_streamers_on_twitch
dbt_project.yml の models-path に設定したディレクトリ models に ymlファイルを置き、
dbt_trocco_integration
という名前で dbt_trocco_integration
データセットの
top_streamers_on_twitch
テーブルをソースとして宣言しています。
モデルの作成
以下のようなモデルを作成しました。
SELECT
channel
, language
FROM
{{ source('dbt_trocco_integration', 'top_streamers_on_twitch') }}
SELECT
channel
, watch_time
, stream_time
, followers
, partnered
FROM
{{ source('dbt_trocco_integration', 'top_streamers_on_twitch') }}
WHERE
partnered IS TRUE
sources関数を使ってsource('ソース名','テーブル名')
でソースデータを参照しています。
WITH partnered_channel AS (
SELECT * FROM {{ ref('partnered_channel') }}
)
, channel_language AS (
SELECT * FROM {{ ref('channel_language') }}
)
, partnered_top_streamers AS (
SELECT
partnered_channel.channel
, partnered_channel.watch_time
, partnered_channel.stream_time
, partnered_channel.followers
, partnered_channel.partnered
, channel_language.language
FROM
partnered_channel
LEFT JOIN
channel_language
ON
partnered_channel.channel = channel_language.channel
)
SELECT * FROM partnered_top_streamers
ソースから作成した2つのモデルを ref('モデル名')
で参照しています。
テストの実行
Singularテスト
存在してはいけないデータの有無を検証する Singularテスト を実行します。
SELECT partnered
FROM {{ ref('partnered_top_streamers') }}
WHERE partnered = TRUE
dbt_project.yml の test-paths に設定したディレクトリ tests に
always_failtest.sql を作成して partnered
カラムがTRUEのデータが
抽出されるテストを書きました。
Genericテスト
dbtでは規定の Genericテスト を利用することもできます。
version: 2
models:
- name: partnered_top_streamers
columns:
- name: channel
data_tests:
- unique
- not_null
- relationships:
to: ref('channel_language')
field: channel
テスト対象のモデルが置いてあるディレクトリに schema.yml という名前でテストを設定しています。
モデル partnered_top_streamers
の channel
カラムに対して以下のテストを実行します。
-
unique
:channel
のデータが全てユニークである -
not_null
:channel
に NULL が含まれていない -
relationships
:channel
のデータがchannel_language
モデル のchannel
に含まれている
テストの実行結果
dbt test
でテストを実行しました。
Running with dbt=1.8.9
Registered adapter: bigquery=1.8.3
Found 3 models, 1 test, 2 sources, 484 macros
Concurrency: 1 threads (target='dev')
1 of 1 START test always_failtest........................................... [RUN]
1 of 1 FAIL 978 always_failtest .............................. [FAIL 978 in 2.20s]
Finished running 1 test in 0 hours 0 minutes and 3.38 seconds (3.38s).
Completed with 1 error and 0 warnings:
Failure in test always_failtest (tests\always_failtest.sql)
Got 978 results, configured to fail if != 0
compiled code at target\compiled\dbt_trocco_integration_test\tests\always_failtest.sql
Done. PASS=0 WARN=0 ERROR=1 SKIP=0 TOTAL=1
always_failtest に失敗しています。
TROCCOでのdbt連携
上記テストをTROOCOでスケジュール実行するためにまずはTROCCO側のdbt連携設定を行います。
Gitリポジトリ連携
リポジトリのURL、ブランチ名、dbtのバージョンを入力してdbtのGitリポジトリを連携させます。
上記の通り登録されました。
dbtジョブ設定
TROCCOでdbtジョブを設定します。
上で登録したGitリポジトリを選択します。
今回はテストを実行したいので実行コマンドにはdbt test
を設定します。
ワークフローへの組み込み
ワークフローの作成
テストをスケジュール実行するためにワークフローを作成します。
データ集計を行うデータマートにdbtでテストを行うジョブを接続しています。
ワークフローはスケジュール設定機能で実行頻度(毎時・毎日・毎週・毎月)と実行タイミング(時刻や曜日、日付)を設定できます。
また、通知設定機能で終了時または失敗時のみに限定してSlackなどで通知を飛ばすことも可能です。
以上で設定は完了です。TROCCO側のdbt連携はあっという間にできました。
結果の確認
dbtで dbt test
を実行した時と同様にテストが失敗してエラーが発生しています。
結果がSlackやメールで通知されるので早期の発見と対応ができるようになります。
おわりに
dbtを導入するメリットについておおまかには理解していましたがTROCCOとCOMETAを既に導入している場合、恩恵はそう大きくないのではないかと思っていました。
しかし、TROCCOとCOMETAのみでは得られない「データテストのやりやすさ」については実際にテストをdbtで設計してTROCCOでdbt連携試してみて、それだけでも十分導入する価値があると感じました。
※今回の実践では dbt入門 を大変参考にさせていただきました。
Discussion