💡

Apache Beam Javaの各モデルを理解する

2024/05/06に公開

はじめに

仕事で使いそうなため、キャッチアップのための自分への備忘録
単語とその意味だけでも理解できれば上々...といった初心者向け記事

Apache beamとは

https://cloud.google.com/dataflow/docs/concepts/beam-programming-model?hl=ja

quickstart

CloudDataflowを使って練習
https://console.cloud.google.com/welcome?walkthrough_id=dataflow--quickstart-beam--quickstart-beam-java&_ga=2.187371307.-2009087995.1687056820&project=workspace-422408

WordCount.java

各用語、モデルの役割

quickstartで使用したWordCount.javaを用いてまとめる。

Pipeline

指定したデータコレクションを処理する一連の流れ。
データのinputで始まり、outputで終わる

runWordCountが該当

  static void runWordCount(WordCountOptions options) {
    Pipeline p = Pipeline.create(options);

    // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
    // static FormatAsTextFn() to the ParDo transform.
    p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
        .apply(new CountWords())
        .apply(MapElements.via(new FormatAsTextFn()))
        .apply("WriteCounts", TextIO.write().to(options.getOutput()));

    p.run().waitUntilFinish();
  }

Pipeline内で処理を書いても完結しそうだが、後述するPCollectionPTransformを使うのが一般的

PCollection

Pipelineで使用するデータセットで、複数要素として定義される。
PCollectionを数珠繋ぎにしていく事でPipelineを作成していく。
後述するPTransformへの橋渡しのイメージ。

CountWords.class

  public static class CountWords
      extends PTransform<PCollection<String>, PCollection<KV<String, Long>>> {
    @Override
    public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts = words.apply(Count.perElement());

      return wordCounts;
    }
  }

PTransform

データの加工を行う。
基本的に加工する場合のメインロジックはこちらで記述する。
戻り値はPCollectionを返す。

WordCount.javaでは

  static class ExtractWordsFn extends DoFn<String, String> {
    private final Counter emptyLines = Metrics.counter(ExtractWordsFn.class, "emptyLines");
    private final Distribution lineLenDist =
        Metrics.distribution(ExtractWordsFn.class, "lineLenDistro");

    @ProcessElement
    public void processElement(@Element String element, OutputReceiver<String> receiver) {
      lineLenDist.update(element.length());
      if (element.trim().isEmpty()) {
        emptyLines.inc();
      }

      // Split the line into words.
      String[] words = element.split(ExampleUtils.TOKENIZER_PATTERN, -1);

      // Output each word encountered into the output PCollection.
      for (String word : words) {
        if (!word.isEmpty()) {
          receiver.output(word);
        }
      }
    }
  }

実行ビュー

WordCountの実行結果はこんな感じ

  1. ReadLines
    2. inputデータの読み込み
    1. gs://apache-beam-samples/shakespeare/kinglear.txtを利用
    2. https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L182
  2. CountWords
    1. 1つ目のPTransformの処理でInputの文字列から単語をリスト化
    2. 1.で生成したPCollection(単語のコレクション)を単語ごとにGroupBy
      1. Count.perElement()で文字列に対してGroupByできる(便利)
  3. MapElements
    1. MapElements.viaでKey、Valueをそれぞれ文字列に結合
    2. こちらも戻り値はPCollection
    3. https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L203C16-L203C32
  4. WriteCounts
    1. データをOutput

独自関数だったり、SDKのUtilを使用していたりで分かりづらい部分もあるが、
基本的にPCollectionをI/Oとして繋げていくという思想

感想

PCollectionを抑えておけば綺麗には書けそう。という浅い感想。
割とutil系のライブラリが充実していそうなので、単純な「データを加工する」といったユースケースは特に独自でLogicを組む必要はなさそうに感じる。
Dataflowで使う前提だがを行うためにGCEやGCS周りの権限をちゃんと設定してあげる必要はあるが、Cloudリソース間でのデータ共有、書き込みへのコード量は低そうだなと感じる。

Discussion