🦔
Dataflowジョブサイズには制限があるよ
処理するデータの量・サイズではなく、ジョブの定義のサイズの話です。
Dataflowジョブのサイズの制限
ジョブの定義は10MBまでという制限があります(※)。
試してみる
50個のBigQueryIOで、2つのデータをBigQueryに取り込みます。
(Tornadoes Exampleを元にしています)
package com.google.rogue.not;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.schemas.*;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.values.PCollection;
import java.util.ArrayList;
import java.util.List;
public class StarterPipeline {
public interface Options extends PipelineOptions {
@Description(
"BigQuery table to write to, specified as "
+ "<project_id>:<dataset_id>.<table_id>. The dataset must already exist.")
@Validation.Required
String getOutput();
void setOutput(String value);
}
@DefaultSchema(JavaBeanSchema.class)
public static class TornadoSchema {
public TornadoSchema() {
}
public Integer getMonth() {
return month;
}
public Long getCount() {
return count;
}
private Integer month;
private Long count;
@SchemaCreate
public TornadoSchema (final Integer month, final Long count) {
this.month = month;
this.count = count;
}
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<TornadoSchema> domainResult;
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("tornado_count").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
domainResult = p.apply(Create.of(
new TornadoSchema(1, 1l),
new TornadoSchema(2, 3l)
));
for (int i = 0; i < 20; i++) {
domainResult.apply(
BigQueryIO.<TornadoSchema>write()
.useBeamSchema()
.to(options.getOutput() + "_" + Integer.toString(i))
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
//.withNumFileShards(1)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
//.withTriggeringFrequency(Duration.standardDays(365))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
}
p.run().waitUntilFinish();
}
}
結果
(Beam SDK 2.25で試しました)
DirectRunner
mvn compile exec:java -Dexec.mainClass=com.google.rogue.not.StarterPipeline -Dexec.args="--output=プロジェクトID:データセット.output --runner=DirectRunner --project=プロジェクトID --region=asia-northeast1 --gcpTempLocation=gs://バケット/tmp --tempLocation=gs://バケット/tmp"
特にエラーは起きず、テーブルにデータが格納されました。
$ bq show プロジェクトID:データセット.output_45
Last modified Schema Total Rows Total Bytes Expiration Time Partitioning Clustered Fields Labels
----------------- ------------------------------ ------------ ------------- ------------ ------------------- ------------------ --------
30 Dec 15:27:28 |- count: integer (required) 2 32
$ bq ls --format=json プロジェクトID:データセット | jq '. | length'
50
DataflowRunner
mvn compile exec:java -Dexec.mainClass=com.google.rogue.not.StarterPipeline -Dexec.args="--output=プロジェクトID:データセット.output --runner=DataflowRunner --project=プロジェクトID --region=asia-northeast1 --gcpTempLocation=gs://バケット/tmp --tempLocation=gs://バケット/tmp"
エラーが表示され、Dataflowへのデプロイも失敗しました。
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project beamBigquery: An exception occured while executing the Java class. Failed to create a workflow job: The size of the serialized JSON representation of the pipeline exceeds the allowable limit. For more information, please see the documentation on job submission:
[ERROR] https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#jobs: 400 Bad Request
[ERROR] {
[ERROR] "code" : 400,
[ERROR] "errors" : [ {
[ERROR] "domain" : "global",
[ERROR] "message" : "(7c63b9ed76acaa19): The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.",
[ERROR] "reason" : "badRequest"
[ERROR] } ],
[ERROR] "message" : "(7c63b9ed76acaa19): The job graph is too large. Please try again with a smaller job graph, or split your job into two or more smaller jobs.",
[ERROR] "status" : "INVALID_ARGUMENT"
[ERROR] }
補足
- BigQueryIOで試したのは、中(Composite Transform)で複雑な処理をしているためです(=ジョブのサイズも大きくなりそう)
- 34個未満のBigQueryIOだとデプロイ出来ました
Workaround
BigQueryIOに関してはDynamic Destinationを使えば、(多くの場合)回避出来ると思います。
(BigQueryIOに限らない、一般的な解決策は無い…?)
Discussion