😎
Beamでは、Schema使うとBigQueryの書き込みが楽になるよ
BeamのSchemaとは Beamで扱うデータに抽象的な型を導入する機能です。BigQueryのスキーマとは違う概念です(※)。
BigQueryIOとの関わり
Schemaを使わない時にBigQueryIOで書き込む時は、
- 書き込みたいデータの準備
- 1.で用意したデータ(型はドメイン依存)をTableRowに変換
- TableSchemaを準備
- BigQueryIO.write
の段階を踏む必要があります。
1.で使用するデータ型にSchemaを指定にする事で、(退屈な)詰め替えの処理を省略することが出来ます。
Scheamを使わない場合
BigQueryTornadoesをもとにしています。
// 元データの入るクラス
public static class Tornado {
// AvroCoderに必要
public TornadoScheman() {}
public Integer getMonth() {
return month;
}
public Long getCount() {
return count;
}
private Integer month;
private Long count;
public Tornado (final Integer month, final Long count) {
this.month = month;
this.count = count;
}
}
// ダミーの入力データ
domainResult = p.apply(Create.of(
new Tornado(1, 1l),
new Tornado(2, 3l)
)).withCoder(AvroCoder.of(Tornado.class)));
// TableRowへの詰替(2)
final PCollection<TableRow>tableRowResult = domainResult.apply(ParDo.of(new DoFn<Tornado, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow()
.set("month", c.element().getMonth())
.set("tornado_count", c.element().getCount());
c.output(row);
}
}));
// TableScheamの準備(3)
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
// BigQueryIOで書き込み
tableRowResult.apply(
BigQueryIO.writeTableRows()
.withSchema(schema)
.to(options.getOutput())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
詰め替え処理(2の部分)は単純ですが、退屈なので誰かにやってほしいですよね。
Scheamを使う場合
追加するのは、
- 元データのクラス(Tornado)にスキーマ定義のアノテーション(DefaultSchemaとSchemaCreate)を追加(TornadoSchema)
- BigQueryIOでuseBeamSchemaを追加
だけです。また、Schemaを使うと
- Coderの推定
- TableScheamの推定
をしてくれるため、関連する箇所も削除出来ます(withCoderや空のコンストラクタ、 withSchemaの部分)。
// Scheamのあるクラスの定義
// アノテーション追加
@DefaultSchema(JavaBeanSchema.class)
public static class TornadoSchema {
public Integer getMonth() {
return month;
}
public Long getCount() {
return count;
}
private final Integer month;
private final Long count;
// アノテーション追加
@SchemaCreate
public TornadoSchema (final Integer month, final Long count) {
this.month = month;
this.count = count;
}
}
domainResult.apply(
BigQueryIO.<TornadoSchema>write()
// 追加
.useBeamSchema()
.to(options.getOutput())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
Schemaを使わない場合に比べ、フィールドの詰め替え部分がなくなり、スッキリしていますね。
注意
Create.ofでSchema使う場合は、pomにプラグインを指定する必要があります。
読み込みの時はどうよ?
調査中です…
読み込みの時も、GenericRecord(Avro)から入れ替える事になるので、何か楽でないかなーと思っています。
Discussion