🐿️

🐿️🐿️🐿️Apache Flink触ってみた🐿️🐿️🐿️

2022/12/01に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#1です。

この記事では、

  • Apache Flinkの概要の説明
  • ローカルで動かす方法の説明(クラスタの起動とアプリケーションコードのコンパイル)

を記載します。StateやReactive Modeなどの面白い機能は(たぶん)別に紹介します。

Apache Flinkとは

公式ページ曰く

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.

です。つまり、

  • ストリーミング(入力データがどんどん続く)、バッチ(入力データに終わりがある)の両方を処理でき
  • 一般的なクラスター環境で動き
  • スケールする

分散処理エンジン・フレームワークらしいです。

同じカテゴリーの製品に(おそらく)Apache SparkとかCloud Dataflowなどがあります。公式ページやStream Processing with Apache Flinkを見た感じ、

  • 状態を保持するState
  • Exactly-once・event time processingでの、低レイテンシーでのストリーミング処理

をアピールしている気がします(Sparkでも可能な話もありますが、機能が豊富だったり先んじて実行していたり)。

学習リソース

などがあります(私が知っている限り邦訳・日本語の書籍や講座はありません)。
また、Flink特化ではないですがStreaming SystemsもStateやWatermarkなどの概念の参考になるかもしれません。

Stream Processing with Apache Flinkは創始者(の一人)が著者であり、また、本としてまとまった形になっている点(とくにwatermarkやcheckpointといった概念)はメリットです。
ただ、出版が2019年と少し古く、(読むのであれば)公式ドキュメントと並べながら読んだ方が良いと思います。この本のApache Flinkのバージョンは1.7、2022/11時点のstableは1.16で、例えばその間に以下のような変更がリリースされています。

事例

事例のページでは

  • Alibaba
    • Flinkのサポートを提供しているververicaの親会社でもあります
  • Capital One
  • Uber

などの企業が紹介されています。日系企業では

などで利用されているようです。

また、

といったエンタープライズサポート・マネージド環境があります(それぞれのカバー範囲は不明)。

アーキテクチャ

公式ドキュメントのFlink Architectureページの説明がわかりやすいと思います。

  • アプリケーションを管理するJobManager
  • 実際のタスクを実行するTaskManager
    • 他のシステムだとworkerとも呼ばれるやつ
  • ジョブをJobManagerに登録するClient

の大きく三つのコンポーネントがあります。デプロイは、

の三種類の方法のいずれかで行います。この記事では、まずはstandalneで動かしてみます。

ローカルでクラスタを動かす

first stepsというドキュメントに従ってコマンドを実行することで、

  • ローカルでFlinkクラスタの起動
  • 事前ビルドされたjarをクラスタにサブミットし実行
  • WebUI・ログの確認

を行うことができます。

基本指示通りに実行すれば大丈夫だと思いますが、

  • FlinkクラスタはWSLのLinux上で実行
  • Web UIはWindows側からアクセスする

時は、conf/flink-conf.yamlのrest.bind-addressを0.0.0.0(もしくはWSL2のIPアドレス)にする必要があります。
(デフォルトではlocalhost。Windowsから見たWSL上のLinuxは別マシン扱いなので、このままではアクセスできません)

grep rest.bind-address conf/flink-conf.yaml
rest.bind-address: 0.0.0.0

うまく設定・ジョブのサブミットできると、localhost:8081(flink-conf.yamlのrest.port)で下のような管理画面が確認できます。

プログラムをコンパイルして実行する

先ほどの例(first steps)では事前用意されたjarを実行しました。今度はソースコードからビルドして実行してみます。

自前でビルドの設定・コードを書いてももちろん良いのですが、flink-quick-start-javaというMaven archetypeが用意されており、Mavenの設定(pom.xml)とFlinkコードのひな形を用意できます。

# groupIdとかartifactIdは適当に変えてください
 mvn archetype:generate -D archetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.16.0 -DgroupId=rogue.not -DartifactId=flinkStart -Dversion=0.1 -Dpackage=flinkStart -DinteractiveMode=false

Mavenのpom.xml、log4j2の設定ファイル、ひな形のファイルが作成されます

$ tree flinkStart/
flinkStart/
├── pom.xml
└── src
    └── main
        ├── java
        │   └── flinkStart
        │       └── DataStreamJob.java
        └── resources
            └── log4j2.properties
cd flinkStart/

DataStreamJob.javaには処理が記載されていない(getExecutionEnvironmentとexecuteだけ)ので、Flinkのリポジトリにあるサンプルコードから処理をコピーします(パッケージ(flinkStart)とクラス名(DataStreamJob)はarchetype:generate作成のまま)。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package flinkStart;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * <p>This program connects to a server socket and reads strings from the socket. The easiest way to
 * try this out is to open a text server (at port 12345) using the <i>netcat</i> tool via
 *
 * <pre>
 * nc -l 12345 on Linux or nc -l -p 12345 on Windows
 * </pre>
 *
 * <p>and run this example with the hostname and the port as arguments.
 */
public class DataStreamJob {

	public static void main(String[] args) throws Exception {

		// the host and the port to connect to
		final String hostname;
		final int port;
		try {
			final ParameterTool params = ParameterTool.fromArgs(args);
			hostname = params.has("hostname") ? params.get("hostname") : "localhost";
			port = params.getInt("port");
		} catch (Exception e) {
			System.err.println(
					"No port specified. Please run 'SocketWindowWordCount "
							+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "
							+ "and port is the address of the text server");
			System.err.println(
					"To start a simple text server, run 'netcat -l <port>' and "
							+ "type the input text into the command line");
			return;
		}

		// get the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// get input data by connecting to the socket
		DataStream<String> text = env.socketTextStream(hostname, port, "\n");

		// parse the data, group it, window it, and aggregate the counts
		DataStream<WordWithCount> windowCounts =
				text.flatMap(
								(FlatMapFunction<String, WordWithCount>)
										(value, out) -> {
											for (String word : value.split("\\s")) {
												out.collect(new WordWithCount(word, 1L));
											}
										},
								Types.POJO(WordWithCount.class))
						.keyBy(value -> value.word)
						.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
						.reduce((a, b) -> new WordWithCount(a.word, a.count + b.count))
						.returns(WordWithCount.class);

		// print the results with a single thread, rather than in parallel
		windowCounts.print().setParallelism(1);

		env.execute("Socket Window WordCount");
	}

	// ------------------------------------------------------------------------

	/** Data type for words with count. */
	public static class WordWithCount {

		public String word;
		public long count;

		@SuppressWarnings("unused")
		public WordWithCount() {}

		public WordWithCount(String word, long count) {
			this.word = word;
			this.count = count;
		}

		@Override
		public String toString() {
			return word + " : " + count;
		}
	}
}

このプログラムでは、

します。

ビルドしてみましょう

mvn clean package
ls -l target/flinkStart-0.1.jar
-rw-r--r-- 1 notrogue notrogue 4840 Nov 19 15:46 target/flinkStart-0.1.jar

クラスタで動かす

それではジョブを起動してみますが、その前に入力を渡すためのサーバ(ここではnetcat)を起動する必要があります。

nc -lk 9000

先ほど(first steps)のクラスタでジョブを起動してみます

# ../fromscratch/は今回ビルドしたディレクトリです。環境に合わせて適当に変えてください
./bin/flink run ../fromscratch/flinkStart/target/flinkStart-0.1.jar  --hostname localhost --port 9000

netcatから適当に入力を渡します

(ncのプロンプトに入力)
hoge hoge
buzz buzz buzz
hoge aa

TaskManagerのログを見るとそれっぽいログがでています

tail log/flink-notrogue-taskexecutor-4-DESKTOP-6QTOO0L.out
hoge : 3
aa : 1
buzz : 3

独立して動かす

前述の手順でビルドしたjarを単体(クラスにsubmitではなく)動かすとエラーが表示されます。

java -jar  ./target/flinkStart-0.1.jar --hostname localhost --port 9000
Error: Unable to initialize main class flinkStart.DataStreamJob
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/datastream/DataStream

これはpom.xmlで必要なパッケージのscopeがprovidedに設定されており、実行時に必要なライブラリ(flink-javaとか)がjarに同梱さていないためです。
scopeをcompileにします。
(ただし、クラスタで動かすときには不要なのでプロダクション用にビルドする時は非推奨

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<!-- provided->compileに -->
			<scope>compile</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<!-- provided->compileに -->
			<scope>compile</scope>
		</dependency>

また、log4j2関係のパッケージも除外されているので、この除外も外します(除外したままだと、やはりエラーがでます)。

							<artifactSet>
								<excludes>
									<exclude>org.apache.flink:flink-shaded-force-shading</exclude>
									<exclude>com.google.code.findbugs:jsr305</exclude>
									<!-- コメントアウトを追加 -->
<!--									<exclude>org.slf4j:*</exclude>-->
<!--									<exclude>org.apache.logging.log4j:*</exclude>-->
								</excludes>
							</artifactSet>
# ビルドしなおして
mvn clean package
# netcat(クラスタとポートを変更)
nc -lk 9001

java -jar  ./target/flinkStart-0.1.jar --hostname localhost --port 9001

netcatのプロンプトでまた適当に入力します

standalone
standalone complex

起動したジョブの標準出力にログがでるはずです。

standalone : 2
complex : 1

Discussion