🐡
Vertex AI Pipelineからdbtを実行する
これはなにか
弊社ではVertex AI Pipelineを使用して、機械学習を用いた予測パイプラインを構築していますが、出力された予測結果をdbtでtransformしてレポーティングしてみたので、備忘録的に残します。
やりたいこと
Vertex AI Pipelineで予測結果をBigQueryに出力した後に、その出力結果をもとにdbtでtransformしてレポーティングに利用したい。Pipeline jobの終了をトリガーにしてdbtを実行することも考えたが、Pipeline jobの中でレポーティングまで行った方が設計が綺麗になりそうな気がしました。
全体の流れ
予測からレポーティングまでの処理は、以下のステップに基づいています:
-
Vertex AI Pipelineの予測
- モデルを使用してデータの予測を行い、その結果を取得します。
-
予測結果をBigQueryに出力
- 予測結果をBigQueryに出力します。
-
Vertex AI Pipelineからdbtを実行
- dbtを用いて、予測結果をもとにデータ変換を行い、その結果をレポーティングに利用します。
Vertex AI Pipelineからdbtを実行する処理
全体のコードは以下。
方針としては、Githubリポジトリからコードをcloneしてdbtを実行します。Cloneする際はGithub Appのトークンを使用しています。
run_reporting_pipeline.py
from kfp.dsl import component
@component(
packages_to_install=[
"requests",
"pyjwt[crypto]",
"gitpython",
# dbtライブラリなど
],
base_image="python:3.11",
)
def run_reporting_pipeline(
project_id: str,
secret_id: str,
github_repo: str,
dbt_target: str,
dbt_select_tag: str,
app_id: str,
installation_id: str,
) -> str:
import os
import subprocess
from datetime import datetime, timedelta
from urllib.parse import urlparse
import git
import jwt
import requests
from google.cloud import secretmanager
# Secret Managerから秘密鍵を取得
def get_secret_from_secret_manager(
project_id: str,
secret_id: str,
version: str = "latest",
) -> str:
secret_client = secretmanager.SecretManagerServiceClient()
name = f"projects/{project_id}/secrets/{secret_id}/versions/{version}"
response = secret_client.access_secret_version(request={"name": name})
return response.payload.data.decode("UTF-8")
def get_github_app_token(
project_id: str,
secret_id: str,
app_id: str,
installation_id: str,
) -> str:
# GitHub Appの秘密鍵を取得
private_key = get_secret_from_secret_manager(project_id, secret_id)
# JWTを生成
now = datetime.utcnow()
payload = {
"iat": now,
"exp": now + timedelta(minutes=10),
"iss": app_id,
}
encoded_jwt = jwt.encode(payload, private_key, algorithm="RS256")
# アクセストークンを取得
headers = {
"Authorization": f"Bearer {encoded_jwt}",
"Accept": "application/vnd.github.v3+json",
}
response = requests.post(
f"https://api.github.com/app/installations/{installation_id}/access_tokens",
headers=headers,
)
response.raise_for_status()
token = response.json()["token"]
return token
# GitHub Appのトークンを取得
github_token = get_github_app_token(project_id, secret_id, app_id, installation_id)
repo_path = "/tmp/dbt_repo"
parsed_url = urlparse(github_repo)
secure_repo_url = f"{parsed_url.scheme}://x-access-token:{github_token}@{parsed_url.netloc}{parsed_url.path}"
git.Repo.clone_from(secure_repo_url, repo_path, branch="main")
os.chdir(repo_path)
try:
deps_result = subprocess.run(
["dbt", "deps"],
check=True,
capture_output=True,
text=True,
)
run_result = subprocess.run(
[
"dbt",
"run",
"--target",
dbt_target,
"--select",
"tag:" + dbt_select_tag, # このパイプライン処理に関連のあるタグを指定
],
check=True,
capture_output=True,
text=True,
)
return run_result.stdout
except subprocess.CalledProcessError as e:
raise RuntimeError(
f"DBT command failed with return code {e.returncode}\n"
f"STDOUT:\n{e.stdout}\n\nSTDERR:\n{e.stderr}"
)
まとめ
開発時はクローンするブランチの差し替えなどを考える必要がありそう。カスタムジョブのstatusを見てトリガーするのはパッと調べた感じ難しそうだったのですが、もう少し調べたい。
Discussion