😇

data transfer configでシャーディングしつつbigqueryに投入する

2023/10/28に公開

背景

s3からbigqueryにパーティション切りながらデータ投入できるかなーと思っていろいろ調べていたら
最終的にシャーディングテーブルとして投入できたのでメモしておきます

なお、シャーディング・パーティショニングについては以下の記事をどうぞ
https://qiita.com/elyunim26/items/d5e989e6cb6415ca8431

Caveat

  • terraformを抜粋してを貼り付けるので不完全です

概要

  1. S3からbigqueryへeltする
    1. 取り込みデータは s3://hoge/table_yyyy-mm-dd/ 日毎に分けられている
  2. bigquery_data_transfer_configを利用する
  3. cloudworkflowsで全体を管理する
    1. data_transfer_configではシャーディングを切りながら投入できなかった
    2. ので、prehookとしてemptyテーブルを作成している

内容

シャーディングの形式でテーブルをあらかじめ作る
なお、_20000202の部分が重要です、これでシャーディングテーブルとして認識されます

resource "google_bigquery_table" "this" {
  dataset_id = google_bigquery_dataset.raw.dataset_id
  # シャーディング用
  table_id            = "data_20000202"
  deletion_protection = false
  schema              = file("${path.module}/bigquery_schema/DB_NAME/TABLE_NAME.json")
}

data transfer configでrun_timeを利用して取り込み元と投入先のテーブルを動的に指定する。

resource "google_bigquery_data_transfer_config" "this" {
  display_name             = "this"
  location                 = var.gcp_region
  data_refresh_window_days = 0
  data_source_id           = "amazon_s3"
  schedule                 = "every sunday 20:00" # UTC
  destination_dataset_id   = google_bigquery_dataset.this.dataset_id
  disabled                 = false
  params = {
    access_key_id                   = var.aws_access_key_id
    destination_table_name_template = "${google_bigquery_table.this.table_id}_{run_time+9h|\"%Y%m%d\"}"
    data_path                       = "s3://BUCKET_NAME/DB_NAME/TABLE_NAME_{run_time+9h|\"%Y%m%d\"}/*"
    file_format                     = "CSV"
    write_disposition               = "WRITE_APPEND"
    skip_leading_rows               = 0
  }
  sensitive_params {
    secret_access_key = var.aws_secret_access_key
  }
  project = data.google_project.project.project_id
  # project = var.gcp_project_id

  # service_account_name = google_service_account.service_account.email
  timeouts {}
}

あとはworkflowsを使ってシャーディングテーブルを先に生成してからtransferを実行するyamlを書いて

main:
  steps:
    - generateTableName:
        assign:
          - tableName: $${text.replace_all(text.split(time.format(sys.now(), "Asia/Tokyo"), "T")[0], "-", "")}

    - logging:
        call: sys.log
        args:
          text: $${tableName}
          severity: INFO

    - create_table:
        call: googleapis.bigquery.v2.tables.insert
        args:
          datasetId: ${datasetId}
          projectId: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          body:
            tableReference:
              datasetId: ${datasetId}
              projectId: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
              tableId: $${"TABLE_NAME_" + tableName}
            schema:
              fields:
                ${schema_fields}

    - start_run:
        call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
        args:
          parent: "${bigquery_data_transfer_name}"
          body:
            requestedRunTime: $${time.format(sys.now(), "Asia/Tokyo")}
        result: runsResp

    - return_value:
        return: $${runsResp}

リソースを作れば終わりです

resource "google_workflows_workflow" "this" {
  name        = "this"
  project     = var.gcp_project_id
  region      = var.gcp_region
  description = "A sample workflow"
  service_account = google_service_account.sample.id


  source_contents = join("", [
    templatefile(
      "${path.module}/workflows/main.yaml",
      {
        bigquery_data_transfer_name = google_bigquery_data_transfer_config.this.name,
        datasetId                   = google_bigquery_dataset.this.dataset_id,
        schema_fields               = replace(yamlencode(jsondecode(file("${path.module}/bigquery_schema/DB_NAME/TABLE_NAME.json"))), "\n", "\n                ")
      }
    ),
  ])
}

参考

https://swfz.hatenablog.com/entry/2021/12/06/191258

https://stackoverflow.com/questions/70194094/how-to-create-sharded-table-in-gcp-bigquery

https://cloud.google.com/bigquery/docs/partitioned-tables#dt_partition_shard

https://tech.asahi.co.jp/posts/20230320-e358

Discussion