🚚

S3 CSV データを BigQuery Data Transfer Service (DTS) に転送する

に公開

BigQuery Data Transfer Service とは?

BigQuery Data Transfer Service (以下DTS)は
あらゆる SaaS系サービスやストレージサービスなどをデータソースとし、スケジュール設定で定期的にBigQueryにデータ転送を自動化するマネージドサービスです。

BigQuery Data Transfer Service とは

DTS と Datastream の違い

データ転送というと Datastream が思い浮かべるかもしれませんが、明確に違いがあります。

一言で言うと、

  • Datastreamは「データベースの変更」をリアルタイム転送
  • Data Transfer Serviceは「様々なサービスのデータ」を定期的にBigQueryへ転送

🔎 BigQuery Datastream:
Datastreamは、変更データキャプチャ(Change Data Capture, CDC) という技術を使って、元のデータベースに加えられた変更(INSERT、UPDATE、DELETE)を検出し、ほぼリアルタイムでBigQueryに反映させます。

🚚 BigQuery Data Transfer Service (DTS) :
Data Transfer Serviceは、様々な外部サービスやストレージからデータを定期的にBigQueryへ自動で取り込むためのマネージドサービスです。一度設定すれば、あとはスケジュール通りに自動でデータをロードしてくれます。

違いのまとめ

観点 Datastream BigQuery Data Transfer Service
目的 RDBMSのリアルタイム分析基盤構築 SaaSやクラウドストレージ等→BigQueryの定期バッチ取り込み
ソース Oracle / MySQL / PostgreSQL などの運用DBの変更ログ Google広告, Google Analytics, YouTubeなどのSaaS、Amazon S3, Cloud Storageなどのクラウドストレージ
実行タイミング ほぼリアルタイム(ストリーミング)+初回バックフィル CRON的スケジュール(毎時/毎日など)、バックフィルも可能
転送先 BigQuery(直接書き込み)、Cloud Storage(→Dataflowで他宛先へ) BigQueryのみ(外へは出せない)
ユースケース 最新の顧客注文データなど常に更新されるDBデータを即座に分析したい 昨日の広告費用対効果を分析したい、Google広告やYouTubeアナリティクスから日次や月次のレポートデータを自動で取り込み

上記の DTS を利用して
S3 → BigQuery にデータ転送を Terraform で設定
こちらを実装します。

0. 前提・要件

今回は Repro というサービスの CSVデータ を BigQuery に取り込む分析基盤を構築します。
Reproの顧客データは CSV として Amazon S3 に保管されているので、DTSを利用します。

ReproのS3には「イベント/プッシュ通知/アプリ内メッセージ」の 3つのCSVデータが存在します。

これらのデータを DTS と Terraform で 自動取り込みするまでの手順・設計・つまずきポイントを、実運用ベースでまとめました。

Reproと契約し、付与された情報は以下ですので、これを元に実装していきます。

  • 取り込み元S3 : repro-s3-bucket (仮名)

  • データ構造

    • イベント:event{project_id}/custom_event_histories/yyyyMMddHH/{project_id}_custom_event_history_{index}.{hash}.csv.gz(毎時・ヘッダーなし)

    • プッシュ通知:push_notification_delivery_logs/{yyyyMMddHH}_push_notification_delivery_log.csv(1日3回・24h 分)

    • アプリ内メッセージ: in_app_message_delivery_logs/{yyyyMMddHH}_in_app_message_delivery_log.csv(同上)

  • AWSアクセスキー: 上記バケットの s3:GetObject / s3:ListBucketの権限付与

1. DTS アーキテクチャ設計

DTSを設定するには、データ転送の宛先として指定する、テーブル定義とデータセットが必要になります。
Reproでは公式で CSVデータの形式を公開しています。
その情報をもとに データを取り込むためのテーブル設計・データセット・DTSの構築を行います。

イベントデータの場合

イベントデータエクスポートの概要

プッシュ通知、アプリ内メッセージ の場合

キャンペーンデータエクスポート(β版)の概要

テーブル設計

データ転送先に指定するテーブル設計において考えること選択肢は2つあります。

  • DTS で自動作成されるテーブルを利用する
  • 固定テーブルを用意する

DTSの機能としてテーブルを自動作成することもできます。
しかし、自動で作成されるテーブルは、パーティション分割・型定義・クラスタリングなどの細かい設計ができません。

Reproのデータのように大量の顧客データ・厳密な型・パフォーマンス最適化(スキャン削減)を考えるなら、自動作成とは別に terraform google_bigquery_table で固定のテーブルを定義しておく方が安全と判断しました。

パーティション & クラスタリングの方針

分析用のテーブル設計において、重要なのは パーティション と クラスタリング です。

パーティションとクラスタリングは分析時のクエリ実行時のムダをなくし、コスト・パフォーマンス効率を高める役割を果たします。

今回は 顧客データからユーザーの動向を分析したいので、以下のように設定します。

  • イベントデータ
    tracked_at を DAYパーティション
    user_id で クラスタリング

  • プッシュ通知、アプリ内メッセージ
    deliver_time を DAY パーティション
    user_id で クラスタリング

上記のように設定することで、分析の絞り込みが “時刻” や “ユーザー単位” となり、分析にかかるコストが最適化されます。

データセットの作成

CSV を取得し、BigQueryに取り込むには必ずデータセットが必要になります。
データセットは設計したテーブルをひとまとめで管理しDTSの転送先として指定できます。

Terraform で google_bigquery_data_transfer_config を作成するときに destination_dataset_id に作成するデータセットを設定します。

これで S3 → BigQuery の転送ジョブが正常に作成・実行できるようになります。

DTSの構築

DTSは データ転送するための中心的な役割となります。
先に述べた通り、ReproのS3には「イベント/プッシュ通知/アプリ内メッセージ」の3つのCSVデータがありますので、DTSを3種類作成します。

3種のデータの各S3エンドポイントは以下で提供されているので、data_path の指定は以下を指定します。

  • イベント : s3://repro-s3-bucket/${project_id}/custom_event_histories/{YYYYMMDDHH}/
  • プッシュ通知 : s3://repro-s3-bucket/${project_id}/push_notification_delivery_logs/
  • アプリ内メッセージ : s3://repro-s3-bucket/${project_id}/in_app_message_delivery_logs/

スケジュールは以下で設定します。

  • イベントデータ:毎時
  • プッシュ通知:毎日 JST 06:00
  • アプリ内メッセージ:毎日 JST 06:00

その他のDTSの設定は以下の内容です。

  • 保存期間:30日
  • BigQuery 側は 1 データセットに 3 テーブルを固定名で運用
  • CSV は UTF-8/LF/ヘッダーなし(イベント)、一部列は 空文字が入る

Terraform 実装

データセット

resource "google_bigquery_dataset" "repro_s3" {
  dataset_id                 = "repro_s3"
  friendly_name              = "repro_s3"
  description                = "Dataset for Repro S3"
  project                    = data.google_project.example.project_id
  location                   = data.google_client_config.current.region
  delete_contents_on_destroy = false
  timeouts {}
}

テーブル

resource "google_bigquery_table" "repro_s3_tables" {
  for_each = local.repro_tables

  dataset_id          = google_bigquery_dataset.repro_s3.dataset_id
  table_id            = each.value.table_id
  deletion_protection = true
  dynamic "time_partitioning" {
    for_each = lookup(each.value, "time_partition", null) == null ? [] : [each.value.time_partition]
    content {
      type  = time_partitioning.value.type
      field = time_partitioning.value.field
    }
  }
  clustering = lookup(each.value, "clustering", null)
  schema     = file(each.value.schema_file)

  labels = {
    source = "repro_s3"
    pii    = "false"
  }
}

locals {
  # テーブル定義
  repro_tables = {
    custom_event_history = {
      table_id    = "custom_event_history"
      schema_file = "custom_event_history.json"
      time_partition = {
        type  = "DAY"
        field = "tracked_at"
      }
      clustering = ["user_id"]
    }

    push_notification_delivery_log = {
      table_id    = "push_notification_delivery_log"
      schema_file = "push_notification_delivery_log.json"
      time_partition = {
        type  = "DAY"
        field = "deliver_time"
      }
      clustering = ["user_id"]
    }

    in_app_message_delivery_log = {
      table_id    = "in_app_message_delivery_log"
      schema_file = "in_app_message_delivery_log.json"
      time_partition = {
        type  = "DAY"
        field = "deliver_time"
      }
      clustering = ["user_id"]
    }
  }
}
  • 日次パーティション:tracked_at / deliver_time を partition key に分割
  • クラスタリング:user_id に指定

テーブルスキーマ

custom_event_history.json
[
  { "name": "tracked_at", "type": "TIMESTAMP", "mode": "REQUIRED", "description": "トラッキング時刻 (UTC)" },
  { "name": "name",       "type": "STRING",    "mode": "REQUIRED", "description": "イベント名" },
  { "name": "properties", "type": "JSON",      "mode": "NULLABLE", "description": "JSON 文字列のプロパティ" },
  { "name": "user_id",    "type": "STRING",    "mode": "NULLABLE", "description": "ユーザー ID" },
  { "name": "os",         "type": "STRING",    "mode": "NULLABLE", "description": "OS 名 (ios / android)" },
  { "name": "device_id",  "type": "STRING",    "mode": "NULLABLE", "description": "デバイス ID" },
  { "name": "deprecated", "type": "STRING",    "mode": "NULLABLE", "description": "廃止済み列(常に空文字)" }
]
push_notification_delivery_log.json
[
  { "name": "device_id",      "type": "STRING",    "mode": "REQUIRED", "description": "デバイス ID" },
  { "name": "user_id",        "type": "STRING",    "mode": "NULLABLE", "description": "ユーザー ID" },
  { "name": "campaign_id",    "type": "STRING",    "mode": "NULLABLE", "description": "キャンペーン ID" },
  { "name": "name",           "type": "STRING",    "mode": "NULLABLE", "description": "キャンペーン名" },
  { "name": "kind",           "type": "STRING",    "mode": "NULLABLE", "description": "種別 (dialog など)" },
  { "name": "action",         "type": "STRING",    "mode": "NULLABLE", "description": "display / cta_primary …" },
  { "name": "deliver_time",   "type": "TIMESTAMP", "mode": "REQUIRED", "description": "配信日時 (UTC)" },
  { "name": "variant_number", "type": "STRING",    "mode": "NULLABLE", "description": "AB テストのパターン番号" }
]
in_app_message_delivery_log.json
[
  { "name": "device_id",      "type": "STRING",    "mode": "REQUIRED", "description": "デバイス ID" },
  { "name": "user_id",        "type": "STRING",    "mode": "NULLABLE", "description": "ユーザー ID" },
  { "name": "campaign_id",    "type": "STRING",    "mode": "NULLABLE", "description": "キャンペーン ID" },
  { "name": "name",           "type": "STRING",    "mode": "NULLABLE", "description": "キャンペーン名" },
  { "name": "deliver_time",   "type": "TIMESTAMP", "mode": "REQUIRED", "description": "配信日時 (UTC)" },
  { "name": "variant_number", "type": "STRING",    "mode": "NULLABLE", "description": "AB テストのパターン番号" },
  { "name": "opened",         "type": "STRING",    "mode": "NULLABLE", "description": "Opened / NULL" },
  { "name": "message_title",  "type": "STRING",    "mode": "NULLABLE", "description": "通知タイトル" }
]

DTS

resource "google_bigquery_data_transfer_config" "repro" {
  for_each = local.repro_s3_jobs

  display_name           = "Repro ${each.value.display_name} CSV"
  data_source_id         = "amazon_s3"
  destination_dataset_id = google_bigquery_dataset.repro_s3.dataset_id
  location               = data.google_client_config.current.region
  project                = data.google_project.example.project_id
  schedule               = each.value.schedule
  params = merge(
    local.repro_dts_common,
    {
      data_path                       = each.value.data_path
      destination_table_name_template = each.value.table_name_template
    }
  )
}

locals {
  # DTS定義
  # S3 path の部分は顧客によって違います
  repro_s3_jobs = {
    event = {
      display_name        = "Event"
      data_path           = "s3://repro-data-for-outer-production/7210/custom_event_histories/{run_time-1h|\"%Y%m%d%H\"}/*"
      schedule            = "every 60 minutes"
      table_name_template = "custom_event_history"
    }
    push = {
      display_name        = "Push"
      data_path           = "s3://repro-s3-bucket/{project_id}/push_notification_delivery_logs/*"
      schedule            = "every day 21:00" # JST 06:00
      table_name_template = "push_notification_delivery_log"
    }
    inapp = {
      display_name        = "InApp"
      data_path           = "s3://repro-s3-bucket/{project_id}/in_app_message_delivery_logs/*"
      schedule            = "every day 21:00" # JST 06:00
      table_name_template = "in_app_message_delivery_log"
    }
  }

  repro_dts_common = {
    file_format           = "CSV"
    skip_leading_rows     = "0"
    write_disposition     = "WRITE_APPEND"
    allow_quoted_newlines = true
    access_key_id         = data.sops_file.secrets.data["repro_s3_aws_access_key"]
    secret_access_key     = data.sops_file.secrets.data["repro_s3_aws_secret_key"]
  }
}
  • destination_table_name_template を指定することで固定のテーブルが利用される

データ転送実施

BigQueryコンソールからデータ転送できているか確認します。

テーブルとデータセットの確認

以下のドキュメントから確認できます。
データセットを作成する

以下のテーブとデータセットが作成されています。

スキーマの定義も以下のように反映されています。

DTSの確認

以下のドキュメントから確認できます。
Amazon S3 データを BigQuery に読み込む

データ転送が実施され、特にエラーメッセージが表示されず ✅ がついていればOKです。

DTS作成以前のデータは バックフィルで取得可能なので、バックフィルも実行します。

つまずきポイント

実際に実装して、遭遇したエラーをまとめます。

No files found matching: "s3://...

データパスとして指定したS3のオブジェクトが探し出せないエラーです。
"No files found matching: "s3://... の場合、data_path の指定が誤っている可能性があります。

データパスの指定で、URIとオブジェクトの記述が誤っている

  • data_path で指定するのはS3のURIまで。オブジェクトは /* で指定します。
  • どこまでが URIでどれがオブジェクトなのか。 aws s3api list-objects-v2 などで keyをよく確認しましょう。

S3パスに日時({YYYYMMDDHH})が入っている場合、時間のズレが生じる

  • DTSがデータを取得しに行くタイミングの日時と、S3にファイルを出力するタイミングの日時でよズレが起きているかもしれません。
  • 今回出力される {YYYYMMDDHH} ディレクトリはUTC基準なので、run_time-1h などしてオフセットするようにしました。

Error while reading data, error message: CSV table encountered too many errors, giving up. Rows: 0; errors: 100.

この類のエラーは
CSVファイルにヘッダー行(カラム名が記載された最初の行)が存在するにもかかわらず、DTSがそれをデータ行として読み込もうとすると、このように出ます。

今回はヘッダーがないので、
skip_leading_rows="0"
を明示しました。

実際に S3 からcsvデータをダウンロードしてみて、データにヘッダーがあるかないかを確認してみましょう。

Discussion