PubSubから、一つのDataflowジョブで複数のBigQueryテーブルにデータを流す方法
課題
分析ログをCloudRun等のサービスからBigQueryに流すときに、王道なのがPubSubとDataflowを使う方法です。
このとき、素直な方法で実装すると、一つのBigQueryテーブルにたいして一つのPubSubトピックとDataflowジョブが必要になってしまいます。
素直な方法はこちら
PubSubのトピックがいくつ発生しようとまぁ問題ないんですが、Dataflowジョブはそうはいきません。Dataflowは、一つのジョブに対して最低一つのGCEインスタンスを生成します。
GCEインスタンスのタイプはオプションで選択できますが、デフォルトのn1-standard-4だと、月に1万円程度のコストがかかり、それが分析用のテーブル一つにつき一つ生成されてはたまったものではありません。
そこで、用意されたテンプレートは利用せず、自分でパイプラインを定義します。
パイプラインの定義
実装
パイプラインの定義をするのに最適な言語は、現状ではJava一択ですが、何かの意地でKotlinで書いています。
package dataflow
import org.apache.beam.sdk.coders.ByteArrayCoder
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO
import org.apache.beam.sdk.values.ValueInSingleWindow
import org.apache.beam.sdk.transforms.SerializableFunction
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.Pipeline
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions
import com.google.api.services.bigquery.model.TableReference
import com.google.api.services.bigquery.model.TableSchema
import com.google.api.services.bigquery.model.TableRow
import java.io.IOException
import java.io.PipedInputStream
import java.io.PipedOutputStream
class Destinations : DynamicDestinations<PubsubMessage, String>() {
override fun getDestination(element: ValueInSingleWindow<PubsubMessage>): String {
return element.value!!.getAttribute("tableName")!!
}
override fun getTable(destination: String): TableDestination {
return TableDestination(TableReference().setDatasetId("log").setTableId(destination), "Table for initial " + destination)
}
override fun getSchema(destination: String): TableSchema {
return tableNameToSchema(destination)
}
}
class FormatFunction : SerializableFunction<PubsubMessage, TableRow> {
override fun apply(input: PubsubMessage): TableRow {
try {
val inputStream = PipedInputStream()
val outputStream = PipedOutputStream(inputStream)
ByteArrayCoder.of().encode(input.payload, outputStream)
return TableRowJsonCoder.of().decode(inputStream)
} catch (e: IOException) {
throw RuntimeException("Failed converting to TableRow with an error:" + e.message)
}
}
}
fun main(args: Array<String>) {
val options = PipelineOptionsFactory.fromArgs(*args).withValidation().`as`(DataflowPipelineOptions::class.java)
val subscription = "projects/${options.project}/subscriptions/analytics-log"
val read = PubsubIO.readMessagesWithAttributes().fromSubscription(subscription)
val write = BigQueryIO.write<PubsubMessage>()
.to(Destinations())
.withFormatFunction(FormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
val pipeline = Pipeline.create(options)
pipeline.apply(read).apply(write)
pipeline.run()
}
package dataflow;
import com.google.api.services.bigquery.model.TableFieldSchema
import com.google.api.services.bigquery.model.TableSchema
fun tableNameToSchema(tableName: String): TableSchema {
val schema = TableSchema();
schema.fields = when (tableName) {
"view" -> listOf(
TableFieldSchema().setName("t").setType("TIMESTAMP").setMode("REQUIRED"),
TableFieldSchema().setName("d").setType("DATE").setMode("REQUIRED"),
TableFieldSchema().setName("path").setType("STRING").setMode("REQUIRED"),
)
"playVideo" -> listOf(
TableFieldSchema().setName("t").setType("TIMESTAMP").setMode("REQUIRED"),
TableFieldSchema().setName("d").setType("DATE").setMode("REQUIRED"),
TableFieldSchema().setName("videoId").setType("STRING").setMode("REQUIRED"),
)
else -> throw RuntimeException("unknown table name:" + tableName)
}
return schema;
}
解説
基本
素晴らしいことに、 DynamicDestinations
というクラスが公式で用意されています。
これは、入力された値から書き込み先のテーブルの定義を返すために存在する、今回の目的そのままのクラスです。
今回は、 PubSubMessage
のattributeを取得し、その値をそのままテーブル名として使えるようにしています。
データの変換
その後 FormatFunction
によって、入力された PubSubMessage
を TableRow
に変換しています。
その際にポイントとなるのが、 ByteArrayCoder
と TableRowJsonCoder
です。
もちろん、これらのクラスを使わなくても変換できればなんでも良いのですが、公式のテンプレートでも使われているので安心です。
気をつけなければならないのは、TableRowJsonCoder
に食わせるストリームをペイロードから生成する必要がありますが、その際にKotlinやJavaに備わっている方法で変換してはならないということです。
Kotlinでは
input.payload.inputStream() // ダメ!
と書くだけで簡単にストリームを生成できてしまいますが、この方法だと TableRowJsonCoder
はエラーを吐いてしまいます。
Cannot construct instance of `com.google.api.services.bigquery.model.TableRow` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('t') at [Source: (String)""t":"2021-12-16T03:15:59+00:00","d":"2021-12-16""; line: 1, column: 1] org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187)
実装を追ってみたところ、どうやら TableRowJsonCoder
は1バイト目を見てそのデータが何のデータなのかを判定するような実装が入っているようで、それに対応するきちんとしたストリームを作るためには、ByteArrayCoder
を使わねばならないようです。
PipedInputStream
/ PipedOutputStream
を使って無理矢理やっていますが、もしかしたらもっと良い方法があるかもしれません。
Discussion