🦔

Dataflowジョブサイズには制限があるよ

2020/12/30に公開

処理するデータの量・サイズではなくジョブの定義のサイズの話です。

Dataflowジョブのサイズの制限

ジョブの定義は10MBまでという制限があります(※)。

※「Maximum size for a job creation request. Pipeline descriptions with a lot of steps and very verbose names may reach this limit.:10MB

試してみる

50個のBigQueryIOで、2つのデータをBigQueryに取り込みます。
Tornadoes Exampleを元にしています)

package com.google.rogue.not;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.schemas.*;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PCollection;
import java.util.ArrayList;
import java.util.List;

public class StarterPipeline {
  public interface Options extends PipelineOptions {
    @Description(
            "BigQuery table to write to, specified as "
                    + "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
    @Validation.Required
    String getOutput();
    void setOutput(String value);
  }

  @DefaultSchema(JavaBeanSchema.class)
  public static class TornadoSchema {
    public TornadoSchema() {
    }
    public Integer getMonth() {
      return month;
    }
    public Long getCount() {
      return count;
    }
    private Integer month;
    private Long count;
    @SchemaCreate
    public TornadoSchema (final Integer month, final Long count) {
      this.month = month;
      this.count = count;
    }
  }

  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);
    PCollection<TornadoSchema> domainResult;

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
    fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
    TableSchema schema = new TableSchema().setFields(fields);

    domainResult = p.apply(Create.of(
            new TornadoSchema(1, 1l),
            new TornadoSchema(2, 3l)
    ));
    for (int i = 0; i < 20; i++) {
      domainResult.apply(
              BigQueryIO.<TornadoSchema>write()
                      .useBeamSchema()
                      .to(options.getOutput() + "_" + Integer.toString(i))
                      .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                      //.withNumFileShards(1)
                      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                      //.withTriggeringFrequency(Duration.standardDays(365))
                      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    }
    p.run().waitUntilFinish();
  }
}

結果

(Beam SDK 2.25で試しました)

DirectRunner

mvn compile exec:java -Dexec.mainClass=com.google.rogue.not.StarterPipeline  -Dexec.args="--output=プロジェクトID:データセット.output --runner=DirectRunner --project=プロジェクトID --region=asia-northeast1 --gcpTempLocation=gs://バケット/tmp --tempLocation=gs://バケット/tmp"

特にエラーは起きず、テーブルにデータが格納されました。

$ bq show プロジェクトID:データセット.output_45
  Last modified               Schema              Total Rows   Total Bytes   Expiration   Time Partitioning   Clustered Fields   Labels
 ----------------- ------------------------------ ------------ ------------- ------------ ------------------- ------------------ --------
  30 Dec 15:27:28   |- count: integer (required)   2            32
$ bq ls --format=json プロジェクトID:データセット | jq '. | length'
50

DataflowRunner

mvn compile exec:java -Dexec.mainClass=com.google.rogue.not.StarterPipeline -Dexec.args="--output=プロジェクトID:データセット.output --runner=DataflowRunner --project=プロジェクトID --region=asia-northeast1 --gcpTempLocation=gs://バケット/tmp --tempLocation=gs://バケット/tmp"

エラーが表示され、Dataflowへのデプロイも失敗しました。

[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamBigquery: An exception occured while executing the Java class. Failed to create a workflow job: The size of the serialized JSON representation of the pipeline exceeds the allowable limit. For more information, please see the documentation on job submission:
[ERROR] https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#jobs: 400 Bad Request
[ERROR] {
[ERROR]   "code" : 400,
[ERROR]   "errors" : [ {
[ERROR]     "domain" : "global",
[ERROR]     "message" : "(7c63b9ed76acaa19): The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.",
[ERROR]     "reason" : "badRequest"
[ERROR]   } ],
[ERROR]   "message" : "(7c63b9ed76acaa19): The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.",
[ERROR]   "status" : "INVALID_ARGUMENT"
[ERROR] }

補足

  • BigQueryIOで試したのは、中(Composite Transform)で複雑な処理をしているためです(=ジョブのサイズも大きくなりそう)
  • 34個未満のBigQueryIOだとデプロイ出来ました

Workaround

BigQueryIOに関してはDynamic Destinationを使えば、(多くの場合)回避出来ると思います。

(BigQueryIOに限らない、一般的な解決策は無い…?)

Discussion