🔥
dbtとBigQueryでRAGを構築してみよう(その3ーインフラ構築IaC編)
こんにちは、前回に引き続き、RAGを構築していく記事になります。
前回までで大雑把に機能の根幹は実装できましたが、果たしてこれで作ったと言っていいのでしょうか?
いや、ここまでのインフラ周り全てをコードで構築できて初めて作ったと言えるのではないでしょうか。
ということで、今回はインフラ構築のコード化になります。
本日のスコープ
具体的には
- 今までやっていたSQLでの事前準備をdbtのrun-on-startに載せます
- インフラの構築をterraformで実行できるようにします
- dbtの実行をcloud run jobs上でdocker imageをrunする形で実行できるようにします
手順
0.事前準備
まだ事前準備あるの?と思われると思いますがあります
- terraformのインストール
- 今回構築したいprojectへのアクセスがローカルのterraformから実行できるようにしておく
これとかでどうにかなると思います。
1. 事前準備SQLをdbtのrun-on-startに入れ込む
というわけで以下のmacroを作ります
macros/on-run-start/pdf_parser.sql
{% macro pdf_parser() %}
CREATE OR REPLACE MODEL `{{ target.schema }}.pdf_parser`
REMOTE WITH CONNECTION `us.rag_sample_connection`
OPTIONS (remote_service_type = 'CLOUD_AI_VISION_V1')
;
{% endmacro %}
macros/on-run-start/source_object.sql
{% macro source_object() %}
CREATE OR REPLACE EXTERNAL TABLE `{{ target.schema }}.source_object`
WITH CONNECTION `us.rag_sample_connection`
OPTIONS (
object_metadata = 'SIMPLE',
uris = ['gs://{{ env_var('DOCUMENT_BUCKET_NAME') }}/*'],
metadata_cache_mode= 'AUTOMATIC',
max_staleness= INTERVAL 1 HOUR
);
{% endmacro %}
macros/on-run-start/text_generator.sql
{% macro text_generator() %}
CREATE OR REPLACE MODEL `{{ target.schema }}.text_generator`
REMOTE WITH CONNECTION `us.rag_sample_connection`
OPTIONS (
REMOTE_SERVICE_TYPE = 'CLOUD_AI_LARGE_LANGUAGE_MODEL_V1',
ENDPOINT='text-bison-32k')
;
{% endmacro %}
macros/on-run-start/vector_maker.sql
{% macro vector_maker() %}
CREATE OR REPLACE MODEL `{{ target.schema }}.vector_maker`
REMOTE WITH CONNECTION `us.rag_sample_connection`
OPTIONS (
REMOTE_SERVICE_TYPE = 'CLOUD_AI_TEXT_EMBEDDING_MODEL_V1',
ENDPOINT='textembedding-gecko-multilingual');
{% endmacro %}
でprojectの設定に以下を追記しましょう
dbt_project.yml
on-run-start:
- '{{pdf_parser()}}'
- '{{source_object()}}'
- '{{text_generator()}}'
- '{{vector_maker()}}'
はいこれで完了です、dbtが回る度SQLが実行されちゃうのでちょっと微妙ですが自動化の恩恵に比べれば大したことはありませんね多分きっと(一応課金はされないはずです)
2. terraformコード化
それでは一気にterraform使ってインフラのコード化していきます
以下のtfファイルを作ってください
main.tf
terraform {
backend "gcs" {} # stateを管理するバケット名は起動時にコマンドラインから渡す
required_providers {
google = {
source = "hashicorp/google"
version = "> 5.0.0" # 恐れず上げていく
}
google-beta = {
source = "hashicorp/google-beta"
version = "> 5.0.0"
}
}
}
provider "google" {
region = "us-central1"
zone = "us-central1-a"
default_labels = {
managedby = "terraform"
}
}
provider "google-beta" {
region = "us-central1"
zone = "us-central1-a"
default_labels = {
managedby = "terraform"
}
}
bigquery.tf
locals {
connection-user-roles = [
{
role = "roles/serviceusage.serviceUsageConsumer"
}
]
}
resource "google_bigquery_dataset" "rag-sample" {
dataset_id = "rag_sample"
project = local.rag-project[var.target_env].name
location = "US"
max_time_travel_hours = "168"
access {
role = "OWNER"
special_group = "projectOwners"
}
access {
role = "READER"
special_group = "projectReaders"
}
access {
role = "WRITER"
special_group = "projectWriters"
}
access {
role = "WRITER"
user_by_email = google_service_account.app.email
}
}
resource "google_bigquery_connection" "rag_sample_connection" {
project = local.rag-project[var.target_env].name
connection_id = "rag_sample_connection"
location = "US"
description = "rag_sample_connection"
cloud_resource {}
}
resource "google_project_iam_member" "connection-user-roles" {
for_each = { for i in local.connection-user-roles : i.role => i }
project = local.rag-project[var.target_env].name
role = each.value.role
member = "serviceAccount:${google_bigquery_connection.rag_sample_connection.cloud_resource[0].service_account_id}"
}
コネクションの作成は事前準備でやってましたが、terraformに入れてます
cloud_run_jobs.tf
resource "google_cloud_run_v2_job" "bq-dbt-rag" {
project = local.rag-project[var.target_env].name
name = "bq-dbt-rag"
launch_stage = "GA"
location = "asia-northeast1"
template {
parallelism = 1
task_count = 1
template {
containers {
image = "asia-northeast1-docker.pkg.dev/${local.rag-project[var.target_env].name}/app/bq-dbt-rag:latest"
command = ["/bin/bash"]
args = ["-c", "bash dbt_scripts/dbt_build.sh"]
resources {
limits = {
cpu = "1"
memory = "1Gi"
}
}
env {
name = "DBT_TARGET_PATH"
value = "dbt_target"
}
env {
name = "ENV" # 実行環境
value = var.target_env
}
env {
name = "DEPLOY_PROJECT"
value = local.rag-project[var.target_env].name
}
env {
name = "DOCUMENT_BUCKET_NAME"
value = google_storage_bucket.rag-sample.name
}
}
timeout = "36000s" # 10h
service_account = google_service_account.app.email
execution_environment = "EXECUTION_ENVIRONMENT_GEN2"
max_retries = 0
}
}
}
common_variables.tf
variable "target_env" {
type = string
validation {
condition = var.target_env == "prod" || var.target_env == "dev"
error_message = "Invalid target_env"
}
}
locals {
rag-project = {
prod = {
name = "" # 動かす時に設定してください
default_region = "us-central1"
default_zone = "us-central1-a"
}
dev = {
name = "" # 動かす時に設定してください
default_region = "us-central1"
default_zone = "us-central1-a"
}
}
}
gcs.tf
locals {
rag-sample-bucket-name = "${local.rag-project[var.target_env].name}--rag-sample"
bucket-roles--app = [
{
role = "roles/storage.legacyObjectReader"
},
{
role = "roles/storage.legacyBucketReader"
}
]
bucket-roles--connection = [
{
role = "roles/storage.objectViewer"
},
{
role = "roles/storage.legacyBucketReader"
}
]
}
resource "google_storage_bucket" "rag-sample" {
name = local.rag-sample-bucket-name
project = local.rag-project[var.target_env].name
location = "US"
force_destroy = true
lifecycle_rule {
condition {
age = 3
}
action {
type = "Delete"
}
}
public_access_prevention = "enforced"
uniform_bucket_level_access = true
}
resource "google_storage_bucket_iam_member" "roles--app" {
for_each = { for i in local.bucket-roles--app : i.role => i }
bucket = google_storage_bucket.rag-sample.name
role = each.value.role
member = "serviceAccount:${google_service_account.app.email}"
}
resource "google_storage_bucket_iam_member" "roles--connection" {
for_each = { for i in local.bucket-roles--connection : i.role => i }
bucket = google_storage_bucket.rag-sample.name
role = each.value.role
member = "serviceAccount:${google_bigquery_connection.rag_sample_connection.cloud_resource[0].service_account_id}"
depends_on = [google_bigquery_connection.rag_sample_connection]
}
service_account.tf
locals {
app-roles = [
{
"role" : "roles/bigquery.jobUser"
},
{
"role" : "roles/bigquery.connectionAdmin"
}
]
}
resource "google_service_account" "app" {
account_id = "app-sa"
display_name = "app-sa"
project = local.rag-project[var.target_env].name
}
resource "google_project_iam_member" "app-sa-role" {
for_each = { for i in local.app-roles : i.role => i }
project = local.rag-project[var.target_env].name
role = each.value.role
member = "serviceAccount:${google_service_account.app.email}"
}
以上です
3. dbtの起動をcloud run jobsにやらせる
dbtの実行は
- dbt cloud
- airflow -> composer
- cloud run jobs
あたりにやらせたいですね?
というわけで簡単にできるcloud run jobsにやらせましょう
というわけでdbtで動かしたいprofileを作ります
bq_rag:
outputs:
dev:
dataset: rag_sample
job_execution_timeout_seconds: 3600
job_retries: 1
location: US
method: oauth
priority: interactive
project: "{{ env_var('DEPLOY_PROJECT') }}"
threads: 4
type: bigquery
target: "{{ env_var('ENV') }}"
次にdockerで動かしたいスクリプトやpython環境設定を追記します
pyproject.toml
[tool.poetry]
name = "bq-dbt-rag-sample"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "3.10.13"
dbt-bigquery = "^1.7.4"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
scripts/dbt_build.sh
#!/bin/bash
set -ue -o pipefail
readonly SCRIPT=${BASH_SOURCE[0]##*/}
################################################################################
# エラーハンドリング
################################################################################
function err() {
# Usage: trap 'err ${LINENO[0]} ${FUNCNAME[1]}' ERR
status=$?
lineno=$1
err_str="ERROR: [`date +'%Y-%m-%d %H:%M:%S'`] ${SCRIPT}: dbt returned non-zero exit status ${status} at line ${lineno}"
echo ${err_str}
}
################################################################################
# メイン処理
################################################################################
trap 'err ${LINENO[0]}' ERR # エラー時
echo "`date +'%Y-%m-%d %H:%M:%S'` Started."
dbt build --project-dir ./ --profiles-dir ./ "$@" 2>&1
set -e
echo "`date +'%Y-%m-%d %H:%M:%S'` Finished."
次に動かしたいdocker imageを作成します
Dockerfile
FROM python:3.10.13-slim-bookworm as python-base
ENV PYTHONIOENCODING=utf-8 \
PYTHONUNBUFFERED=1 \
PYTHONDONTWRITEBYTECODE=1 \
PIP_NO_CACHE_DIR=off \
PIP_DISABLE_PIP_VERSION_CHECK=on \
PIP_DEFAULT_TIMEOUT=120 \
POETRY_HOME="/opt/poetry" \
POETRY_VIRTUALENVS_IN_PROJECT=true \
POETRY_NO_INTERACTION=1 \
PYSETUP_PATH="/opt/pysetup" \
VENV_PATH="/opt/pysetup/.venv"
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"
FROM python-base as builder-base
RUN apt-get update \
&& apt-get install --no-install-recommends -y \
curl \
build-essential
RUN curl -sSL https://install.python-poetry.org | python3 -
WORKDIR $PYSETUP_PATH
COPY ./pyproject.toml ./
RUN poetry install
FROM builder-base as production
COPY $PYSETUP_PATH $PYSETUP_PATH
WORKDIR /app
ENV LANG=C.UTF-8 \
TZ='Asia/Tokyo'
COPY ./analyses /app/analyses
COPY ./profiles.yml /app/profiles.yml
COPY ./dbt_project.yml /app/dbt_project.yml
COPY ./macros /app/macros
COPY ./models /app/dbt_models
COPY ./scripts /app/scripts
COPY ./seeds /app/seeds
COPY ./snapshots /app/snapshots
COPY ./tests /app/tests
WORKDIR /app
次にgarにimageを登録するためのcloudbuildの設定を書きます
cloudbuild.yaml
steps:
# dbtを実行するためのDockerfileのコンテナイメージのビルド
- name: "gcr.io/cloud-builders/docker"
args:
[
"build",
"-t",
"asia-northeast1-docker.pkg.dev/$_DEPLOY_PROJECT/app/bq-dbt-rag:latest",
".",
]
- name: "gcr.io/cloud-builders/docker"
args: ["push", "asia-northeast1-docker.pkg.dev/$_DEPLOY_PROJECT/app/bq-dbt-rag"]
images:
- "asia-northeast1-docker.pkg.dev/$_DEPLOY_PROJECT/app/bq-dbt-rag:latest"
options:
logging: CLOUD_LOGGING_ONLY
dynamic_substitutions: true
最後にローカルでcloudbuildを起動するスクリプトを書いて実行します
local.sh
# ローカルで行う構築手順
export DEPLOY_PROJECT="プロジェクト名"
export BACKEND_GCS_BUCKET="tfstate入れるbucket名"
gcloud builds submit \
--region=asia-northeast1 \
--config cloudbuild.yaml \
--project=${DEPLOY_PROJECT} \
--substitutions _DEPLOY_PROJECT=${DEPLOY_PROJECT}
terraform init -backend-config="bucket=${BACKEND_GCS_BUCKET}"
terraform plan
terraform apply -auto-approve
とても長いように見えますが、終わってしまえば一瞬ですね。
これでcloud run jobs上でdbtが実行されます
Discussion