🐿️🐿️🐿️Apache Flink触ってみた🐿️🐿️🐿️(State編)
気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#2です。
🐿️🐿️🐿️Apache Flink触ってみた🐿️🐿️🐿️の続きです。Flinkの目玉機能(多分)のStateを試してみました。
Stateとは
ストリーミング処理の中で、何らかの状態を保存する仕組みです。
some operations remember information across multiple events (for example window operators). These operations are called stateful.
Apache Flinkでは、複数のイベントに跨って状態を保存する処理を「stateful」、保存する状態を「state」と呼びます。
例えば、flink-trainingのrides-and-faresでは、二種類のデータ(乗車情報と料金情報)を紐づけるために、Stateを利用し先に受け取ったデータを保存しています。
他のストリーミングエンジンでは、Spark(Strcutred Streaming)だとStateStore、Cloud Dataflow・Apache BeamだとState APIが対応する機能だと思います。
色々なState
Stateの種類としては、
- KeyedState
- BroadcastState
- OperatorState
の三種類があります。それぞれ、Stateを紐づける単位が違いまして、
- KeyedStateはkeyBy(SQLで言うところのGROUP BY)で分類されたキー・Operator単位
- BroadcastStateは複数のOperator単位
- OperatorStateはOperator単位
でStateを持ちます。
また、Stateのバックエンドとしては
- JVMのヒープ(HashMapStateBackend)
- RocksDB(EmbeddedRocksDBStateBackend)
の二種類がバンドルされています。
なお、TaskManager障害のための永続化は、Checkpoint、Savepointで別に指定します。
Stateを使うプログラムの例
前回使ったSocketWindowWordCountを少し変えて、Stateを使うようにしてみます。
元々のプログラムでは、
- 入力をスペースで分割し、WordWithCountを出力
- 文字列毎にまとめる(keyBy)
- 5秒単位のウィンドウに分割
- 回数を数える
の処理を行います。少し変更して
- 入力をスペースで分割し、Stringを出力
- 文字列毎にまとめる(keyBy)
-
RichMapFunctionを継承したクラス(Counter)を呼び出し
- MapFunctionをベースに、処理の開始前のセットアップ(open)・クローズ(close)できるようにしたクラスです。Stateを使う場合はopenにStateの初期化を記述するためにこちらを使う必要があります
- Stateを使い、Counterの中で呼び出し回数を数える
ことにします。
文字列毎の回数を数えるStateを持つCounterクラスを下のように定義します。
public static class Counter extends RichMapFunction<String, WordWithCount> {
private transient ValueState<Integer> counter;
@Override
public void open(Configuration config) throws Exception {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>(
"counter", Integer.class
);
counter = getRuntimeContext().getState(descriptor);
}
@Override
public WordWithCount map(String value) throws Exception {
if (counter.value() == null) {
counter.update(1);
} else {
counter.update(counter.value() + 1);
}
return new WordWithCount(value, counter.value());
}
}
コード読んでいただくと何となく雰囲気わかると思いますが、
- インスタンス変数として、State(counter)を定義
- openメソッドでStateの情報(名前・型)を持つStateDescriptorを定義
- open自体はStateのためだけの機能ではなく、RichMapFunction(の親クラスRichFunction)で定義されている、初期化処理を定義するメソッドです
- mapメソッドで、
- 呼ばれる毎にState(counter)をカウントアップ
- WordWithCountインスタンスを出力
しています。
Counterの呼び出し部分は、
- flatMapでWordWithCountではなくStringを出力するように
- window+reduceではなく先ほどのCounterクラスをmapで呼び出すように
元々の実装から変更します。
DataStream<WordWithCount> windowCounts =
text.flatMap(
(FlatMapFunction<String, String>)
(value, out) -> {
for (String word : value.split("\\s")) {
// WordWithCountではなくStringを出力するように
out.collect(word);
}
},
TypeInformation.of(String.class)
)
.keyBy(value -> value)
// reduce+windowではなく、
.map(new Counter())
.returns(WordWithCount.class);
全体です
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flinkStart;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
* <p>This program connects to a server socket and reads strings from the socket. The easiest way to
* try this out is to open a text server (at port 12345) using the <i>netcat</i> tool via
*
* <pre>
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
* </pre>
*
* <p>and run this example with the hostname and the port as arguments.
*/
public class DataStreamJob {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println(
"No port specified. Please run 'SocketWindowWordCount "
+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "
+ "and port is the address of the text server");
System.err.println(
"To start a simple text server, run 'netcat -l <port>' and "
+ "type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
// DataStream<WordWithCount> windowCounts =
DataStream<WordWithCount> windowCounts =
text.flatMap(
(FlatMapFunction<String, String>)
(value, out) -> {
for (String word : value.split("\\s")) {
// WordWithCountではなくStringを出力するように
out.collect(word);
}
},
TypeInformation.of(String.class)
)
.keyBy(value -> value)
// reduce+windowではなく、
.map(new Counter())
.returns(WordWithCount.class);
windowCounts.print();
env.execute("Socket Window WordCount");
}
/** Data type for words with count. */
public static class WordWithCount {
public String word;
public long count;
@SuppressWarnings("unused")
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
public static class Counter extends RichMapFunction<String, WordWithCount> {
private transient ValueState<Integer> counter;
@Override
public void open(Configuration config) throws Exception {
ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>(
"counter", Integer.class
);
counter = getRuntimeContext().getState(descriptor);
}
@Override
public WordWithCount map(String value) throws Exception {
if (counter.value() == null) {
counter.update(1);
} else {
counter.update(counter.value() + 1);
}
return new WordWithCount(value, counter.value());
}
}
}
ビルドと実行は前回と同様に行えます。
# ビルド
mvn clean package
# 入力を与えるnetcatの起動
nc -lk 9000
# クラスターの起動
./bin/start-cluster.sh
# ジョブのサブミット
./bin/flink run ../fromscratch/flinkStart/target/flinkStart-0.1.jar --hostname localhost --port 9000
netcatのプロンプトに入力します
state test
hoge test
それっぽいログが出力されます
# ログのファイル名は環境で変わるので、grepやls -lで適当にアタリをつけてください
tail log/flink-notrogue-taskexecutor-7-DESKTOP-6QTOO0L.out
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_e84cfd49-0463-470d-9df8-d767b17e09b1.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
state : 1
test : 1
hoge : 1
test : 2
Checkpoint
概要
前の章のプログラム・設定でも動きはしますが、StateはTaskManagerのメモリにのみ存在する状態なため、TaskManagerのホストやJVMがクラッシュするとStateは失われます。
これを回避するために、FlinkにはStateを定期的にストレージに保存するCheckpointという仕組みがあります。
試す
以下の手順でCheckpointの挙動を確認します。
- Checkpointを有効にしてジョブを起動
- 適当なメッセージをnetcatに渡す
- ジョブを(ungracefulに)止める
- ジョブをチェックポイントから実行
- (2)と共通する単語を含むメッセージをnetcatに渡し、Checkpoint前のStateが復元されていることを見る
まずは、三つの設定を指定してジョブを実行します。
- Checkpointを保存する間隔(execution.checkpointing.interval)
- Checkpointの保存方法(state.checkpoint-storage)
- Checkpointを保存するディレクトリ(state.checkpoints.dir)
# クラスター・netcatは前章に引き続き実行の状態で実行
# ディレクトリの部分は適当に変えてください
./bin/flink run -Dexecution.checkpointing.interval='2min' -Dstate.checkpoint-storage='filesystem' -Dstate.checkpoints.dir='file:///home/notrogue/project/flink/flink-1.16.0/checkpoints/' ../fromscratch/flinkStart/target/flinkStart-0.1.jar --hostname localhost --port 9000
netcatに適当な入力を渡します
before checkpoint
I love checkpoint
ログにそれっぽい出力が追加されています。
tail log/flink-notrogue-taskexecutor-7-DESKTOP-6QTOO0L.out
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_42cd5112-92df-45a5-86ab-97b0f97124fc.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
before : 1
checkpoint : 1
I : 1
love : 1
checkpoint : 2
指定したディレクトリにCheckpointのデータが保存されています
# bb0fc866871ad93fdd49b869973326c9の部分はflink runで表示されるJob IDです
# 試す際はflink runの結果(やWebUI)で表示されるJob IDに合わせてください
tree checkpoints/bb0fc866871ad93fdd49b869973326c9
checkpoints/bb0fc866871ad93fdd49b869973326c9
├── chk-1
│ └── _metadata
├── shared
└── taskowned
3 directories, 1 file
TaskManagerの停止
# JobManager・TaskManagerをkill -9で止めます
pgrep -f taskmanager | xargs -I{} kill -9 {}
killの結果、TaskManagerは管理画面から消え、JobはRestartingの状態になります。
次に、TaskManagerを再度起動します。
./bin/taskmanager.sh start
Jobの再起動を見届け、netcatに再度入力を入れます。
after checkpoint
ログを見るとcheckpointが3になっています(TaskManager止める前に二回、止めた後に一回)。なお、「before」や「I love」は入力がないため出力さていません(State自体は保存さているはず)。
tail log/flink-notrogue-taskexecutor-8-DESKTOP-6QTOO0L.out
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_eed4396b-ed4e-4772-96f1-73b0dbda3fc3.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
after : 1
checkpoint : 3
TaskManager・JobManagerの停止
先ほどはTaskManagerだけを停止しました。この場合、JobManagerは生きていたため、ジョブ自体はは生きており、自動で再実行しました。
より大きな障害を想定して、今度はJobManagerも止めて(=ジョブも止まる)、ジョブを手動で再実行する場合を試してみます。
pgrep -f taskmanager | xargs -I{} kill -9 {}
pgrep -f jobmanager | xargs -I{} kill -9 {}
# Flink関係のプロセスは死んでいることを確認
ps aux | grep flink
notrogue 31490 0.0 0.0 8164 656 pts/4 S+ 18:47 0:00 grep --color=auto flink
# 再度クラスターを起動
./bin/start-cluster.sh
JobManagerが死んだため、動いているジョブが無い状態のはずです。
./bin/flink list
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Waiting for response...
No running jobs.
No scheduled jobs.
試しにCheckpointなしで実行してみます。
./bin/flink run ../fromscratch/flinkStart/target/flinkStart-0.1.jar --hostname localhost --port 9000
netcatに入力します。
without checkpoint
ログには回数1として表示されます。Stateが保存されているなら、checkpointは3になるはずです(Checkpoint前に二回入力しているので)。
without : 1
checkpoint : 1
今度はCheckpointの復元を指定して実行します。
# Job ID
# ディレクトリ、checkpoints下のジョブIDとCheckpointは適当に合わせ変更してください
./bin/flink run -s 'file:///home/notrogue/project/flink/flink-1.16.0/checkpoints/bb0fc866871ad93fdd49b869973326c9/chk-20/' -Dexecution.checkpointing.interval='2min' -Dstate.checkpoint-storage='filesystem' -Dstate.checkpoints.dir='file:///home/notrogue/project/ flink/flink-1.16.0/checkpoints/' ../fromscratch/flinkStart/target/flinkStart-0.1.jar --hostname localhost --port 9000
再度、メッセージをnetcatに入力します。
with checkpoint
今度は「checkpoint」の回数が3になっています
with : 1
checkpoint : 3
補足:Checkpointの設定
なお、上ではFlink CLIのオプションでチェックポイントの間隔・ディレクトリを指定していますが、flink-conf.yamlでも設定可能です。
execution.checkpointing.interval: 2min
state.checkpoint-storage: filesystem
state.checkpoints.dir: file:///home/notrogue/project/flink/flink-1.16.0/checkpoints
./bin/stop-cluster.sh
./bin/start-cluster.sh
# ジョブのサブミット
./bin/flink run ../fromscratch/flinkStart/target/flinkStart-0.1.jar --hostname localhost --port 9000
この例ではファイルへの保存を指定していますが、JobManagerのメモリへの保存やカスタマイズして保存先の指定が可能です。
今回はローカルファイルシステムを指定して試しましたが、(権限準備すれば)HDFSやS3も可能っぽいです(例:S3のドキュメントへの保存)。
Savepoint
Checkpointでは一定間隔(execution.checkpointing.interval)毎にStateを保存しますが、任意のタイミングで保存するSavepointという機能もあるので試してみます。
まずは、Savepointの設定を有効にしてジョブを起動します。
./bin/flink run -s 'file:///home/notrogue/project/flink/flink-1.16.0/checkpoints/bb0fc866871ad93fdd49b869973326c9/chk-20/' -Dexecution.checkpointing.interval='2min' -Dstate.checkpoint-storage='filesystem' -Dstate.checkpoints.dir='file:///home/notrogue/project/ flink/flink-1.16.0/checkpoints/' ../fromscratch/flinkStart/target/flinkStart-0.1.jar --hostname localhost --port 9000
netcat
with checkpoint
before savepoint
ログ
tail log/flink-notrogue-taskexecutor-11-DESKTOP-6QTOO0L.out
WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_7552c222-6b96-483a-abab-e5743ef8450b.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
with : 1
checkpoint : 3
with : 1
checkpoint : 3
before : 2
savepoint : 1
Savepointをとってみます
./bin/flink savepoint bin/flink savepoint 9119a923147d251aff09ac071ad99b1f 'file:///home/notrogue/project/flink/flink-1.16.0/savepoints/
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Triggering savepoint for job 9119a923147d251aff09ac071ad99b1f.
Waiting for response...
Savepoint completed. Path: file:/home/notrogue/project/flink/flink-1.16.0/savepoints/savepoint-9119a9-0acd3dd286fd
You can resume your program from this savepoint with the run command.
tree savepoints/savepoint-9119a9-0acd3dd286fd
savepoints/savepoint-9119a9-0acd3dd286fd
└── _metadata
動いているジョブを止めて、Savepointから再実行してみます
# ジョブを停止
./bin/flink cancel 9119a923147d251aff09ac071ad99b1f
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Cancelling job 9119a923147d251aff09ac071ad99b1f.
Cancelled job 9119a923147d251aff09ac071ad99b1f.
# Savepointからジョブを起動
./bin/flink run -s 'file:///home/notrogue/project/flink/flink-1.16.0/savepoints/savepoint-9119a9-0acd3dd286fd' -Dexecution.checkpointing.interval='2min' -Dstate.checkpoint-storage='filesystem' -Dstate.checkpoints.dir='file:///home/notrogue/project/ flink/flink-1.16.0/checkpoints/' ../fromscratch/flinkStart/target/flinkStart-0.1.jar --hostname localhost --port 9000
SavepointからStateが復活していることを確認するために、netcatに適当に入力します。
after savepoint
Savepoint前に与えた入力を加味して、「savepoint」が二回とログに出力されます。
after : 1
savepoint : 2
その他の機能
ドキュメント見て、面白そうと思った機能です(試そうとしましたが力尽きました)。
State Processor API
これまでの例では、Flinkアプリケーションで保存したStateを、同じFlinkアプリケーション(同じOperator・Stateを持つアプリケーション)で読み込んでいました。
State Processor APIを使うことで、別のFlinkアプリケーションで読み書きすることが出来ます。
これにより、
- 動いてるFlinkジョブのStateの状態の確認
- 初期状態(Bootstrap)のためのState書き込み
- 壊れてStateの修復
などが出来るそうです。
Schema Evolution
これまでの例では、Stateを書き込みに使った型(クラス)と読み込みに使った型(クラス)が同じでした。
Schema Evolutionにより、読み込みに使う型(クラス)に元の型(クラス)からのフィールドの削除や・追加などが、(いくつかの条件付きで)行えるようです。
削除
Stateは消さない限り残ります。特にKeyedStateでは、入力データの増加に応じて保存するStateも増えていきます。
align・unalign
Opereator間でStateの一貫性を保つ(align)、あるいは、レイテンシーを優先する(unalign)かのコントールができるっぽいです
Discussion