BigQueryエコシステム完全ガイド: 関連サービスとユースケース
BigQueryは単なるデータウェアハウスではなく、データ分析基盤の中核として機能する包括的なエコシステムです。BigQuery単体でも強力ですが、関連サービスと組み合わせることで、データの取り込みから処理、分析、可視化までのエンドツーエンドのデータパイプラインを構築できます。このブログでは、BigQueryと連携する主要なサービスとそのユースケースを紹介します。
データ取り込み・転送系サービス
1. BigQuery Data Transfer Service (DTS)
概要:
スケジュールベースで自動的に外部データソースからBigQueryへデータを転送するサービス。
主な機能:
- Googleサービス(Google広告、YouTube、Play Storeなど)からのデータ自動取り込み
- Amazon S3やTeradata、Redshiftなど外部ソースからのデータ移行
- スケジュール設定と監視機能
ユースケース:
- マーケティングデータ統合: Google広告、キャンペーンマネージャー、YouTubeアナリティクスなどのデータを自動的にBigQueryに取り込み、クロスチャネル分析を実現
- レガシーデータウェアハウス移行: TeradataやRedshiftから定期的にデータを移行し、段階的な移行戦略を実施
-- DTS設定後のデータ分析例(複数のGoogle広告アカウントのデータを統合分析)
SELECT
account_name,
campaign_name,
SUM(impressions) AS total_impressions,
SUM(clicks) AS total_clicks,
SUM(cost) / 1000000 AS total_cost,
SUM(conversions) AS total_conversions
FROM
`project.google_ads.*`
WHERE
_DATA_DATE BETWEEN '2023-01-01' AND '2023-03-31'
GROUP BY
account_name, campaign_name
ORDER BY
total_cost DESC;
2. Dataflow
概要:
GCPのフルマネージドETL(抽出・変換・ロード)サービスで、ストリーミングと一括処理の両方に対応。
主な機能:
- Apache Beamベースの統一プログラミングモデル
- ストリーミングと一括処理を同じパイプラインでカバー
- 自動スケーリングと耐障害性
ユースケース:
- リアルタイムデータ処理: PubSubからのイベントストリームをリアルタイムで処理し、BigQueryに保存
- 複雑なETLパイプライン: 複数のソースからデータを取り込み、結合、変換し、BigQueryにロード
- データクレンジング: 取り込み前にデータを正規化、クレンジング、強化
// Dataflowパイプラインの例(Java)- PubSubからのメッセージ処理とBigQueryへの書き込み
Pipeline p = Pipeline.create(options);
p.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply("ParseJSON", ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
String json = c.element();
TableRow row = convertJsonToTableRow(json);
c.output(row);
}
}))
.apply("WriteToBigQuery", BigQueryIO.writeTableRows()
.to(options.getOutputTable())
.withSchema(schema)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
3. Cloud Data Fusion
概要:
GCPのフルマネージドCDAPサービスで、コード不要でデータ統合パイプラインを構築できる。
主な機能:
- 視覚的なインターフェースによるETLパイプライン構築
- 150以上の事前構築されたコネクタとトランスフォーメーション
- リアルタイム監視とメタデータ管理
ユースケース:
- コード不要のデータパイプライン: データエンジニアリングリソースが限られた組織での迅速なETL構築
- セルフサービスデータ準備: ビジネスアナリストが自らデータソースを接続し、BigQueryへデータを準備
- 複雑なデータ変換: ワンクリックでJoinや集計などの複雑な変換を実行
4. Dataprep by Trifacta
概要:
データクレンジングと準備のためのインテリジェントなクラウドサービス。
主な機能:
- インタラクティブなデータクレンジングと変換
- 機械学習によるデータパターン認識と変換提案
- ポイント&クリックでの操作とすぐに確認できるビジュアルフィードバック
ユースケース:
- データクレンジング: 不正確なデータや重複を検出し修正
- 非構造化データの構造化: CSVやJSONなどの半構造化データを構造化テーブルに変換
- 分析前準備: BigQueryでの分析に適した形式にデータを整形
データ処理・分析系サービス
5. BigQuery Scheduled Queries
概要:
SQLクエリを自動的に定期実行し、結果をテーブルに保存するBigQuery組み込み機能。
主な機能:
- クエリのスケジュール設定(時間、日次、週次、月次など)
- パラメータ化されたクエリのサポート
- 実行履歴と通知機能
ユースケース:
- 定期レポートの自動生成: 日次/週次/月次の集計レポートを自動作成
- インクリメンタルETL: 新しいデータだけを処理して結果テーブルを更新
- データメンテナンス: 古いデータの自動アーカイブや削除
-- 毎日午前2時に実行される日次集計クエリの例
-- BigQuery UIからスケジュール設定可能
CREATE OR REPLACE TABLE `project.dataset.daily_sales_summary`
AS
SELECT
DATE(transaction_timestamp) AS sale_date,
store_id,
product_category,
SUM(quantity) AS units_sold,
SUM(amount) AS total_sales
FROM
`project.dataset.transactions`
WHERE
DATE(transaction_timestamp) = DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY
sale_date, store_id, product_category;
6. BigQuery ML
概要:
BigQuery内でSQLを使用して機械学習モデルを作成、評価、使用できる機能。
主な機能:
- SQL文による機械学習モデル構築
- 線形回帰、ロジスティック回帰、k-means、時系列分析など複数のアルゴリズムをサポート
- AutoMLとの統合
ユースケース:
- 顧客行動予測: 過去の購買データから将来の購買可能性を予測
- 異常検知: 通常とは異なるパターンや不正行為を検出
- 需要予測: 時系列データに基づいて将来の需要を予測
- 顧客セグメンテーション: 行動や属性に基づいて顧客を自動的にグルーピング
-- BigQuery MLを使った顧客離脱予測モデルの例
CREATE OR REPLACE MODEL `project.dataset.customer_churn_model`
OPTIONS(
model_type='LOGISTIC_REG',
input_label_cols=['churned']
) AS
SELECT
IF(next_purchase_date IS NULL AND DATE_DIFF(CURRENT_DATE(), last_purchase_date, DAY) > 90, 1, 0) AS churned,
customer_age,
tenure_days,
avg_purchase_value,
purchase_frequency,
days_since_last_purchase,
support_contacts_count,
product_categories_count
FROM
`project.dataset.customer_features`;
-- モデルを使った予測
SELECT
customer_id,
ML.PREDICT(MODEL `project.dataset.customer_churn_model`,
(SELECT
customer_age,
tenure_days,
avg_purchase_value,
purchase_frequency,
days_since_last_purchase,
support_contacts_count,
product_categories_count
FROM
`project.dataset.customer_features`
WHERE
customer_id = c.customer_id)
).predicted_churned AS churn_probability
FROM
`project.dataset.customers` c
ORDER BY
churn_probability DESC
LIMIT 100;
7. Cloud Composer (Apache Airflow)
概要:
GCP上で動作するフルマネージドのApache Airflowサービスで、ワークフローのオーケストレーションを提供。
主な機能:
- Pythonベースのワークフロー定義
- 依存関係の管理と条件付き実行
- スケジュール設定と再試行メカニズム
- ワークフロー監視ダッシュボード
ユースケース:
- 複雑なデータパイプラインの管理: 複数のステップや依存関係を持つETLパイプラインの調整
- クロスサービスオーケストレーション: BigQueryに加えてDataflow、DataprocなどのGCPサービスを組み合わせた処理
- 条件付き処理: データの状態や品質に基づいた条件付きワークフロー実行
# Cloud Composer (Airflow) DAGの例 - BigQueryでの日次データ処理
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2023, 4, 1),
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG('daily_sales_processing', default_args=default_args, schedule_interval='0 2 * * *') as dag:
load_new_data = GCSToBigQueryOperator(
task_id='load_new_data',
bucket='my-sales-data',
source_objects=['sales/{{ ds }}/*.csv'],
destination_project_dataset_table='project.dataset.raw_sales',
schema_fields=[...],
write_disposition='WRITE_APPEND',
)
transform_data = BigQueryExecuteQueryOperator(
task_id='transform_data',
sql="""
CREATE OR REPLACE TABLE `project.dataset.daily_sales_summary`
AS
SELECT
DATE(transaction_timestamp) AS sale_date,
store_id,
product_category,
SUM(quantity) AS units_sold,
SUM(amount) AS total_sales
FROM
`project.dataset.raw_sales`
WHERE
DATE(transaction_timestamp) = '{{ ds }}'
GROUP BY
sale_date, store_id, product_category
""",
use_legacy_sql=False,
)
load_new_data >> transform_data
データ可視化・BIツール系
8. Looker Studio (旧Data Studio)
概要:
BigQueryに直接接続できる無料のデータ可視化ツール。
主な機能:
- ドラッグ&ドロップによる直感的なダッシュボード作成
- BigQueryへの直接クエリ機能
- 広範な可視化形式(表、グラフ、地図など)
- レポート共有と埋め込み機能
ユースケース:
- セルフサービスダッシュボード: ビジネスユーザーが自らデータを視覚化
- KPIモニタリング: 主要指標をリアルタイムで監視するダッシュボード
- 定期レポート: 自動更新される日次/週次/月次レポートの配信
9. Looker
概要:
Googleが買収した高度なBI(ビジネスインテリジェンス)およびデータ分析プラットフォーム。
主な機能:
- LookML(Lookerのモデリング言語)によるデータモデリング
- 再利用可能なビジネスロジック定義
- 高度なダッシュボードとビジュアライゼーション
- アクションプラットフォーム(分析から直接アクションを起こせる)
ユースケース:
- データの民主化: 組織全体でのセルフサービス分析を実現
- 埋め込み分析: 社内アプリケーションやカスタマーポータルに分析を統合
- 高度なデータ分析: 複雑なビジネスロジックを持つ分析とダッシュボード
# LookMLモデルの例
view: sales {
sql_table_name: `project.dataset.sales` ;;
dimension: sale_id {
primary_key: yes
type: string
sql: ${TABLE}.sale_id ;;
}
dimension_group: transaction {
type: time
timeframes: [raw, date, week, month, quarter, year]
sql: ${TABLE}.transaction_timestamp ;;
}
dimension: product_category {
type: string
sql: ${TABLE}.product_category ;;
}
measure: total_sales {
type: sum
sql: ${TABLE}.amount ;;
value_format_name: usd
}
measure: average_sale_value {
type: average
sql: ${TABLE}.amount ;;
value_format_name: usd
}
}
10. Tableau
概要:
BigQueryに接続可能な業界リーディングのBIおよびデータ可視化ツール。
主な機能:
- 高度で柔軟な可視化機能
- ドラッグ&ドロップインターフェース
- 強力な分析機能(予測、統計、地理空間分析など)
- BigQueryとのネイティブ連携
ユースケース:
- エンタープライズBI: 大規模組織でのデータ可視化標準化
- 複雑な分析可視化: 多次元データの高度な視覚的分析
- インタラクティブダッシュボード: ユーザー操作に応じた動的なデータ表示
データガバナンス・セキュリティ系
11. Data Catalog
概要:
GCPのフルマネージドで拡張性の高いメタデータ管理サービス。
主な機能:
- データの自動カタログ化とタグ付け
- データ検索と発見機能
- ビジネスメタデータとテクニカルメタデータの両方をサポート
- データ系統(データリネージ)の可視化
ユースケース:
- メタデータ管理: データセットとテーブルの説明、所有者、品質情報などを管理
- セルフサービスデータ発見: 組織全体でデータを検索し活用
- コンプライアンス管理: 機密データの識別とタグ付け
12. Sensitive Data Protection
概要:
GCPの機密データ検出、分類、および匿名化サービス。
主な機能:
- 50以上の定義済み機密データタイプの検出
- カスタムタイプの定義機能
- マスキング、トークン化、擬似匿名化などの保護技術
- BigQueryとのネイティブ統合
ユースケース:
- 機密データ検出: BigQueryテーブル内のPII(個人を特定できる情報)の自動検出
- データ匿名化: 分析のためにデータを維持しながら機密情報を保護
- コンプライアンス支援: GDPRやHIPAAなどの規制への準拠をサポート
-- Sensitive Data Protectionで処理したデータの例(擬似コード)
-- 実際はAPIまたはコンソールから設定
SELECT
customer_id,
-- 元の電子メールをマスク
REGEXP_REPLACE(email, '(.*)@(.*)', 'xxxx@\\2') AS masked_email,
-- クレジットカード番号をトークン化
tokenize(credit_card_number) AS tokenized_cc,
purchase_amount
FROM
`project.dataset.customers`;
特殊用途サービス
13. BigQuery Omni
概要:
マルチクラウド分析を可能にするBigQueryの拡張機能で、AWS S3やAzure Blobに保存されたデータを分析できる。
主な機能:
- クラウド間でのデータ移動なしに分析
- 統一されたSQLインターフェース
- クロスクラウドのデータクエリ
- 各クラウドプロバイダのセキュリティモデルを維持
ユースケース:
- マルチクラウド分析: 異なるクラウドプロバイダにあるデータを統合分析
- データ主権対応: データ移動規制に対応しながらグローバル分析
- クラウド移行: クラウド間移行中の段階的アプローチをサポート
-- BigQuery Omniを使用したAWSデータの分析例
SELECT
date,
product_category,
SUM(revenue) AS total_revenue
FROM
`aws-region.dataset.sales_data`
WHERE
date BETWEEN '2023-01-01' AND '2023-03-31'
GROUP BY
date, product_category
ORDER BY
date, total_revenue DESC;
14. BigQuery BI Engine
概要:
BigQueryに高速なインメモリ分析エンジンを追加するサービス。
主な機能:
- 超高速なクエリレスポンス(サブ秒)
- BIツールとの統合の最適化
- 列指向の処理
- キャパシティベースの料金モデル
ユースケース:
- インタラクティブダッシュボード: レスポンスタイムが重要なダッシュボードの高速化
- アドホッククエリ: ビジネスユーザーのリアルタイム探索的分析
- 高頻度レポート: 頻繁にアクセスされるレポートのパフォーマンス最適化
-- BigQuery BI Engineで高速化されるクエリの例
-- (BI Engineはクエリ自体を変更せず、バックグラウンドでパフォーマンスを向上)
SELECT
product_category,
store_region,
DATE_TRUNC(sale_date, MONTH) AS month,
SUM(sales_amount) AS revenue
FROM
`project.dataset.sales`
WHERE
sale_date BETWEEN '2023-01-01' AND '2023-03-31'
GROUP BY
product_category, store_region, month
ORDER BY
month, revenue DESC;
15. BigQuery GIS
概要:
BigQuery内で地理空間データを分析するための機能群。
主な機能:
- 地理空間データ型(GEOGRAPHY)のサポート
- 地理空間関数(距離計算、交差判定など)
- 複雑な地理空間クエリの最適化
- 地理空間可視化との連携
ユースケース:
- 位置ベース分析: 店舗や顧客の位置データに基づく商圏分析
- 経路最適化: 配送ルートや移動パターンの分析
- 地理的クラスタリング: 位置に基づく顧客セグメンテーション
-- BigQuery GISを使った地理空間分析の例
-- 各店舗から5km圏内の顧客数を計算
SELECT
store_id,
store_name,
COUNT(*) AS customers_within_5km
FROM
`project.dataset.stores` stores,
`project.dataset.customers` customers
WHERE
ST_DISTANCE(stores.location, customers.home_location) <= 5000 -- 5000メートル = 5km
GROUP BY
store_id, store_name
ORDER BY
customers_within_5km DESC;
実際のユースケースによる統合例
ユースケース1: リアルタイムダッシュボード構築
シナリオ:
ECサイトの運営チームがウェブサイトやアプリからのユーザー行動データをリアルタイムで分析し、運用上の意思決定を行いたい。
ソリューション構成:
- データ収集: Google Analytics 4 → BigQuery DTSで自動データ転送
- ストリーミングデータ: Cloud Functions → Pub/Sub → Dataflow → BigQuery
- データ処理: BigQuery Scheduled Queriesで15分ごとに集計テーブル更新
- 高速クエリ: BigQuery BI Engineでダッシュボードクエリを高速化
- 可視化: Looker Studioでリアルタイムダッシュボード構築
メリット:
- データの収集から可視化までのエンドツーエンドのパイプライン
- バッチとストリーミングの両方のデータを統合
- 最小限の遅延で意思決定を支援
-- 15分ごとに実行される集計クエリ例
CREATE OR REPLACE TABLE `project.dataset.realtime_metrics`
AS
SELECT
DATETIME_TRUNC(event_timestamp, MINUTE) AS minute,
country,
device_category,
traffic_source.medium AS traffic_medium,
COUNT(DISTINCT user_pseudo_id) AS unique_users,
COUNT(DISTINCT session_id) AS sessions,
SUM(IF(event_name = 'purchase', 1, 0)) AS conversions,
SUM(IF(event_name = 'purchase', ecommerce.purchase_revenue, 0)) AS revenue
FROM
`project.dataset.events_*`,
UNNEST(items) AS item
WHERE
_TABLE_SUFFIX BETWEEN
FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY))
AND FORMAT_DATE('%Y%m%d', CURRENT_DATE())
AND event_timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 4 HOUR)
GROUP BY
minute, country, device_category, traffic_medium
ORDER BY
minute DESC;
ユースケース2: 予測分析と自動化アクション
シナリオ:
小売企業が店舗ごとの商品需要を予測し、在庫管理システムと連動して自動的に発注を最適化したい。
ソリューション構成:
- データ統合: Cloud Data Fusionで販売データ、在庫データ、外部要因(天気、イベントなど)を統合
- 予測モデル構築: BigQuery MLで商品カテゴリ別・店舗別の需要予測モデルを構築
- 予測実行: Cloud Composerで日次バッチで需要予測を実行し結果をテーブルに保存
- アクション自動化: 予測結果に基づいてCloud FunctionsでAPIを呼び出し、発注システムに通知
- モニタリング: Lookerで予測精度と在庫レベルをモニタリング
メリット:
- 予測からアクションまでを自動化
- 複数のデータソースを組み合わせたハイブリッドモデル
- 継続的なモデル評価と改善サイクル
-- BigQuery MLで時系列予測モデルを作成
CREATE OR REPLACE MODEL `project.dataset.demand_forecast`
OPTIONS(
model_type='ARIMA_PLUS',
time_series_timestamp_col='date',
time_series_data_col='units_sold',
time_series_id_col='store_product_id',
horizon=14 -- 2週間先まで予測
) AS
SELECT
CONCAT(CAST(store_id AS STRING), '-', product_id) AS store_product_id,
date,
units_sold,
-- 外部要因
is_holiday,
is_weekend,
promo_discount_percent,
avg_temperature,
precipitation_mm
FROM
`project.dataset.sales_history`
WHERE
date BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 2 YEAR) AND CURRENT_DATE();
-- 予測を実行して結果テーブルに保存
CREATE OR REPLACE TABLE `project.dataset.demand_predictions`
AS
SELECT
forecast_timestamp AS date,
SPLIT(time_series_id, '-')[OFFSET(0)] AS store_id,
SPLIT(time_series_id, '-')[OFFSET(1)] AS product_id,
forecast_value AS predicted_units,
prediction_interval_lower_bound AS min_units,
prediction_interval_upper_bound AS max_units,
CURRENT_TIMESTAMP() AS prediction_generated_at
FROM
ML.FORECAST(MODEL `project.dataset.demand_forecast`,
STRUCT(14 AS horizon, 0.8 AS confidence_level));
ユースケース3: データ品質管理パイプライン
シナリオ:
金融機関が複数ソースからのデータを統合し、規制要件を満たすためにデータ品質を厳格に管理したい。
ソリューション構成:
- データ取り込み: Dataflowでソースシステムからデータを抽出し変換
- 品質チェック: Cloud Composerで一連のデータ品質チェックを実行
- 機密データ保護: Sensitive Data Protectionで個人情報を検出しマスキング
- メタデータ管理: Data Catalogでデータ系統とメタデータを追跡
- 監査レポート: Scheduled Queriesでデータ品質メトリクスを計算し、Lookerで可視化
メリット:
- エンドツーエンドのデータガバナンス
- 自動化された品質管理プロセス
- コンプライアンス要件への対応
# Cloud Composer (Airflow)での品質チェックDAG例
from airflow import DAG
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from datetime import datetime
default_args = {
'owner': 'data_quality_team',
'start_date': datetime(2023, 4, 1),
'email_on_failure': True,
}
with DAG('data_quality_checks', default_args=default_args, schedule_interval='0 3 * * *') as dag:
# 完全性チェック
completeness_check = BigQueryExecuteQueryOperator(
task_id='completeness_check',
sql="""
INSERT INTO `project.dataset.data_quality_results`
SELECT
'completeness' AS check_type,
'customer_data' AS table_name,
CURRENT_TIMESTAMP() AS check_time,
COUNTIF(customer_id IS NULL OR customer_name IS NULL OR email IS NULL) AS failed_records,
COUNT(*) AS total_records
Discussion