🎃

Terraformで、Cloud Pub/Sub -> Cloud Functions -> BigQuery Jobの流れを作る

2024/08/30に公開

はじめに

  • メモ書き程度に残してあります。
  • 汚いのはご了承ください😓

環境

  • Terraform
  • Cloud Pub/Sub
  • Cloud Functions
  • BigQuery
  • Node.js

前提知識

Cloud Pub/Sub スキーマについて

https://cloud.google.com/pubsub/docs/schemas?hl=ja

  • Pub/Sub のトピックに流れるメッセージのスキーマは、Avro か プロトコルバッファという、フレームワークで定義する必要があるらしい
  • 今回は、見た目がJSONで扱いやすそうなAvroを使ってみる

Cloud Function経由で BigQuery のクエリを、動的に実行する方法

  • 最初は Cloud Functions から BigQuery Job を呼び出す方法を探していたが、結局見つからず
  • 結論、Cloud Function 内で BigQuery のクエリを定義・実行することに

BigQueryのクエリに変数を渡す方法

  • NAMED(@をつける) というのと、PARAMETERS というのがある
  • 今回は、NAMED を使ってみる
# ディレクトリ構成
.
├── main.tf
└── schemas
   └── schema.avsc
# main.tf
resource "google_pubsub_schema" "sample_pubsub_schema" {
  name       = "sample-pubsub-schema"
  type       = "AVRO"
  definition = file("schemas/schema.avsc")
}

resource "google_pubsub_topic" "sample" {
  name       = "sample"
  depends_on = [google_pubsub_schema.sample_pubsub_schema]
  schema_settings {
    schema   = google_pubsub_schema.sample_pubsub_schema.id
    encoding = "JSON"
  }
}

# schemas/schema.avsc(Avroの拡張子はavsc)

{
  "type": "record",
  "name": "Avro",
  "fields": [
    {
      "name": "userId",
      "type": "string"
    },
    {
      "name": "message",
      "type": "string"
    }
  ]
}

  • ここまで行ければ、あとはTerraformで反映すれば、スキーマ
$ terraform apply

# スキーマに合うメッセージを送れる
$ gcloud pubsub topics publish voter-point-aggregation --message='{"userId": "1", "message": "hello" }'
# messageIds:
# - 'XXX'

# もしスキーマに合わないデータを送信したら、こんなエラーが出る
$ gcloud pubsub topics publish voter-point-aggregation --message='hello'
#ERROR: (gcloud.pubsub.topics.publish) INVALID_ARGUMENT: Invalid data in message: Message failed schema validation.

Cloud Pub/Sub のメッセージをトリガーとして、Cloud Functions を実行する場合

  • データを受け取る方法は以下の通り
const functions = require('@google-cloud/functions-framework')

functions.cloudEvent('sample', async (event) => {
  // データは base64 で飛んでくるので、一旦文字列に変換する
  const decodedString = Buffer.from(event.data.message.data, 'base64').toString('utf-8')
  // 文字列をJSONに変換する
  const jsonData = JSON.parse(decodedString)

  // あとは JSON として扱える
  console.log('data', jsonData)
})
GitHubで編集を提案

Discussion