💬
オレオレDBT小技
macro を用意(macros/get_target.sql)
dbt が解決した target(=profiles.yml の出力)を取得
動くかどうかは知らん
{% macro get_target() %}
{{ return({"database": target.database, "schema": target.schema}) }}
{% endmacro %}
Airflow の PythonOperator から実行して取得
動くかどうかは知らん
- Airflow で DBT_PROFILES_DIR / DBT_TARGET / DBT_PROFILE を 環境変数や Airflow Variables で一元管理すると、DAG 側のコードは不変にできます。
- 取得した schema は XCom に格納して、BashOperator / PythonOperator / KubernetesPodOperator などに渡すと便利。
import json, subprocess, os
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
DBT_PROFILES_DIR = os.getenv("DBT_PROFILES_DIR", "~/.dbt") # 必要に応じて上書き
DBT_PROJECT_DIR = os.getenv("DBT_PROJECT_DIR", "/opt/dbt/my_project") # プロジェクト場所
DBT_PROFILE_NAME = os.getenv("DBT_PROFILE", "my_profile") # dbt_project.yml の profile と一致
DBT_TARGET_NAME = os.getenv("DBT_TARGET", None) # 例: "prod" を明示したい時
def fetch_dbt_schema(**_):
cmd = [
"dbt", "run-operation", "get_target",
"--project-dir", DBT_PROJECT_DIR,
"--profiles-dir", DBT_PROFILES_DIR,
"--profile", DBT_PROFILE_NAME,
"--log-format", "json",
"--quiet"
]
if DBT_TARGET_NAME:
cmd += ["--target", DBT_TARGET_NAME]
# 実行
out = subprocess.check_output(cmd, text=True)
# dbt run-operation は最後に {"result": "..."} を JSON ログで出す
# 複数行 JSON が混じるので最後の JSON 行を拾うのがコツ
last_json = None
for line in out.splitlines():
line = line.strip()
if line.startswith("{") and line.endswith("}"):
try:
last_json = json.loads(line)
except json.JSONDecodeError:
pass
if not last_json or "result" not in last_json:
raise RuntimeError("dbt run-operation の結果が取得できませんでした")
# result は文字列 or オブジェクト。上の macro は dict を返すので素直に取得
result = last_json["result"]
# もし文字列化されていたらデコード
if isinstance(result, str):
result = json.loads(result)
database = result.get("database")
schema = result.get("schema")
print(f"Resolved: database={database}, schema={schema}")
# ここで XCom に積む/環境変数に渡す/以降のタスクの引数に使う等
return {"database": database, "schema": schema}
with DAG(
dag_id="dbt_schema_probe",
start_date=datetime(2025, 10, 1),
schedule=None,
catchup=False,
) as dag:
t = PythonOperator(
task_id="fetch_dbt_schema",
python_callable=fetch_dbt_schema,
do_xcom_push=True,
)
Discussion