💡
Apache Beam Javaの各モデルを理解する
はじめに
仕事で使いそうなため、キャッチアップのための自分への備忘録
単語とその意味だけでも理解できれば上々...といった初心者向け記事
Apache beamとは
quickstart
CloudDataflowを使って練習
各用語、モデルの役割
quickstartで使用したWordCount.javaを用いてまとめる。
Pipeline
指定したデータコレクションを処理する一連の流れ。
データのinputで始まり、outputで終わる
runWordCountが該当
static void runWordCount(WordCountOptions options) {
Pipeline p = Pipeline.create(options);
// Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
// static FormatAsTextFn() to the ParDo transform.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply(new CountWords())
.apply(MapElements.via(new FormatAsTextFn()))
.apply("WriteCounts", TextIO.write().to(options.getOutput()));
p.run().waitUntilFinish();
}
Pipeline内で処理を書いても完結しそうだが、後述するPCollection
、PTransform
を使うのが一般的
PCollection
Pipelineで使用するデータセットで、複数要素として定義される。
PCollectionを数珠繋ぎにしていく事でPipelineを作成していく。
後述するPTransform
への橋渡しのイメージ。
public static class CountWords
extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
@Override
public PCollection<KV<String, Long>> expand(PCollection<String> lines) {
// Convert lines of text into individual words.
PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));
// Count the number of times each word occurs.
PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());
return wordCounts;
}
}
PTransform
データの加工を行う。
基本的に加工する場合のメインロジックはこちらで記述する。
戻り値はPCollectionを返す。
WordCount.javaでは
static class ExtractWordsFn extends DoFn<String, String> {
private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
private final Distribution lineLenDist =
Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> receiver) {
lineLenDist.update(element.length());
if (element.trim().isEmpty()) {
emptyLines.inc();
}
// Split the line into words.
String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);
// Output each word encountered into the output PCollection.
for (String word : words) {
if (!word.isEmpty()) {
receiver.output(word);
}
}
}
}
実行ビュー
WordCountの実行結果はこんな感じ
- ReadLines
2. inputデータの読み込み- gs://apache-beam-samples/shakespeare/kinglear.txtを利用
- https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L182
- CountWords
- 1つ目のPTransformの処理でInputの文字列から単語をリスト化
- 1.で生成したPCollection(単語のコレクション)を単語ごとにGroupBy
- Count.perElement()で文字列に対してGroupByできる(便利)
- MapElements
- MapElements.viaでKey、Valueをそれぞれ文字列に結合
- こちらも戻り値はPCollection
- https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L203C16-L203C32
- WriteCounts
- データをOutput
独自関数だったり、SDKのUtilを使用していたりで分かりづらい部分もあるが、
基本的にPCollectionをI/Oとして繋げていくという思想
感想
PCollectionを抑えておけば綺麗には書けそう。という浅い感想。
割とutil系のライブラリが充実していそうなので、単純な「データを加工する」といったユースケースは特に独自でLogicを組む必要はなさそうに感じる。
Dataflowで使う前提だがを行うためにGCEやGCS周りの権限をちゃんと設定してあげる必要はあるが、Cloudリソース間でのデータ共有、書き込みへのコード量は低そうだなと感じる。
Discussion