🔄
WorkflowsでFirestoreからBigQueryへのデータパイプラインを自動化してみた
Firestore のデータを BigQuery で分析したいケースはよくあります。本記事では、Google Cloud Workflows を使って、Firestore から BigQuery へのデータパイプラインを自動化する方法を解説します。
はじめに
Firestore のデータを BigQuery で分析したいケースはよくあります。
手動でエクスポート・インポートするのは面倒ですし、ミスも起きやすいです。
本記事では、Google Cloud Workflows を使って、Firestore から BigQuery へのデータパイプラインを自動化する方法を解説します。
一度 Workflow を作成すれば、コマンド一つで実行できるようになります。
全体像
[Workflows]
├─ 1. Firestoreエクスポート実行
├─ 2. エクスポート完了を待機
└─ 3. BigQueryテーブル作成/更新
前提条件
本記事の手順を実行するには、Google Cloud プロジェクトの作成と gcloud CLI のインストールが完了している必要があります。また、Firestore データベースにデータが存在していることを前提としています。
実装手順
必要な API を有効化
# プロジェクトIDを設定
export PROJECT_ID="your-project-id"
gcloud config set project $PROJECT_ID
# 必要なAPIを一括で有効化
gcloud services enable \
firestore.googleapis.com \
bigquery.googleapis.com \
workflows.googleapis.com \
storage.googleapis.com \
logging.googleapis.com
# 有効化の確認
gcloud services list --enabled --filter="name:(firestore|bigquery|workflows|storage|logging)"
GCS バケットの作成
エクスポートデータを一時保存するバケットを作成します。
# バケット名(グローバルにユニーク)
export BUCKET_NAME="${PROJECT_ID}-firestore-export"
# バケット作成
gsutil mb -l asia-northeast1 gs://${BUCKET_NAME}
# ライフサイクルポリシー設定(30日後に自動削除)
cat > lifecycle.json <<EOF
{
"lifecycle": {
"rule": [{
"action": {"type": "Delete"},
"condition": {"age": 30}
}]
}
}
EOF
gsutil lifecycle set lifecycle.json gs://${BUCKET_NAME}
BigQuery データセットの作成
# データセット作成
bq mk \
--location=asia-northeast1 \
--dataset \
--description="Firestore export data" \
firestore_export
サービスアカウントの作成と権限設定
# サービスアカウント作成
gcloud iam service-accounts create firestore-export-sa \
--display-name="Firestore Export Service Account"
# 必要な権限を付与
export SA_EMAIL="firestore-export-sa@${PROJECT_ID}.iam.gserviceaccount.com"
# Firestoreエクスポート権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/datastore.importExportAdmin"
# GCS権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/storage.admin"
# BigQuery権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/bigquery.dataEditor"
# BigQuery権限(ジョブ作成)
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/bigquery.jobUser"
# ログ書き込み権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/logging.logWriter"
# Workflows実行権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:${SA_EMAIL}" \
--role="roles/workflows.invoker"
Workflows の定義ファイル作成
firestore-to-bigquery.yaml を作成します。
# firestore-to-bigquery.yaml
main:
params: [args]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- bucket_name: ${project_id + "-firestore-export"}
- export_prefix: ${"firestore-export-" + text.split(string(sys.now()), "T")[0]}
- dataset_id: "firestore_export"
- collections: ["Users", "Products", "Orders"]
- export_firestore:
call: googleapis.firestore.v1.projects.databases.exportDocuments
args:
name: ${"projects/" + project_id + "/databases/(default)"}
body:
outputUriPrefix: ${"gs://" + bucket_name + "/" + export_prefix}
collectionIds: ${collections}
result: export_operation
- wait_for_export:
call: googleapis.firestore.v1.projects.databases.operations.get
args:
name: ${export_operation.name}
result: export_result
- process_collections:
for:
value: collection_name
in: ${collections}
steps:
- prepare_load_info:
assign:
- table_name: ${collection_name}
- gcs_uri: ${"gs://" + bucket_name + "/" + export_prefix + "/all_namespaces/kind_" + collection_name + "/all_namespaces_kind_" + collection_name + ".export_metadata"}
- job_created: false
- job_id: ""
- load_to_bigquery:
try:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${project_id}
body:
configuration:
load:
sourceUris:
- ${gcs_uri}
destinationTable:
projectId: ${project_id}
datasetId: ${dataset_id}
tableId: ${table_name}
sourceFormat: "DATASTORE_BACKUP"
writeDisposition: "WRITE_TRUNCATE"
autodetect: true
result: load_job
except:
as: e
steps:
- set_job_failed:
assign:
- job_created: false
- check_and_set_job_id:
switch:
- condition: ${job_created == false}
assign:
- job_created: false
- condition: ${load_job != null and "jobReference" in load_job and "jobId" in load_job.jobReference}
assign:
- job_created: true
- job_id: ${load_job.jobReference.jobId}
- wait_for_load:
switch:
- condition: ${job_created == true}
try:
call: googleapis.bigquery.v2.jobs.get
args:
projectId: ${project_id}
jobId: ${job_id}
result: load_result
except:
as: e
assign:
- load_result: null
# 既存のサブルーチンはそのまま維持
export_in_progress:
params: [e]
steps:
- check:
return: ${not("done" in e.body) or e.body.done == false}
load_in_progress:
params: [e]
steps:
- check:
return: ${e.body.status.state != "DONE"}
Workflows のデプロイ
gcloud workflows deploy firestore-to-bigquery \
--source=firestore-to-bigquery.yaml \
--location=asia-northeast1 \
--service-account=${SA_EMAIL}
Workflow の実行
手動実行
# Workflowを実行
gcloud workflows run firestore-to-bigquery \
--location=asia-northeast1
BigQuery でのデータ確認
export PROJECT_ID="your-project-id"
export DATASET_ID="firestore_export"
エクスポートが完了したら、BigQuery でデータを確認します。
テーブル一覧の確認
bq ls ${PROJECT_ID}:${DATASET_ID}
# 詳細表示
bq ls --format=pretty ${PROJECT_ID}:${DATASET_ID}
トラブルシューティング
エクスポートが失敗する場合
Firestore のエクスポートが失敗する場合、サービスアカウントの権限が不足している可能性があります。roles/datastore.importExportAdmin ロールが正しく付与されているか確認してください。
BigQuery へのロードが失敗する場合
BigQuery へのロードが失敗する場合、GCS バケットへのアクセス権限または BigQuery のデータセットへの書き込み権限を確認してください。また、データセットが存在することも確認が必要です。
参考資料
- 参考 : Workflows 公式ドキュメント
- 参考 : Firestore Export/Import
- 参考 : BigQuery Data Loading
Discussion