📁

dbt Core を GCPのCloud Run JobsやBatchで実行する方法

2023/01/09に公開

はじめに

dbtを用いたデータ基盤運用がお盛んになったなと個人的に感じます。
よく記事で見かけるのが、DWH製品としてSnowFlake, そしてデータ変換にdbt Cloudでしょうか。
そして DWH製品としてSnowFlake, そしてデータ変換にdbt core を ECS Fargateに乗せてサーバレスにやる方法も最近記事で見ました。
あとは dbt core を AirFlow(Cloud ComposerMWAA含む)で実行する方法もしばしばみかけます。
しかしどれも自分には、あまりFitしませんでした。なぜならば求めている要件としては以下だからです

  • BigQueryの案件が多いのでBigQueryにクエリ投げれればOK
  • 1日1回のバッチ回せれば良い
  • スケジュール設定できればOK
  • dbtドキュメントもみたい
  • 無課金or微課金(ストレージ料やクエリスキャン料除く)ですませたい。

dbt Cloudはチームだと1人あたり5000円、、、高い。特にスタートアップだと割と痛い出費。なのでdbt Coreを使用することはほぼ確定。Airflow系はやりたいことに対してオーバーキルすぎる(し高い)。GCEに乗せる方法もあるが常時起動は無駄に課金されるし、毎日決まった時間帯のみ起動する方法もあるがイケてない。Cloud Buildで行う記事も見たが、無料枠は1日あたりたったの120分なので大容量データとたくさんのクエリがあると余裕で超えるので、CI/CDのみで使いたい。ECS Fargateも選択肢にあがるが、権限周りがめんどい。

そうです。BigQueryxdbt Coreをもっとサクッとやりたいのでした。
そんな中いい感じのサービスBatchがGAになりました。
そして、そういえばということでCloudRunJobsの存在を思い出しました。(Preview段階なのですっかり頭から忘れていました。普段のサービスはCloudRunで動かしているのに←)
どちらも、安い!

この2つのどちらかで実現できそうです。
BatchCloudRunJobsもコンテナイメージの指定と実行コマンドの指定だけで完了します!

CloudRunJobsを利用するには以下を気をつけましょう。

  • 最大タイムアウトが3600sなので、それ以内に処理が収まる
  • トリガーとしてはCloud Schedulerしか設定できない(cron形式)
  • preview版でも問題無い人
  • だけどもだけど無料枠がある!

一方、Batchを利用するには以下を気をつけましょう

  • タイムアウトない、動いている限りVM(ComputerEngine)が起動している → 動いている間は指定したスペックにしたがって課金される(終わったらVMは削除される) といっても基本dbtを実行するだけで重いタスクはBigQuery側に任せているので、弱々スペックでも問題ない。
  • 公式によるとスケジューラーおよびトリガーによる実行開始するにはWorkflowsを利用する必要がある(クライアントライブラリを用いて実行すれば、Cloud Functionsでも同様にいけそう)。なので、Batch単体では完結できなさそう

私の場合は、分析やレポート、ダッシュボードに用いるためのデータ基盤であればCloudRunjobs、一方主に機械学習のためのデータ基盤が含まれているならばBatchという選択になることが多いです。

あともう一つ、dbtのドキュメントをどうするかですが、
結論、GCSでホスティングしています。

ただし、ホスティングするにもコツがありまして、それは本記事後半で触れます。

これで、お金をかけずに、サクッと実行できる環境ができそうです。

補足

  • 足りない情報は適宜、時間在るときに追記します(←時間あるのか?)
  • そのうちサンプルのリポジトリを作ります(←だからそんな時間あるのか?)

BatchやCloudRunJobsを利用した dbt実行のためのリポジトリ準備

Project
|- dbt_transform(dbtのプロジェクト)
|  |- analyses
|  |- macros
|  |- models
|  |- seeds
|  |- snapshots
|  |- tests
|  |- dbt_project.yml
|  |- package.yml
|  |- profiles.yml
|
|- scripts(実行コマンドの準備)
|  |- copy_dbtdocs_to_gcs.sh
|  |- dbt_build.sh
|  |- dbt_snapshot.sh
|
|- src (今回はdbtドキュメントをGCSへ転送するpythonスクリプトのみ記載)
|  |- uitls
|  |   |- config.py
|  |   |- files.py
|  |   |- gcs.py
|  |
|  |- copy_dbtdocs_to_gcs.py
|
|- workflows(Batchの場合のみ)
|  |- daily.yaml
|
|- .python-version (pyenvによるバージョン指定)
|- cloudbuild.yaml
|- Dockerfile
|- pyproject.toml (poetryのファイル)
|- poetry.lock (poetryのファイル)

今回 pyenv(Pythonバージョン管理) + poetry(ライブラリ依存関係管理) でもろもろ準備しています。

それでは、いくつか抜粋してみてましょう。

実行コマンド(scripts配下)

dbt_build.sh
cd dbt_transform
dbt build --profiles-dir . "$@"
cd ..

dbt buildコマンドの実行です。dbt build とは dbt rundbt testを同時に行うことができるコマンドです。modelstestssnapshotsseedsの配下が対象となりrefの依存関係にしたがって順に実行されます。ただし、どこかテストで失敗した際は、以降の関係する処理はSKIPされます。
個人的にはデータレイク層に関するsnapshotはこちらのタイミングで行いたい場合があるので、引数で外すようにしたりもしています。上記シェルファイルの中身で、"$@"としているので、普段のdbtコマンドと同様に引数が設定でき、例えば

sh dbt_build.sh --select tag:daily

とか

sh dbt_build.sh --full-refresh

とか

sh dbt_build.sh --vars='{"date":"2099-01-01"}'

といった事ができます。


dbt_snapshot.sh
cd dbt_transform
dbt snapshot --profiles-dir . "$@"
cd ..

こちらも同様です。上述の通り個人的に使い分けをしたかったので作っています。


copy_dbtdocs_to_gcs.sh
python -m src.copy_dbtdocs_to_gcs

こちらは、dbtドキュメントをGCSへ転送するための実行コマンドです。BatchCloudRunJobsで実行するものではなく、後述しますがCloud Build内で実行するためのものです。

実行コマンドは以上です。

Dockerfile

次に、コンテナのイメージを準備するDockerfileを紹介します。マルチステージビルドの構成にしています。

Dockerfile
# 前半で requirements.txt を作成、後半でそれを用いて Python アプリケーションの載った Docker Image を作成
# poetry.lock を読むためには Poetry をインストールする必要がある一方、
# アプリケーションが載るコンテナには Poetry を入れておく必要がないため、これらを切り離したいという意図

# 前半
# Docker のキャッシュ戦略をうまく使うため、一番最初に Poetry をインストール。
# こうすることで、ビルドのたびに Poetry をインストールする手間が省ける。
# 次に、pyproject.toml と poetry.lock だけをコピーして、requirements.txt を生成

# pyenv localに合わせて3.9.13を指定していますが、ご自由に
FROM python:3.9.13-slim as builder

WORKDIR /app

RUN pip install 'poetry<1.2'

COPY pyproject.toml poetry.lock ./

RUN poetry export --without-hashes -f requirements.txt --output requirements.txt

# 後半
# できあがった requirements.txt を前半部分からコピーしてきた上で、
# pip コマンドを使ってそれらのパッケージを全てインストール。
# もしこれらのファイルに変更がない場合はキャッシュが使われて自動で次のレイヤのビルドが行われる。
# インストールが終わった後に、各種 Python スクリプトを転送して、その他コマンドなどを書いて完成
FROM python:3.9.13-slim

# 環境変数を引数で受け取り、環境変数へ突っ込む
# dbtのprofiles.ymlで環境毎の接続情報を"outputs"で定義し
# "target"で "{{ env_var('ENV') }}" と
# 指定すると環境によってプロジェクトまたはデータセットを変更できる
ARG env
ENV ENV=${env}

ENV PYTHONUNBUFFERED=1
ENV TZ='Asia/Tokyo'

WORKDIR /app

COPY --from=builder /app/requirements.txt .

RUN pip install -U pip
RUN pip install -r requirements.txt

COPY . /app

# dbtの追加パッケージをインストール
WORKDIR /app/dbt_transform
RUN dbt deps --profiles-dir .
WORKDIR /app

cloudbuild.yaml (Batchの場合)

次は、Cloud Buildを実行するための設定するためのyamlファイルを見ていきます。
まずはBatchを用いる場合

cloudbuild.yaml
steps:
  # dbtを実行するためのDockerfileのコンテナイメージのビルド
  - name: "gcr.io/cloud-builders/docker"
    args:
      [
        "build",
        "-t",
        "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA",
        "-t",
        "gcr.io/$PROJECT_ID/$_BATCH_NAME:latest",
        "--build-arg",
        "env=$_ENV",
        ".",
      ]
  # GCRにpush
  - name: "gcr.io/cloud-builders/docker"
    args: ["push", "gcr.io/$PROJECT_ID/$_BATCH_NAME"]
  # Workflowsのデプロイ Batchへのjob定義&実行はWorkflowsファイル内で定義
  - name: "gcr.io/cloud-builders/gcloud"
    args:
      [
        "workflows",
        "deploy",
        "$_BATCH_NAME-daily",
        "--source",
        "workflows/daily.yaml",
      ]
  # dbtのdocs生成とGCSへのアップロード
  - name: "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA"
    entrypoint: "bash"
    args:
      - "-c"
      - |-
        echo "==========================================================="
        pwd
        cd dbt_transform
        echo "==========================================================="
        dbt deps --profiles-dir .
        wait
        if [[ $? == 1 ]]; then
        echo 'dbt deps is faild'
        exit 1
        fi
        dbt docs generate --profiles-dir .
        wait
        if [[ $? == 1 ]]; then
        echo 'dbt docs generate is faild'
        exit 1
        fi
        echo "==========================================================="
        cd ..
        pwd
        echo "==========================================================="
        sh scripts/copy_dbtdocs_to_gcs.sh
        if [[ $? == 1 ]]; then
        echo 'copy_dbtdocs_to_gcs.sh is faild'
        exit 1
        fi
    env:
      - ENV=$_ENV
images:
  - "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA"
  - "gcr.io/$PROJECT_ID/$_BATCH_NAME:latest"
options:
  # Cloud BuildはデフォルトでCloud Storageにビルドログを保存しようとするが、
  # Cloud Loggingだけにするように指定
  logging: CLOUD_LOGGING_ONLY

ちなみにWorkflowsのファイルは以下です。snapshotとdailyのモデリングを順に実行するパターンの例です。

workflows/daily.yaml
main:
  params: [args]
  steps:
    - init:
        assign:
          - projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - workflowId: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")}
          - region: "us-central1"
          - batchApi: "batch.googleapis.com/v1"
          - batchApiUrl: ${"https://" + batchApi + "/projects/" + projectId + "/locations/" + region + "/jobs"}
          - imageUri: ${"gcr.io/" + projectId + "/" +  text.replace_all(workflowId, "-daily", "") + ":latest"}
          - jobId: ${workflowId + string(int(sys.now()))}
    - logCreateBatchJob:
        call: sys.log
        args:
          data: ${"Creating and running the batch job " + jobId}
    - createAndRunBatchJob:
        call: http.post
        args:
          url: ${batchApiUrl}
          query:
            job_id: ${jobId}
          headers:
            Content-Type: application/json
          auth:
            type: OAuth2
          body:
            taskGroups:
              taskSpec:
                runnables:
                  - container:
                      imageUri: ${imageUri}
                      entrypoint: bash
                      commands:
                        - "-c"
                        - |-
                          sh scripts/dbt_snapshots.sh --select tag:snapshot
                  - container:
                      imageUri: ${imageUri}
                      entrypoint: bash
                      commands:
                        - "-c"
                        - |-
                          sh scripts/dbt_build.sh --select tag:daily
                computeResource:
                  cpuMilli: 1000
                  memoryMib: 500
              taskCount: 1
              parallelism: 1
            allocationPolicy:
              instances:
                - policy:
                    machineType: e2-micro
            logsPolicy:
              destination: CLOUD_LOGGING
        result: createAndRunBatchJobResponse
    - getJob:
        call: http.get
        args:
          url: ${batchApiUrl + "/" + jobId}
          auth:
            type: OAuth2
        result: getJobResult
    - logState:
        call: sys.log
        args:
          data: ${"Current job state " + getJobResult.body.status.state}
    - checkState:
        switch:
          - condition: ${getJobResult.body.status.state == "SUCCEEDED"}
            next: returnResult
          - condition: ${getJobResult.body.status.state == "FAILED"}
            next: failExecution
        next: sleep
    - sleep:
        call: sys.sleep
        args:
          seconds: 10
        next: getJob
    - failExecution:
        raise:
          message: ${"The underlying batch job " + jobId + " failed"}
    - returnResult:
        return:
          jobId: ${jobId}

やはりWorkflows内でBatchのjob設定&実行をする感じです。
Workflowsをかまさないといけないというのはちょっと手間ですね。
一方で以下のCloudRunJobsのときと比べると、トリガーとしてCloudScheduler以外にPub/Subによるメッセージングキューのトリガーや多様なEventArcにも対応しています。

cloudbuild.yaml(CloudRunJobsの場合)

そしてCloudRunJobsを定義するcloudbuild.yamlです。

cloudbuild.yaml
steps:
  # dbtを実行するためのDockerfileのコンテナイメージのビルド
  - name: "gcr.io/cloud-builders/docker"
    args:
      [
        "build",
        "-t",
        "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA",
        "-t",
        "gcr.io/$PROJECT_ID/$_BATCH_NAME:latest",
        "--build-arg",
        "env=$_ENV",
        ".",
      ]
  # GCRにpush
  - name: "gcr.io/cloud-builders/docker"
    args: ["push", "gcr.io/$PROJECT_ID/$_BATCH_NAME"]
  # Cloud Run Jobsにデプロイ(定期的にデプロイする都合上updateにしているので事前にコンソールから同名同リージョンのjobを定義しておくと良いです)
  - name: "gcr.io/cloud-builders/gcloud"
    args:
      [
        "beta",
        "run",
        "jobs",
        "update",
        "$_BATCH_NAME",
        "--image",
        "gcr.io/$PROJECT_ID/$_BATCH_NAME:latest",
        "--command",
        "bash",
        "--args",
        "-c,sh scripts/dbt_build.sh",
        "--region",
        "asia-northeast1",
        "--max-retries",
        "1",
        "--task-timeout",
        "3600s",
      ]
  # docsの生成とGCSへのアップロード
  - name: "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA"
    entrypoint: "bash"
    args:
      - "-c"
      - |-
        echo "==========================================================="
        pwd
        cd dbt_transform
        echo "==========================================================="
        dbt deps --profiles-dir .
        wait
        if [[ $? == 1 ]]; then
        echo 'dbt deps is faild'
        exit 1
        fi
        dbt docs generate --profiles-dir .
        wait
        if [[ $? == 1 ]]; then
        echo 'dbt docs generate is faild'
        exit 1
        fi
        echo "==========================================================="
        cd ..
        pwd
        echo "==========================================================="
        sh scripts/copy_dbtdocs_to_gcs.sh
        if [[ $? == 1 ]]; then
        echo 'copy_dbtdocs_to_gcs.sh is faild'
        exit 1
        fi
    env:
      - ENV=$_ENV
images:
  - "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA"
  - "gcr.io/$PROJECT_ID/$_BATCH_NAME:latest"
options:
  # Cloud BuildはデフォルトでCloud Storageにビルドログを保存しようとするが、
  # Cloud Loggingだけにするように指定
  logging: CLOUD_LOGGING_ONLY

Batchよりもシンプルでした。
定期実行する場合はコンソールから トリガー設定でCloud Schedulerの設定をすれば出来ます。


今回の場合どちらのcloudubuild.yamlであっても、CloudBuildの環境変数に設定するのは以下の2つだけです。

_ENV = "環境名"
_BATCH_NAME="dbt名"

dbtのdocsをGCSでホスティング

最後に、両方のcloudbuild.yaml内にある、scripts/copy_dbtdocs_to_gcs.shについて記載します。
今回はbuild時にドキュメントを生成してホスティングするようにしています。なのでcloudbuild.yaml内でdbt docs generatescripts/copy_dbtdocs_to_gcs.shがあります。
scripts/copy_dbtdocs_to_gcs.shでは内部でsrc/copy_dbtdocs_to_gcs.pyを実行しています。内容は以下です。

src/copy_dbtdocs_to_gcs.py
import json

from src.utils import files, gcs
from src.utils.config import settings
# 上記は自作モジュールなので以下メモ
""" files.py
import json

def read_file(filepath: str) -> str:
    with open(filepath, 'r') as f:
        content = f.read()
    return content


def read_json(filepath: str) -> str:
    with open(filepath, 'r') as f:
        json_dict = json.loads(f.read())
    return json_dict


def write_text(filepath: str, content: str):
    with open(filepath, 'w') as f:
        f.write(content)
    return
"""
""" gcs.py
from google.cloud import storage


def to_gcs(bucket_name: str, filepath: str, upload_path: str):

    client = storage.Client()

    bucket = client.get_bucket(bucket_name)

    blob = bucket.blob(filepath)

    blob.upload_from_filename(upload_path)

"""
""" config.py
# PydanticのBaseSettingsを継承
class Settings(BaseSettings):
    env: str
    project_id: str = "gcp-project-id"
    dbt_docs_bucket: str = "bucket-name" + os.environ.get("ENV")

    dbt_target = {
        "dir_path": "dbt_transform/target/",
        "docs_files": ["index.html", "catalog.json", "manifest.json", "index2.html"],
    }

settings = Settings()
"""


def copy_dbtdocs_to_gcs():

    # jsonファイルをhtmlに埋め込んだverを作成 → index2.html
    search_str = 'o=[i("manifest","manifest.json"+t),i("catalog","catalog.json"+t)]'
    content_index = files.read_file(settings.dbt_target["dir_path"] + "index.html")
    json_manifest = files.read_json(settings.dbt_target["dir_path"] + "manifest.json")
    json_catalog = files.read_json(settings.dbt_target["dir_path"] + "catalog.json")
    new_str = (
        "o=[{label: 'manifest', data: "
        + json.dumps(json_manifest)
        + "},{label: 'catalog', data: "
        + json.dumps(json_catalog)
        + "}]"
    )
    new_content = content_index.replace(search_str, new_str)
    files.write_text(settings.dbt_target["dir_path"] + "index2.html", new_content)

    for filename in settings.dbt_target["docs_files"]:
        gcs.to_gcs(
            bucket_name=settings.dbt_docs_bucket,
            filepath=filename,
            upload_path=settings.dbt_target["dir_path"] + filename,
        )


if __name__ == "__main__":

    copy_dbtdocs_to_gcs()

なんか、みなさんからこんな声が聞こえました。
dbt docs generaateコマンドで生成される
index.html, catalog.json, manifest.jsonGCSに持っていくだけでよくね?」

私も最初はそう思いました。
しかし、基本的にそれは全公開(allUser閲覧権限開放)をしなくてはいけません。秘密情報満載のメタデータを公開するわけにはいきません。なのでこの方法は却下です。

一方で認証機能をつけて見れるようにする方法もありますが、やりたいことに対してオーバキルすぎます。

で、そこでひらめきました。一つのファイルならば公開設定にしなくとも普通のGCS閲覧権限だけで閲覧することがきます → 一つのファイルに結合すればいいじゃんか!
ということで、2つのjsonファイルの内容をindex.htmlに埋め込んでいるのが上記のPythonスクリプトというわけです。
ここではindex2.htmlという名前で出力しています。
これの認証済み URLをクリックすると閲覧できます

(他なにかよりよい方法があったら教えて下さい。)

まとめ

結構書きましたが、
要はコンテナイメージ化して、BatchCloudRunJobsを使えば安くすむよね
というお話でした。
どちらも、スポットでコマンド実行もできるので、いざというときに便利です。
とくに制限はいくつかありますがCloudRunJobsには無料枠があります。

上述の他にも私は以下実施しています。

  • ローカル実行用compose.ymlの作成(Docker Compose V2)、envでBQの権限付与しているサービスアカウントの指定
  • docker compose upで dbt Cloudに習って、作業者ごとにデータセット(スキーマ)を作る
  • 環境x層(レイク・ウェアハウス・マート・etc.)毎にデータセット(スキーマ)を自動で分ける
  • プルリクがあると CIでdbtのテストを実施するようにCloud Buildで設定
  • etc.

追々この辺も記事にできたらなと思います。

BatchCloudRunJobsを利用しなくともdbt用のコンテナイメージを作っておけば、AirflowVertexAIPipelineなど様々な箇所で呼び出しも可能になるので便利かと思います。

総括

あまり課金したくない + BigQuery を使う場合は、しばらくこれでいいのでは?と思っています。

ばいなら〜

Discussion