🙌
Workflows だけで完結するBigQuery -> Slack通知と定期実行をTerraform管理で
はじめに
- 従来、BigQuery でクエリを発行し、何らかのSlack通知を行う場合は、GASなどを使ったケースが多かったと思います。
- 今回は、Google Cloud の Workflows BigQueryコネクタ と Cloud Scheduler のみで「BigQuery にクエリ発行して結果をスラック通知」を行います。
- これによって、通知の設定が Terraform で完結することになり、スプレッドシートなどより保守が容易になると思います。
- 想定ユースケースとしては、Airflow のようなオーケストレーターを使っていない環境だけど、クエリ発行と通知がしたい、です。
処理の流れ
- Cloud Schedulerが Workflows の Workflow をキック
- Workflows の BigQuery コネクタが BigQuery にクエリを発行して結果取得
- クエリの結果を元に、Slack Webhook用のbodyを構成して、Slack通知
やること
- Slack Webhook Url の取得
- Workflows の設定を記述
- Cloud Scheduler で定期設定
- Terraform で反映
Slack Webhook Url の取得と Secret Manager への設定
- webhook url の取得は公式サイトを参照してください
- webhook url を取得したら、secret manager に格納します
Workflows の設定
- 以下は workflows.yaml の sample です(workflow.yaml.tmpl)
- Terraform の template にしているので、${project_id} などの変数化しています。
- このWorkflowsで実現していることは、対象テーブルのcreated_at をチェックして、1週間以上古くなっていたらmentionをつけて通知します。
- yaml ファイルの基本的な書き方は以下を参照してください。
main:
steps:
- get_slack_webhook:
call: googleapis.secretmanager.v1.projects.secrets.versions.access
args:
name: "projects/${project_id}/secrets/webhook-url/versions/latest"
result: slack_secret
- parse_webhook_url:
assign:
- webhook_url: $${text.decode(base64.decode(slack_secret.payload.data))}
- execute_bigquery:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: "${project_id}"
body:
query: |
SELECT
MAX(created_at) as max_created_at,
MAX(created_at) >= DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) as is_within_7_days
FROM `${project_id}.foobar_dataset.foobar_table`
useLegacySql: false
location: "asia-northeast1"
result: query_result
- prepare_values:
assign:
- max_date: $${query_result.rows[0].f[0].v}
- within_7_days_str: $${query_result.rows[0].f[1].v}
- is_up_to_date: $${within_7_days_str == "true"}
- color: $${if(is_up_to_date, "good", "danger")}
- mention: $${if(is_up_to_date, "", "<!channel> ")}
- send_slack:
call: http.post
args:
url: $${webhook_url}
headers:
Content-Type: application/json
body:
text: $${mention}
attachments:
- color: $${color}
title: "BigQuery Check"
fields:
- title: "max created_at"
value: $${max_date}
short: true
- title: "Within 7 Days"
value: $${within_7_days_str}
short: true
result: slackResponse
少し補足
googleapis.bigquery.v2.jobs.query の戻り値が以下のような形式のため、vとかfがいます
{
"rows": [
{ "f": [ {"v": "2025-10-20"}, {"v": "true"} ] }
]
}
terraformのテンプレート変数から、workflowsに変数のまま(${...}のまま)渡したいので$$のように重ねている部分がある
- prepare_values:
assign:
- max_date: $${query_result.rows[0].f[0].v}
Terraform 設定
先程のworkflow.yaml.tmpl を読み込む設定は以下
resource "google_workflows_workflow" "bigquery_data_check" {
name = "bigquery-check"
project = var.project_id
region = "asia-northeast1"
description = "BigQuery Data Check"
service_account = google_service_account.workflows_user.email
source_contents = templatefile("${path.module}/workflows/workflow.yaml.tmpl", {
project_id = var.project_id
})
}
ほか必要な設定は以下
resource "google_cloud_scheduler_job" "weekly_bigquery_check" {
name = "weekly-bigquery-check"
project = var.project_id
region = "asia-northeast1"
description = "BigQuery Data Check"
schedule = "0 10 * * 1"
time_zone = "Asia/Tokyo"
attempt_deadline = "320s"
http_target {
http_method = "POST"
uri = "https://workflowexecutions.googleapis.com/v1/projects/${var.project_id}/locations/asia-northeast1/workflows/${google_workflows_workflow.bigquery_data_check.name}/executions"
oauth_token {
service_account_email = google_service_account.workflows_user.email
}
}
}
resource "google_service_account" "workflows_user" {
account_id = "workflows-user"
display_name = "workflows_user"
}
resource "google_project_iam_binding" "bigquery_job_user" {
project = var.project_id
role = "roles/bigquery.jobUser"
members = [
"serviceAccount:${google_service_account.workflows_user.email}",
]
}
resource "google_project_iam_binding" "bigquery_data_viewer" {
project = var.project_id
role = "roles/bigquery.dataViewer"
members = [
"serviceAccount:${google_service_account.workflows_user.email}",
]
}
resource "google_project_iam_binding" "logging_config_writer" {
project = var.project_id
role = "roles/logging.configWriter"
members = [
"serviceAccount:${google_service_account.workflows_user.email}",
]
}
resource "google_project_iam_binding" "logging_writer" {
project = var.project_id
role = "roles/logging.logWriter"
members = [
"serviceAccount:${google_service_account.workflows_user.email}",
]
}
resource "google_project_iam_binding" "secret_accessor" {
project = var.project_id
role = "roles/secretmanager.secretAccessor"
members = [
"serviceAccount:${google_service_account.workflows_user.email}",
]
}
resource "google_project_iam_binding" "workflows_editor" {
project = var.project_id
role = "roles/workflows.editor"
members = [
"serviceAccount:${google_service_account.workflows_user.email}",
]
}
Discussion