💽

PubSubから、一つのDataflowジョブで複数のBigQueryテーブルにデータを流す方法

2021/12/21に公開

課題

分析ログをCloudRun等のサービスからBigQueryに流すときに、王道なのがPubSubとDataflowを使う方法です。
このとき、素直な方法で実装すると、一つのBigQueryテーブルにたいして一つのPubSubトピックとDataflowジョブが必要になってしまいます。
素直な方法はこちら

PubSubのトピックがいくつ発生しようとまぁ問題ないんですが、Dataflowジョブはそうはいきません。Dataflowは、一つのジョブに対して最低一つのGCEインスタンスを生成します。
GCEインスタンスのタイプはオプションで選択できますが、デフォルトのn1-standard-4だと、月に1万円程度のコストがかかり、それが分析用のテーブル一つにつき一つ生成されてはたまったものではありません。

そこで、用意されたテンプレートは利用せず、自分でパイプラインを定義します。

パイプラインの定義

実装

パイプラインの定義をするのに最適な言語は、現状ではJava一択ですが、何かの意地でKotlinで書いています。

Pipeline.kt
package dataflow

import org.apache.beam.sdk.coders.ByteArrayCoder
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.values.ValueInSingleWindow
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.Pipeline
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import com.google.api.services.bigquery.model.TableReference
import com.google.api.services.bigquery.model.TableSchema
import com.google.api.services.bigquery.model.TableRow
import java.io.IOException
import java.io.PipedInputStream
import java.io.PipedOutputStream

class Destinations : DynamicDestinations<PubsubMessage, String>() {
    override fun getDestination(element: ValueInSingleWindow<PubsubMessage>): String {
        return element.value!!.getAttribute("tableName")!!
    }

    override fun getTable(destination: String): TableDestination {
        return TableDestination(TableReference().setDatasetId("log").setTableId(destination), "Table for initial " + destination)
    }
    
    override fun getSchema(destination: String): TableSchema {
        return tableNameToSchema(destination)
    }
}

class FormatFunction : SerializableFunction<PubsubMessage, TableRow> {
    override fun apply(input: PubsubMessage): TableRow {
        try {
            val inputStream = PipedInputStream()
            val outputStream = PipedOutputStream(inputStream)
            ByteArrayCoder.of().encode(input.payload, outputStream)
            return TableRowJsonCoder.of().decode(inputStream)
        } catch (e: IOException) {
            throw RuntimeException("Failed converting to TableRow with an error:" + e.message)
        }
    }
}

fun main(args: Array<String>) {
    val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(DataflowPipelineOptions::class.java)

    val subscription = "projects/${options.project}/subscriptions/analytics-log"
    val read = PubsubIO.readMessagesWithAttributes().fromSubscription(subscription)
    val write = BigQueryIO.write<PubsubMessage>()
        .to(Destinations())
        .withFormatFunction(FormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)

    val pipeline = Pipeline.create(options)
    pipeline.apply(read).apply(write)
    pipeline.run()
}
TableNameToSchema.kt
package dataflow;

import com.google.api.services.bigquery.model.TableFieldSchema
import com.google.api.services.bigquery.model.TableSchema

fun tableNameToSchema(tableName: String): TableSchema {
    val schema = TableSchema();
    schema.fields = when (tableName) {
        "view" -> listOf(
            TableFieldSchema().setName("t").setType("TIMESTAMP").setMode("REQUIRED"),
	    TableFieldSchema().setName("d").setType("DATE").setMode("REQUIRED"),
            TableFieldSchema().setName("path").setType("STRING").setMode("REQUIRED"),
        )
        "playVideo" -> listOf(
            TableFieldSchema().setName("t").setType("TIMESTAMP").setMode("REQUIRED"),
            TableFieldSchema().setName("d").setType("DATE").setMode("REQUIRED"),
	    TableFieldSchema().setName("videoId").setType("STRING").setMode("REQUIRED"),
        )
        else -> throw RuntimeException("unknown table name:" + tableName)
    }
    return schema;
}

解説

基本

素晴らしいことに、 DynamicDestinations というクラスが公式で用意されています。
これは、入力された値から書き込み先のテーブルの定義を返すために存在する、今回の目的そのままのクラスです。
今回は、 PubSubMessage のattributeを取得し、その値をそのままテーブル名として使えるようにしています。

データの変換

その後 FormatFunction によって、入力された PubSubMessageTableRow に変換しています。
その際にポイントとなるのが、 ByteArrayCoderTableRowJsonCoder です。
もちろん、これらのクラスを使わなくても変換できればなんでも良いのですが、公式のテンプレートでも使われているので安心です。
気をつけなければならないのは、TableRowJsonCoderに食わせるストリームをペイロードから生成する必要がありますが、その際にKotlinやJavaに備わっている方法で変換してはならないということです。

Kotlinでは

input.payload.inputStream() // ダメ!

と書くだけで簡単にストリームを生成できてしまいますが、この方法だと TableRowJsonCoder はエラーを吐いてしまいます。

Cannot construct instance of `com.google.api.services.bigquery.model.TableRow` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('t') at [Source: (String)""t":"2021-12-16T03:15:59+00:00","d":"2021-12-16""; line: 1, column: 1] org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187) 

実装を追ってみたところ、どうやら TableRowJsonCoder は1バイト目を見てそのデータが何のデータなのかを判定するような実装が入っているようで、それに対応するきちんとしたストリームを作るためには、ByteArrayCoderを使わねばならないようです。

PipedInputStream / PipedOutputStream を使って無理矢理やっていますが、もしかしたらもっと良い方法があるかもしれません。

Discussion