🫥

dbtで作成したアドホックなモデルをGitHubからもSnowflakeからも全部消す 〜Snowflake Notebooksを使って〜

2024/12/31に公開

こんにちは。harry(@gappy50)です。

みなさんは、dbtで作成した不要になったモデルをどのように削除していますか?

  • 自分で作ったモデルを誰が使ってもらえなくなって、いきなり消してしまったら問題になった
  • 問題にならないように調整するだけで数日溶けた
  • それが面倒なので削除しなかったら、いつの間にかめちゃくちゃ大事な意思決定に使われ始めていて嫌な予感がした
  • そもそも諦めた

などなど。

今後、私の組織ではある程度事業側にオーナーシップをもってもらうことでアジリティを確保しながら、正しくお掃除ができるようなDevOpsを考えています。

https://zenn.dev/dely_jp/articles/2c1d3c42f3bbf6

今回は、上記を実現するためにdbtからSnowflakeに作ったデータモデルを何かしらのきっかけで(今回は作成後3ヶ月経過したら)、Snowflakeのテーブルやビューだけでなくdbtのモデルそのものを削除していきたいと思います。

全体の処理の流れ

以下のような流れを想定しています。

  • dbtのmacroにてpost_hookを実行
    • モデル作成後にdbtのyamlのmetaをSnowflakeのTagとしてアドホック用のテーブルへ設定
  • Snowflakeのタグ情報をにて集計し、アドホック用のテーブル作成後から30日超過した場合に以下の処理を実施
    • 対象モデルとyamlを削除するためのPRをGitHubへ出す
    • 対象テーブルをdrop
    • モデル作成者に対してSlackへ削除したことを通知

作成後30日経過したら否応なしに削除するというno mercyな感じではあるのですが、PRをMergeしなければ救える命でもありますし、ここらへんは組織の要件とかルールに準拠させればいいかなと思います。

AWS Lambdaだったり、ワークフローエンジン的なものを使っても良いと思うのですが、今回はSnowflakeの記事なのでSnowflake Notebooksで実装をしてみたいと思います。

dbtからSnowflakeへタグを付与する

まずは、dbtからSnowflakeへタグを付与するために、以下のようなmacroを作成します。

macros/set_snowflake_tags.sql
macros/set_snowflake_tags.sql
-- macros/set_snowflake_tags.sql
{% macro set_snowflake_tags() %}

{% set raw_meta = config.get('meta', {}) %}

{% if raw_meta is string %}
  {% set meta_dict = {} %}
{% else %}
  {% set meta_dict = raw_meta %}
{% endif %}

{% set tags = meta_dict.get('tags', '') %}
{% set owner = meta_dict.get('owner', '') %}
{% set owner_email = meta_dict.get('owner_email', '') %}
{% set created_at = meta_dict.get('created_at', '') %}
{% set pii = meta_dict.get('pii', '') %}

{% if tags != '' %}
  ALTER TABLE {{ this }} SET TAG dbt_tags.tags="{{ tags }}";
{% endif %}

{% if owner != '' %}
  ALTER TABLE {{ this }} SET TAG dbt_tags.owner='{{ owner }}';
{% endif %}

{% if owner_email != '' %}
  ALTER TABLE {{ this }} SET TAG dbt_tags.owner_email='{{ owner_email }}';
{% endif %}

{% if created_at != '' %}
  ALTER TABLE {{ this }} SET TAG dbt_tags.CREATED_AT='{{ created_at }}';
{% endif %}

{% if pii != '' %}
  ALTER TABLE {{ this }} SET TAG dbt_tags.PII='{{ pii }}';
{% else %}
  ALTER TABLE {{ this }} SET TAG dbt_tags.PII='false';
{% endif %}

{% endmacro %}

事前にSnowflakeにはdbt_tagsスキーマを作成しており、そちらですべてのタグを管理するようにします。

また、今回は言及しないのですが、PIIタグとかdbtそのもののtagとかをSnowflakeへ書き込んでおくと、Snowflake側のテーブル情報もいい感じになったり、マスキングが捗ったりするので積極的に活用していきたいですね。

今回はmarts配下のテーブルに対してタグを付与したいので、モデル作成後にタグが付与されるよう以下のようにpost-hookを設定しておきます。

dbt_project.yml
dbt_project.yml
models:
  dbt_snowflake_sample:
    marts:
      +post-hook:
        - "{{ set_snowflake_tags() }}"

dbtのjobでbuildすると、無事テーブルのタグが付与されていることが確認できました。


SnowflakeのGUI上でもタグが確認できるようになる

SnowflakeからSlack/GitHubへのアクセスをできるようにする

具体的な処理を実装する前に、SnowflakeからSlack/GitHubへのアクセスができるように設定をしましょう。

Slack Appsおよび、GitHub Appsの作成方法は今回説明はしませんが、上記を実現するための最小限の権限の設定にしておくのがよいでしょう。Snowflakeからの外部アクセスは以下のドキュメントを参照してください。

https://docs.snowflake.com/ja/developer-guide/external-network-access/external-network-access-overview
https://docs.snowflake.com/en/user-guide/notifications/webhook-notifications

また、個人的にも外部アクセスに関しては色々と記事を書いてるのでよろしければそちらもご参照ください。

https://zenn.dev/dataheroes/articles/3debab3b527d0a
https://tech.dely.jp/entry/snowflake-external-network-access

Slackへのアクセス

Slackへのアクセスは2種類のアクセスを想定しています。

  • Slack APIを通して、メールアドレスからSlackのUserIdを取得
    • Slackでメンションをできるようにするため
  • SlackのIncoming Webhookを利用して対象のチャンネルへの通知を行う
    • Snowflake Notificationから通知を行う

まず、これらのための設定をSnowflakeで実施します。

実装例
-- Slack AppのBot User OAuth Tokenを安全な文字列として保存(以下はダミー)
create or replace secret dbt_tags.slack_bot_user_oauth_token
    type = generic_string
    secret_string = 'xoxb-**********-************-********************************'
;

-- SnowflakeからSlackへのネットワークアクセスを許可
create or replace network rule slack_bot_user_oauth_rule
    mode = egress
    type = host_port
    value_list = ('slack.com')
;

-- webhookのURLを安全な文字列として保存
create or replace secret dbt_tags.slack_webhook_url
    type = generic_string
    -- https://hooks.slack.com/services/ 以降の文字列を追加する
    secret_string = 'T**********/***********/************************'
;

-- 外部ネットワークアクセスを作成
create or replace external access integration slack_bot_user_api_access
    allowed_network_rules = (slack_bot_user_oauth_rule)
    allowed_authentication_secrets = (slack_bot_user_oauth_token)
    enabled = true
;

今回はSnowflake Notebooks上からSlack APIを呼び出したいのですがPythonからsecretへのアクセスができないため、今回はユーザー関数としてsecretを呼び出せるようにします。[1]

実装例
create or replace function get_slack_bot_user_generic_secret_string()
    returns string
    language python
    runtime_version = 3.8
    handler = 'get_generic_secret_string'
    external_access_integrations = (slack_bot_user_api_access)
    secrets = ('cred' = slack_bot_user_oauth_token)
as
$$
import _snowflake

def get_generic_secret_string():
  return _snowflake.get_generic_secret_string('cred')
$$;

これでSnowflakeからSlackへのアクセスをするための準備が完了しました。

GitHubへのアクセス

GitHubへのアクセスは以下の想定しています。

  • コンテンツ(対象のリポジトリ)の読み取り/書き込み
  • Pull Requestを作成

GitHubへのアクセスも以下のように設定をします。

実装例
-- GitHub Appsから払い出されてたPrivate keyのpemを安全な文字列として保存
create or replace secret dbt_tags.github_pem_secret
    type = generic_string
    secret_string = '-----BEGIN RSA PRIVATE KEY-----\n\n-----END RSA PRIVATE KEY-----'
;

create or replace network rule dbt_tags.github_rule
    mode = egress
    type = host_port
    value_list = ('github.com', 'api.github.com')
;

create or replace external access integration github_api_access
    allowed_network_rules = (dbt_tags.github_rule)
    allowed_authentication_secrets = (dbt_tags.github_pem_secret)
    enabled = true
;

こちらも、pemの値を取り出せるようにユーザー関数を作成します。

実装例
create or replace function dbt_tags.get_github_pem_secret_string()
    returns string
    language python
    runtime_version = 3.8
    handler = 'get_generic_secret_string'
    external_access_integrations = (github_api_access)
    secrets = ('cred' = dbt_tags.github_pem_secret)
as
$$
import _snowflake

def get_generic_secret_string():
  return _snowflake.get_generic_secret_string('cred')
$$;

これらの機密情報を取得するためのユーザー関数は適切な権限を付与するようにしましょう。

GitHubへ削除対象のモデルのPRを出す

続いて、GitHubに存在している削除対象のモデルを削除するための処理を実装します。
Snowflake NotebooksではGitHubへの認証のためのJWT作成のため pyjwtcryptography をパッケージとして追加しておきましょう。

実装例
import jwt
import json
import requests
import time
from snowflake.snowpark.context import get_active_session

session = get_active_session()

GITHUB_PEM = session.sql(f"""
    select dbt_tags.get_github_pem_secret_string()
""").collect()[0][0]
GITHUB_INSTALLATION_ID = '00000000'
GITHUB_APP_ID = '1111111'
GITHUB_REPO = 'my_repo/dbt-snowflake-sample'
BASE_BRANCH = "main"


def generate_jwt(app_id, pem):
    """
    GitHub App 用の JWT を作成
    - iat(発行時刻), exp(有効期限10分以内), iss(App ID)
    """
    private_key = pem

    now_utc = int(time.time())
    payload = {
        "iat": now_utc,
        "exp": now_utc + (10 * 60),  # 10分
        "iss": app_id
    }
    token_jwt = jwt.encode(payload, private_key, algorithm="RS256")
    
    return token_jwt


def get_installation_access_token(app_id, pem, installation_id):
    """
    1. JWT 生成
    2. /app/installations/{installation_id}/access_tokens にPOST
    3. tokenを返す
    """
    token_jwt = generate_jwt(app_id, pem)
    url = f"https://api.github.com/app/installations/{installation_id}/access_tokens"
    headers = {
        "Authorization": f"Bearer {token_jwt}",
        "Accept": "application/vnd.github+json"
    }
    resp = requests.post(url, headers=headers)
    resp.raise_for_status()
    data = resp.json()
    return data["token"]  # 1時間有効


def create_or_update_branch(github_repo, base_branch, new_branch_name, headers):
    """
    1. base_branchのHEAD SHAを取得
    2. new_branch_name で新規ブランチ作成 (既存の場合は422を無視)
    3. 戻り値として base_branchの最新SHA を返す
    """
    # base_branchのSHA取得
    ref_url = f"https://api.github.com/repos/{github_repo}/git/ref/heads/{base_branch}"
    r_ref = requests.get(ref_url, headers=headers)
    r_ref.raise_for_status()
    base_ref_data = r_ref.json()
    base_sha = base_ref_data["object"]["sha"]
    print(f"Base branch {base_branch} SHA: {base_sha}")

    # 新ブランチ作成
    create_ref_url = f"https://api.github.com/repos/{github_repo}/git/refs"
    payload_ref = {
        "ref": f"refs/heads/{new_branch_name}",
        "sha": base_sha
    }
    r_newref = requests.post(create_ref_url, json=payload_ref, headers=headers)
    if r_newref.status_code not in (201, 422):
        r_newref.raise_for_status()
    print(f"Branch {new_branch_name} created (or already existed)")

    return base_sha


def commit_files_deletion(github_repo, branch_name, base_sha, files_to_delete, commit_message, headers):
    """
    1. Tree APIで files_to_delete を一括削除指定 (sha=None)
    2. Commitを作成
    3. Branchを update
    4. コミットSHA を返す
    """
    # Treeエントリ作成 (削除するファイル)
    tree_entries = []
    for file_path in files_to_delete:
        tree_entries.append({
            "path": file_path,
            "mode": "100644",
            "type": "blob",
            "sha": None  # None で削除を表す
        })

    # 新しいツリーを作成
    create_tree_url = f"https://api.github.com/repos/{github_repo}/git/trees"
    payload_tree = {
        "base_tree": base_sha,
        "tree": tree_entries
    }
    r_tree = requests.post(create_tree_url, json=payload_tree, headers=headers)
    r_tree.raise_for_status()
    tree_data = r_tree.json()
    new_tree_sha = tree_data["sha"]
    print(f"Created new tree: {new_tree_sha}")

    # New commit
    create_commit_url = f"https://api.github.com/repos/{github_repo}/git/commits"
    payload_commit = {
        "message": commit_message,
        "tree": new_tree_sha,
        "parents": [base_sha]
    }
    r_commit = requests.post(create_commit_url, json=payload_commit, headers=headers)
    r_commit.raise_for_status()
    commit_data = r_commit.json()
    new_commit_sha = commit_data["sha"]
    print(f"Created commit: {new_commit_sha}")

    # ブランチを新コミットに更新
    update_ref_url = f"https://api.github.com/repos/{github_repo}/git/refs/heads/{branch_name}"
    payload_updateref = {
        "sha": new_commit_sha
    }
    r_update = requests.patch(update_ref_url, json=payload_updateref, headers=headers)
    r_update.raise_for_status()
    print(f"Updated branch {branch_name} to commit {new_commit_sha}")

    return new_commit_sha


def create_pull_request(github_repo, title, body, head_branch, base_branch, headers):
    """
    Pull Requestを作成する。
    """
    pr_url = f"https://api.github.com/repos/{github_repo}/pulls"
    payload_pr = {
        "title": title,
        "body": body,
        "head": head_branch,
        "base": base_branch
    }
    r_pr = requests.post(pr_url, json=payload_pr, headers=headers)
    if r_pr.status_code == 201:
        pr_data = r_pr.json()
        print(f"Created PR #{pr_data['number']} : {pr_data['html_url']}")
    else:
        print(f"PR creation failed: {r_pr.status_code} {r_pr.text}")

Slackへの通知処理

続いて、Slackへの通知処理を作成します。
UserIDの取得は、Slack APIを経由して取得しますが、通知自体はSnowflake Notifications利用してストアドから通知を行います。

実装例
import requests

SLACK_BOT_TOKEN = session.sql(f"""
    select get_slack_bot_usergeneric_secret_string()
""").collect()[0][0]

def get_slack_user_id_by_email(email):
    """
    dbtのyamlから登録したemailからslackのUserIDを取得
    """
    url = "https://slack.com/api/users.lookupByEmail"
    headers = {
        "Authorization": f"Bearer {SLACK_BOT_TOKEN}",
        "Content-Type": "application/json"
    }

    params = {"email": email}

    resp = requests.get(url, headers=headers, params=params)
    data = resp.json()
    if data.get("ok"):
        return data["user"]["id"]
    else:
        return None

def send_slack_notification(email, model):
    """
    対象ユーザーに該当モデルが削除されたことを通知
    """
    user_id = get_slack_user_id_by_email(email)
    session.sql(
        f"""
        call system$send_snowflake_notification(
            snowflake.notification.application_json(
                '{{
                    "text":"<@{user_id}> : あなたの作成した {model} は削除されました"}}'
            ),
            snowflake.notification.integration('slack_webhook_notification_integration')
        );
        """
    ).collect()

Snowflakeの該当テーブルを削除

最後にSnowflakeの該当テーブルを削除するための処理を作成します。

実装例
def delete_model_table(database, schema, table):
    session.sql(
        f"drop table {database}.{schema}.{table};"
    )
    print(f"table {database}.{schema}.{table} is dropped.")

削除対象をデータフレーム化

SQLセルとして実行してもよかったですが、特に理由はないですがデータフレームとして後続で使えるようにします。

実装例
sql = """
with tagged_tables AS (
  select
    ref.object_database,
    ref.object_schema,
    ref.object_name,
    ref.tag_name,
    ref.tag_value
  from snowflake.account_usage.tag_references ref
  WHERE ref.domain in ('TABLE', 'VIEW')
),
table_info AS (
  select
    t.table_catalog,
    t.table_schema,
    t.table_name,
    t.created AS created_at
  from information_schema.tables t
  where t.table_schema = 'RAMEN'
  and t.table_name like 'ADHOC%' -- アドホックテーブルは接頭にADHOCとつける運用
),
merged AS (
  select
    ti.table_catalog as table_database,
    ti.table_schema,
    ti.table_name,
    ti.created_at,
    tg.tag_name,
    tg.tag_value
  from table_info ti
  left join tagged_tables tg
    on tg.object_name = ti.table_name
    and tg.object_schema = ti.table_schema
    and tg.object_database = ti.table_catalog
),
final as (
  select
    table_database,
    table_schema,
    table_name,
    max(case when tag_name = 'PII' then TAG_VALUE else null end) as pii,
    max(case when tag_name = 'OWNER' then TAG_VALUE else null end) as owner,
    try_parse_json(max(case when tag_name = 'TAGS' then TAG_VALUE else null end)) as tags,
    max(case when tag_name = 'CREATED_AT' then try_to_timestamp_tz(TAG_VALUE) else null end) as created_at,
    max(case when tag_name = 'OWNER_EMAIL' then TAG_VALUE else null end) as owner_email
  from
    merged
  group by
    1, 2, 3
)
select *
from final
where created_at < dateadd(day, -30, current_timestamp())
"""

df = session.sql(sql).to_pandas()

実際に動かしてみよう

実際にお掃除してみましょう。

実装例
# 1. インストールトークン取得
installation_token = get_installation_access_token(
    GITHUB_APP_ID, GITHUB_PEM, GITHUB_INSTALLATION_ID)

# 2. ヘッダ
headers = {
    "Authorization": f"Bearer {installation_token}",
    "Accept": "application/vnd.github+json"
}

for index, row in df.iterrows():
    table_database = row["TABLE_DATABASE"].lower()
    table_schema = row["TABLE_SCHEMA"].lower()
    table_name = row["TABLE_NAME"].lower()
    owner = row["OWNER"].lower()
    owner_email = row["OWNER_EMAIL"].lower()
    
    # 3. 新ブランチ作成
    new_branch_name = f"feature/delete_{table_name}_models"
    base_sha = create_or_update_branch(GITHUB_REPO, BASE_BRANCH, new_branch_name, headers)
    
    # 4. ファイル削除
    files_to_delete = [
        f"models/marts/{table_name}.sql",
        f"models/marts/_{table_name}.yml"
    ]
    commit_message = f"remove {table_name} model"
    commit_sha = commit_files_deletion(
        GITHUB_REPO, new_branch_name, base_sha, files_to_delete, commit_message, headers)
    
    # 5. Pull Request作成
    pull_request_title = f"delete {table_name} model"
    pull_request_body = f"model: {table_name}\\nowner: {owner}"
    create_pull_request(
        GITHUB_REPO, pull_request_title, pull_request_body, new_branch_name, BASE_BRANCH, headers)
    
    # 6. テーブル削除
    delete_model_table(table_database, table_schema, table_name)

    # 7. 通知
    send_slack_notification(owner_email, table_name)

はい。
テーブルはなくなり、Slackへの通知も来てます。

GitHubにもPull Requestが来ています。

さいごに

いかがでしたか?今回はデータライフサイクルで一番面倒になりがちな削除を自動でできるようにしてみました。
Snowflake Notebooksのスケジュール機能を使えば毎日不要なモデルを削除できますし、もう少し改良すれば1週間前にアナウンスするなどもできるので、いい感じにマネジメントができそうです。

データモデルの削除の方法には色々な方法はあると思いますが、dbtを通して事業側にデータのオーナーシップを持ってもらう限りにおいては、これらの運用によってデータエンジニア側の負荷が少なくなることで、より正しいデータライフサイクルを回せるようになるかなと思っています。

それでは皆さん良いお年を!

脚注
  1. 正しい取得方法ご存知の方いたら、教えて下さい。 ↩︎

Snowflake Data Heroes

Discussion