🔥

dbtとBigQueryでRAGを構築してみよう(その3ーインフラ構築IaC編)

2024/02/21に公開

こんにちは、前回に引き続き、RAGを構築していく記事になります。

前回までで大雑把に機能の根幹は実装できましたが、果たしてこれで作ったと言っていいのでしょうか?
いや、ここまでのインフラ周り全てをコードで構築できて初めて作ったと言えるのではないでしょうか。

ということで、今回はインフラ構築のコード化になります。

本日のスコープ

具体的には

  1. 今までやっていたSQLでの事前準備をdbtのrun-on-startに載せます
  2. インフラの構築をterraformで実行できるようにします
  3. dbtの実行をcloud run jobs上でdocker imageをrunする形で実行できるようにします

手順

0.事前準備

まだ事前準備あるの?と思われると思いますがあります

  1. terraformのインストール
  2. 今回構築したいprojectへのアクセスがローカルのterraformから実行できるようにしておく

https://dev.classmethod.jp/articles/accesse-google-cloud-with-terraform/
これとかでどうにかなると思います。

1. 事前準備SQLをdbtのrun-on-startに入れ込む

というわけで以下のmacroを作ります

macros/on-run-start/pdf_parser.sql
{% macro pdf_parser() %}

CREATE OR REPLACE MODEL `{{ target.schema }}.pdf_parser`
REMOTE WITH CONNECTION `us.rag_sample_connection`
OPTIONS (remote_service_type = 'CLOUD_AI_VISION_V1')
;

{% endmacro %}
macros/on-run-start/source_object.sql
{% macro source_object() %}

CREATE OR REPLACE EXTERNAL TABLE `{{ target.schema }}.source_object`
WITH CONNECTION `us.rag_sample_connection`
OPTIONS (
  object_metadata = 'SIMPLE',
  uris = ['gs://{{ env_var('DOCUMENT_BUCKET_NAME') }}/*'],
  metadata_cache_mode= 'AUTOMATIC',
  max_staleness= INTERVAL 1 HOUR
);

{% endmacro %}
macros/on-run-start/text_generator.sql
{% macro text_generator() %}

CREATE OR REPLACE MODEL `{{ target.schema }}.text_generator`
REMOTE WITH CONNECTION `us.rag_sample_connection`
OPTIONS (
  REMOTE_SERVICE_TYPE = 'CLOUD_AI_LARGE_LANGUAGE_MODEL_V1', 
  ENDPOINT='text-bison-32k')
;

{% endmacro %}
macros/on-run-start/vector_maker.sql
{% macro vector_maker() %}

CREATE OR REPLACE MODEL `{{ target.schema }}.vector_maker`
REMOTE WITH CONNECTION `us.rag_sample_connection`
OPTIONS (
    REMOTE_SERVICE_TYPE = 'CLOUD_AI_TEXT_EMBEDDING_MODEL_V1', 
    ENDPOINT='textembedding-gecko-multilingual');

{% endmacro %}

でprojectの設定に以下を追記しましょう

dbt_project.yml
on-run-start:
  - '{{pdf_parser()}}'
  - '{{source_object()}}'
  - '{{text_generator()}}'
  - '{{vector_maker()}}'

はいこれで完了です、dbtが回る度SQLが実行されちゃうのでちょっと微妙ですが自動化の恩恵に比べれば大したことはありませんね多分きっと(一応課金はされないはずです)

2. terraformコード化

それでは一気にterraform使ってインフラのコード化していきます
以下のtfファイルを作ってください

main.tf
terraform {
  backend "gcs" {} # stateを管理するバケット名は起動時にコマンドラインから渡す

  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "> 5.0.0" # 恐れず上げていく
    }
    google-beta = {
      source  = "hashicorp/google-beta"
      version = "> 5.0.0"
    }
  }
}

provider "google" {
  region = "us-central1"
  zone   = "us-central1-a"
  default_labels = {
    managedby = "terraform"
  }
}

provider "google-beta" {
  region = "us-central1"
  zone   = "us-central1-a"
  default_labels = {
    managedby = "terraform"
  }
}

bigquery.tf
locals {
  connection-user-roles = [
    {
      role = "roles/serviceusage.serviceUsageConsumer"
    }
  ]
}

resource "google_bigquery_dataset" "rag-sample" {
  dataset_id            = "rag_sample"
  project               = local.rag-project[var.target_env].name
  location              = "US"
  max_time_travel_hours = "168"

  access {
    role          = "OWNER"
    special_group = "projectOwners"
  }

  access {
    role          = "READER"
    special_group = "projectReaders"
  }

  access {
    role          = "WRITER"
    special_group = "projectWriters"
  }

  access {
    role          = "WRITER"
    user_by_email = google_service_account.app.email
  }
}

resource "google_bigquery_connection" "rag_sample_connection" {
  project       = local.rag-project[var.target_env].name
  connection_id = "rag_sample_connection"
  location      = "US"
  description   = "rag_sample_connection"
  cloud_resource {}
}

resource "google_project_iam_member" "connection-user-roles" {
  for_each = { for i in local.connection-user-roles : i.role => i }
  project  = local.rag-project[var.target_env].name
  role     = each.value.role
  member   = "serviceAccount:${google_bigquery_connection.rag_sample_connection.cloud_resource[0].service_account_id}"
}

コネクションの作成は事前準備でやってましたが、terraformに入れてます

cloud_run_jobs.tf
resource "google_cloud_run_v2_job" "bq-dbt-rag" {
  project      = local.rag-project[var.target_env].name
  name         = "bq-dbt-rag"
  launch_stage = "GA"
  location     = "asia-northeast1"

  template {
    parallelism = 1
    task_count  = 1

    template {

      containers {
        image   = "asia-northeast1-docker.pkg.dev/${local.rag-project[var.target_env].name}/app/bq-dbt-rag:latest"
        command = ["/bin/bash"]
        args    = ["-c", "bash dbt_scripts/dbt_build.sh"]

        resources {
          limits = {
            cpu    = "1"
            memory = "1Gi"
          }
        }
        env {
          name  = "DBT_TARGET_PATH"
          value = "dbt_target"
        }
        env {
          name  = "ENV" # 実行環境
          value = var.target_env
        }
        env {
          name  = "DEPLOY_PROJECT"
          value = local.rag-project[var.target_env].name
        }
        env {
          name  = "DOCUMENT_BUCKET_NAME"
          value = google_storage_bucket.rag-sample.name
        }
      }

      timeout               = "36000s" # 10h
      service_account       = google_service_account.app.email
      execution_environment = "EXECUTION_ENVIRONMENT_GEN2"
      max_retries           = 0
    }
  }
}

common_variables.tf
variable "target_env" {
  type = string

  validation {
    condition     = var.target_env == "prod" || var.target_env == "dev"
    error_message = "Invalid target_env"
  }
}

locals {
  rag-project = {
    prod = {
      name           = "" # 動かす時に設定してください
      default_region = "us-central1"
      default_zone   = "us-central1-a"
    }
    dev = {
      name           = "" # 動かす時に設定してください
      default_region = "us-central1"
      default_zone   = "us-central1-a"
    }
  }
}

gcs.tf
locals {
  rag-sample-bucket-name = "${local.rag-project[var.target_env].name}--rag-sample"

  bucket-roles--app = [
    {
      role = "roles/storage.legacyObjectReader"
    },
    {
      role = "roles/storage.legacyBucketReader"
    }
  ]
  bucket-roles--connection = [
    {
      role = "roles/storage.objectViewer"
    },
    {
      role = "roles/storage.legacyBucketReader"
    }
  ]
}

resource "google_storage_bucket" "rag-sample" {
  name          = local.rag-sample-bucket-name
  project       = local.rag-project[var.target_env].name
  location      = "US"
  force_destroy = true

  lifecycle_rule {
    condition {
      age = 3
    }
    action {
      type = "Delete"
    }
  }

  public_access_prevention = "enforced"

  uniform_bucket_level_access = true
}

resource "google_storage_bucket_iam_member" "roles--app" {
  for_each = { for i in local.bucket-roles--app : i.role => i }
  bucket   = google_storage_bucket.rag-sample.name
  role     = each.value.role
  member   = "serviceAccount:${google_service_account.app.email}"
}

resource "google_storage_bucket_iam_member" "roles--connection" {
  for_each = { for i in local.bucket-roles--connection : i.role => i }
  bucket   = google_storage_bucket.rag-sample.name
  role     = each.value.role
  member   = "serviceAccount:${google_bigquery_connection.rag_sample_connection.cloud_resource[0].service_account_id}"

  depends_on = [google_bigquery_connection.rag_sample_connection]
}

service_account.tf
locals {
  app-roles = [
    {
      "role" : "roles/bigquery.jobUser"
    },
    {
      "role" : "roles/bigquery.connectionAdmin"
    }
  ]
}

resource "google_service_account" "app" {
  account_id   = "app-sa"
  display_name = "app-sa"
  project      = local.rag-project[var.target_env].name
}

resource "google_project_iam_member" "app-sa-role" {
  for_each = { for i in local.app-roles : i.role => i }
  project  = local.rag-project[var.target_env].name
  role     = each.value.role
  member   = "serviceAccount:${google_service_account.app.email}"
}

以上です

3. dbtの起動をcloud run jobsにやらせる

dbtの実行は

  1. dbt cloud
  2. airflow -> composer
  3. cloud run jobs

あたりにやらせたいですね?
というわけで簡単にできるcloud run jobsにやらせましょう

というわけでdbtで動かしたいprofileを作ります

bq_rag:
  outputs:
    dev:
      dataset: rag_sample
      job_execution_timeout_seconds: 3600
      job_retries: 1
      location: US
      method: oauth
      priority: interactive
      project: "{{ env_var('DEPLOY_PROJECT') }}"
      threads: 4
      type: bigquery
  target: "{{ env_var('ENV') }}"

次にdockerで動かしたいスクリプトやpython環境設定を追記します

pyproject.toml

[tool.poetry]
name = "bq-dbt-rag-sample"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "3.10.13"
dbt-bigquery = "^1.7.4"


[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

scripts/dbt_build.sh
#!/bin/bash
set -ue -o pipefail

readonly SCRIPT=${BASH_SOURCE[0]##*/}

################################################################################
# エラーハンドリング
################################################################################
function err() {
  # Usage: trap 'err ${LINENO[0]} ${FUNCNAME[1]}' ERR
  status=$?
  lineno=$1
  err_str="ERROR: [`date +'%Y-%m-%d %H:%M:%S'`] ${SCRIPT}: dbt returned non-zero exit status ${status} at line ${lineno}"
  echo ${err_str} 
}

################################################################################
# メイン処理
################################################################################
trap 'err ${LINENO[0]}' ERR # エラー時

echo "`date +'%Y-%m-%d %H:%M:%S'` Started."

dbt build --project-dir ./ --profiles-dir ./ "$@" 2>&1

set -e

echo "`date +'%Y-%m-%d %H:%M:%S'` Finished."



次に動かしたいdocker imageを作成します

Dockerfile
FROM python:3.10.13-slim-bookworm as python-base

ENV PYTHONIOENCODING=utf-8 \
    PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PIP_NO_CACHE_DIR=off \
    PIP_DISABLE_PIP_VERSION_CHECK=on \
    PIP_DEFAULT_TIMEOUT=120 \
    POETRY_HOME="/opt/poetry" \
    POETRY_VIRTUALENVS_IN_PROJECT=true \
    POETRY_NO_INTERACTION=1 \
    PYSETUP_PATH="/opt/pysetup" \
    VENV_PATH="/opt/pysetup/.venv" 
ENV PATH="$POETRY_HOME/bin:$VENV_PATH/bin:$PATH"


FROM python-base as builder-base

RUN apt-get update \
    && apt-get install --no-install-recommends -y \
    curl \
    build-essential
RUN curl -sSL https://install.python-poetry.org | python3 -
WORKDIR $PYSETUP_PATH
COPY ./pyproject.toml ./
RUN poetry install


FROM builder-base as production
COPY --from=builder-base $PYSETUP_PATH $PYSETUP_PATH
WORKDIR /app

ENV LANG=C.UTF-8 \
    TZ='Asia/Tokyo'

COPY ./analyses /app/analyses
COPY ./profiles.yml /app/profiles.yml
COPY ./dbt_project.yml /app/dbt_project.yml
COPY ./macros /app/macros
COPY ./models /app/dbt_models
COPY ./scripts /app/scripts
COPY ./seeds /app/seeds
COPY ./snapshots /app/snapshots
COPY ./tests /app/tests

WORKDIR /app

次にgarにimageを登録するためのcloudbuildの設定を書きます

cloudbuild.yaml
steps:
  # dbtを実行するためのDockerfileのコンテナイメージのビルド
  - name: "gcr.io/cloud-builders/docker"
    args:
      [
        "build",
        "-t",
        "asia-northeast1-docker.pkg.dev/$_DEPLOY_PROJECT/app/bq-dbt-rag:latest",
        ".",
      ]
  - name: "gcr.io/cloud-builders/docker"
    args: ["push", "asia-northeast1-docker.pkg.dev/$_DEPLOY_PROJECT/app/bq-dbt-rag"]
images:
  - "asia-northeast1-docker.pkg.dev/$_DEPLOY_PROJECT/app/bq-dbt-rag:latest"
options:
  logging: CLOUD_LOGGING_ONLY
  dynamic_substitutions: true

最後にローカルでcloudbuildを起動するスクリプトを書いて実行します

local.sh
# ローカルで行う構築手順

export DEPLOY_PROJECT="プロジェクト名"
export BACKEND_GCS_BUCKET="tfstate入れるbucket名"

gcloud builds submit \
  --region=asia-northeast1 \
  --config cloudbuild.yaml \
  --project=${DEPLOY_PROJECT} \
  --substitutions _DEPLOY_PROJECT=${DEPLOY_PROJECT}

terraform init -backend-config="bucket=${BACKEND_GCS_BUCKET}"

terraform plan

terraform apply -auto-approve

とても長いように見えますが、終わってしまえば一瞬ですね。

これでcloud run jobs上でdbtが実行されます

ちゅらデータ株式会社

Discussion