🐿️

🐿️🐿️🐿️Apache Flink触ってみた🐿️🐿️🐿️(State編)

2022/12/02に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#2です。

🐿️🐿️🐿️Apache Flink触ってみた🐿️🐿️🐿️の続きです。Flinkの目玉機能(多分)のStateを試してみました。

Stateとは

ストリーミング処理の中で、何らかの状態を保存する仕組みです。

公式ドキュメント曰く

some operations remember information across multiple events (for example window operators). These operations are called stateful.

Apache Flinkでは、複数のイベントに跨って状態を保存する処理を「stateful」、保存する状態を「state」と呼びます。
例えば、flink-trainingのrides-and-faresでは、二種類のデータ(乗車情報と料金情報)を紐づけるために、Stateを利用し先に受け取ったデータを保存しています。

他のストリーミングエンジンでは、Spark(Strcutred Streaming)だとStateStore、Cloud Dataflow・Apache BeamだとState APIが対応する機能だと思います。

色々なState

Stateの種類としては、

  • KeyedState
  • BroadcastState
  • OperatorState

三種類があります。それぞれ、Stateを紐づける単位が違いまして、

  • KeyedStateはkeyBy(SQLで言うところのGROUP BY)で分類されたキー・Operator単位
  • BroadcastStateは複数のOperator単位
  • OperatorStateはOperator単位

でStateを持ちます。

また、Stateのバックエンドとして

  • JVMのヒープ(HashMapStateBackend)
  • RocksDB(EmbeddedRocksDBStateBackend)

の二種類がバンドルされています。
なお、TaskManager障害のための永続化は、Checkpoint、Savepointで別に指定します。

Stateを使うプログラムの例

前回使ったSocketWindowWordCountを少し変えて、Stateを使うようにしてみます。

元々のプログラムでは、

  1. 入力をスペースで分割し、WordWithCountを出力
  2. 文字列毎にまとめる(keyBy)
  3. 5秒単位のウィンドウに分割
  4. 回数を数える

の処理を行います。少し変更して

  1. 入力をスペースで分割し、Stringを出力
  2. 文字列毎にまとめる(keyBy)
  3. RichMapFunctionを継承したクラス(Counter)を呼び出し
    • MapFunctionをベースに、処理の開始前のセットアップ(open)・クローズ(close)できるようにしたクラスです。Stateを使う場合はopenにStateの初期化を記述するためにこちらを使う必要があります
  4. Stateを使い、Counterの中で呼び出し回数を数える

ことにします。

文字列毎の回数を数えるStateを持つCounterクラスを下のように定義します。

	public static class Counter extends RichMapFunction<String, WordWithCount> {
		private transient ValueState<Integer> counter;

		@Override
		public void open(Configuration config)  throws Exception {
			ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>(
					 "counter", Integer.class
			);
			counter = getRuntimeContext().getState(descriptor);
		}

		@Override
		public WordWithCount map(String value) throws Exception {
			if (counter.value() == null) {
				counter.update(1);
			} else {
				counter.update(counter.value() + 1);

			}
			return new WordWithCount(value, counter.value());
		}
	}

コード読んでいただくと何となく雰囲気わかると思いますが、

  • インスタンス変数として、State(counter)を定義
  • openメソッドでStateの情報(名前・型)を持つStateDescriptorを定義
    • open自体はStateのためだけの機能ではなく、RichMapFunction(の親クラスRichFunction)で定義されている、初期化処理を定義するメソッドです
  • mapメソッドで、
    • 呼ばれる毎にState(counter)をカウントアップ
    • WordWithCountインスタンスを出力

しています。

Counterの呼び出し部分は、

  • flatMapでWordWithCountではなくStringを出力するように
  • window+reduceではなく先ほどのCounterクラスをmapで呼び出すように

元々の実装から変更します。

		DataStream<WordWithCount> windowCounts =
						text.flatMap(
								(FlatMapFunction<String, String>)
										(value, out) -> {
											for (String word : value.split("\\s")) {
												// WordWithCountではなくStringを出力するように
												out.collect(word);
											}
										},
								TypeInformation.of(String.class)
						 )
						.keyBy(value -> value)
						// reduce+windowではなく、
						.map(new Counter())
						.returns(WordWithCount.class);

全体です

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package flinkStart;
import org.apache.flink.api.common.functions.FlatMapFunction;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.configuration.Configuration;

/**
 * Implements a streaming windowed version of the "WordCount" program.
 *
 * <p>This program connects to a server socket and reads strings from the socket. The easiest way to
 * try this out is to open a text server (at port 12345) using the <i>netcat</i> tool via
 *
 * <pre>
 * nc -l 12345 on Linux or nc -l -p 12345 on Windows
 * </pre>
 *
 * <p>and run this example with the hostname and the port as arguments.
 */
public class DataStreamJob {

	public static void main(String[] args) throws Exception {

		// the host and the port to connect to
		final String hostname;
		final int port;
		try {
			final ParameterTool params = ParameterTool.fromArgs(args);
			hostname = params.has("hostname") ? params.get("hostname") : "localhost";
			port = params.getInt("port");
		} catch (Exception e) {
			System.err.println(
					"No port specified. Please run 'SocketWindowWordCount "
							+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "
							+ "and port is the address of the text server");
			System.err.println(
					"To start a simple text server, run 'netcat -l <port>' and "
							+ "type the input text into the command line");
			return;
		}

		// get the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// get input data by connecting to the socket
		DataStream<String> text = env.socketTextStream(hostname, port, "\n");

		// parse the data, group it, window it, and aggregate the counts
		// DataStream<WordWithCount> windowCounts =
		DataStream<WordWithCount> windowCounts =
						text.flatMap(
								(FlatMapFunction<String, String>)
										(value, out) -> {
											for (String word : value.split("\\s")) {
												// WordWithCountではなくStringを出力するように
												out.collect(word);
											}
										},
								TypeInformation.of(String.class)
						 )
						.keyBy(value -> value)
						// reduce+windowではなく、
						.map(new Counter())
						.returns(WordWithCount.class);
		windowCounts.print();
		env.execute("Socket Window WordCount");
	}

	/** Data type for words with count. */
	public static class WordWithCount {

		public String word;
		public long count;

		@SuppressWarnings("unused")
		public WordWithCount() {}

		public WordWithCount(String word, long count) {
			this.word = word;
			this.count = count;
		}

		@Override
		public String toString() {
			return word + " : " + count;
		}
	}


	public static class Counter extends RichMapFunction<String, WordWithCount> {
		private transient ValueState<Integer> counter;

		@Override
		public void open(Configuration config)  throws Exception {
			ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>(
					 "counter", Integer.class
			);
			counter = getRuntimeContext().getState(descriptor);
		}

		@Override
		public WordWithCount map(String value) throws Exception {
			if (counter.value() == null) {
				counter.update(1);
			} else {
				counter.update(counter.value() + 1);

			}
			return new WordWithCount(value, counter.value());
		}
	}
}

ビルドと実行は前回と同様に行えます。

# ビルド
mvn clean package

# 入力を与えるnetcatの起動
nc -lk 9000
# クラスターの起動
 ./bin/start-cluster.sh
# ジョブのサブミット
./bin/flink run  ../fromscratch/flinkStart/target/flinkStart-0.1.jar  --hostname localhost --port 9000

netcatのプロンプトに入力します

state test
hoge test

それっぽいログが出力されます

# ログのファイル名は環境で変わるので、grepやls -lで適当にアタリをつけてください
tail log/flink-notrogue-taskexecutor-7-DESKTOP-6QTOO0L.out
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_e84cfd49-0463-470d-9df8-d767b17e09b1.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
state : 1
test : 1
hoge : 1
test : 2

Checkpoint

概要

前の章のプログラム・設定でも動きはしますが、StateはTaskManagerのメモリにのみ存在する状態なため、TaskManagerのホストやJVMがクラッシュするとStateは失われます。
これを回避するために、FlinkにはStateを定期的にストレージに保存するCheckpointという仕組みがあります。

試す

以下の手順でCheckpointの挙動を確認します。

  1. Checkpointを有効にしてジョブを起動
  2. 適当なメッセージをnetcatに渡す
  3. ジョブを(ungracefulに)止める
  4. ジョブをチェックポイントから実行
  5. (2)と共通する単語を含むメッセージをnetcatに渡し、Checkpoint前のStateが復元されていることを見る

まずは、三つの設定を指定してジョブを実行します。

  • Checkpointを保存する間隔(execution.checkpointing.interval)
  • Checkpointの保存方法(state.checkpoint-storage)
  • Checkpointを保存するディレクトリ(state.checkpoints.dir)
# クラスター・netcatは前章に引き続き実行の状態で実行
# ディレクトリの部分は適当に変えてください
./bin/flink run -Dexecution.checkpointing.interval='2min' -Dstate.checkpoint-storage='filesystem' -Dstate.checkpoints.dir='file:///home/notrogue/project/flink/flink-1.16.0/checkpoints/' ../fromscratch/flinkStart/target/flinkStart-0.1.jar  --hostname localhost --port 9000

netcatに適当な入力を渡します

before checkpoint
I love checkpoint

ログにそれっぽい出力が追加されています。

 tail log/flink-notrogue-taskexecutor-7-DESKTOP-6QTOO0L.out
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_42cd5112-92df-45a5-86ab-97b0f97124fc.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
before : 1
checkpoint : 1
I : 1
love : 1
checkpoint : 2

指定したディレクトリにCheckpointのデータが保存されています

# bb0fc866871ad93fdd49b869973326c9の部分はflink runで表示されるJob IDです
# 試す際はflink runの結果(やWebUI)で表示されるJob IDに合わせてください
tree checkpoints/bb0fc866871ad93fdd49b869973326c9
checkpoints/bb0fc866871ad93fdd49b869973326c9
├── chk-1
│   └── _metadata
├── shared
└── taskowned

3 directories, 1 file

TaskManagerの停止

# JobManager・TaskManagerをkill -9で止めます
pgrep -f taskmanager | xargs -I{} kill -9 {}

killの結果、TaskManagerは管理画面から消え、JobはRestartingの状態になります。
次に、TaskManagerを再度起動します。

./bin/taskmanager.sh start

Jobの再起動を見届け、netcatに再度入力を入れます。

after checkpoint

ログを見るとcheckpointが3になっています(TaskManager止める前に二回、止めた後に一回)。なお、「before」や「I love」は入力がないため出力さていません(State自体は保存さているはず)。

tail log/flink-notrogue-taskexecutor-8-DESKTOP-6QTOO0L.out
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_eed4396b-ed4e-4772-96f1-73b0dbda3fc3.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
after : 1
checkpoint : 3

TaskManager・JobManagerの停止

先ほどはTaskManagerだけを停止しました。この場合、JobManagerは生きていたため、ジョブ自体はは生きており、自動で再実行しました。
より大きな障害を想定して、今度はJobManagerも止めて(=ジョブも止まる)、ジョブを手動で再実行する場合を試してみます。

pgrep -f taskmanager | xargs -I{} kill -9 {}
pgrep -f jobmanager | xargs -I{} kill -9 {}
# Flink関係のプロセスは死んでいることを確認
ps aux | grep flink
notrogue 31490  0.0  0.0   8164   656 pts/4    S+   18:47   0:00 grep --color=auto flink
# 再度クラスターを起動
./bin/start-cluster.sh

JobManagerが死んだため、動いているジョブが無い状態のはずです。

 ./bin/flink list
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Waiting for response...
No running jobs.
No scheduled jobs.

試しにCheckpointなしで実行してみます。

./bin/flink run  ../fromscratch/flinkStart/target/flinkStart-0.1.jar  --hostname localhost --port 9000

netcatに入力します。

without checkpoint

ログには回数1として表示されます。Stateが保存されているなら、checkpointは3になるはずです(Checkpoint前に二回入力しているので)。

without : 1
checkpoint : 1

今度はCheckpointの復元を指定して実行します。

# Job ID
# ディレクトリ、checkpoints下のジョブIDとCheckpointは適当に合わせ変更してください
./bin/flink run  -s 'file:///home/notrogue/project/flink/flink-1.16.0/checkpoints/bb0fc866871ad93fdd49b869973326c9/chk-20/' -Dexecution.checkpointing.interval='2min' -Dstate.checkpoint-storage='filesystem' -Dstate.checkpoints.dir='file:///home/notrogue/project/ flink/flink-1.16.0/checkpoints/' ../fromscratch/flinkStart/target/flinkStart-0.1.jar  --hostname localhost --port 9000

再度、メッセージをnetcatに入力します。

with checkpoint

今度は「checkpoint」の回数が3になっています

with : 1
checkpoint : 3

補足:Checkpointの設定

なお、上ではFlink CLIのオプションでチェックポイントの間隔・ディレクトリを指定していますが、flink-conf.yamlでも設定可能です。

execution.checkpointing.interval: 2min
state.checkpoint-storage: filesystem
state.checkpoints.dir: file:///home/notrogue/project/flink/flink-1.16.0/checkpoints
./bin/stop-cluster.sh
./bin/start-cluster.sh
# ジョブのサブミット
./bin/flink run ../fromscratch/flinkStart/target/flinkStart-0.1.jar  --hostname localhost --port 9000

この例ではファイルへの保存を指定していますが、JobManagerのメモリへの保存やカスタマイズして保存先の指定が可能です
今回はローカルファイルシステムを指定して試しましたが、(権限準備すれば)HDFSやS3も可能っぽいです(例:S3のドキュメントへの保存)。

Savepoint

Checkpointでは一定間隔(execution.checkpointing.interval)毎にStateを保存しますが、任意のタイミングで保存するSavepointという機能もあるので試してみます。

まずは、Savepointの設定を有効にしてジョブを起動します。

./bin/flink run  -s 'file:///home/notrogue/project/flink/flink-1.16.0/checkpoints/bb0fc866871ad93fdd49b869973326c9/chk-20/' -Dexecution.checkpointing.interval='2min' -Dstate.checkpoint-storage='filesystem' -Dstate.checkpoints.dir='file:///home/notrogue/project/ flink/flink-1.16.0/checkpoints/'  ../fromscratch/flinkStart/target/flinkStart-0.1.jar  --hostname localhost --port 9000

netcat

with checkpoint
before savepoint

ログ

 tail log/flink-notrogue-taskexecutor-11-DESKTOP-6QTOO0L.out
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_7552c222-6b96-483a-abab-e5743ef8450b.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
with : 1
checkpoint : 3
with : 1
checkpoint : 3
before : 2
savepoint : 1

Savepointをとってみます

 ./bin/flink savepoint  bin/flink  savepoint 9119a923147d251aff09ac071ad99b1f 'file:///home/notrogue/project/flink/flink-1.16.0/savepoints/
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Triggering savepoint for job 9119a923147d251aff09ac071ad99b1f.
Waiting for response...
Savepoint completed. Path: file:/home/notrogue/project/flink/flink-1.16.0/savepoints/savepoint-9119a9-0acd3dd286fd
You can resume your program from this savepoint with the run command.

tree savepoints/savepoint-9119a9-0acd3dd286fd
savepoints/savepoint-9119a9-0acd3dd286fd
└── _metadata

動いているジョブを止めて、Savepointから再実行してみます

# ジョブを停止
./bin/flink cancel 9119a923147d251aff09ac071ad99b1f
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Cancelling job 9119a923147d251aff09ac071ad99b1f.
Cancelled job 9119a923147d251aff09ac071ad99b1f.
# Savepointからジョブを起動
./bin/flink run  -s 'file:///home/notrogue/project/flink/flink-1.16.0/savepoints/savepoint-9119a9-0acd3dd286fd' -Dexecution.checkpointing.interval='2min' -Dstate.checkpoint-storage='filesystem' -Dstate.checkpoints.dir='file:///home/notrogue/project/ flink/flink-1.16.0/checkpoints/' ../fromscratch/flinkStart/target/flinkStart-0.1.jar  --hostname localhost --port 9000

SavepointからStateが復活していることを確認するために、netcatに適当に入力します。

after savepoint

Savepoint前に与えた入力を加味して、「savepoint」が二回とログに出力されます。

after : 1
savepoint : 2

その他の機能

ドキュメント見て、面白そうと思った機能です(試そうとしましたが力尽きました)。

State Processor API

これまでの例では、Flinkアプリケーションで保存したStateを、同じFlinkアプリケーション(同じOperator・Stateを持つアプリケーション)で読み込んでいました。

State Processor APIを使うことで、別のFlinkアプリケーションで読み書きすることが出来ます。

これにより、

  • 動いてるFlinkジョブのStateの状態の確認
  • 初期状態(Bootstrap)のためのState書き込み
  • 壊れてStateの修復

などが出来るそうです。

Schema Evolution

これまでの例では、Stateを書き込みに使った型(クラス)と読み込みに使った型(クラス)が同じでした。

Schema Evolutionにより、読み込みに使う型(クラス)に元の型(クラス)からのフィールドの削除や・追加などが、(いくつかの条件付きで)行えるようです。

削除

Stateは消さない限り残ります。特にKeyedStateでは、入力データの増加に応じて保存するStateも増えていきます。

TTLtimerなどでStateの削除を行えます。

align・unalign

Opereator間でStateの一貫性を保つ(align)、あるいは、レイテンシーを優先する(unalign)かのコントールができるっぽいです

https://flink.apache.org/2020/10/15/from-aligned-to-unaligned-checkpoints-part-1.html

Discussion