😎

Beamでは、Schema使うとBigQueryの書き込みが楽になるよ

2020/12/20に公開

BeamのSchemaとは

https://beam.apache.org/documentation/programming-guide/#schemas
Beamで扱うデータに抽象的な型を導入する機能です。BigQueryのスキーマとは違う概念です(※)。
変換するユーティリティはあります

BigQueryIOとの関わり

Schemaを使わない時にBigQueryIOで書き込む時は、

  1. 書き込みたいデータの準備
  2. 1.で用意したデータ(型はドメイン依存)をTableRowに変換
  3. TableSchemaを準備
  4. BigQueryIO.write

の段階を踏む必要があります。

1.で使用するデータ型にSchemaを指定にする事で、(退屈な)詰め替えの処理を省略することが出来ます。

Scheamを使わない場合

BigQueryTornadoesをもとにしています。

  // 元データの入るクラス
  public static class Tornado {
    // AvroCoderに必要
    public TornadoScheman() {}
    
    public Integer getMonth() {
      return month;
    }
    public Long getCount() {
      return count;
    }
    private Integer month;
    private Long count;
    public Tornado (final Integer month, final Long count) {
      this.month = month;
      this.count = count;
    }
  }
    // ダミーの入力データ
    domainResult = p.apply(Create.of(
            new Tornado(1, 1l),
            new Tornado(2, 3l)
    )).withCoder(AvroCoder.of(Tornado.class)));
     // TableRowへの詰替(2)
    final PCollection<TableRow>tableRowResult = domainResult.apply(ParDo.of(new DoFn<Tornado, TableRow>() {
      @ProcessElement
      public void processElement(ProcessContext c)  {
        TableRow row = new TableRow()
                .set("month", c.element().getMonth())
                .set("tornado_count", c.element().getCount());
        c.output(row);
      }
    }));
    // TableScheamの準備(3)
    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
    TableSchema schema = new TableSchema().setFields(fields);

    // BigQueryIOで書き込み
    tableRowResult.apply(
            BigQueryIO.writeTableRows()
                    .withSchema(schema)
                    .to(options.getOutput())
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

詰め替え処理(2の部分)は単純ですが、退屈なので誰かにやってほしいですよね。

Scheamを使う場合

追加するのは、

  1. 元データのクラス(Tornado)にスキーマ定義のアノテーション(DefaultSchemaとSchemaCreate)を追加(TornadoSchema)
  2. BigQueryIOでuseBeamSchemaを追加

だけです。また、Schemaを使うと

  • Coderの推定
  • TableScheamの推定

をしてくれるため、関連する箇所も削除出来ます(withCoderや空のコンストラクタ、 withSchemaの部分)。

  // Scheamのあるクラスの定義
  // アノテーション追加
  @DefaultSchema(JavaBeanSchema.class)
  public static class TornadoSchema {
    public Integer getMonth() {
      return month;
    }
    public Long getCount() {
      return count;
    }
    private final Integer month;
    private final Long count;
    // アノテーション追加
    @SchemaCreate
    public TornadoSchema (final Integer month, final Long count) {
      this.month = month;
      this.count = count;
    }
  }
    domainResult.apply(
            BigQueryIO.<TornadoSchema>write()
                    // 追加
		    .useBeamSchema()
                    .to(options.getOutput())
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

Schemaを使わない場合に比べ、フィールドの詰め替え部分がなくなり、スッキリしていますね。

注意

Create.ofでSchema使う場合は、pomにプラグインを指定する必要があります。

読み込みの時はどうよ?

調査中です…
読み込みの時も、GenericRecord(Avro)から入れ替える事になるので、何か楽でないかなーと思っています。

Discussion