🚀

Export BigQuery to Firestore (Reverse ETL) を試してみる

2023/12/02に公開

tl;dr;

  • Reverse ETL は、ETL の逆側、つまり Data Warehouse から データ転送元(本記事では Firestore)にデータを連携する処理
  • 2023/12/1 現在、 Bigtable / Firestore が BigQuery からの Reverse ETL に対応していて、本記事では Firestore での手順を解説
  • Firebase の Extensions の仕組みで提供されており、設定記載 + one click で Reverse ETL(Extract, Transform and Load) 処理を実現できる

はじめに

こんにちは。Google Cloud でパートナーエンジニアをやっている Sho です。
この記事は Google Cloud Advent Calendar 2023 の 12/2 の記事です。

突然ですが、BigQuery で統計処理したデータをアプリケーションから利用したいと思ったことはないでしょうか。BigQuery には各種 API が提供されており、クライアントライブラリも存在するので、これらを利用すれば上記の処理を行うことは可能です。ですが、いざ実装してみると以下の点が気になるかもしれません。

  • クエリスキャン量あるいはスロット利用料で従量課金されるため、大規模なデータのスキャンの場合は課金料が気になる
  • BigQuery へのクエリを都度実行する場合、(比較的高速とはいっても)データベースから取得するよりは時間がかかる

上記の問題を解決するための方法として、 BigQuery BI Engineを利用してスキャン費用や取得速度を高速化する(Zennでの活用事例 )という方法もあります。ですが、この記事では、今年 GA された「データベースから BigQuery で処理したデータを扱う」手段を試してみます。

Bigtable の Reverse ETL は公式ブログでも触れられていますので、興味がある方はご一読ください。 Bigtable、 Firestore ともに Reverse ETL 機能は GA されていますが、Firestore については Firebase Extensions での提供になります(Extensions 機能自体はベータ機能となります)。

Firebase Extensions とは?

Firebase は知っているけど Extensions って?という方は多いのではないでしょうか。
Firebase には Hosting、Authentication、Analytics、Storage、Firestore そして Crashlytics など数多くの機能が存在しています。
Firebase を使ってどのようなことが実現できるのか?については公式ページFirebase solution portalに詳しいですが、単体の機能を使っていただくケースと、複数の機能を組み合わせて使っていただくケースに大別できます(このあたりは Google Cloud と同じですね)。これらの主に後者について、拡張機能として Firebase に導入を行い機能を利用できる、というものが、Firebase Extension になります。どんな拡張機能があるか、については是非以下のページからラインナップをご覧ください。

本記事は Firebase Extensions 自体の紹介記事ではないため詳細な説明は省きますが、Cloud AI/ML プロダクトとの結合やメッセージング処理、認証処理など多くの拡張がございます。作成元も Firebase / Google Cloud だけではなく、3rd party のものもございます。
今回ご紹介する BigQuery への Reverse ETL 処理はこの拡張のうちの一つとなります(拡張紹介ページ)。

2023 年 11 月 15 日〜 16 日にかけて開催された Google Cloud Next Tokyo 2023 でもFirebase Extensions で構築する生成 AI チャットボット アプリ体験 というブースで Firebase Extensions についてご説明がありました。この記事を読んでくださっている皆様にも是非今後活用していっていただきたい、と思っています。

Export BigQuery to Firestore の導入

前置きが長くなってしまいましたが、百聞は一見に如かず。早速試してみましょう。
Google アカウントが存在していて、Google Cloud コンソールにログイン出来ている状態を想定しています。

[1] API の有効化、Firestore の作成

※ Firestore データベースが存在する場合は Skip 可能です。
事前に Google Cloud Firestore API を有効にしておきましょう。

また、Firestore を作成していない場合は作成しておきます。

CREATE DATABASE から、SELECT NATIVE MODE を選択します。
Location は設定後変更できません。今回は nam5 (United States) を選択しておきます。

https://console.cloud.google.com/firestore/databases を開き、以下のような表示になっていれば、(default) のデータベースが無事作成されています。本手順は終了です。

[2] BigQuery データの用意

※ 利用可能なBigQuery のテーブル、データセットが作成済みの場合は Skip 可能です。
今回は BigQuery からのデータ転送になるため、予めデータを用意しておきます。適切なデータをお持ちの方はこの処理をスキップきます。

データセットを作成します、ここでは、advent_2023 という名称で作成しました。リージョンは US Multi-region としてください。

BigQuery の public dataset を利用します。

クエリした結果を save results で、advent_2023 以下に保存しましょう。ここでは citibike_stations という名称で保存しました。

クエリで上記の保存したテーブルにアクセスできることを確認したら、この手順は完了です。

[3] Firebase プロジェクトの作成

※ Firebase プロジェクト作成済みの場合は Skip 可能です。
https://console.firebase.google.com/ にアクセスして、Google Cloud のプロジェクト名を指定しましょう。(古い Google Cloud のアイコン)が表示されていれば OK です。

課金を有効にしておく必要がありますので、 Blaze プランになっていることを確認してください。無料版ですと機能が制限されており、本拡張はご利用いただけません。

Analytics は有効無効どちらでも大丈夫です(そのまま Continue)。

Google Analytics も Default のアカウントを指定して OK です。

Add Firebase ボタンを押したら、プロジェクトが作成されるのを待ちます。

作成が完了し、以下の様な画面に遷移したら、プロジェクト作成は完了です。

[4] Extensions のインストール

それでは、Extensions を導入していきます。
Build の Extensions から、先程ご紹介した Extensions 一覧が確認できる画面に遷移します。

Explore Extesions Hub を押す。

対象ページから Export BigQuey to Firestore を検索します。

すると製品紹介ページに遷移します。
早速拡張を導入しましょう。"Install in Firebase Console" ボタンを押します。

すると、インストール先のプロジェクトを選択する画面に遷移するので、さきほど作成したプロジェクトを選択しましょう。

選択できたら、次は設定を進めていきます。

[4-1] Review billing and usage

本拡張を導入することで、どのくらい費用がかかるかを説明した項目です。
以下が実際の説明文章になりますが、要点をかいつまんで記載すると、以下になります。
また、ここに記載されていない料金(BigQuery へのアクセス、Firestore への書き込みや読み込みで発生)も課金される可能性がありますので、大規模なデータやワークロードで利用を検討している場合は、開発環境で利用量の目安を確認しておくことをおすすめします。

  • 3rd party の API を実行すると別途料金がかかる可能性があるよ
  • Firebase のリソースの課金は(この拡張の機能を全く使わないとしても)月 1 セント程度掛かる可能性があるよ
  • 各種サービスには無料枠があるから、それを超えた分だけ課金されます
  • Cloud Tasks については別途料金を確認してね
Making requests to third-party APIs may incur costs from the third-party service.
You will be charged a small amount (typically around $0.01/month) for the Firebase resources required by this extension (even if it is not used), in addition to any charges associated with its usage. However, you'll only be charged for usage that exceeds Firebase's no-cost tier for those services. Learn more details in the Firebase pricing page.
Cloud Tasks pricing is based on billable operations per month. Learn more details in the Google Cloud Tasks Pricing page.

[4-2] Review APIs enabled and resources created

続いて、有効化が必要な API を確認します。Firestore API は本記事冒頭で有効化しましたが、その他のAPI については拡張のインストール時に実施してくれます。

本記事執筆時点で記載されている API リストは以下です。

  • BigQuery API
    • BigQuery のクエリを実行するのに必要
  • BigQuery Data Transfer API
    • データ転送をスケジュールするために必要
  • Cloud Tasks API
    • TaskQueue が実行時、キューをプロビジョニングするのに必要

本拡張インストール時に、以下のリソースが Google Cloud プロジェクトに作成されます。

  • Cloud Functions
  • Cloud Tasks

また、Artifact registry も有効化が必要となっていました。Enable ボタンを押して有効化しておきましょう。

[4-3] Review access granted to this extension

次に、本処理で利用するリソースに与える権限を確認します。

説明文に記載のある通り、 Firebase Extensions はサービスアカウントを利用して、拡張機能で定義されているワークロードを実行する、とあります。

Firebase Extensions use service accounts, with roles specified by the extension's developer, to access your project and resources

本記事掲載時の記載リストは以下です。確認したら、Next を押しましょう。

  • Cloud Datastore User
    • BigQuery で実行したクエリの結果を Firestore に書き込むために必要
  • BigQuery Admin
    • Transfer Config を作成、及び BigQuery のDestination table にクエリするために必要
  • Pub/Sub Admin
    • BigQuery Data Transfer から、Pub/Sub Topic へ通知を行うために必要
  • Cloud Tasks Enqueuer
    • Cloud Tasks にキューを追加するために必要

[4-4] Configure extension

最後に、Extentions を実行する上で必要な設定をしていきます。
※入力値の説明については記事執筆時のものであり、変更される可能性があります。

項目 説明 Required?
Cloud Function location Cloud Function をデプロイするロケーションです。 デフォルトでOK(Iowa, us-central1)
BigQuery Dataset Location BigQuery のデータセットのローケーション デフォルトでOK(US Multi-Region)
DisplayName 本拡張の表示名。変更不可で、この名前が"BigQuery Data Transfer / Scheduled Query の表示名として利用されます 必須
Dataset ID データセットのID、後述しますが Firestore に書き込まれる結果を格納するための領域になります。 必須
Destination Table Name 上記のテーブル名になります 必須
Query String Firestore に書き込みたい内容を取得するクエリを記載します。このクエリの結果が Destination Table Name に格納されます。 必須
Partition Field パーティションフィールドを使いたい場合に指定します(未検証) 任意(省略可)
Schedule 何分単位でクエリを実行するかを指定します。頻度が高いほどデータの鮮度があがりますが、コストも増大します。 every 15 minutes
Firestore Collection 上記で実行された結果を格納する Firestore のトップレベルのコレクション名です。デフォルト値でOKです。 必須
Pub Sub Topic スケジュールクエリが実行完了した際に通知先として利用されるトピック名です 必須

高度な設定(VPC、Docker リポジトリ、タイムアウト秒)もありますが、今回は何も設定せずに進めてみます。

クエリについては、本記事の手順に沿って進めた方は、以下を指定してみてください。

SELECT
	station_id,
	name,
	capacity
FROM
	`<Google Cloud プロジェクト名>.<データセット名>.<テーブル名>`
LIMIT 100

今回の手順では以下の設定値で進めることにします。

すべて完了したら、Install Extension を押してしばらくまちます。

インストールが完了すると、以下のように表示されます。

[5] 無事インストールができたことの確認

上記の画面から遷移し、 Extension の詳細を見てみましょう。

BigQuery の Data Transfer に “Display Name” で先程指定した Display Name のジョブが登録されていることを確認してください。

ジョブが存在しない場合、(成功したように見えるが)何らかの処理作成処理が失敗している可能性があります。
拡張の APIs and resources から、 Cloud Functions のエラー等を確認すると原因究明の助けになります。

エラーが出ず Cloud Functions の処理が完了していればインストール完了です!お疲れ様でした🍺

実際に格納されたデータを確認する

さて、データは本当に Firestore へ格納されているのでしょうか?

Firestore は、Google Cloud コンソール上で格納されているデータを確認することができます。コンソールにアクセスしてみます。
すると、コレクション部分に transferConfigs というコレクションが作成されていることがわかります。

改めてドキュメント を確認すると、Get started の下部に具体的にどのような構造で保存されているのかが記載されています。
これによると、transferConfig/<ConfigのID>/runs/<Transfer実行ID>/latest 以下に実行に際しての最新のメタデータが存在し、保存されたデータがある、と記載がありました。

上記に記載された通りになっているか、実際にデータベースの中身を覗いてみます。
まず、transferConfigs という root collection ができているのがわかります。
transferConfig コレクション以下には、UUID のような ID 値を持つドキュメントが 1 つあります。こちらの中身を見てみると、実行に際してのメタデータが保存されていました。

該当ドキュメントには runs というサブコレクションがあります。Firestore はドキュメント以下にコレクションを持つことができ、階層構造を作ることができるのでしたね。
latest 及び UUID形式の ID を持つドキュメントがありました。

それぞれの中身を見てみます。

ドキュメントID 中身
657743bf-0000-2c49-87d3-d4f547ffb178
65b52c6f-0000-2aa5-8517-001a11483394
latest

latest には、output がなく、他のドキュメントには output があり、latest には latestRunId というカラムがありました。そして、latestRunId には 657743bf-0000-2c49-87d3-d4f547ffb178 が指定されており、実行時間を見ても該当のドキュメントが最新のようです。
つまり、runs.latest.latestRunId を取得した上で runs.latest.<latestRunId>.output にアクセスすれば、常に最新のデータにアクセスできることがわかりました。runs 以下のドキュメントはジョブが実行するたびに増えていきますので、15分のインターバルの場合は 15 分に 1件ずつ増えていくことになります。

まとめると、コレクションとドキュメントの構造は、以下の図のように表現することができます。

クライアントライブラリを用いたデータの取得

コンソールからデータが格納されていることを確認できました。
ここでは、クライアントライブラリコード (Python) からの取得を試してみましょう。Cloud Shell 上でお手軽に確認していただけます。

まず、データが格納されている Firestore のコレクションを取得します。以下のいずれかの方法で確認してください。
(1) BigQuery Data Transfer から確認
Configulation の Resource name

(2) Firestore console から確認

上記の ID を控えておきます。では、コードを書いていきましょう。

# 仮想環境の用意
python3 -m venv fstore
source fstore/bin/activate
pip install google-cloud-firestore

ソースコード

from google.cloud import firestore

# ルートコレクションの名称を指定します。デフォルトであれば transferConfig
root_collection_name = "transferConfigs"
# 上記の手順で取得したIDを指定します。
root_document_id = "656b63df-0000-2ec1-9337-3c286d397632"

client = firestore.Client()
# transferConfigs 以下の IDは、BigQueryのData Transfer を参照
doc_ref = client.collection(root_collection_name).document(root_document_id)
# latestRunIdを取得
latest_run_id = doc_ref.collection("runs").document("latest").get().to_dict()['latestRunId']
# latestRunIdから、結果をリストで取得
list_ref = doc_ref.collection("runs").document(latest_run_id).collection("output")
# 10件を取得
query = list_ref.limit(10)
for entry in query.stream():
	print(entry.to_dict())

早速実行してみます。無事結果が取得できました。

上記はいずれも文字列でしたので、様々な型のデータでも試してみます。

結果は以下のようになりました。

対応表にしてみるとこんな感じになります。
概ね想定通りの結果になりました。JSON 型は string として出力されていました。map として保存したい場合は、RECORD型(tuple)で保存しておくと良さそうです。

BigQueryの型 Firestoreの型
INTEGER / FLOAT number
DATE / TIMESTAMP timestamp
STRING / JSON string
RECORD map
REPEATED array

これで、BigQuery に保存されたデータを Firestore で参照することができるようになりました。Firestore 上で扱うことで簡単なクエリができたり、ドキュメント単位のキャッシュを効かせたりと用途が広がると思いますので、気になった方は是非お試しください。

(おまけ)作成されたリソース

本 Extension を導入することで、Google Cloud にどのようなリソースが作られているのかを見ていきましょう。

作成されたリソースを簡単に示した図です。Firebase から BigQuery に伸びている線は、Scheduled Query (Data Transfer) が動作していることを示しています。

BigQuery Data Transfer

拡張作成時に設定したインターバル間隔でクエリが実行されます。結果、以下が行われます。

  • (設定した)Pub/Sub Topic への Notification
  • Destination table への書き込み
    • Write Preference に WRITE_TRUNCATE が指定されているため、同じ宛先テーブル名を指定された場合は結果が上書きされます。
  • デフォルトでは宛先テーブル名に run_time %H%M%S が指定されるようです。そのため、中間テーブルには全てのヒストリーが残るわけではないことに注意です。
    • ここは(設定を更新することで)Edit 可能なので、日付を入れることですべてのヒストリーを残すことも可能ですが、拡張のサポートの範囲を超える内容となるためご注意ください。

Pub/Sub Topic

トピック ext-bigquery-firestore-export-processMessages が作成されていて、subscriber が存在することがわかります。

Delivery type は Push とあるので、Topic にメッセージが到達したらすぐにそのメッセージを受け取る設定になっているようです。

グラフを見ると、15 分おきにメッセージを受け取っていることがわかりますね。

Cloud Functions

2 つの関数が作成されています。

processMessages は、上記の Pub/Sub トピックがトリガーになっています。BigQuery のScheduled Query が終了された後、トリガーされて実行されるということがわかります。

upsertTransferConfig は、メトリクスを見ても定期的に実行されているわけではないようで、名称の通り、設定変更時に実行されているものと推測できます。

Cloud Tasks

upsertTransferConfig という Task が作成されています。こちらも設定変更時の処理と推測できます。

設定の変更

本拡張は、設定情報を一部変更することができるようになっています。過去に作成されたアーティファクトは削除されないため、データの不整合等には注意する必要があります。

変更可能なのはBigQuery Datasetのリージョン、宛先テーブル、ルートコレクション名、クエリ、スケジュールなどのようです。
例えば、ルートコレクションとクエリを少し変更してみましょう。

# before
transferConfigs

SELECT station_id, name, capacity FROM `advent-2023-sho.advent_2023.citibike_stations` LIMIT 100

# after
transferConfigsV2

SELECT station_id, name, capacity,last_reported FROM `advent-2023-sho.advent_2023.citibike_stations` LIMIT 100

情報を変更して Save を押すと、画面上部が以下のように変化します。設定の変更に数分かかるようです。

コンソールを確認してみたところ、Cloud Functions の削除処理が行われていたのは確認したのですが、その他のリソース(以下)削除されていませんでした。

  • Pub/Sub Topic
  • Cloud Tasks
  • Scheduled Query

BigQuery Data Transfers が増えている

transferConfigV2 ができている

last_reported が追加されている

拡張のアンインストール

拡張のアンインストールは、Firebaseコンソールの拡張最下部に小さくボタンがあります(見えにくい)。

アンインストールを押すと、以下のウィンドウが出るので、 Uninstall extension を押すと処理が実行されます。ウィンドウに記載があるように、Cloud Functions 及びサービスアカウントが削除されるとあります。

実際に実行してみると、作成時と同じように処理が始まりました。

処理が終わると、先程インストールしていた拡張がなくなったことを確認できました。

まとめ

この記事では、Reverse ETL for firestore の実行を試しつつ、どのようなコンポーネントが作成されているかを見てみました。
ここまで読んでいただいた方は既にお気づきになっているかもしれませんが、BigQuery から Firestore で ETL を行う新しいソリューションが出来たというわけではなく、既存のコンポーネントを使った ETL 処理をワンクリック(に近い形)で生成できる、という機能となります。いわゆるリファレンスアーキテクチャをデプロイできる、というイメージを持ちました。

拡張自体の所感でいうと、統計処理データをアプリケーションから処理できるようにする、といった要望はあるものの多くは自前での実装になりがちなので、省コストでデータベースから統計処理データを扱えるのは便利だな、と感じました。皆様もこれを機に Firestore へ触れていただき、統計情報をアプリケーションから扱ってみてはいかがでしょうか?

長文にも関わらず、最後までお読みいただき、誠にありがとうございました。

付録: Firestore の課金

Firestore の利用にかかる料金は、以下のようになります。データの容量に関する課金はリージョンにより異なります。

項目 説明 補足
容量に対する課金 Storage: $0.18 GB/day US Multi-region 1 GB まで無料
データベースの操作に対する課金 Entity Read: $0.06 / 10 万回
Entity Write: $0.18 / 10 万回
Entity delete: $0.02 / 10 万回
無料枠あり
読み取り(5万回)
書き込み(2万回)
削除(2万回)

App Engine の Billing Status から、Firestore の容量利用状況やオペレーションの回数を確認できます。

Google Cloud Japan

Discussion