途中に改行があるCSVをBeam/Dataflowで読む

4 min読了の目安(約4200字TECH技術記事

TL;DR

ContextualTextIOのおかげで、Apache Beam/Cloud Dataflowで改行を含むCSVが楽に読めるようになったよ。

途中に改行があるCSV

RFC 4180有志の方の日本語訳)準拠のCSVでは、(ダブルクォートで囲むことで)改行(CRLF)をフィールドに含むことが出来ます。

例えば下のCSVは、「aaa」「b\r\nbb」「ccc」からなるレコードと、「zzz」「yyy」「xxx」からなるレコードで構成されています。

"aaa","b CRLF
bb","ccc" CRLF
zzz,yyy,xxx

Beam/DataflowのTextIO

Beamでテキストファイルを読み書きする時は、組み込みIOのTextIOを使うのが一般的だと思います。

ただし、このIOは改行(※)で分割された行をPCollectionとして返すので、RFC 4180のような改行を跨いだデータを扱うのは難しいです。

※正確にはwithDelimiterで指定した文字

ContextualTextIO

Beam 2.26でリリースされたContextualTextIOは、ファイル中のオフセットなど「Context」を含めて取得することが出来るため、RFC 4180の改行を跨いだデータを処理することが出来ます。

使い方

基本的な使い方はTextIOと同じですが、いくつか補足です:

  • TextIOはStringのPCollectionを返しますが、ContextualTextIOはRow(※)のPCollectionを返します
    • スキーマはRecordWithMetadata
    • データ本体はword.getValue("value")で取得(wordはRow型のエレメント)
  • RFC 4180の形式を読むには、withHasMultilineCSVRecordsにtrueを設定します
  • 標準のBeam SDKに加え、beam-sdks-java-io-contextualtextioの依存が必要

※ Rowについては、Beam Programming GuideのSchemaの章を読んでね

注意点

  • Beam 2.26時点でContextualTextIOSourceはExperimentalです
  • TextIO使うよりもパーフォマンス悪くなるかもです(※)

※ Javadocの「When using ContextualTextIO.Read.withHasMultilineCSVRecords(Boolean), a single reader will be used to process the file, rather than multiple readers which can read from different offsets. For a large file this can result in lower performance. 」

試してみる

コード

package com.google.rogue.not;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.beam.sdk.io.contextualtextio.ContextualTextIO;

public class StarterPipeline {
  private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

  public interface Options extends PipelineOptions {
    @Validation.Required
    String getInput();
    void setInput(String value);
  }

  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    Pipeline p = Pipeline.create(options);
    PCollection<Row> lines = p.apply(ContextualTextIO.read().from(options.getInput()).withHasMultilineCSVRecords(true));
    lines.apply(ParDo.of(new DoFn<Row, String>() {
      @ProcessElement
      public void processElement(@Element Row word, OutputReceiver<String> out) {
        LOG.info(word.getValue("value"));
      }
    }));
    p.run();
  }
}

入力のファイル

first,line,"a
after line"
second,line,hoge

コンパイル

 mvn compile exec:java -Dexec.mainClass=com.google.rogue.not.StarterPipeline  -Dexec.args="--runner=DirectRunner --input=入力ファイル"

結果

「first,line,"a
after line"」が一つのログ(「情報:」)として出ている事に注目してください。

12月 31, 2020 10:10:37 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: first,line,"a
after line"
12月 31, 2020 10:10:37 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: second,line,hoge

TextIOの場合の結果

ちなみにTextIOを(デフォルトの設定で)使うと下のような結果になります。
「"a
after line"」が分割されていますね。

12月 31, 2020 10:53:07 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: after line"
12月 31, 2020 10:53:07 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: first,line,"a
12月 31, 2020 10:53:07 午前 com.google.rogue.not.StarterPipeline$1 processElement
情報: second,line,hoge