😊

Cloud Functions から BigQuery Storage Write API 使ってみた

こんにちは。

株式会社アルダグラムの内倉です。

今回は、掲題のとおり Cloud Functions + BigQuery Storage Write API 使ってみた話を書いてみたいと思います。

今回の経緯

↓ざっと、こんなかんじの要件がありました。

  • 大量のログを貯めつつ、ある程度検索も自由にやりたい
  • できるだけ、リアルタイムに近いかたちでクエリできるようにしたい

Dynamo や Elasticsearch 、KANNA でメインで利用しているRDSも、保存先候補としてあがりましたが

主に1つ目の「大量のログを貯める」「ある程度自由に検索したい」というとこから、BigQuery を使用することになりました。

構成

こんなかんじになりました。

一度、なんでも受け入れてくれる Firestore にデータを格納しておいて、onCreate トリガーで Bigquery へ書き込みます。

Firestore → BigQuery の組み合わせは、以前別のプロジェクトで採用していたメンバーもいたのですが「安定していておすすめ」とのことでした⭕

Cloud Functions も失敗することがあるので、リトライを気持ち多めに入れておくと安心です。

BigQuery へのデータ取り込み方法

今回、候補になったデータの取り込み方法がいくつかあります。

それぞれ、良いとこがありました。

  • バッチ読み込み
    • メリット:
      • 課金対象外
    • デメリット:
      • 取り込み速度は、なるはや(すぐクエリできないこともある)
      • 一日に実行できる上限がある
  • ストリーミング挿入
    • メリット:
      • 基本的にすぐクエリできるようになる
      • Firebase Extensions を利用すれば、ちょっとぽちぽちするだけで良い
    • デメリット:
      • ( Firebase Extensions だと)
        • パーティションキーが自由に設定できない
        • 1データがまるごと1カラムに入るので、クラスタリングキーが指定できない
  • BigQuery Storage Write API
    • メリット:
      • 基本的にすぐクエリできるようになる
      • パーティションキーも、クラスタリングキーも自由に指定できる
      • ストリーミングと比べて
        • 1つのストリームで1回しか commit できない(失敗したら、気軽に再試行できる)
        • 取り込みスループットが、3倍くらい早い
      • 公式の押している感
    • デメリット:
      • ちょっと、ドキュメントが少ない??

最初、Firebase Extensions を使おうと思っていましたが、クラスタリングキーが指定できないと厳しいので、BigQuery Storage Write API を使うことにしました。

実装してたときは、「ドキュメントないな〜」と思っていたので、BigQuery Storage Write API のデメリットにあげましたが、公式含めて充実してきてる気がします👀

実装

やっていきます💪

すでに Firebase のプロジェクトは作ってある前提です。

1. BigQuery 側の準備

BigQuery のコンソール で作業していきます。

  1. Firebase と同じ region を指定して、dataset を作成します。

  2. テーブルを作成します。今回は、sample_table を作ってみます。

    ユーザー操作を保存するテーブルです。

    CREATE TABLE
      [project id].[dataset id].sample_table
    (
      id STRING NOT NULL,
      user_uuid STRING NOT NULL,
      action_type INT64 NOT NULL,
      resource_uuids Array<STRING>,
      operated_at TIMESTAMP NOT NULL
    )
    PARTITION BY DATE(operated_at)
    CLUSTER BY operated_at, user_uuid;
    

    今回は、ゆくゆく「検索対象がかなり多くなる」という要件です。

    クエリするときに必ずパーティション指定して、検索対象を限定してほしいので、パーティションフィルターをつけておきます。

    (パーティションの指定なしでクエリするとエラーが出るようになります)

    ALTER TABLE [project id].[dataset id].sample_table
      SET OPTIONS (require_partition_filter = true);
    
  3. BigQuery Storage Write API を有効にしておきます。↓

    https://console.cloud.google.com/apis/enableflow?apiid=bigquerystorage.googleapis.com

2. Protocol Buffers の準備

BigQuery Storage Write API では gRPC ストリーミングを使用しているので、Protocol Buffers の準備をしていきます。

  1. まずはライブラリインストール

    npm i @grpc/grpc-js grpc-tools google-protobuf ts-protoc-gen google-gax
    

    ts-protoc-gen は、TypeScript の型定義を生成するライブラリなので、使わない人はなしで大丈夫です。

  2. proto ファイル作成

    最小だとこんなかんじかな?

    functions/src/proto/sample.proto
    syntax = "proto3";
    
    enum ActionType {
      ACTION_UNSPECIFIED = 0;
      ACTION_CREATE = 1;
      ACTION_UPDATE = 2;
      ACTION_DELETE = 3;
    }
    
    message SampleTable {
      string id = 1;
      string user_uuid = 2;
      ActionType action_type = 3;
      repeated string resource_uuids = 4;
      string operated_at = 5;
    }
    

    Protocol Buffers の Scalar Value Types には日付関連のやつがないので、今回は扱いやすさを考えて string にしています(フォーマットさえしっかり決めとけばいいかな!)。

    TimeStamp がよかったら、組み込み型で定義されてる google.protobuf.Timestamp でも⭕。

  3. コンパイル

    functions/package.json に追加しておきます。

    functions/package.json
    {
    	"scripts": {
    	"codegen": "grpc_tools_node_protoc -I ./src/proto --plugin=protoc-gen-ts=./node_modules/.bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/proto/generated --grpc_out=grpc_js:./src/proto/generated --ts_out=service=grpc-node,mode=grpc-js:./src/proto/generated ./src/proto/*.proto"
    	}
    	...
    }
    

    npm run codegen すると、 functions/src/proto/generated 以下に、コードや ts の型ファイル一式ができます。

3. Cloud Functions トリガーの実装

やっと本題にたどり着きました。。。

  1. BigQuery Storage Write API を叩く共通処理の作成

    functions/src/lib/BigQuery.ts
    import * as functions from 'firebase-functions'
    
    const { BigQueryWriteClient } = require('@google-cloud/bigquery-storage').v1
    const storageClient = new BigQueryWriteClient()
    
    export type BigQueryResponse = {
      statusCode: number,
      XMessageId: string | null
    }
    
    export const sendLogByDefaultStream = async (
      tableName: string,
      data: any,
      protoDescriptor: any
    ) => {
      try {
        const serializedRows = []
        serializedRows.push(data)
    
        const protoData = {
          writerSchema: { protoDescriptor },
          rows: { serializedRows }
        }
        const projectId = process.env.GCP_PROJECT || process.env.GCLOUD_PROJECT
        const datasetId = functions.config()?.aldagram?.bigquery?.dataset_id_for_auditlog
    
        // ローカル等で、環境変数が取れなかったら何もしない
        if (!projectId || !datasetId) return
    
        const writeStream = `projects/${projectId}/datasets/${datasetId}/tables/${tableName}/_default`
        const request = {
          writeStream,
          protoRows: protoData
        }
    
        // NOTE: デフォルトストリームを使うので、append したら即 commit される
        const stream = await storageClient.appendRows()
        stream.on('data', (response: any) => {
          /* response 例
           * [error]
           * {
           *   rowErrors: [],
           *   updatedSchema: null,
           *   error: {
           *     details: [],
           *     code: 3,
           *     message: 'Rows must be specified. Entity: projects/xxx/_default'
           *   },
           *   response: 'error'
           * }
           *
           * [success]
           * {
           *   rowErrors: [],
           *   updatedSchema: null,
           *   appendResult: { offset: null },
           *   response: 'appendResult'
           * }
           */
    
          if (response.error) {
            console.error('error returnd in bigquery storage-write-api response', response.error)
            return
          }
          console.info('bigquery storage-write-api response success!!')
        })
        stream.on('error', (error: any) => {
          console.error('send bigquery storage-write-api failed', error)
        })
        stream.on('end', () => { /* API call completed */ })
    
        stream.write(request)
        stream.end()
    
      } catch (error) {
        console.error('An error occurred during bigquery storage-write-api transmission.', error)
        return
      }
    }
    

    通常の書き込み時は、ストリームを作成→好きなだけ append → commit という流れになりますが

    1件ずつ書き込む場合は、デフォルトストリームが使えます。

    ただし、デフォルトストリームでは1回限りのコミットが保証されないので、厳密に1回だけ書き込みたい場合は、自分でストリームを作成する必要があります。

  2. トリガー作成

    unctions/src/indexes/sampleTableTrigger/sampmeTableOnCreate.ts
    import * as functions from 'firebase-functions'
    import * as admin from 'firebase-admin'
    import * as st from '../../proto/generated/sampleTable_pb'
    import { sendLogByDefaultStream } from '../../lib/BigQuery'
    
    const fn = functions.region('asia-northeast1')
    const type = require('@google-cloud/bigquery-storage').protos.google.protobuf.FieldDescriptorProto.Type
    
    const LOG_PROTO_DESCRIPTOR = {
      name: 'sampleTable',
      field: [
        { 'name': 'id', 'number': 1, 'type': type.TYPE_STRING },
        { 'name': 'user_uuid', 'number': 2, 'type': type.TYPE_STRING },
        { 'name': 'action_type', 'number': 3, 'type': type.TYPE_INT64 },
        { 'name': 'resource_uuids', 'number': 4, 'type': type.TYPE_STRING },
        { 'name': 'operated_at', 'number': 5, 'type': type.TYPE_STRING }
      ]
    }
    
    export const sampleTableOnCreate = (firestore: admin.firestore.Firestore) =>
      fn.firestore
        .document(
          `sample/{logId}`
        )
        .onCreate(
          async (snap, context): Promise<void> => {
            const logId = context.params.logId as string
            const data = {
              ...snap.data()
            }
    
            const log = new st.SampleTable()
            log.setId(logId)
            log.setUserUuid(data.userUuid)
            log.setActionType(data.actionType)
            log.setResourceUuidsList(data.resourceUuids)
            log.setOperatedAt(data.operatedAt)
    
            await sendLogByDefaultStream(
              auditLogcollectionName,
              log.serializeBinary(),
              LOG_PROTO_DESCRIPTOR
            )
          }
        )
    
  3. トリガーを登録する

    functions/src/index.ts で、作ったトリガーを登録します。

    functions/src/index.ts
    import * as admin from 'firebase-admin'
    import { sampleTableOnCreate } from './sampleTableTrigger/sampmeTableOnCreate'
    
    admin.initializeApp()
    const firestore = admin.firestore()
    
    exports.sampleTableOnCreate = sampleTableOnCreate(firestore)
    

これで、Firestore の sampleTable にデータが作成されたタイミングで、BigQuery の同テーブルにも書き込みが行えるようになりました。

弊社で利用しているものは、まだリリースしたばかりですが、これからデータ量が増えて、色々と活用していくのが楽しみです🥳

アルダグラム Tech Blog

Discussion