Cloud Functions から BigQuery Storage Write API 使ってみた
こんにちは。
株式会社アルダグラムの内倉です。
今回は、掲題のとおり Cloud Functions + BigQuery Storage Write API 使ってみた話を書いてみたいと思います。
今回の経緯
↓ざっと、こんなかんじの要件がありました。
- 大量のログを貯めつつ、ある程度検索も自由にやりたい
- できるだけ、リアルタイムに近いかたちでクエリできるようにしたい
Dynamo や Elasticsearch 、KANNA でメインで利用しているRDSも、保存先候補としてあがりましたが
主に1つ目の「大量のログを貯める」「ある程度自由に検索したい」というとこから、BigQuery を使用することになりました。
構成
こんなかんじになりました。
一度、なんでも受け入れてくれる Firestore にデータを格納しておいて、onCreate トリガーで Bigquery へ書き込みます。
Firestore → BigQuery の組み合わせは、以前別のプロジェクトで採用していたメンバーもいたのですが「安定していておすすめ」とのことでした⭕
Cloud Functions も失敗することがあるので、リトライを気持ち多めに入れておくと安心です。
BigQuery へのデータ取り込み方法
今回、候補になったデータの取り込み方法がいくつかあります。
それぞれ、良いとこがありました。
- バッチ読み込み
- メリット:
- 課金対象外
- デメリット:
- 取り込み速度は、なるはや(すぐクエリできないこともある)
- 一日に実行できる上限がある
- メリット:
- ストリーミング挿入
- メリット:
- 基本的にすぐクエリできるようになる
- Firebase Extensions を利用すれば、ちょっとぽちぽちするだけで良い
- デメリット:
- ( Firebase Extensions だと)
- パーティションキーが自由に設定できない
- 1データがまるごと1カラムに入るので、クラスタリングキーが指定できない
- ( Firebase Extensions だと)
- メリット:
- BigQuery Storage Write API
- メリット:
- 基本的にすぐクエリできるようになる
- パーティションキーも、クラスタリングキーも自由に指定できる
- ストリーミングと比べて
- 1つのストリームで1回しか commit できない(失敗したら、気軽に再試行できる)
- 取り込みスループットが、3倍くらい早い
- 公式の押している感
- デメリット:
- ちょっと、ドキュメントが少ない??
- メリット:
最初、Firebase Extensions を使おうと思っていましたが、クラスタリングキーが指定できないと厳しいので、BigQuery Storage Write API を使うことにしました。
実装してたときは、「ドキュメントないな〜」と思っていたので、BigQuery Storage Write API のデメリットにあげましたが、公式含めて充実してきてる気がします👀
実装
やっていきます💪
すでに Firebase のプロジェクトは作ってある前提です。
1. BigQuery 側の準備
BigQuery のコンソール で作業していきます。
-
Firebase と同じ region を指定して、dataset を作成します。
-
テーブルを作成します。今回は、
sample_table
を作ってみます。ユーザー操作を保存するテーブルです。
CREATE TABLE [project id].[dataset id].sample_table ( id STRING NOT NULL, user_uuid STRING NOT NULL, action_type INT64 NOT NULL, resource_uuids Array<STRING>, operated_at TIMESTAMP NOT NULL ) PARTITION BY DATE(operated_at) CLUSTER BY operated_at, user_uuid;
今回は、ゆくゆく「検索対象がかなり多くなる」という要件です。
クエリするときに必ずパーティション指定して、検索対象を限定してほしいので、パーティションフィルターをつけておきます。
(パーティションの指定なしでクエリするとエラーが出るようになります)
ALTER TABLE [project id].[dataset id].sample_table SET OPTIONS (require_partition_filter = true);
-
BigQuery Storage Write API を有効にしておきます。↓
https://console.cloud.google.com/apis/enableflow?apiid=bigquerystorage.googleapis.com
2. Protocol Buffers の準備
BigQuery Storage Write API では gRPC ストリーミングを使用しているので、Protocol Buffers の準備をしていきます。
-
まずはライブラリインストール
npm i @grpc/grpc-js grpc-tools google-protobuf ts-protoc-gen google-gax
ts-protoc-gen
は、TypeScript の型定義を生成するライブラリなので、使わない人はなしで大丈夫です。 -
proto ファイル作成
最小だとこんなかんじかな?
functions/src/proto/sample.protosyntax = "proto3"; enum ActionType { ACTION_UNSPECIFIED = 0; ACTION_CREATE = 1; ACTION_UPDATE = 2; ACTION_DELETE = 3; } message SampleTable { string id = 1; string user_uuid = 2; ActionType action_type = 3; repeated string resource_uuids = 4; string operated_at = 5; }
Protocol Buffers の Scalar Value Types には日付関連のやつがないので、今回は扱いやすさを考えて string にしています(フォーマットさえしっかり決めとけばいいかな!)。
TimeStamp がよかったら、組み込み型で定義されてる
google.protobuf.Timestamp
でも⭕。 -
コンパイル
functions/package.json
に追加しておきます。functions/package.json{ "scripts": { "codegen": "grpc_tools_node_protoc -I ./src/proto --plugin=protoc-gen-ts=./node_modules/.bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/proto/generated --grpc_out=grpc_js:./src/proto/generated --ts_out=service=grpc-node,mode=grpc-js:./src/proto/generated ./src/proto/*.proto" } ... }
npm run codegen
すると、functions/src/proto/generated
以下に、コードや ts の型ファイル一式ができます。
3. Cloud Functions トリガーの実装
やっと本題にたどり着きました。。。
-
BigQuery Storage Write API を叩く共通処理の作成
functions/src/lib/BigQuery.tsimport * as functions from 'firebase-functions' const { BigQueryWriteClient } = require('@google-cloud/bigquery-storage').v1 const storageClient = new BigQueryWriteClient() export type BigQueryResponse = { statusCode: number, XMessageId: string | null } export const sendLogByDefaultStream = async ( tableName: string, data: any, protoDescriptor: any ) => { try { const serializedRows = [] serializedRows.push(data) const protoData = { writerSchema: { protoDescriptor }, rows: { serializedRows } } const projectId = process.env.GCP_PROJECT || process.env.GCLOUD_PROJECT const datasetId = functions.config()?.aldagram?.bigquery?.dataset_id_for_auditlog // ローカル等で、環境変数が取れなかったら何もしない if (!projectId || !datasetId) return const writeStream = `projects/${projectId}/datasets/${datasetId}/tables/${tableName}/_default` const request = { writeStream, protoRows: protoData } // NOTE: デフォルトストリームを使うので、append したら即 commit される const stream = await storageClient.appendRows() stream.on('data', (response: any) => { /* response 例 * [error] * { * rowErrors: [], * updatedSchema: null, * error: { * details: [], * code: 3, * message: 'Rows must be specified. Entity: projects/xxx/_default' * }, * response: 'error' * } * * [success] * { * rowErrors: [], * updatedSchema: null, * appendResult: { offset: null }, * response: 'appendResult' * } */ if (response.error) { console.error('error returnd in bigquery storage-write-api response', response.error) return } console.info('bigquery storage-write-api response success!!') }) stream.on('error', (error: any) => { console.error('send bigquery storage-write-api failed', error) }) stream.on('end', () => { /* API call completed */ }) stream.write(request) stream.end() } catch (error) { console.error('An error occurred during bigquery storage-write-api transmission.', error) return } }
通常の書き込み時は、ストリームを作成→好きなだけ append → commit という流れになりますが
1件ずつ書き込む場合は、デフォルトストリームが使えます。
ただし、デフォルトストリームでは1回限りのコミットが保証されないので、厳密に1回だけ書き込みたい場合は、自分でストリームを作成する必要があります。
-
トリガー作成
unctions/src/indexes/sampleTableTrigger/sampmeTableOnCreate.tsimport * as functions from 'firebase-functions' import * as admin from 'firebase-admin' import * as st from '../../proto/generated/sampleTable_pb' import { sendLogByDefaultStream } from '../../lib/BigQuery' const fn = functions.region('asia-northeast1') const type = require('@google-cloud/bigquery-storage').protos.google.protobuf.FieldDescriptorProto.Type const LOG_PROTO_DESCRIPTOR = { name: 'sampleTable', field: [ { 'name': 'id', 'number': 1, 'type': type.TYPE_STRING }, { 'name': 'user_uuid', 'number': 2, 'type': type.TYPE_STRING }, { 'name': 'action_type', 'number': 3, 'type': type.TYPE_INT64 }, { 'name': 'resource_uuids', 'number': 4, 'type': type.TYPE_STRING }, { 'name': 'operated_at', 'number': 5, 'type': type.TYPE_STRING } ] } export const sampleTableOnCreate = (firestore: admin.firestore.Firestore) => fn.firestore .document( `sample/{logId}` ) .onCreate( async (snap, context): Promise<void> => { const logId = context.params.logId as string const data = { ...snap.data() } const log = new st.SampleTable() log.setId(logId) log.setUserUuid(data.userUuid) log.setActionType(data.actionType) log.setResourceUuidsList(data.resourceUuids) log.setOperatedAt(data.operatedAt) await sendLogByDefaultStream( auditLogcollectionName, log.serializeBinary(), LOG_PROTO_DESCRIPTOR ) } )
-
トリガーを登録する
functions/src/index.ts
で、作ったトリガーを登録します。functions/src/index.tsimport * as admin from 'firebase-admin' import { sampleTableOnCreate } from './sampleTableTrigger/sampmeTableOnCreate' admin.initializeApp() const firestore = admin.firestore() exports.sampleTableOnCreate = sampleTableOnCreate(firestore)
これで、Firestore の sampleTable にデータが作成されたタイミングで、BigQuery の同テーブルにも書き込みが行えるようになりました。
弊社で利用しているものは、まだリリースしたばかりですが、これからデータ量が増えて、色々と活用していくのが楽しみです🥳
株式会社アルダグラムのTech Blogです。 世界中のノンデスクワーク業界における現場の生産性アップを実現する現場DXサービス「KANNA」を開発しています。 採用情報はこちら herp.careers/v1/aldagram0508/
Discussion