😇
data transfer configでシャーディングしつつbigqueryに投入する
背景
s3からbigqueryにパーティション切りながらデータ投入できるかなーと思っていろいろ調べていたら
最終的にシャーディングテーブルとして投入できたのでメモしておきます
なお、シャーディング・パーティショニングについては以下の記事をどうぞ
Caveat
- terraformを抜粋してを貼り付けるので不完全です
概要
- S3からbigqueryへeltする
- 取り込みデータは s3://hoge/table_yyyy-mm-dd/ 日毎に分けられている
- bigquery_data_transfer_configを利用する
- cloudworkflowsで全体を管理する
- data_transfer_configではシャーディングを切りながら投入できなかった
- ので、prehookとしてemptyテーブルを作成している
内容
シャーディングの形式でテーブルをあらかじめ作る
なお、_20000202の部分が重要です、これでシャーディングテーブルとして認識されます
resource "google_bigquery_table" "this" {
dataset_id = google_bigquery_dataset.raw.dataset_id
# シャーディング用
table_id = "data_20000202"
deletion_protection = false
schema = file("${path.module}/bigquery_schema/DB_NAME/TABLE_NAME.json")
}
data transfer configでrun_timeを利用して取り込み元と投入先のテーブルを動的に指定する。
resource "google_bigquery_data_transfer_config" "this" {
display_name = "this"
location = var.gcp_region
data_refresh_window_days = 0
data_source_id = "amazon_s3"
schedule = "every sunday 20:00" # UTC
destination_dataset_id = google_bigquery_dataset.this.dataset_id
disabled = false
params = {
access_key_id = var.aws_access_key_id
destination_table_name_template = "${google_bigquery_table.this.table_id}_{run_time+9h|\"%Y%m%d\"}"
data_path = "s3://BUCKET_NAME/DB_NAME/TABLE_NAME_{run_time+9h|\"%Y%m%d\"}/*"
file_format = "CSV"
write_disposition = "WRITE_APPEND"
skip_leading_rows = 0
}
sensitive_params {
secret_access_key = var.aws_secret_access_key
}
project = data.google_project.project.project_id
# project = var.gcp_project_id
# service_account_name = google_service_account.service_account.email
timeouts {}
}
あとはworkflowsを使ってシャーディングテーブルを先に生成してからtransferを実行するyamlを書いて
main:
steps:
- generateTableName:
assign:
- tableName: $${text.replace_all(text.split(time.format(sys.now(), "Asia/Tokyo"), "T")[0], "-", "")}
- logging:
call: sys.log
args:
text: $${tableName}
severity: INFO
- create_table:
call: googleapis.bigquery.v2.tables.insert
args:
datasetId: ${datasetId}
projectId: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
body:
tableReference:
datasetId: ${datasetId}
projectId: $${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
tableId: $${"TABLE_NAME_" + tableName}
schema:
fields:
${schema_fields}
- start_run:
call: googleapis.bigquerydatatransfer.v1.projects.locations.transferConfigs.startManualRuns
args:
parent: "${bigquery_data_transfer_name}"
body:
requestedRunTime: $${time.format(sys.now(), "Asia/Tokyo")}
result: runsResp
- return_value:
return: $${runsResp}
リソースを作れば終わりです
resource "google_workflows_workflow" "this" {
name = "this"
project = var.gcp_project_id
region = var.gcp_region
description = "A sample workflow"
service_account = google_service_account.sample.id
source_contents = join("", [
templatefile(
"${path.module}/workflows/main.yaml",
{
bigquery_data_transfer_name = google_bigquery_data_transfer_config.this.name,
datasetId = google_bigquery_dataset.this.dataset_id,
schema_fields = replace(yamlencode(jsondecode(file("${path.module}/bigquery_schema/DB_NAME/TABLE_NAME.json"))), "\n", "\n ")
}
),
])
}
参考
Discussion