Beam Starter Projects のご紹介

2024/04/03に公開

はじめに

こんにちは、クラウドエース データソリューション部所属の伊藤です。

Google Cloud では大規模データの処理の選択肢としていくつかのサービスがあります。その中でも Dataflow は、データのバッチ処理だけでなく、ストリーミング処理(リアルタイム処理)も行うことができるので、とても重宝されます。弊社が携わる案件においても、データの抽出、加工、格納の順番で処理を行うアーキテクチャの際に Dataflow がよく使用されます(これはいわゆる ETL(Extract, Transform, Load)と呼ばれる処理です)。

さて、Dataflow では Apache Beam というフレームワークを使ってコーディングをしていきます。
ユースケースがハマる場合は、Dataflow のテンプレートなどを利用して、極力コーディングを行わないのが良いと思いますが、それでも処理の複雑さから自力で実装を行う必要が出てくることもあります。

筆者は Apache Beam のコードを書き始めようとするとき、毎回プロジェクトの設定や構成を考えるのが面倒でなかなかコーディングに入れないということがあります。これは初心者の方ならなおさらだと思います。

そんな方たちのために、Beam Starter Projects では基本的なプロジェクトの設定や構成をすべて用意してくれています。
今回はそんな Beam Starter Project のご紹介をしたいと思います。

Beam Starter Projects とは

Apache Beam の公式サイトでは以下のページで Beam Starter Projects について紹介されています。
https://beam.apache.org/blog/beam-starter-projects/

Beam Starter Projects は、Apache Beam のプロジェクトを始めるためのテンプレートプロジェクトです。
Java、Python、Go、Kotlin、Scala に対応しており、それぞれの言語に合わせたテンプレートプロジェクトが用意されています。

また、各言語のクイックスタートの内容はすべて Beam Starter Projects を使っています。
https://beam.apache.org/get-started/

GitHub リポジトリに含まれている内容としては言語によって異なりますが、基本的には以下の内容が含まれています。

  • テキストの入力を受け取り、print で出力するプログラムのソースファイル(Hello World 的なパイプライン)
  • 上記のプログラムをテストするためのソースファイル
  • 依存関係の管理を行うためのファイル(Maven の pom.xml など)
  • GitHub Actions のワークフロー
  • dependabot の設定ファイル
  • .gitignore ファイル

その他ライセンスのファイルなども含まれています。

今回は Java と Python のプロジェクトを少し詳しく見ていくとともに、Java プロジェクトを少し改造してみたいと思います。

Java プロジェクトの場合

Java のプロジェクトで主に含まれている内容は以下の通りです。

  • メインのソースファイル
  • テストのソースファイル
  • Maven の pom.xml
  • Gradle の build.gradle
  • sbt の build.sbt
  • GitHub Actions のワークフロー

Java プロジェクトの場合は、Gradle と Maven と sbt の設定ファイルが用意されています。ですので、ご自身のもしくはプロジェクトのビルドツールに合わせて、不要なものは削除して使っていきます。
package のパスなども適宜変更して利用していくと良いでしょう。

個人的には Java プロジェクトは、準備が面倒なのでありがたみを感じます。

Python プロジェクトの場合

Python のプロジェクトで含まれている内容は以下の通りです。

  • メインのソースファイル
  • テストのソースファイル
  • requirements.txt
  • setup.py

Python プロジェクトの場合は、requirements.txt と setup.py が用意されています。Poetry や Pipenv などのツールを使っている場合は、別途用意する必要がありますが、Apache Beam の公式ドキュメントや Dataflow の公式 では基本的に requirements.txt をもとにした案内になっているので、そのまま使っていくのでも問題ないでしょう。

Java プロジェクトを少し改造してみる

ここでは、Java プロジェクトを少し改造してみたいと思います。
最初にそのままのコードの実行と、Dataflow でジョブを動かすところまでをやってみたいと思います。

最初に、 Beam のブログでも案内されている通りのコマンドを実行していきます。

git clone https://github.com/apache/beam-starter-java
cd beam-starter-java

# Install Java and Gradle with sdkman.
curl -s "https://get.sdkman.io" | bash
sdk install java 11.0.22-tem
sdk install gradle

# To run the pipeline.
gradle run

# To run the tests.
gradle test

このとき、Java のバージョン指定のために sdkman を使っています。
執筆時点では、ブログに記載の SDK のバージョンが利用不可だったので、最新のバージョンを指定しています。
sdkman のインストール方法はこちらを参考にしてください。

さて、サンプルコードの実行結果を確認しましょう。
例えば gradle run の結果は以下になります。

Starting a Gradle Daemon (subsequent builds will be faster)

> Task :run
Hello
World!
My input text

BUILD SUCCESSFUL in 30s
2 actionable tasks: 2 executed

次に、Dataflow でジョブを動かしてみたいと思います。
Dataflow でジョブを動かすためには、依存関係の修正が必要です。
Maven を使う場合はこちらを参考にしてください。
今回は Beam Starter Projects のおすすめに従い Gradle を使っていきます。
上記 Dataflow のページに加えてWordCount quickstart for Javaのページを参考にして変更していきます。

まずは、不要なpom.xmlとbuild.sbtを削除します。
その後、build.gradle を以下のように修正します。

--- a/build.gradle
+++ b/build.gradle
@@ -12,6 +12,9 @@ plugins {

 repositories {
     mavenCentral()
+    maven {
+        url "https://packages.confluent.io/maven/"
+    }
 }

 application {
@@ -27,6 +30,7 @@ dependencies {
     // App dependencies.
     implementation "org.apache.beam:beam-sdks-java-core:2.53.0"
     implementation "org.apache.beam:beam-runners-direct-java:2.53.0"
+    implementation "org.apache.beam:beam-runners-google-cloud-dataflow-java:2.53.0"
     implementation "org.slf4j:slf4j-jdk14:1.7.32"

WordCount quickstart for Java ページだと build.gradle.kts になっていますが、Beam Starter Projects では build.gradle を使っていますので、Groovy の記述にしています。

ソースコードは特に変更する必要はありません。
テンプレートで利用されている Pipeline Options は Dataflow 起動時にも使える Options クラスとなっていますので、そのまま使っていくことができます。

	public interface Options extends StreamingOptions {
		@Description("Input text to print.")
		@Default.String("My input text")
		String getInputText();

		void setInputText(String value);
	}

最低限必要なコマンドライン引数の実行コマンドは以下になります(適宜引数は追加できます)。

gradle run --args="\
  --runner=DataflowRunner \
  --project=<PROJECT ID> \
  --region=<REGION \
  --tempLocation=gs://<BUCKET NAME>/tmp/"

実行結果は以下になります。
Dataflow の起動と終了のオーバーヘッドがあるので5分くらいかかります。

> Task :run
4月 02, 2024 2:32:54 午後 org.apache.beam.runners.dataflow.options.DataflowPipelineOptions$StagingLocationFactory create
情報: No stagingLocation provided, falling back to gcpTempLocation
4月 02, 2024 2:32:54 午後 org.apache.beam.runners.dataflow.DataflowRunner fromOptions
情報: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 194 files. Enable logging at DEBUG level to see which files will be staged.
4月 02, 2024 2:32:55 午後 org.apache.beam.runners.dataflow.DataflowRunner run
情報: Executing pipeline on the Dataflow Service, which will have billing implications related to Google Compute Engine usage and other Google Cloud Services.

...省略...

4月 02, 2024 2:35:39 午後 org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
情報: 2024-04-02T05:35:38.864Z: Stopping worker pool...
4月 02, 2024 2:38:04 午後 org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
情報: 2024-04-02T05:38:04.527Z: Autoscaling: Resized worker pool from 1 to 0.
4月 02, 2024 2:38:04 午後 org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
情報: 2024-04-02T05:38:04.561Z: Worker pool stopped.
4月 02, 2024 2:38:11 午後 org.apache.beam.runners.dataflow.DataflowPipelineJob logTerminalState
情報: Job 2024-04-01_22_33_15-4172068346610238807 finished with status DONE.

BUILD SUCCESSFUL in 5m 20s
2 actionable tasks: 1 executed, 1 up-to-date

指定したプロジェクトの Google Cloud のコンソール画面を確認すると Dataflow のジョブ実行が無事に完了されていることが確認できます。
dataflow job

まとめ

Beam Starter Projects は、Apache Beam のプロジェクトを始める際にとても便利なテンプレートプロジェクトです。
Beam Starter Projects を使うことで、プロジェクトの設定や構成を考えることなく、すぐにコーディングを始めることができます。
ただし、フォルダ構成はサンプルっぽい名称となっているので、必要に応じてフォルダ名やパッケージ名を変更してください。
ビルド用の設定ファイルなどは GitHub のサンプルと本記事を参考にしながら適宜変更していくと良いでしょう。

Discussion