Open12

【dbt】GitLab Data Team

YuichiYuichi

わざわざ、監査情報?をクエリに追記している
https://gitlab.com/gitlab-data/analytics/-/blob/master/transform/snowflake-dbt/macros/utils/dbt_audit.sql

~
{{ dbt_audit(
    cte_ref="final",
    created_by="@michellecooper",
    updated_by="@michellecooper",
    created_date="2021-06-21",
    updated_date="2021-06-21",
    ) 
 }}

{%- macro dbt_audit(cte_ref, created_by, updated_by, created_date, updated_date) -%}

    SELECT
      *,
      '{{ created_by }}'::VARCHAR       AS created_by,
      '{{ updated_by }}'::VARCHAR       AS updated_by,
      '{{ created_date }}'::DATE        AS model_created_date,
      '{{ updated_date }}'::DATE        AS model_updated_date,
      CURRENT_TIMESTAMP()               AS dbt_updated_at,

    {% if execute %}

        {% if not flags.FULL_REFRESH and config.get('materialized') == "incremental" %}

            {%- set source_relation = adapter.get_relation(
                database=target.database,
                schema=this.schema,
                identifier=this.table,
                ) -%}

            {% if source_relation != None %}

                {% set min_created_date %}
                    SELECT LEAST(MIN(dbt_created_at), CURRENT_TIMESTAMP()) AS min_ts
                    FROM {{ this }}
                {% endset %}

                {% set results = run_query(min_created_date) %}

                '{{results.columns[0].values()[0]}}'::TIMESTAMP AS dbt_created_at

            {% else %}

                CURRENT_TIMESTAMP()               AS dbt_created_at

            {% endif %}

        {% else %}

            CURRENT_TIMESTAMP()               AS dbt_created_at

        {% endif %}
    {% endif %}

    FROM {{ cte_ref }}

{%- endmacro -%}

{% docs dbt_audit %}
Used to append audit columns to a model.
This model assumes that the final statement in your model is a SELECT * from a CTE. The final SQL will still be a SELECT * just with 6 additional columns added to it. Further SQL DML can be added after the macro call, such as ORDER BY and GROUP BY.
There are two internally calculated date values based on when the table is created and, for an incremental model, when data was inserted.

WITH my_cte AS (...)
{% raw %}
{{ dbt_audit(
cte_ref="my_cte",
created_by="@gitlab_user1",
updated_by="@gitlab_user2",
created_date="2019-02-12",
updated_date="2020-08-20"
) }}
{% endraw %}
ORDER BY updated_at

{% enddocs %}

{% docs dbt_audit %}
モデルに監査用のカラムを追加するために使用します。
このモデルは、あなたのモデル内で最後のステートメントが「CTE からの SELECT *」であることを前提としています。
最終的な SQL は SELECT * のままですが、そこに追加で6つの監査カラムが付与されます。
このマクロ呼び出しの後には、ORDER BY や GROUP BY といった追加の SQL DML を記述することも可能です。

作成されるテーブルに基づいて内部的に計算される2つの日付値があります。
ひとつはテーブルが作成されたとき、もうひとつはインクリメンタルモデルにおいてデータが挿入されたときの日付です。

WITH my_cte AS (...)
{% raw %}
{{ dbt_audit(
cte_ref="my_cte",
created_by="@gitlab_user1",
updated_by="@gitlab_user2",
created_date="2019-02-12",
updated_date="2020-08-20"
) }}
{% endraw %}
ORDER BY updated_at

{% enddocs %}

📊 追加されるカラム一覧

  1. created_by
    • マクロ呼び出し時の引数で渡された値
    • 型: VARCHAR
  2. updated_by
    • マクロ呼び出し時の引数で渡された値
    • 型: VARCHAR
  3. model_created_date
    • マクロ呼び出し時の引数 created_date
    • 型: DATE
  4. model_updated_date
    • マクロ呼び出し時の引数 updated_date
    • 型: DATE
  5. dbt_updated_at
    • マクロ実行時の CURRENT_TIMESTAMP()
    • 型: TIMESTAMP
    • モデルが更新された日時を示す
  6. dbt_created_at
    • incremental モデルかつ FULL_REFRESH ではない場合に条件分岐
      • 既存テーブルがある場合 → MIN(dbt_created_at)CURRENT_TIMESTAMP() の小さい方
      • 既存テーブルがない場合 → CURRENT_TIMESTAMP()
    • それ以外(FULL_REFRESH / non-incremental)の場合 → CURRENT_TIMESTAMP()
    • 型: TIMESTAMP
YuichiYuichi

日付ディメンションもりもり
https://gitlab.com/gitlab-data/analytics/-/blob/master/transform/snowflake-dbt/models/sources/date/date_details_source.sql

WITH date_spine AS (

  {{ dbt_utils.date_spine(
      start_date="to_date('11/01/2009', 'mm/dd/yyyy')",
      datepart="day",
      end_date="dateadd(year, 40, current_date)"
     )
  }}

),
~

日付ディメンションと繋げる時に日付をID化するマクロ
https://gitlab.com/gitlab-data/analytics/-/blob/master/transform/snowflake-dbt/macros/utils/get_date_id.sql
BigQueryバージョン

macros/utils/get_date_id.sql
{% macro get_date_id(column) -%}
  CAST(FORMAT_DATE('%Y%m%d',{{ column }}) AS INT64)
{%- endmacro %}
YuichiYuichi

複数のCTEを一度に定義しているマクロ
https://gitlab.com/gitlab-data/analytics/-/blob/master/transform/snowflake-dbt/macros/utils/simple_cte.sql
うちのコードスタイルにあわせる

macros/utils/simple_cte.sql
{% macro simple_cte(tuple_list) -%}
WITH
{% for cte_ref in tuple_list %}{{cte_ref[0]}} AS (
  SELECT * 
  FROM {{ ref(cte_ref[1]) }}
)

{% if not loop.last -%},{%- endif -%}
{%- endfor -%}
{%- endmacro -%}

使い方


{{ simple_cte([
    ('geozones', 'prep_geozone'),
    ('location_factor', 'prep_location_factor'),
    ('director_factors','director_location_factor_seed_source')
])}}
, geozone_titles AS (
~
YuichiYuichi

増分更新は割とこんな感じ

    {% if is_incremental() %}

    WHERE behavior_at > (SELECT MAX(max_timestamp) FROM {{this}})

    {% endif %}

    {{ dbt_utils.group_by(n=10) }}


YuichiYuichi

dbt_utils

  FROM actuals
  {{ dbt_utils.group_by(n=7) }}
 data_tests:
      - dbt_utils.unique_combination_of_columns:
          combination_of_columns:
  SELECT

    {{ dbt_utils.generate_surrogate_key([
      'base.dim_crm_current_account_set_hierarchy_sk',
      'base.close_fiscal_quarter_name',
      'base.snapshot_date',
      'base.sales_qualified_source_live',
      'base.sales_qualified_source_grouped_live',
      'base.order_type_live',
      'base.order_type_grouped_live'])
    }}                                                                                                     AS rpt_pipeline_coverage_daily_pk,


   data_tests:
      - dbt_utils.expression_is_true:
          expression: "installation_creation_date > '2000-01-01' OR dim_installation_id = 'b6a2b5c51f1e36f6fcb8568319633808'"
     data_tests:
          - dbt_utils.accepted_range:
              min_value: 0
    data_tests:
          - dbt_utils.at_least_one
      - name: projects_imported_all_time_event
    data_tests:
          - dbt_utils.at_least_one
      - name: projects_imported_all_time_event
  - name: dim_locality
    description: '{{ doc("dim_locality") }}'
    data_tests:
    - dbt_utils.unique_combination_of_columns:
        combination_of_columns:
          - locality
          - valid_from
YuichiYuichi

https://gitlab.com/gitlab-data/analytics/-/blob/master/transform/snowflake-dbt/macros/utils/macros.md?plain=1
マークダウンの中でsqlを書くのは問題ないがjinja sqlは{% raw %}{% endraw %}で挟む必要あり

```sql
{% raw %}
{{ simple_cte([
    ('map_merged_crm_account','map_merged_crm_account'),
    ('zuora_account','zuora_account_source'),
    ('zuora_contact','zuora_contact_source')
]) }}

, excluded_accounts AS (

    SELECT DISTINCT
      account_id
    FROM {{ref('zuora_excluded_accounts')}}

)
{% endraw %}
WITH map_merged_crm_account AS (

    SELECT * 
    FROM "PROD".common.map_merged_crm_account

), zuora_account AS (

    SELECT * 
    FROM "PREP".zuora.zuora_account_source

), zuora_contact AS (

    SELECT * 
    FROM "PREP".zuora.zuora_contact_source

)

, excluded_accounts AS (

    SELECT DISTINCT
      account_id
    FROM "PROD".legacy.zuora_excluded_accounts

)
YuichiYuichi

https://gitlab.com/gitlab-data/analytics/-/blob/master/transform/snowflake-dbt/models/common/dimensions_shared/dim_crm_user_daily_snapshot.sql
dbt_utils.star() 関数は、fromにRelationオブジェクトを渡すと、そのテーブルのカラムを列挙してくれる関数になります。

  • except(~~)でもじつげんできるが、*使わない制約がチームである場合dbt_utils.starを使う
    SELECT 
      {{ dbt_utils.star(
           from=ref('prep_crm_user_daily_snapshot'), 
           except=['CREATED_BY','UPDATED_BY','MODEL_CREATED_DATE','MODEL_UPDATED_DATE','DBT_UPDATED_AT','DBT_CREATED_AT']
           ) 
      }}
    FROM {{ ref('prep_crm_user_daily_snapshot') }}

増分更新はシンプル

ケースによって>=,>だったりまちまち
WHERE snapshot_date >= (SELECT MAX(snapshot_date) FROM {{this}})
WHERE snapshot_date > (SELECT MAX(snapshot_date) FROM {{this}})


{{ config({
        "materialized": "incremental",
        "unique_key": "crm_user_snapshot_id",
    })
}}
WITH final AS (
    SELECT 
      {{ dbt_utils.star(
           from=ref('prep_crm_user_daily_snapshot'), 
           except=['CREATED_BY','UPDATED_BY','MODEL_CREATED_DATE','MODEL_UPDATED_DATE','DBT_UPDATED_AT','DBT_CREATED_AT']
           ) 
      }}
    FROM {{ ref('prep_crm_user_daily_snapshot') }}
    {% if is_incremental() %}
    WHERE snapshot_date >= (SELECT MAX(snapshot_date) FROM {{this}})
    {% endif %}
)
{{ dbt_audit(
    cte_ref="final",
    created_by="@michellecooper",
    updated_by="@jonglee1218",
    created_date="2022-01-20",
    updated_date="2025-02-06"
) }}
YuichiYuichi

https://handbook.gitlab.com/handbook/enterprise-data/platform/dbt-guide/

dbt Macros: 命名規則と構造

dbt プロジェクトにおけるマクロの作成と管理には、一貫した命名規則と構造に従うことが推奨されます。

命名規則

  • ファイル名とマクロ名の一致: マクロを定義するファイルの名前は、そのファイル内で定義されているマクロの名前と一致させる必要があります。例えば、calculate_discount.sql というファイルには、calculate_discount という名前のマクロを定義します。

構造

マクロ関連ファイルは、機能やカテゴリごとにフォルダに格納することが推奨されます。各フォルダには、以下のファイルを含めることができます。

  • 格納しているフォルダ名: case_when_entries(例)のように、関連するマクロの機能を反映した名前を付けます。
  • ドキュメントファイル: _{フォルダ名}__docs.md の形式で、フォルダ内のマクロ全体に関する説明や、個々のマクロの詳細な使用例などを記述します。例:_case_when_entries__docs.md
  • 設定ファイル: _{フォルダ名}__macros.yml の形式で、フォルダ内のマクロの引数や戻り値などのメタデータを記述します。例:_case_when_entries__macros.yml
  • 引数の記述: マクロが受け取る入力変数(引数)については、対応する _case_when_entries__macros.yml ファイルの arguments プロパティを使用して、名前と説明を記述します。これにより、マクロの利用者がその使い方を理解しやすくなります。

dbt-utils

私たちの dbt プロジェクトでは、dbt-utils パッケージを活用しています。このパッケージは、一般的によく利用される便利なマクロを多数提供しており、特に重要なものは以下の通りです。

  • group_by(n): このマクロは、指定された数値 n までのフィールド(通常はモデル内のカラム位置)に対する GROUP BY ステートメントを生成します。

    • 例:{{ dbt_utils.group_by(3) }} は、最初の 3 つのカラム (1, 2, 3) を指定した GROUP BY 句を生成します。
  • surrogate_key(field_list): このマクロは、指定されたフィールド名のリスト (field_list) の値を連結し、そのハッシュ値を生成することで一意のキーを作成します。

    • 複数のカラムの組み合わせに基づいて一意の ID を生成する際に役立ちます。
    • 例:{{ dbt_utils.surrogate_key(['order_id', 'customer_id']) }} は、order_idcustomer_id の値を組み合わせて生成された一意のハッシュ値を返します。

これらの規則と dbt-utils のマクロを効果的に活用することで、dbt プロジェクトの保守性と可読性を向上させることができます。

YuichiYuichi

https://gitlab.com/gitlab-data/analytics/-/blob/master/.gitlab-ci.yml?ref_type=heads

この .gitlab-ci.yml は、Snowflake と Python を中心に データ基盤のCI/CDパイプラインを構築している設定です。目的は「Snowflakeクローン環境の準備 → Pythonコードのチェックとテスト → dbtの実行とDocs生成 → 通知・triage」といった一連のフローを自動化することです。ポイントごとに解説します。


グローバル設定

  • PYTHONPATH をカスタムディレクトリに指定。Extract層や shared_modules を含め、CI環境内でモジュールをインポート可能にしている。
  • BRANCH_NAME$CI_COMMIT_REF_NAME から設定。Snowflakeクローンジョブなどの条件分岐で利用。

Workflow (パイプラインの発火条件)

GitLab CI の workflow: rules を利用し、次を実現:

  • MRが開いているブランチでPush → pipelineは発火しない
  • Merge request event (MR作成/更新) → 実行される
  • Open MRがある場合のpush → 実行しない
  • 通常のbranch push (open MRがなければ) → 実行

つまり MR主導で動作させる想定。


ステージ構成

ステージは開発フローに沿って以下の順番で実行される:

  1. ❄️ Snowflake クローン作成用
  2. 🚂 Extract データ抽出層 (外部include)
  3. ⚙️ dbt Run Snowflakeでの変換処理 (dbtモデル実行)
  4. 🛠 dbt Misc その他のdbt関連処理
  5. 📚 dbt Docs dbtドキュメント生成 & GitLab Pages公開
  6. 🛑🐍 Python Critical (ブラックフォーマッタ、mypy などの必須チェック)
  7. 🐍 Python (flake8, pylint, complexity, vulture, pytestなど)
  8. 🛑 Snowflake Stop MR用に作ったSnowflakeクローンを削除
  9. notify 通知系
  10. triage / triage run GitLab Issue triage の自動実行

Snowflake Database Clones

MR環境で安全に検証できるように、SnowflakeのDBをクローン:

  • .snowflake_clone_template: 共通テンプレ。registry.gitlab.com/gitlab-data/data-image/data-image:v2.0.12 イメージ利用。
  • 📈clone_prod : prod, prep, raw を empty clone
  • 📈❗️clone_prod_real: prod, prep を force clone
  • 📈⚙clone_prep_specific_schema: prep の特定スキーマだけ
  • 🥩📜clone_raw_sheetload など生rawスキーマ系クローン
  • clone_stop : review 環境と紐づけられた Snowflake クローン削除(CI環境終了時に大事なガーベジ回収ジョブ)

Python コードチェック

重大度に応じてステージを分けているのが特徴:

Critical (必須チェック - fail時にCI停止)

  • ⚫python_black : black整形チェック
  • ✏️python_mypy : 型チェック

Non-critical (allow_failure: true)

  • 🌽python_flake8: コーディング規約
  • 🗒️python_pylint: lint
  • 🤔python_complexity: xenonで複雑度チェック
  • 🦅python_vulture: 未使用コード検出
  • ✅python_pytest: Pytest 実行 (Junit + coverage出力, 依存する GCP サービスアカウント認証に base64 エンコード利用)

Snowflake Permissions Validator

permissions/snowflake/roles.yml 等が更新された場合に YAML を pyyamlsafe_load で検証。単純だが堅牢。


dbt Docs

  • dbt docs を生成して GitLab Pages へデプロイ
  • dbt deps & dbt docs generate 実行。
  • doc生成物を public/ に格納 → Pages 公開。
  • masterブランチ + $DEPLOY_DBT_PAGES がある時のみ動作。

Triage (GitLab Issues管理)

  • gitlab-triage を使い、IssueやMR整理を自動化。
  • dry-run:triage : 手動実行用の dry run
  • policy:run-triage : master上での手動実行
  • schedule:run-triage : $RUN_GITLAB_TRIAGE があるとスケジュール実行

全体設計の意図

  • MR駆動でSnowflakeクローンを作成 → dbt実行・Pythonチェック → dbt docsデプロイ → マージ後にクローン削除
  • Pythonチェックを critical と non-critical に分離し、最低限の品質を強制
  • Snowflake権限・YAML Validation・CI安全ガードなど、データチーム特化CI/CDベストプラクティスが網羅

この構成は「AnalyticsチームのGitLab CI/CDのテンプレ」として完成度がかなり高く、Snowflake+dbt+Python の実務ワークフローに最適化されています。