🐡

Vertex AI Pipelineからdbtを実行する

2025/01/25に公開

これはなにか

弊社ではVertex AI Pipelineを使用して、機械学習を用いた予測パイプラインを構築していますが、出力された予測結果をdbtでtransformしてレポーティングしてみたので、備忘録的に残します。

やりたいこと

Vertex AI Pipelineで予測結果をBigQueryに出力した後に、その出力結果をもとにdbtでtransformしてレポーティングに利用したい。Pipeline jobの終了をトリガーにしてdbtを実行することも考えたが、Pipeline jobの中でレポーティングまで行った方が設計が綺麗になりそうな気がしました。

全体の流れ

予測からレポーティングまでの処理は、以下のステップに基づいています:

  1. Vertex AI Pipelineの予測

    • モデルを使用してデータの予測を行い、その結果を取得します。
  2. 予測結果をBigQueryに出力

    • 予測結果をBigQueryに出力します。
  3. Vertex AI Pipelineからdbtを実行

    • dbtを用いて、予測結果をもとにデータ変換を行い、その結果をレポーティングに利用します。

Vertex AI Pipelineからdbtを実行する処理

全体のコードは以下。
方針としては、Githubリポジトリからコードをcloneしてdbtを実行します。Cloneする際はGithub Appのトークンを使用しています。

run_reporting_pipeline.py
from kfp.dsl import component

@component(
    packages_to_install=[
        "requests",
        "pyjwt[crypto]",
        "gitpython",
        # dbtライブラリなど
    ],
    base_image="python:3.11",
)
def run_reporting_pipeline(
    project_id: str,
    secret_id: str,
    github_repo: str,
    dbt_target: str,
    dbt_select_tag: str,
    app_id: str,
    installation_id: str,
) -> str:
    import os
    import subprocess
    from datetime import datetime, timedelta
    from urllib.parse import urlparse

    import git
    import jwt
    import requests
    from google.cloud import secretmanager

    # Secret Managerから秘密鍵を取得
    def get_secret_from_secret_manager(
        project_id: str,
        secret_id: str,
        version: str = "latest",
    ) -> str:
        secret_client = secretmanager.SecretManagerServiceClient()
        name = f"projects/{project_id}/secrets/{secret_id}/versions/{version}"
        response = secret_client.access_secret_version(request={"name": name})
        return response.payload.data.decode("UTF-8")

    def get_github_app_token(
        project_id: str,
        secret_id: str,
        app_id: str,
        installation_id: str,
    ) -> str:
        # GitHub Appの秘密鍵を取得
        private_key = get_secret_from_secret_manager(project_id, secret_id)

        # JWTを生成
        now = datetime.utcnow()
        payload = {
            "iat": now,
            "exp": now + timedelta(minutes=10),
            "iss": app_id,
        }
        encoded_jwt = jwt.encode(payload, private_key, algorithm="RS256")

        # アクセストークンを取得
        headers = {
            "Authorization": f"Bearer {encoded_jwt}",
            "Accept": "application/vnd.github.v3+json",
        }
        response = requests.post(
            f"https://api.github.com/app/installations/{installation_id}/access_tokens",
            headers=headers,
        )
        response.raise_for_status()
        token = response.json()["token"]
        return token

    # GitHub Appのトークンを取得
    github_token = get_github_app_token(project_id, secret_id, app_id, installation_id)

    repo_path = "/tmp/dbt_repo"

    parsed_url = urlparse(github_repo)
    secure_repo_url = f"{parsed_url.scheme}://x-access-token:{github_token}@{parsed_url.netloc}{parsed_url.path}"

    git.Repo.clone_from(secure_repo_url, repo_path, branch="main")

    os.chdir(repo_path)

    try:
        deps_result = subprocess.run(
            ["dbt", "deps"],
            check=True,
            capture_output=True,
            text=True,
        )

        run_result = subprocess.run(
            [
                "dbt",
                "run",
                "--target",
                dbt_target,
                "--select",
                "tag:" + dbt_select_tag, # このパイプライン処理に関連のあるタグを指定
            ],
            check=True,
            capture_output=True,
            text=True,
        )

        return run_result.stdout
    except subprocess.CalledProcessError as e:
        raise RuntimeError(
            f"DBT command failed with return code {e.returncode}\n"
            f"STDOUT:\n{e.stdout}\n\nSTDERR:\n{e.stderr}"
        )

まとめ

開発時はクローンするブランチの差し替えなどを考える必要がありそう。カスタムジョブのstatusを見てトリガーするのはパッと調べた感じ難しそうだったのですが、もう少し調べたい。

Discussion