S3 CSV データを BigQuery Data Transfer Service (DTS) に転送する
BigQuery Data Transfer Service とは?
BigQuery Data Transfer Service (以下DTS)は
あらゆる SaaS系サービスやストレージサービスなどをデータソースとし、スケジュール設定で定期的にBigQueryにデータ転送を自動化するマネージドサービスです。
BigQuery Data Transfer Service とは
DTS と Datastream の違い
データ転送というと Datastream が思い浮かべるかもしれませんが、明確に違いがあります。
一言で言うと、
- Datastreamは「データベースの変更」をリアルタイム転送
- Data Transfer Serviceは「様々なサービスのデータ」を定期的にBigQueryへ転送
🔎 BigQuery Datastream:
Datastreamは、変更データキャプチャ(Change Data Capture, CDC) という技術を使って、元のデータベースに加えられた変更(INSERT、UPDATE、DELETE)を検出し、ほぼリアルタイムでBigQueryに反映させます。
🚚 BigQuery Data Transfer Service (DTS) :
Data Transfer Serviceは、様々な外部サービスやストレージからデータを定期的にBigQueryへ自動で取り込むためのマネージドサービスです。一度設定すれば、あとはスケジュール通りに自動でデータをロードしてくれます。
違いのまとめ
観点 | Datastream | BigQuery Data Transfer Service |
---|---|---|
目的 | RDBMSのリアルタイム分析基盤構築 | SaaSやクラウドストレージ等→BigQueryの定期バッチ取り込み |
ソース | Oracle / MySQL / PostgreSQL などの運用DBの変更ログ | Google広告, Google Analytics, YouTubeなどのSaaS、Amazon S3, Cloud Storageなどのクラウドストレージ |
実行タイミング | ほぼリアルタイム(ストリーミング)+初回バックフィル | CRON的スケジュール(毎時/毎日など)、バックフィルも可能 |
転送先 | BigQuery(直接書き込み)、Cloud Storage(→Dataflowで他宛先へ) | BigQueryのみ(外へは出せない) |
ユースケース | 最新の顧客注文データなど常に更新されるDBデータを即座に分析したい | 昨日の広告費用対効果を分析したい、Google広告やYouTubeアナリティクスから日次や月次のレポートデータを自動で取り込み |
上記の DTS を利用して
S3 → BigQuery にデータ転送を Terraform で設定
こちらを実装します。
0. 前提・要件
今回は Repro というサービスの CSVデータ を BigQuery に取り込む分析基盤を構築します。
Reproの顧客データは CSV として Amazon S3 に保管されているので、DTSを利用します。
ReproのS3には「イベント/プッシュ通知/アプリ内メッセージ」の 3つのCSVデータが存在します。
これらのデータを DTS と Terraform で 自動取り込みするまでの手順・設計・つまずきポイントを、実運用ベースでまとめました。
Reproと契約し、付与された情報は以下ですので、これを元に実装していきます。
-
取り込み元S3 : repro-s3-bucket (仮名)
-
データ構造
-
イベント:
event{project_id}/custom_event_histories/yyyyMMddHH/{project_id}_custom_event_history_{index}.{hash}.csv.gz
(毎時・ヘッダーなし) -
プッシュ通知:
push_notification_delivery_logs/{yyyyMMddHH}_push_notification_delivery_log.csv
(1日3回・24h 分) -
アプリ内メッセージ:
in_app_message_delivery_logs/{yyyyMMddHH}_in_app_message_delivery_log.csv
(同上)
-
-
AWSアクセスキー: 上記バケットの
s3:GetObject
/s3:ListBucket
の権限付与
1. DTS アーキテクチャ設計
DTSを設定するには、データ転送の宛先として指定する、テーブル定義とデータセットが必要になります。
Reproでは公式で CSVデータの形式を公開しています。
その情報をもとに データを取り込むためのテーブル設計・データセット・DTSの構築を行います。
イベントデータの場合
プッシュ通知、アプリ内メッセージ の場合
テーブル設計
データ転送先に指定するテーブル設計において考えること選択肢は2つあります。
- DTS で自動作成されるテーブルを利用する
- 固定テーブルを用意する
DTSの機能としてテーブルを自動作成することもできます。
しかし、自動で作成されるテーブルは、パーティション分割・型定義・クラスタリングなどの細かい設計ができません。
Reproのデータのように大量の顧客データ・厳密な型・パフォーマンス最適化(スキャン削減)を考えるなら、自動作成とは別に terraform google_bigquery_table
で固定のテーブルを定義しておく方が安全と判断しました。
パーティション & クラスタリングの方針
分析用のテーブル設計において、重要なのは パーティション と クラスタリング です。
パーティションとクラスタリングは分析時のクエリ実行時のムダをなくし、コスト・パフォーマンス効率を高める役割を果たします。
今回は 顧客データからユーザーの動向を分析したいので、以下のように設定します。
-
イベントデータ
tracked_at
を DAYパーティション
user_id
で クラスタリング -
プッシュ通知、アプリ内メッセージ
deliver_time
を DAY パーティション
user_id
で クラスタリング
上記のように設定することで、分析の絞り込みが “時刻” や “ユーザー単位” となり、分析にかかるコストが最適化されます。
データセットの作成
CSV を取得し、BigQueryに取り込むには必ずデータセットが必要になります。
データセットは設計したテーブルをひとまとめで管理しDTSの転送先として指定できます。
Terraform で google_bigquery_data_transfer_config
を作成するときに destination_dataset_id
に作成するデータセットを設定します。
これで S3 → BigQuery の転送ジョブが正常に作成・実行できるようになります。
DTSの構築
DTSは データ転送するための中心的な役割となります。
先に述べた通り、ReproのS3には「イベント/プッシュ通知/アプリ内メッセージ」の3つのCSVデータがありますので、DTSを3種類作成します。
3種のデータの各S3エンドポイントは以下で提供されているので、data_path
の指定は以下を指定します。
- イベント :
s3://repro-s3-bucket/${project_id}/custom_event_histories/{YYYYMMDDHH}/
- プッシュ通知 :
s3://repro-s3-bucket/${project_id}/push_notification_delivery_logs/
- アプリ内メッセージ :
s3://repro-s3-bucket/${project_id}/in_app_message_delivery_logs/
スケジュールは以下で設定します。
- イベントデータ:毎時
- プッシュ通知:毎日 JST 06:00
- アプリ内メッセージ:毎日 JST 06:00
その他のDTSの設定は以下の内容です。
- 保存期間:30日
- BigQuery 側は 1 データセットに 3 テーブルを固定名で運用
- CSV は UTF-8/LF/ヘッダーなし(イベント)、一部列は 空文字が入る
Terraform 実装
データセット
resource "google_bigquery_dataset" "repro_s3" {
dataset_id = "repro_s3"
friendly_name = "repro_s3"
description = "Dataset for Repro S3"
project = data.google_project.example.project_id
location = data.google_client_config.current.region
delete_contents_on_destroy = false
timeouts {}
}
テーブル
resource "google_bigquery_table" "repro_s3_tables" {
for_each = local.repro_tables
dataset_id = google_bigquery_dataset.repro_s3.dataset_id
table_id = each.value.table_id
deletion_protection = true
dynamic "time_partitioning" {
for_each = lookup(each.value, "time_partition", null) == null ? [] : [each.value.time_partition]
content {
type = time_partitioning.value.type
field = time_partitioning.value.field
}
}
clustering = lookup(each.value, "clustering", null)
schema = file(each.value.schema_file)
labels = {
source = "repro_s3"
pii = "false"
}
}
locals {
# テーブル定義
repro_tables = {
custom_event_history = {
table_id = "custom_event_history"
schema_file = "custom_event_history.json"
time_partition = {
type = "DAY"
field = "tracked_at"
}
clustering = ["user_id"]
}
push_notification_delivery_log = {
table_id = "push_notification_delivery_log"
schema_file = "push_notification_delivery_log.json"
time_partition = {
type = "DAY"
field = "deliver_time"
}
clustering = ["user_id"]
}
in_app_message_delivery_log = {
table_id = "in_app_message_delivery_log"
schema_file = "in_app_message_delivery_log.json"
time_partition = {
type = "DAY"
field = "deliver_time"
}
clustering = ["user_id"]
}
}
}
- 日次パーティション:
tracked_at
/deliver_time
を partition key に分割 - クラスタリング:
user_id
に指定
テーブルスキーマ
[
{ "name": "tracked_at", "type": "TIMESTAMP", "mode": "REQUIRED", "description": "トラッキング時刻 (UTC)" },
{ "name": "name", "type": "STRING", "mode": "REQUIRED", "description": "イベント名" },
{ "name": "properties", "type": "JSON", "mode": "NULLABLE", "description": "JSON 文字列のプロパティ" },
{ "name": "user_id", "type": "STRING", "mode": "NULLABLE", "description": "ユーザー ID" },
{ "name": "os", "type": "STRING", "mode": "NULLABLE", "description": "OS 名 (ios / android)" },
{ "name": "device_id", "type": "STRING", "mode": "NULLABLE", "description": "デバイス ID" },
{ "name": "deprecated", "type": "STRING", "mode": "NULLABLE", "description": "廃止済み列(常に空文字)" }
]
[
{ "name": "device_id", "type": "STRING", "mode": "REQUIRED", "description": "デバイス ID" },
{ "name": "user_id", "type": "STRING", "mode": "NULLABLE", "description": "ユーザー ID" },
{ "name": "campaign_id", "type": "STRING", "mode": "NULLABLE", "description": "キャンペーン ID" },
{ "name": "name", "type": "STRING", "mode": "NULLABLE", "description": "キャンペーン名" },
{ "name": "kind", "type": "STRING", "mode": "NULLABLE", "description": "種別 (dialog など)" },
{ "name": "action", "type": "STRING", "mode": "NULLABLE", "description": "display / cta_primary …" },
{ "name": "deliver_time", "type": "TIMESTAMP", "mode": "REQUIRED", "description": "配信日時 (UTC)" },
{ "name": "variant_number", "type": "STRING", "mode": "NULLABLE", "description": "AB テストのパターン番号" }
]
[
{ "name": "device_id", "type": "STRING", "mode": "REQUIRED", "description": "デバイス ID" },
{ "name": "user_id", "type": "STRING", "mode": "NULLABLE", "description": "ユーザー ID" },
{ "name": "campaign_id", "type": "STRING", "mode": "NULLABLE", "description": "キャンペーン ID" },
{ "name": "name", "type": "STRING", "mode": "NULLABLE", "description": "キャンペーン名" },
{ "name": "deliver_time", "type": "TIMESTAMP", "mode": "REQUIRED", "description": "配信日時 (UTC)" },
{ "name": "variant_number", "type": "STRING", "mode": "NULLABLE", "description": "AB テストのパターン番号" },
{ "name": "opened", "type": "STRING", "mode": "NULLABLE", "description": "Opened / NULL" },
{ "name": "message_title", "type": "STRING", "mode": "NULLABLE", "description": "通知タイトル" }
]
DTS
resource "google_bigquery_data_transfer_config" "repro" {
for_each = local.repro_s3_jobs
display_name = "Repro ${each.value.display_name} CSV"
data_source_id = "amazon_s3"
destination_dataset_id = google_bigquery_dataset.repro_s3.dataset_id
location = data.google_client_config.current.region
project = data.google_project.example.project_id
schedule = each.value.schedule
params = merge(
local.repro_dts_common,
{
data_path = each.value.data_path
destination_table_name_template = each.value.table_name_template
}
)
}
locals {
# DTS定義
# S3 path の部分は顧客によって違います
repro_s3_jobs = {
event = {
display_name = "Event"
data_path = "s3://repro-data-for-outer-production/7210/custom_event_histories/{run_time-1h|\"%Y%m%d%H\"}/*"
schedule = "every 60 minutes"
table_name_template = "custom_event_history"
}
push = {
display_name = "Push"
data_path = "s3://repro-s3-bucket/{project_id}/push_notification_delivery_logs/*"
schedule = "every day 21:00" # JST 06:00
table_name_template = "push_notification_delivery_log"
}
inapp = {
display_name = "InApp"
data_path = "s3://repro-s3-bucket/{project_id}/in_app_message_delivery_logs/*"
schedule = "every day 21:00" # JST 06:00
table_name_template = "in_app_message_delivery_log"
}
}
repro_dts_common = {
file_format = "CSV"
skip_leading_rows = "0"
write_disposition = "WRITE_APPEND"
allow_quoted_newlines = true
access_key_id = data.sops_file.secrets.data["repro_s3_aws_access_key"]
secret_access_key = data.sops_file.secrets.data["repro_s3_aws_secret_key"]
}
}
-
destination_table_name_template
を指定することで固定のテーブルが利用される
データ転送実施
BigQueryコンソールからデータ転送できているか確認します。
テーブルとデータセットの確認
以下のドキュメントから確認できます。
データセットを作成する
以下のテーブとデータセットが作成されています。
スキーマの定義も以下のように反映されています。
DTSの確認
以下のドキュメントから確認できます。
Amazon S3 データを BigQuery に読み込む
データ転送が実施され、特にエラーメッセージが表示されず ✅ がついていればOKです。
DTS作成以前のデータは バックフィルで取得可能なので、バックフィルも実行します。
つまずきポイント
実際に実装して、遭遇したエラーをまとめます。
No files found matching: "s3://...
データパスとして指定したS3のオブジェクトが探し出せないエラーです。
"No files found matching: "s3://...
の場合、data_path
の指定が誤っている可能性があります。
データパスの指定で、URIとオブジェクトの記述が誤っている
-
data_path
で指定するのはS3のURIまで。オブジェクトは/*
で指定します。 - どこまでが URIでどれがオブジェクトなのか。
aws s3api list-objects-v2
などで keyをよく確認しましょう。
S3パスに日時({YYYYMMDDHH})が入っている場合、時間のズレが生じる
- DTSがデータを取得しに行くタイミングの日時と、S3にファイルを出力するタイミングの日時でよズレが起きているかもしれません。
- 今回出力される {YYYYMMDDHH} ディレクトリはUTC基準なので、
run_time-1h
などしてオフセットするようにしました。
Error while reading data, error message: CSV table encountered too many errors, giving up. Rows: 0; errors: 100.
この類のエラーは
CSVファイルにヘッダー行(カラム名が記載された最初の行)が存在するにもかかわらず、DTSがそれをデータ行として読み込もうとすると、このように出ます。
今回はヘッダーがないので、
skip_leading_rows="0"
を明示しました。
実際に S3 からcsvデータをダウンロードしてみて、データにヘッダーがあるかないかを確認してみましょう。
Discussion