😊

Cloud Functions から BigQuery Storage Write API 使ってみた

アルダグラム Tech Blog2022/11/28に公開

こんにちは。

株式会社アルダグラムの内倉です。

今回は、掲題のとおり Cloud Functions + BigQuery Storage Write API 使ってみた話を書いてみたいと思います。

今回の経緯

↓ざっと、こんなかんじの要件がありました。

  • 大量のログを貯めつつ、ある程度検索も自由にやりたい
  • できるだけ、リアルタイムに近いかたちでクエリできるようにしたい

Dynamo や Elasticsearch 、KANNA でメインで利用しているRDSも、保存先候補としてあがりましたが

主に1つ目の「大量のログを貯める」「ある程度自由に検索したい」というとこから、BigQuery を使用することになりました。

構成

こんなかんじになりました。

一度、なんでも受け入れてくれる Firestore にデータを格納しておいて、onCreate トリガーで Bigquery へ書き込みます。

Firestore → BigQuery の組み合わせは、以前別のプロジェクトで採用していたメンバーもいたのですが「安定していておすすめ」とのことでした⭕

Cloud Functions も失敗することがあるので、リトライを気持ち多めに入れておくと安心です。

BigQuery へのデータ取り込み方法

今回、候補になったデータの取り込み方法がいくつかあります。

それぞれ、良いとこがありました。

  • バッチ読み込み
    • メリット:
      • 課金対象外
    • デメリット:
      • 取り込み速度は、なるはや(すぐクエリできないこともある)
      • 一日に実行できる上限がある
  • ストリーミング挿入
    • メリット:
      • 基本的にすぐクエリできるようになる
      • Firebase Extensions を利用すれば、ちょっとぽちぽちするだけで良い
    • デメリット:
      • ( Firebase Extensions だと)
        • パーティションキーが自由に設定できない
        • 1データがまるごと1カラムに入るので、クラスタリングキーが指定できない
  • BigQuery Storage Write API
    • メリット:
      • 基本的にすぐクエリできるようになる
      • パーティションキーも、クラスタリングキーも自由に指定できる
      • ストリーミングと比べて
        • 1つのストリームで1回しか commit できない(失敗したら、気軽に再試行できる)
        • 取り込みスループットが、3倍くらい早い
      • 公式の押している感
    • デメリット:
      • ちょっと、ドキュメントが少ない??

最初、Firebase Extensions を使おうと思っていましたが、クラスタリングキーが指定できないと厳しいので、BigQuery Storage Write API を使うことにしました。

実装してたときは、「ドキュメントないな〜」と思っていたので、BigQuery Storage Write API のデメリットにあげましたが、公式含めて充実してきてる気がします👀

実装

やっていきます💪

すでに Firebase のプロジェクトは作ってある前提です。

1. BigQuery 側の準備

BigQuery のコンソール で作業していきます。

  1. Firebase と同じ region を指定して、dataset を作成します。

  2. テーブルを作成します。今回は、sample_table を作ってみます。

    ユーザー操作を保存するテーブルです。

    CREATE TABLE
      [project id].[dataset id].sample_table
    (
      id STRING NOT NULL,
      user_uuid STRING NOT NULL,
      action_type INT64 NOT NULL,
      resource_uuids Array<STRING>,
      operated_at TIMESTAMP NOT NULL
    )
    PARTITION BY DATE(operated_at)
    CLUSTER BY operated_at, user_uuid;
    

    今回は、ゆくゆく「検索対象がかなり多くなる」という要件です。

    クエリするときに必ずパーティション指定して、検索対象を限定してほしいので、パーティションフィルターをつけておきます。

    (パーティションの指定なしでクエリするとエラーが出るようになります)

    ALTER TABLE [project id].[dataset id].sample_table
      SET OPTIONS (require_partition_filter = true);
    
  3. BigQuery Storage Write API を有効にしておきます。↓

    https://console.cloud.google.com/apis/enableflow?apiid=bigquerystorage.googleapis.com

2. Protocol Buffers の準備

BigQuery Storage Write API では gRPC ストリーミングを使用しているので、Protocol Buffers の準備をしていきます。

  1. まずはライブラリインストール

    npm i @grpc/grpc-js grpc-tools google-protobuf ts-protoc-gen google-gax
    

    ts-protoc-gen は、TypeScript の型定義を生成するライブラリなので、使わない人はなしで大丈夫です。

  2. proto ファイル作成

    最小だとこんなかんじかな?

    functions/src/proto/sample.proto
    syntax = "proto3";
    
    enum ActionType {
      ACTION_UNSPECIFIED = 0;
      ACTION_CREATE = 1;
      ACTION_UPDATE = 2;
      ACTION_DELETE = 3;
    }
    
    message SampleTable {
      string id = 1;
      string user_uuid = 2;
      ActionType action_type = 3;
      repeated string resource_uuids = 4;
      string operated_at = 5;
    }
    

    Protocol Buffers の Scalar Value Types には日付関連のやつがないので、今回は扱いやすさを考えて string にしています(フォーマットさえしっかり決めとけばいいかな!)。

    TimeStamp がよかったら、組み込み型で定義されてる google.protobuf.Timestamp でも⭕。

  3. コンパイル

    functions/package.json に追加しておきます。

    functions/package.json
    {
    	"scripts": {
    	"codegen": "grpc_tools_node_protoc -I ./src/proto --plugin=protoc-gen-ts=./node_modules/.bin/protoc-gen-ts --js_out=import_style=commonjs,binary:./src/proto/generated --grpc_out=grpc_js:./src/proto/generated --ts_out=service=grpc-node,mode=grpc-js:./src/proto/generated ./src/proto/*.proto"
    	}
    	...
    }
    

    npm run codegen すると、 functions/src/proto/generated 以下に、コードや ts の型ファイル一式ができます。

3. Cloud Functions トリガーの実装

やっと本題にたどり着きました。。。

  1. BigQuery Storage Write API を叩く共通処理の作成

    functions/src/lib/BigQuery.ts
    import * as functions from 'firebase-functions'
    
    const { BigQueryWriteClient } = require('@google-cloud/bigquery-storage').v1
    const storageClient = new BigQueryWriteClient()
    
    export type BigQueryResponse = {
      statusCode: number,
      XMessageId: string | null
    }
    
    export const sendLogByDefaultStream = async (
      tableName: string,
      data: any,
      protoDescriptor: any
    ) => {
      try {
        const serializedRows = []
        serializedRows.push(data)
    
        const protoData = {
          writerSchema: { protoDescriptor },
          rows: { serializedRows }
        }
        const projectId = process.env.GCP_PROJECT || process.env.GCLOUD_PROJECT
        const datasetId = functions.config()?.aldagram?.bigquery?.dataset_id_for_auditlog
    
        // ローカル等で、環境変数が取れなかったら何もしない
        if (!projectId || !datasetId) return
    
        const writeStream = `projects/${projectId}/datasets/${datasetId}/tables/${tableName}/_default`
        const request = {
          writeStream,
          protoRows: protoData
        }
    
        // NOTE: デフォルトストリームを使うので、append したら即 commit される
        const stream = await storageClient.appendRows()
        stream.on('data', (response: any) => {
          /* response 例
           * [error]
           * {
           *   rowErrors: [],
           *   updatedSchema: null,
           *   error: {
           *     details: [],
           *     code: 3,
           *     message: 'Rows must be specified. Entity: projects/xxx/_default'
           *   },
           *   response: 'error'
           * }
           *
           * [success]
           * {
           *   rowErrors: [],
           *   updatedSchema: null,
           *   appendResult: { offset: null },
           *   response: 'appendResult'
           * }
           */
    
          if (response.error) {
            console.error('error returnd in bigquery storage-write-api response', response.error)
            return
          }
          console.info('bigquery storage-write-api response success!!')
        })
        stream.on('error', (error: any) => {
          console.error('send bigquery storage-write-api failed', error)
        })
        stream.on('end', () => { /* API call completed */ })
    
        stream.write(request)
        stream.end()
    
      } catch (error) {
        console.error('An error occurred during bigquery storage-write-api transmission.', error)
        return
      }
    }
    

    通常の書き込み時は、ストリームを作成→好きなだけ append → commit という流れになりますが

    1件ずつ書き込む場合は、デフォルトストリームが使えます。

    ただし、デフォルトストリームでは1回限りのコミットが保証されないので、厳密に1回だけ書き込みたい場合は、自分でストリームを作成する必要があります。

  2. トリガー作成

    unctions/src/indexes/sampleTableTrigger/sampmeTableOnCreate.ts
    import * as functions from 'firebase-functions'
    import * as admin from 'firebase-admin'
    import * as st from '../../proto/generated/sampleTable_pb'
    import { sendLogByDefaultStream } from '../../lib/BigQuery'
    
    const fn = functions.region('asia-northeast1')
    const type = require('@google-cloud/bigquery-storage').protos.google.protobuf.FieldDescriptorProto.Type
    
    const LOG_PROTO_DESCRIPTOR = {
      name: 'sampleTable',
      field: [
        { 'name': 'id', 'number': 1, 'type': type.TYPE_STRING },
        { 'name': 'user_uuid', 'number': 2, 'type': type.TYPE_STRING },
        { 'name': 'action_type', 'number': 3, 'type': type.TYPE_INT64 },
        { 'name': 'resource_uuids', 'number': 4, 'type': type.TYPE_STRING },
        { 'name': 'operated_at', 'number': 5, 'type': type.TYPE_STRING }
      ]
    }
    
    export const sampleTableOnCreate = (firestore: admin.firestore.Firestore) =>
      fn.firestore
        .document(
          `sample/{logId}`
        )
        .onCreate(
          async (snap, context): Promise<void> => {
            const logId = context.params.logId as string
            const data = {
              ...snap.data()
            }
    
            const log = new st.SampleTable()
            log.setId(logId)
            log.setUserUuid(data.userUuid)
            log.setActionType(data.actionType)
            log.setResourceUuidsList(data.resourceUuids)
            log.setOperatedAt(data.operatedAt)
    
            await sendLogByDefaultStream(
              auditLogcollectionName,
              log.serializeBinary(),
              LOG_PROTO_DESCRIPTOR
            )
          }
        )
    
  3. トリガーを登録する

    functions/src/index.ts で、作ったトリガーを登録します。

    functions/src/index.ts
    import * as admin from 'firebase-admin'
    import { sampleTableOnCreate } from './sampleTableTrigger/sampmeTableOnCreate'
    
    admin.initializeApp()
    const firestore = admin.firestore()
    
    exports.sampleTableOnCreate = sampleTableOnCreate(firestore)
    

これで、Firestore の sampleTable にデータが作成されたタイミングで、BigQuery の同テーブルにも書き込みが行えるようになりました。

弊社で利用しているものは、まだリリースしたばかりですが、これからデータ量が増えて、色々と活用していくのが楽しみです🥳

アルダグラム Tech Blog

株式会社アルダグラムのTech Blogです。 世界中のノンデスクワーク業界における現場の生産性アップを実現するプロジェクト管理アプリ「KANNA」を開発しています。 採用情報はこちら https://herp.careers/v1/aldagram0508/

Discussion

ログインするとコメントできます