途中に改行があるCSVをBeam/Dataflowで読む
TL;DR
ContextualTextIOのおかげで、Apache Beam/Cloud Dataflowで改行を含むCSVが楽に読めるようになったよ。
途中に改行があるCSV
RFC 4180(有志の方の日本語訳)準拠のCSVでは、(ダブルクォートで囲むことで)改行(CRLF)をフィールドに含むことが出来ます。
例えば下のCSVは、「aaa」「b\r\nbb」「ccc」からなるレコードと、「zzz」「yyy」「xxx」からなるレコードで構成されています。
"aaa","b CRLF
bb","ccc" CRLF
zzz,yyy,xxx
Beam/DataflowのTextIO
Beamでテキストファイルを読み書きする時は、組み込みIOのTextIOを使うのが一般的だと思います。
ただし、このIOは改行(※)で分割された行をPCollectionとして返すので、RFC 4180のような改行を跨いだデータを扱うのは難しいです。
※正確にはwithDelimiterで指定した文字
ContextualTextIO
Beam 2.26でリリースされたContextualTextIOは、ファイル中のオフセットなど「Context」を含めて取得することが出来るため、RFC 4180の改行を跨いだデータを処理することが出来ます。
使い方
基本的な使い方はTextIOと同じですが、いくつか補足です:
- TextIOはStringのPCollectionを返しますが、ContextualTextIOはRow(※)のPCollectionを返します
- スキーマはRecordWithMetadata
- データ本体はword.getValue("value")で取得(wordはRow型のエレメント)
- RFC 4180の形式を読むには、withHasMultilineCSVRecordsにtrueを設定します
- 標準のBeam SDKに加え、beam-sdks-java-io-contextualtextioの依存が必要
※ Rowについては、Beam Programming GuideのSchemaの章を読んでね
注意点
- Beam 2.26時点でContextualTextIOSourceはExperimentalです
- TextIO使うよりもパーフォマンス悪くなるかもです(※)
※ Javadocの「When using ContextualTextIO.Read.withHasMultilineCSVRecords(Boolean), a single reader will be used to process the file, rather than multiple readers which can read from different offsets. For a large file this can result in lower performance. 」
試してみる
コード
package com.google.rogue.not;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.beam.sdk.io.contextualtextio.ContextualTextIO;
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public interface Options extends PipelineOptions {
@Validation.Required
String getInput();
void setInput(String value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<Row> lines = p.apply(ContextualTextIO.read().from(options.getInput()).withHasMultilineCSVRecords(true));
lines.apply(ParDo.of(new DoFn<Row, String>() {
@ProcessElement
public void processElement(@Element Row word, OutputReceiver<String> out) {
LOG.info(word.getValue("value"));
}
}));
p.run();
}
}
入力のファイル
first,line,"a
after line"
second,line,hoge
コンパイル
mvn compile exec:java -Dexec.mainClass=com.google.rogue.not.StarterPipeline -Dexec.args="--runner=DirectRunner --input=入力ファイル"
結果
「first,line,"a
after line"」が一つのログ(「情報:」)として出ている事に注目してください。
12月 31, 2020 10:10:37 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: first,line,"a
after line"
12月 31, 2020 10:10:37 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: second,line,hoge
TextIOの場合の結果
ちなみにTextIOを(デフォルトの設定で)使うと下のような結果になります。
「"a
after line"」が分割されていますね。
12月 31, 2020 10:53:07 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: after line"
12月 31, 2020 10:53:07 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: first,line,"a
12月 31, 2020 10:53:07 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: second,line,hoge
Discussion