🔄

WorkflowsでFirestoreからBigQueryへのデータパイプラインを自動化してみた

に公開

Firestore のデータを BigQuery で分析したいケースはよくあります。本記事では、Google Cloud Workflows を使って、Firestore から BigQuery へのデータパイプラインを自動化する方法を解説します。

はじめに

Firestore のデータを BigQuery で分析したいケースはよくあります。
手動でエクスポート・インポートするのは面倒ですし、ミスも起きやすいです。

本記事では、Google Cloud Workflows を使って、Firestore から BigQuery へのデータパイプラインを自動化する方法を解説します。
一度 Workflow を作成すれば、コマンド一つで実行できるようになります。

全体像

[Workflows]
    ├─ 1. Firestoreエクスポート実行
    ├─ 2. エクスポート完了を待機
    └─ 3. BigQueryテーブル作成/更新

前提条件

本記事の手順を実行するには、Google Cloud プロジェクトの作成と gcloud CLI のインストールが完了している必要があります。また、Firestore データベースにデータが存在していることを前提としています。

実装手順

必要な API を有効化

# プロジェクトIDを設定
export PROJECT_ID="your-project-id"
gcloud config set project $PROJECT_ID

# 必要なAPIを一括で有効化
gcloud services enable \
  firestore.googleapis.com \
  bigquery.googleapis.com \
  workflows.googleapis.com \
  storage.googleapis.com \
  logging.googleapis.com

# 有効化の確認
gcloud services list --enabled --filter="name:(firestore|bigquery|workflows|storage|logging)"

GCS バケットの作成

エクスポートデータを一時保存するバケットを作成します。

# バケット名(グローバルにユニーク)
export BUCKET_NAME="${PROJECT_ID}-firestore-export"

# バケット作成
gsutil mb -l asia-northeast1 gs://${BUCKET_NAME}

# ライフサイクルポリシー設定(30日後に自動削除)
cat > lifecycle.json <<EOF
{
  "lifecycle": {
    "rule": [{
      "action": {"type": "Delete"},
      "condition": {"age": 30}
    }]
  }
}
EOF

gsutil lifecycle set lifecycle.json gs://${BUCKET_NAME}

BigQuery データセットの作成

# データセット作成
bq mk \
  --location=asia-northeast1 \
  --dataset \
  --description="Firestore export data" \
  firestore_export

サービスアカウントの作成と権限設定

# サービスアカウント作成
gcloud iam service-accounts create firestore-export-sa \
  --display-name="Firestore Export Service Account"

# 必要な権限を付与
export SA_EMAIL="firestore-export-sa@${PROJECT_ID}.iam.gserviceaccount.com"

# Firestoreエクスポート権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:${SA_EMAIL}" \
  --role="roles/datastore.importExportAdmin"

# GCS権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:${SA_EMAIL}" \
  --role="roles/storage.admin"

# BigQuery権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:${SA_EMAIL}" \
  --role="roles/bigquery.dataEditor"

# BigQuery権限(ジョブ作成)
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:${SA_EMAIL}" \
  --role="roles/bigquery.jobUser"

# ログ書き込み権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:${SA_EMAIL}" \
  --role="roles/logging.logWriter"

# Workflows実行権限
gcloud projects add-iam-policy-binding $PROJECT_ID \
  --member="serviceAccount:${SA_EMAIL}" \
  --role="roles/workflows.invoker"

Workflows の定義ファイル作成

firestore-to-bigquery.yaml を作成します。

# firestore-to-bigquery.yaml
main:
  params: [args]
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - bucket_name: ${project_id + "-firestore-export"}
          - export_prefix: ${"firestore-export-" + text.split(string(sys.now()), "T")[0]}
          - dataset_id: "firestore_export"
          - collections: ["Users", "Products", "Orders"]

    - export_firestore:
        call: googleapis.firestore.v1.projects.databases.exportDocuments
        args:
          name: ${"projects/" + project_id + "/databases/(default)"}
          body:
            outputUriPrefix: ${"gs://" + bucket_name + "/" + export_prefix}
            collectionIds: ${collections}
        result: export_operation

    - wait_for_export:
        call: googleapis.firestore.v1.projects.databases.operations.get
        args:
          name: ${export_operation.name}
        result: export_result

    - process_collections:
        for:
          value: collection_name
          in: ${collections}
          steps:
            - prepare_load_info:
                assign:
                  - table_name: ${collection_name}
                  - gcs_uri: ${"gs://" + bucket_name + "/" + export_prefix + "/all_namespaces/kind_" + collection_name + "/all_namespaces_kind_" + collection_name + ".export_metadata"}
                  - job_created: false
                  - job_id: ""

            - load_to_bigquery:
                try:
                  call: googleapis.bigquery.v2.jobs.insert
                  args:
                    projectId: ${project_id}
                    body:
                      configuration:
                        load:
                          sourceUris:
                            - ${gcs_uri}
                          destinationTable:
                            projectId: ${project_id}
                            datasetId: ${dataset_id}
                            tableId: ${table_name}
                          sourceFormat: "DATASTORE_BACKUP"
                          writeDisposition: "WRITE_TRUNCATE"
                          autodetect: true
                  result: load_job
                except:
                  as: e
                  steps:
                    - set_job_failed:
                        assign:
                          - job_created: false

            - check_and_set_job_id:
                switch:
                  - condition: ${job_created == false}
                    assign:
                      - job_created: false
                  - condition: ${load_job != null and "jobReference" in load_job and "jobId" in load_job.jobReference}
                    assign:
                      - job_created: true
                      - job_id: ${load_job.jobReference.jobId}

            - wait_for_load:
                switch:
                  - condition: ${job_created == true}
                    try:
                      call: googleapis.bigquery.v2.jobs.get
                      args:
                        projectId: ${project_id}
                        jobId: ${job_id}
                      result: load_result
                    except:
                      as: e
                      assign:
                        - load_result: null

# 既存のサブルーチンはそのまま維持
export_in_progress:
  params: [e]
  steps:
    - check:
        return: ${not("done" in e.body) or e.body.done == false}

load_in_progress:
  params: [e]
  steps:
    - check:
        return: ${e.body.status.state != "DONE"}

Workflows のデプロイ

gcloud workflows deploy firestore-to-bigquery \
  --source=firestore-to-bigquery.yaml \
  --location=asia-northeast1 \
  --service-account=${SA_EMAIL}

Workflow の実行

手動実行

# Workflowを実行
gcloud workflows run firestore-to-bigquery \
  --location=asia-northeast1

BigQuery でのデータ確認

export PROJECT_ID="your-project-id"
export DATASET_ID="firestore_export"

エクスポートが完了したら、BigQuery でデータを確認します。

テーブル一覧の確認

bq ls ${PROJECT_ID}:${DATASET_ID}

# 詳細表示
bq ls --format=pretty ${PROJECT_ID}:${DATASET_ID}

トラブルシューティング

エクスポートが失敗する場合

Firestore のエクスポートが失敗する場合、サービスアカウントの権限が不足している可能性があります。roles/datastore.importExportAdmin ロールが正しく付与されているか確認してください。

BigQuery へのロードが失敗する場合

BigQuery へのロードが失敗する場合、GCS バケットへのアクセス権限または BigQuery のデータセットへの書き込み権限を確認してください。また、データセットが存在することも確認が必要です。

参考資料

Discussion