dbt Core を GCPのCloud Run JobsやBatchで実行する方法
はじめに
dbtを用いたデータ基盤運用がお盛んになったなと個人的に感じます。
よく記事で見かけるのが、DWH製品としてSnowFlake
, そしてデータ変換にdbt Cloud
でしょうか。
そして DWH製品としてSnowFlake
, そしてデータ変換にdbt core
を ECS Fargateに乗せてサーバレスにやる方法も最近記事で見ました。
あとは dbt core
を AirFlow
(Cloud Composer
やMWAA
含む)で実行する方法もしばしばみかけます。
しかしどれも自分には、あまりFitしませんでした。なぜならば求めている要件としては以下だからです
-
BigQuery
の案件が多いのでBigQueryにクエリ投げれればOK - 1日1回のバッチ回せれば良い
- スケジュール設定できればOK
- dbtドキュメントもみたい
- 無課金or微課金(ストレージ料やクエリスキャン料除く)ですませたい。
dbt Cloud
はチームだと1人あたり5000円、、、高い。特にスタートアップだと割と痛い出費。なのでdbt Core
を使用することはほぼ確定。Airflow
系はやりたいことに対してオーバーキルすぎる(し高い)。GCE
に乗せる方法もあるが常時起動は無駄に課金されるし、毎日決まった時間帯のみ起動する方法もあるがイケてない。Cloud Build
で行う記事も見たが、無料枠は1日あたりたったの120分なので大容量データとたくさんのクエリがあると余裕で超えるので、CI/CDのみで使いたい。ECS Fargate
も選択肢にあがるが、権限周りがめんどい。
そうです。BigQuery
xdbt Core
をもっとサクッとやりたいのでした。
そんな中いい感じのサービスBatch
がGAになりました。
そして、そういえばということでCloudRunJobs
の存在を思い出しました。(Preview段階なのですっかり頭から忘れていました。普段のサービスはCloudRun
で動かしているのに←)
どちらも、安い!
この2つのどちらかで実現できそうです。
Batch
もCloudRunJobs
もコンテナイメージの指定と実行コマンドの指定だけで完了します!
CloudRunJobs
を利用するには以下を気をつけましょう。
- 最大タイムアウトが3600sなので、それ以内に処理が収まる
- トリガーとしては
Cloud Scheduler
しか設定できない(cron形式) - preview版でも問題無い人
- だけどもだけど無料枠がある!
一方、Batch
を利用するには以下を気をつけましょう
- タイムアウトない、動いている限り
VM
(ComputerEngine
)が起動している → 動いている間は指定したスペックにしたがって課金される(終わったらVM
は削除される) といっても基本dbtを実行するだけで重いタスクはBigQuery
側に任せているので、弱々スペックでも問題ない。 - 公式によるとスケジューラーおよびトリガーによる実行開始するには
Workflows
を利用する必要がある(クライアントライブラリを用いて実行すれば、Cloud Functions
でも同様にいけそう)。なので、Batch
単体では完結できなさそう
私の場合は、分析やレポート、ダッシュボードに用いるためのデータ基盤であればCloudRunjobs
、一方主に機械学習のためのデータ基盤が含まれているならばBatch
という選択になることが多いです。
あともう一つ、dbtのドキュメントをどうするかですが、
結論、GCS
でホスティングしています。
ただし、ホスティングするにもコツがありまして、それは本記事後半で触れます。
これで、お金をかけずに、サクッと実行できる環境ができそうです。
補足
- 足りない情報は適宜、時間在るときに追記します(←時間あるのか?)
- そのうちサンプルのリポジトリを作ります(←だからそんな時間あるのか?)
BatchやCloudRunJobsを利用した dbt実行のためのリポジトリ準備
Project
|- dbt_transform(dbtのプロジェクト)
| |- analyses
| |- macros
| |- models
| |- seeds
| |- snapshots
| |- tests
| |- dbt_project.yml
| |- package.yml
| |- profiles.yml
|
|- scripts(実行コマンドの準備)
| |- copy_dbtdocs_to_gcs.sh
| |- dbt_build.sh
| |- dbt_snapshot.sh
|
|- src (今回はdbtドキュメントをGCSへ転送するpythonスクリプトのみ記載)
| |- uitls
| | |- config.py
| | |- files.py
| | |- gcs.py
| |
| |- copy_dbtdocs_to_gcs.py
|
|- workflows(Batchの場合のみ)
| |- daily.yaml
|
|- .python-version (pyenvによるバージョン指定)
|- cloudbuild.yaml
|- Dockerfile
|- pyproject.toml (poetryのファイル)
|- poetry.lock (poetryのファイル)
今回 pyenv
(Pythonバージョン管理) + poetry
(ライブラリ依存関係管理) でもろもろ準備しています。
それでは、いくつか抜粋してみてましょう。
実行コマンド(scripts配下)
cd dbt_transform
dbt build --profiles-dir . "$@"
cd ..
dbt build
コマンドの実行です。dbt build
とは dbt run
と dbt test
を同時に行うことができるコマンドです。models
、tests
、snapshots
、seeds
の配下が対象となりref
の依存関係にしたがって順に実行されます。ただし、どこかテストで失敗した際は、以降の関係する処理はSKIPされます。
個人的にはデータレイク層に関するsnapshotはこちらのタイミングで行いたい場合があるので、引数で外すようにしたりもしています。上記シェルファイルの中身で、"$@"
としているので、普段のdbtコマンドと同様に引数が設定でき、例えば
sh dbt_build.sh --select tag:daily
とか
sh dbt_build.sh --full-refresh
とか
sh dbt_build.sh --vars='{"date":"2099-01-01"}'
といった事ができます。
cd dbt_transform
dbt snapshot --profiles-dir . "$@"
cd ..
こちらも同様です。上述の通り個人的に使い分けをしたかったので作っています。
python -m src.copy_dbtdocs_to_gcs
こちらは、dbtドキュメントをGCSへ転送するための実行コマンドです。Batch
やCloudRunJobs
で実行するものではなく、後述しますがCloud Build
内で実行するためのものです。
実行コマンドは以上です。
Dockerfile
次に、コンテナのイメージを準備するDockerfileを紹介します。マルチステージビルドの構成にしています。
# 前半で requirements.txt を作成、後半でそれを用いて Python アプリケーションの載った Docker Image を作成
# poetry.lock を読むためには Poetry をインストールする必要がある一方、
# アプリケーションが載るコンテナには Poetry を入れておく必要がないため、これらを切り離したいという意図
# 前半
# Docker のキャッシュ戦略をうまく使うため、一番最初に Poetry をインストール。
# こうすることで、ビルドのたびに Poetry をインストールする手間が省ける。
# 次に、pyproject.toml と poetry.lock だけをコピーして、requirements.txt を生成
# pyenv localに合わせて3.9.13を指定していますが、ご自由に
FROM python:3.9.13-slim as builder
WORKDIR /app
RUN pip install 'poetry<1.2'
COPY pyproject.toml poetry.lock ./
RUN poetry export --without-hashes -f requirements.txt --output requirements.txt
# 後半
# できあがった requirements.txt を前半部分からコピーしてきた上で、
# pip コマンドを使ってそれらのパッケージを全てインストール。
# もしこれらのファイルに変更がない場合はキャッシュが使われて自動で次のレイヤのビルドが行われる。
# インストールが終わった後に、各種 Python スクリプトを転送して、その他コマンドなどを書いて完成
FROM python:3.9.13-slim
# 環境変数を引数で受け取り、環境変数へ突っ込む
# dbtのprofiles.ymlで環境毎の接続情報を"outputs"で定義し
# "target"で "{{ env_var('ENV') }}" と
# 指定すると環境によってプロジェクトまたはデータセットを変更できる
ARG env
ENV ENV=${env}
ENV PYTHONUNBUFFERED=1
ENV TZ='Asia/Tokyo'
WORKDIR /app
COPY /app/requirements.txt .
RUN pip install -U pip
RUN pip install -r requirements.txt
COPY . /app
# dbtの追加パッケージをインストール
WORKDIR /app/dbt_transform
RUN dbt deps --profiles-dir .
WORKDIR /app
cloudbuild.yaml (Batchの場合)
次は、Cloud Build
を実行するための設定するためのyamlファイルを見ていきます。
まずはBatchを用いる場合
steps:
# dbtを実行するためのDockerfileのコンテナイメージのビルド
- name: "gcr.io/cloud-builders/docker"
args:
[
"build",
"-t",
"gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA",
"-t",
"gcr.io/$PROJECT_ID/$_BATCH_NAME:latest",
"--build-arg",
"env=$_ENV",
".",
]
# GCRにpush
- name: "gcr.io/cloud-builders/docker"
args: ["push", "gcr.io/$PROJECT_ID/$_BATCH_NAME"]
# Workflowsのデプロイ Batchへのjob定義&実行はWorkflowsファイル内で定義
- name: "gcr.io/cloud-builders/gcloud"
args:
[
"workflows",
"deploy",
"$_BATCH_NAME-daily",
"--source",
"workflows/daily.yaml",
]
# dbtのdocs生成とGCSへのアップロード
- name: "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA"
entrypoint: "bash"
args:
- "-c"
- |-
echo "==========================================================="
pwd
cd dbt_transform
echo "==========================================================="
dbt deps --profiles-dir .
wait
if [[ $? == 1 ]]; then
echo 'dbt deps is faild'
exit 1
fi
dbt docs generate --profiles-dir .
wait
if [[ $? == 1 ]]; then
echo 'dbt docs generate is faild'
exit 1
fi
echo "==========================================================="
cd ..
pwd
echo "==========================================================="
sh scripts/copy_dbtdocs_to_gcs.sh
if [[ $? == 1 ]]; then
echo 'copy_dbtdocs_to_gcs.sh is faild'
exit 1
fi
env:
- ENV=$_ENV
images:
- "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA"
- "gcr.io/$PROJECT_ID/$_BATCH_NAME:latest"
options:
# Cloud BuildはデフォルトでCloud Storageにビルドログを保存しようとするが、
# Cloud Loggingだけにするように指定
logging: CLOUD_LOGGING_ONLY
ちなみにWorkflowsのファイルは以下です。snapshotとdailyのモデリングを順に実行するパターンの例です。
main:
params: [args]
steps:
- init:
assign:
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- workflowId: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")}
- region: "us-central1"
- batchApi: "batch.googleapis.com/v1"
- batchApiUrl: ${"https://" + batchApi + "/projects/" + projectId + "/locations/" + region + "/jobs"}
- imageUri: ${"gcr.io/" + projectId + "/" + text.replace_all(workflowId, "-daily", "") + ":latest"}
- jobId: ${workflowId + string(int(sys.now()))}
- logCreateBatchJob:
call: sys.log
args:
data: ${"Creating and running the batch job " + jobId}
- createAndRunBatchJob:
call: http.post
args:
url: ${batchApiUrl}
query:
job_id: ${jobId}
headers:
Content-Type: application/json
auth:
type: OAuth2
body:
taskGroups:
taskSpec:
runnables:
- container:
imageUri: ${imageUri}
entrypoint: bash
commands:
- "-c"
- |-
sh scripts/dbt_snapshots.sh --select tag:snapshot
- container:
imageUri: ${imageUri}
entrypoint: bash
commands:
- "-c"
- |-
sh scripts/dbt_build.sh --select tag:daily
computeResource:
cpuMilli: 1000
memoryMib: 500
taskCount: 1
parallelism: 1
allocationPolicy:
instances:
- policy:
machineType: e2-micro
logsPolicy:
destination: CLOUD_LOGGING
result: createAndRunBatchJobResponse
- getJob:
call: http.get
args:
url: ${batchApiUrl + "/" + jobId}
auth:
type: OAuth2
result: getJobResult
- logState:
call: sys.log
args:
data: ${"Current job state " + getJobResult.body.status.state}
- checkState:
switch:
- condition: ${getJobResult.body.status.state == "SUCCEEDED"}
next: returnResult
- condition: ${getJobResult.body.status.state == "FAILED"}
next: failExecution
next: sleep
- sleep:
call: sys.sleep
args:
seconds: 10
next: getJob
- failExecution:
raise:
message: ${"The underlying batch job " + jobId + " failed"}
- returnResult:
return:
jobId: ${jobId}
やはりWorkflows
内でBatch
のjob設定&実行をする感じです。
Workflows
をかまさないといけないというのはちょっと手間ですね。
一方で以下のCloudRunJobs
のときと比べると、トリガーとしてCloudScheduler
以外にPub/Sub
によるメッセージングキューのトリガーや多様なEventArc
にも対応しています。
cloudbuild.yaml(CloudRunJobsの場合)
そしてCloudRunJobs
を定義するcloudbuild.yaml
です。
steps:
# dbtを実行するためのDockerfileのコンテナイメージのビルド
- name: "gcr.io/cloud-builders/docker"
args:
[
"build",
"-t",
"gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA",
"-t",
"gcr.io/$PROJECT_ID/$_BATCH_NAME:latest",
"--build-arg",
"env=$_ENV",
".",
]
# GCRにpush
- name: "gcr.io/cloud-builders/docker"
args: ["push", "gcr.io/$PROJECT_ID/$_BATCH_NAME"]
# Cloud Run Jobsにデプロイ(定期的にデプロイする都合上updateにしているので事前にコンソールから同名同リージョンのjobを定義しておくと良いです)
- name: "gcr.io/cloud-builders/gcloud"
args:
[
"beta",
"run",
"jobs",
"update",
"$_BATCH_NAME",
"--image",
"gcr.io/$PROJECT_ID/$_BATCH_NAME:latest",
"--command",
"bash",
"--args",
"-c,sh scripts/dbt_build.sh",
"--region",
"asia-northeast1",
"--max-retries",
"1",
"--task-timeout",
"3600s",
]
# docsの生成とGCSへのアップロード
- name: "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA"
entrypoint: "bash"
args:
- "-c"
- |-
echo "==========================================================="
pwd
cd dbt_transform
echo "==========================================================="
dbt deps --profiles-dir .
wait
if [[ $? == 1 ]]; then
echo 'dbt deps is faild'
exit 1
fi
dbt docs generate --profiles-dir .
wait
if [[ $? == 1 ]]; then
echo 'dbt docs generate is faild'
exit 1
fi
echo "==========================================================="
cd ..
pwd
echo "==========================================================="
sh scripts/copy_dbtdocs_to_gcs.sh
if [[ $? == 1 ]]; then
echo 'copy_dbtdocs_to_gcs.sh is faild'
exit 1
fi
env:
- ENV=$_ENV
images:
- "gcr.io/$PROJECT_ID/$_BATCH_NAME:$COMMIT_SHA"
- "gcr.io/$PROJECT_ID/$_BATCH_NAME:latest"
options:
# Cloud BuildはデフォルトでCloud Storageにビルドログを保存しようとするが、
# Cloud Loggingだけにするように指定
logging: CLOUD_LOGGING_ONLY
Batchよりもシンプルでした。
定期実行する場合はコンソールから トリガー設定でCloud Scheduler
の設定をすれば出来ます。
今回の場合どちらのcloudubuild.yaml
であっても、CloudBuildの環境変数に設定するのは以下の2つだけです。
_ENV = "環境名"
_BATCH_NAME="dbt名"
dbtのdocsをGCSでホスティング
最後に、両方のcloudbuild.yaml
内にある、scripts/copy_dbtdocs_to_gcs.sh
について記載します。
今回はbuild時にドキュメントを生成してホスティングするようにしています。なのでcloudbuild.yaml
内でdbt docs generate
とscripts/copy_dbtdocs_to_gcs.sh
があります。
scripts/copy_dbtdocs_to_gcs.sh
では内部でsrc/copy_dbtdocs_to_gcs.py
を実行しています。内容は以下です。
import json
from src.utils import files, gcs
from src.utils.config import settings
# 上記は自作モジュールなので以下メモ
""" files.py
import json
def read_file(filepath: str) -> str:
with open(filepath, 'r') as f:
content = f.read()
return content
def read_json(filepath: str) -> str:
with open(filepath, 'r') as f:
json_dict = json.loads(f.read())
return json_dict
def write_text(filepath: str, content: str):
with open(filepath, 'w') as f:
f.write(content)
return
"""
""" gcs.py
from google.cloud import storage
def to_gcs(bucket_name: str, filepath: str, upload_path: str):
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(filepath)
blob.upload_from_filename(upload_path)
"""
""" config.py
# PydanticのBaseSettingsを継承
class Settings(BaseSettings):
env: str
project_id: str = "gcp-project-id"
dbt_docs_bucket: str = "bucket-name" + os.environ.get("ENV")
dbt_target = {
"dir_path": "dbt_transform/target/",
"docs_files": ["index.html", "catalog.json", "manifest.json", "index2.html"],
}
settings = Settings()
"""
def copy_dbtdocs_to_gcs():
# jsonファイルをhtmlに埋め込んだverを作成 → index2.html
search_str = 'o=[i("manifest","manifest.json"+t),i("catalog","catalog.json"+t)]'
content_index = files.read_file(settings.dbt_target["dir_path"] + "index.html")
json_manifest = files.read_json(settings.dbt_target["dir_path"] + "manifest.json")
json_catalog = files.read_json(settings.dbt_target["dir_path"] + "catalog.json")
new_str = (
"o=[{label: 'manifest', data: "
+ json.dumps(json_manifest)
+ "},{label: 'catalog', data: "
+ json.dumps(json_catalog)
+ "}]"
)
new_content = content_index.replace(search_str, new_str)
files.write_text(settings.dbt_target["dir_path"] + "index2.html", new_content)
for filename in settings.dbt_target["docs_files"]:
gcs.to_gcs(
bucket_name=settings.dbt_docs_bucket,
filepath=filename,
upload_path=settings.dbt_target["dir_path"] + filename,
)
if __name__ == "__main__":
copy_dbtdocs_to_gcs()
なんか、みなさんからこんな声が聞こえました。
「dbt docs generaate
コマンドで生成される
index.html
, catalog.json
, manifest.json
をGCS
に持っていくだけでよくね?」
私も最初はそう思いました。
しかし、基本的にそれは全公開(allUser閲覧権限開放)をしなくてはいけません。秘密情報満載のメタデータを公開するわけにはいきません。なのでこの方法は却下です。
一方で認証機能をつけて見れるようにする方法もありますが、やりたいことに対してオーバキルすぎます。
で、そこでひらめきました。一つのファイルならば公開設定にしなくとも普通のGCS閲覧権限だけで閲覧することがきます → 一つのファイルに結合すればいいじゃんか!
ということで、2つのjsonファイルの内容をindex.html
に埋め込んでいるのが上記のPythonスクリプトというわけです。
ここではindex2.html
という名前で出力しています。
これの認証済み URL
をクリックすると閲覧できます
(他なにかよりよい方法があったら教えて下さい。)
まとめ
結構書きましたが、
要はコンテナイメージ化して、Batch
かCloudRunJobs
を使えば安くすむよね
というお話でした。
どちらも、スポットでコマンド実行もできるので、いざというときに便利です。
とくに制限はいくつかありますがCloudRunJobs
には無料枠があります。
上述の他にも私は以下実施しています。
- ローカル実行用compose.ymlの作成(Docker Compose V2)、envでBQの権限付与しているサービスアカウントの指定
-
docker compose up
で dbt Cloudに習って、作業者ごとにデータセット(スキーマ)を作る - 環境x層(レイク・ウェアハウス・マート・etc.)毎にデータセット(スキーマ)を自動で分ける
- プルリクがあると CIでdbtのテストを実施するように
Cloud Build
で設定 - etc.
追々この辺も記事にできたらなと思います。
Batch
やCloudRunJobs
を利用しなくともdbt用のコンテナイメージを作っておけば、Airflow
やVertexAIPipeline
など様々な箇所で呼び出しも可能になるので便利かと思います。
総括
あまり課金したくない + BigQuery
を使う場合は、しばらくこれでいいのでは?と思っています。
ばいなら〜
Discussion