💬

オレオレDBT小技

に公開

macro を用意(macros/get_target.sql)

dbt が解決した target(=profiles.yml の出力)を取得

動くかどうかは知らん

{% macro get_target() %}
  {{ return({"database": target.database, "schema": target.schema}) }}
{% endmacro %}

Airflow の PythonOperator から実行して取得

動くかどうかは知らん

  • Airflow で DBT_PROFILES_DIR / DBT_TARGET / DBT_PROFILE を 環境変数や Airflow Variables で一元管理すると、DAG 側のコードは不変にできます。
  • 取得した schema は XCom に格納して、BashOperator / PythonOperator / KubernetesPodOperator などに渡すと便利。
import json, subprocess, os
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

DBT_PROFILES_DIR = os.getenv("DBT_PROFILES_DIR", "~/.dbt")   # 必要に応じて上書き
DBT_PROJECT_DIR  = os.getenv("DBT_PROJECT_DIR", "/opt/dbt/my_project")  # プロジェクト場所
DBT_PROFILE_NAME = os.getenv("DBT_PROFILE", "my_profile")   # dbt_project.yml の profile と一致
DBT_TARGET_NAME  = os.getenv("DBT_TARGET", None)            # 例: "prod" を明示したい時

def fetch_dbt_schema(**_):
    cmd = [
        "dbt", "run-operation", "get_target",
        "--project-dir", DBT_PROJECT_DIR,
        "--profiles-dir", DBT_PROFILES_DIR,
        "--profile", DBT_PROFILE_NAME,
        "--log-format", "json",
        "--quiet"
    ]
    if DBT_TARGET_NAME:
        cmd += ["--target", DBT_TARGET_NAME]

    # 実行
    out = subprocess.check_output(cmd, text=True)

    # dbt run-operation は最後に {"result": "..."} を JSON ログで出す
    # 複数行 JSON が混じるので最後の JSON 行を拾うのがコツ
    last_json = None
    for line in out.splitlines():
        line = line.strip()
        if line.startswith("{") and line.endswith("}"):
            try:
                last_json = json.loads(line)
            except json.JSONDecodeError:
                pass

    if not last_json or "result" not in last_json:
        raise RuntimeError("dbt run-operation の結果が取得できませんでした")

    # result は文字列 or オブジェクト。上の macro は dict を返すので素直に取得
    result = last_json["result"]
    # もし文字列化されていたらデコード
    if isinstance(result, str):
        result = json.loads(result)

    database = result.get("database")
    schema   = result.get("schema")
    print(f"Resolved: database={database}, schema={schema}")
    # ここで XCom に積む/環境変数に渡す/以降のタスクの引数に使う等
    return {"database": database, "schema": schema}

with DAG(
    dag_id="dbt_schema_probe",
    start_date=datetime(2025, 10, 1),
    schedule=None,
    catchup=False,
) as dag:
    t = PythonOperator(
        task_id="fetch_dbt_schema",
        python_callable=fetch_dbt_schema,
        do_xcom_push=True,
    )

Discussion