🤼

BigQuery Data TransferとS3の連携をTerraformで実装する

2022/10/13に公開

背景

ゴリゴリ系エンジニアの開祖、pageoです。
アプリケーションのインフラをAWS上でホストし、データ分析基盤をGCPでホストするマルチクラウド構成が流行っていますね。(Azureは知らん)
自分も最近AWSからGCPへデータ連携する機構の実装を担当したので、備忘録的に実装方法を整理したいと思います。

前提

今回実装するのは以下です。

S3 Bucket内にあるファイル(Parquet形式)を、Big Query Data Transfer Service(以後、DTSと呼ぶ)により定期的にBigQuery Tableに連携するデータ連携システムを実装します。

Githubにサンプルコードを置いているので、全ての実装を確認したい人は参照ください(スター頂けるとpageo喜びます)
https://github.com/page-o/data-transfer-sample

構成

ディレクトリ構成は以下になります。


+- data-transfer-sample/
    |
    +---- shared/  
    |       +---- main.tf
    |       +---- provider.tf
    |       +---- variables.tf
    |       +---- modules/
    |       |      +---- s3/
    |       |            +----- bucket.tf
    |       |            +----- object.tf
    |       |            +----- outputs.tf
    |       |            +----- variables.tf
    |       |      +---- dts/
    |       |            +----- dts.tf
    |       |            +----- outputs.tf
    |       |            +----- variables.tf
    +---- env/
    |       +---- sample/
    |       |      +----- backend.tf
    |       |      +----- locals.tf
    |       |      +----- terraform.tfvars  Create by Yourself
    |       |      +----- main.tf           Symbolic link to /shared/main.tf
    |       |      +----- provider.tf       Symbolic link to /shared/provider.tf
    |       |      +----- variables.tf      Symbolic link to /shared/variables.tf
    +---- templates/
    |       +---- bucket/
    |              +----- hoge.parquet  Sample Empty File
    |              +----- fuga.parquet  Sample Empty File
    +---- backend/
            +---- terraform_backend.yaml

DRY原則に基づくディレクトリ構成にしています。
Terraformにおけるディレクトリ構成のベストプラクティスについては、以下の記事で詳しく説明しているので参照ください。(ハート頂けるとpageo喜びます)
https://zenn.dev/pageo/articles/d90d89e2168061

実装

まず/env/sample/backend.tfなのですが、
今回実装するシステムはAWSとGCPを使用するマルチクラウドなので、required_providersにはawsgoogleのProviderに関する情報を記載します

/env/sample/backend.tf
terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "~> 4.0"
    }
    google = {
      source  = "hashicorp/google"
      version = "~> 4.0"
    }
  }

  backend "s3" {
    bucket         = "sample-tfstate-bucket"
    key            = "sample.tfstate"
    dynamodb_table = "sample-tfstate-table"
    region         = "ap-northeast-1"
  }
}


S3の実装についてはWeb上に実装例が豊富だと思うので、次にDTSについて説明します
(Githubに実装例を挙げているので、S3の実装が気になる方はこちらを参照ください)
まず1~5行目の部分ですが、/shared/provider.tfでDefault Providerをawsに指定しているので、GCPのサービスであるDTSのmoduleではproviderをgoogleで上書きしています。

次に本題のgoogle_bigquery_data_transfer_configresourceの部分(24~44行目)についてです。
data_source_idパラメータなのですが、公式APIドキュメントを漁ってもS3をデータソースとする場合の記載例がありませんでした。(AWSではこんなことありえないんだけどなあ、、)
ここは他の方が既に解決されているみたいで、amazon_s3とするのが正解みたいです。

paramsパラメータの記載方法ですが、ここもTerraform公式ドキュメントなどに実装例がないので、GCP公式ドキュメントやコンソールの情報を見つつ、先人たちのコード例を参考に実装します
先人たちに感謝感謝

/shared/modules/dts/dts.tf
provider "google" {
  credentials = file(var.credential_file_path)
  project     = var.gcp_project_id
  region      = var.gcp_region
}

resource "google_bigquery_dataset" "main" {
  dataset_id    = var.dataset.name
  friendly_name = var.dataset.name
  description   = var.dataset.description
  location      = var.gcp_region
}

resource "google_bigquery_table" "main" {
  count = length(var.tables)

  dataset_id          = google_bigquery_dataset.main.dataset_id
  table_id            = element(var.tables.*.name, count.index)
  friendly_name       = element(var.tables.*.name, count.index)
  description         = element(var.tables.*.description, count.index)
  deletion_protection = false
}

resource "google_bigquery_data_transfer_config" "main" {
  count = length(var.data_transfers)

  display_name           = element(var.data_transfers.*.name, count.index)
  location               = var.gcp_region
  data_source_id         = "amazon_s3"
  schedule               = var.transfer_schedule
  destination_dataset_id = google_bigquery_dataset.main.dataset_id
  params = {
    destination_table_name_template = element(var.data_transfers.*.table_name, count.index)
    data_path                       = element(var.data_transfers.*.data_path, count.index)
    access_key_id                   = var.access_key_id
    secret_access_key               = var.secret_access_key
    file_format                     = "PARQUET"
    write_disposition               = "WRITE_TRUNCATE"
    max_bad_records                 = 0
  }
  email_preferences {
    enable_failure_email = true
  }
}

一応/shared/modules/dts/variables.tf/shared/main.tfも載せときます

/shared/modules/dts/variables.tf
variable "credential_file_path" {
  type = string
}

variable "gcp_project_id" {
  type = string
}

variable "gcp_region" {
  type    = string
  default = "asia-northeast1"
}

variable "access_key_id" {
  type = string
}

variable "secret_access_key" {
  type = string
}

variable "dataset" {
  type = object({
    name        = string
    description = string
  })
}

variable "tables" {
  type = list(object({
    name        = string
    description = string
  }))
}

variable "data_transfers" {
  type = list(object({
    name       = string
    table_name = string
    data_path  = string
  }))
}

variable "transfer_schedule" {
  type = string
}
/shared/main.tf
module "bucket" {
  source = "../../shared/modules/s3"

  bucket = {
    name = "${local.env}-${local.project}-bucket"
  }
  objects = [
    {
      key    = local.object_key.hoge
      source = local.object_source.hoge
    },
    {
      key    = local.object_key.fuga
      source = local.object_source.fuga
    }
  ]
}

module "dts" {
  source = "../../shared/modules/dts"

  credential_file_path = sensitive(local.credential_file_path)
  gcp_project_id       = sensitive(local.gcp_project_id)
  access_key_id        = sensitive(local.access_key_id)
  secret_access_key    = sensitive(local.secret_access_key)
  dataset = {
    name        = "${local.env}-${local.project}-dataset"
    description = "${local.env} ${local.project} Dataset"
  }
  tables = [
    {
      name        = "hoge"
      description = "hoge table"
    },
    {
      name        = "fuga"
      description = "fuga table"
    }
  ]
  data_transfers = [
    {
      name       = "hoge-dts"
      table_name = "hoge"
      data_path  = "s3://${module.bucket.name}/${local.object_key.hoge}"
    },
    {
      name       = "fuga-dts"
      table_name = "fuga"
      data_path  = "s3://${module.bucket.name}/${local.object_key.fuga}"
    }
  ]
  transfer_schedule = local.transfer_schedule
}

最後に

何気にGCPリソースをTerraformで実装するのは初だったのですが、APIドキュメントや各サービスの公式情報が少なすぎてかなり驚いています。
AWSが如何にドキュメント類をしっかり整備しているのかが、GCPを通して理解できました。

今回の学び「AWS偉大

https://github.com/page-o/data-transfer-sample

参考

https://medium.com/@edonosotti/importing-big-data-in-bigquery-with-data-transfer-service-and-terraform-dc55fdfdc821
https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/bigquery_data_transfer_config
https://qiita.com/gooos/items/c0ceda6c277dbaa42887
https://zenn.dev/marusho/scraps/a02a03e2286882

Discussion