🐿️🐿️🐿️Apache Flink触ってみた(Kubernetes編)🐿️🐿️🐿️
気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#4です。
の続きです。
環境
- Ubuntu 20.04 (Windows10のWSL2上)
- minikube kubernetes-version v1.23.8
概要
以前の記事では、Flinkクラスタ(JobMangaer、TaskManager…)をローカルのJVMで動かしました。
Kubernetes上でFlinkを動かす仕組みはいくつかあり、例えば、
などの方法があります。
(Kubernetes Operatorは他二つと少しレイヤーが違い、Kubernetes Operatorの場合は他Standlaone Mode・Native Modeのどちらかと組み合わせて使います)
三つの方法は、実装の「Kubernetesっぽさ」が違い、
- Standalone Modeでは、各コンポーネント(JobManager、Taskamanager)のPodそれぞれが動き続けます
- 非KubernetesのクラスタをそのままKubernetesに持ってきた感じ
- Native Modeでは、Flink Job毎にTask ManagerのPodが立ち上がり、Jobの終了に伴いそのPodも終わります
- Kubernetes Operatorでは、所望の状態になるように監視・対応をするOperatorが常駐し、必要なコンポーネントを管理します
正直なところ使い分けがわかっていないですが、ドキュメントでは
- 一般論としては、StandaloneよりNativeを推奨
-
we generally recommend new users to deploy Flink on Kubernetes using native Kubernetes deployments
-
- ただし、一部の機能(具体的にはReactive Mode)はStandalone Modeのみで動きます
といった記載があります。
試してみる
準備
minikube使いました(他のKubernetes環境でも(多分)動くはず)。
minikube start --kubernetes-version=v1.23.8
また、Flinkのコマンドを使いますので適当なバージョンをダウンロード・解凍しておきます(1.16を私は使いました)。
Standalone Mode
手順としては、
- Kubernetesクラスタの準備
- ConfigMap、JobManager・TaskManagerのDeploymentをapplyし、Flinkクラスタを起動
- ひな形はFlinkのWebページにあります
- ローカルからFlink Jobをデプロイするための、Port forwardingの設定
- Flink Jobのサブミット
を行います。
# minikubeを使う時の注意書きにあるコマンド
minikube ssh 'sudo ip link set docker0 promisc on'
FlinkのWebページにある設定ファイルを保存しapplyします。
kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-session-deployment-non-ha.yaml
kubectl create -f taskmanager-session-deployment.yaml
JobManager・TaskManagerのPodが立ち上がっているはずです。
kubectl get pod
NAME READY STATUS RESTARTS AGE
flink-jobmanager-5c6b6c6b7d-9j2f2 1/1 Running 0 17s
flink-taskmanager-76cb7df9d6-gwst5 1/1 Running 0 14s
flink-taskmanager-76cb7df9d6-j9fjh 1/1 Running 0 14s
kubectl get service
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
flink-jobmanager ClusterIP 10.96.123.162 <none> 6123/TCP,6124/TCP,8081/TCP 22s
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 7d4h
JobManagerへのPort forwardingの設定をします。
kubectl port-forward $(kubectl get pods --selector component=jobmanager -o jsonpath="{.items[*].metadata.name}") 8081:8081
Jobをサブミットします。
../flink-1.16.0/bin/flink run -m localhost:8081 ../flink-1.16.0/examples/streaming/WordCount.jar
WebUI(http://localhost:8081)からJobの実行が確認できるはずです。
また、kubectl logsでTask ManagerのPodを見るとJobのログも確認できます。
# TaskManagerのPodは適当に変えてください
kubectl logs flink-taskmanager-76cb7df9d6-gwst5 | tail
(my,1)
(sins,1)
(remember,1)
(d,4)
2022-11-23 02:51:51,616 INFO org.apache.flink.runtime.taskmanager.Task [] - counter -> Sink: print-sink (1/1)#0 (bf3d928db08d2e4deb9b8c7a7c67a746_90bea66de1c231edf33913ecd54406c1_0_0) switched from RUNNING to FINISHED.
2022-11-23 02:51:51,616 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for counter -> Sink: print-sink (1/1)#0 (bf3d928db08d2e4deb9b8c7a7c67a746_90bea66de1c231edf33913ecd54406c1_0_0).
2022-11-23 02:51:51,617 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task counter -> Sink: print-sink (1/1)#0 bf3d928db08d2e4deb9b8c7a7c67a746_90bea66de1c231edf33913ecd54406c1_0_0.
2022-11-23 02:51:51,653 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=192.000mb (201326587 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb (268435460 bytes), networkMemory=64.000mb (67108865 bytes)}, allocationId: 256d3849b57236cf254b9627be4db473, jobId: a174b8c2c3294e206462bfb03b8db2f5).
2022-11-23 02:51:51,713 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job a174b8c2c3294e206462bfb03b8db2f5 from job leader monitoring.
2022-11-23 02:51:51,714 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job a174b8c2c3294e206462bfb03b8db2f5.
Native Mode
Native ModeでFlinkクラスターを動かすには、
- 権限の設定
- Flinkクラスタの起動
- ローカルからFlink Jobをデプロイするための、Port forwardingの設定
- /etc/hostsの設定
- Flink Jobのサブミット
を行います。
まずは、default service accountの設定を行います。
kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default
クラスタの起動(kubernetes-session.shはFlinkのパッケージに含まれます)します。
../flink-1.16.0/bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster
JobManagerのPodが立ち上がっているはずです。Session Modeとは違いTask Managerは起動していません。
kubectl get pod
NAME READY STATUS RESTARTS AGE
my-first-flink-cluster-595c9d8ff5-pdwb5 1/1 Running 0 20m
kubectl get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 7d6h
my-first-flink-cluster ClusterIP None <none> 6123/TCP,6124/TCP 21m
my-first-flink-cluster-rest ClusterIP 10.100.126.98 <none> 8081/TCP 21m
Port forwardingの設定
kubectl port-forward service/my-first-flink-cluster-rest 8081
/etc/hostsも設定します。なお、設定しないでジョブをサブミットするとCaused by: java.net.UnknownHostException: my-first-flink-cluster-rest.default: Name or service not known
のようなエラーがでます。
sudo echo '127.0.0.1 my-first-flink-cluster-rest.default' >> /etc/hosts
ジョブのサブミット
../flink-1.16.0/bin/flink run --target kubernetes-session -Dkubernetes.cluster-id=my-first-flink-cluster ../flink-1.16.0/examples/streaming/WordCount.jar
WebUI(http://localhost:8081)からJobの実行が確認できるはずです。
Standalone Modeと違い、Task ManagerのPodはジョブの始まりに合わせて起動し、終わると終了します。Task ManagerのPodが生きている間であれば、ジョブのログを確認することもできます。
(本番ではfluentdとか使いませう)
# ジョブ開始から、ジョブが終了して少しの間はTask Managerのpodが見えます
kubectl get pod
NAME READY STATUS RESTARTS AGE
my-first-flink-cluster-595c9d8ff5-pdwb5 1/1 Running 0 25m
my-first-flink-cluster-taskmanager-1-1 1/1 Running 0 23s
# ジョブのログの確認
kubectl logs my-first-flink-cluster-taskmanager-1-1 | tail
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
2022-11-23 00:22:22,020 INFO org.apache.flink.runtime.taskmanager.Task [] - counter -> Sink: print-sink (1/1)#0 (d62975ecc002eea2b526ddfcf3eaf0ab_90bea66de1c231edf33913ecd54406c1_0_0) switched from RUNNING to FINISHED.
2022-11-23 00:22:22,020 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for counter -> Sink: print-sink (1/1)#0 (d62975ecc002eea2b526ddfcf3eaf0ab_90bea66de1c231edf33913ecd54406c1_0_0).
2022-11-23 00:22:22,020 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task counter -> Sink: print-sink (1/1)#0 d62975ecc002eea2b526ddfcf3eaf0ab_90bea66de1c231edf33913ecd54406c1_0_0.
2022-11-23 00:22:22,033 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: ee871e23f964f52ca9cd576fc745359a, jobId: cbad95afe6b0e2c9dd3a459c322d1442).
2022-11-23 00:22:22,034 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job cbad95afe6b0e2c9dd3a459c322d1442 from job leader monitoring.
2022-11-23 00:22:22,035 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor
Kubernetes Operator
Kubernetes OperatorでFlinkクラスターを動かすには、
- Helmチャートのインストール
- 実行したjarを含むコンテナイメージの作成
- (NFSとかObject Storageに置いて実行することも可能だと思います)
- FlinkDeployment(Flinkのジョブに対応するCRD)をapply
- ローカからアクセスできるようにPort forwardingの設定
を行います。
なお、前述のStandalone Mode・Native Modeと、Kubernetes Operatorはレイヤーが異なる話で、Kubernetes Operatorを使う場合は、Standalone Mode・Native Modeを選択して使います。
まずは、helmチャートをインストールします。
kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
実行したjarを含むコンテナイメージを作成します。
Dockerfileを作成します。WordCount.jarはFlinkのbinary releaseに含まれるサンプルのjarです。
FROM flink:1.16
ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
COPY WordCount.jar $FLINK_PLUGINS_DIR/hadoop-fs/
コンテナイメージをビルドします。
# minikubeで動かす場合は eval $(minikube docker-env)(build前)かminikube image load flink-with-word-count:v.0.0.1(build後)
docker image build -t flink-with-word-count:v.0.0.1 .
FlinkDeployment(Flinkのジョブに対応するCRD)を作成します。
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: basic-example
spec:
image: flink-with-word-count:v.0.0.1
imagePullPolicy: Never
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "2"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 1
taskManager:
resource:
memory: "2048m"
cpu: 1
job:
jarURI: local:////opt/flink/plugins/hadoop-fs/WordCount.jar
parallelism: 2
- jarURIの部分はローカルではなく、imageに指定しているコンテナイメージの中でのパスです。
- 適切に権限を設定すればS3やHDFSもいけるはず(試していないですが)
- 設定項目はこちらのドキュメントに説明あります
作成したFlinkDeployment(Flinkのジョブに対応するCRD)ファイルをapplyします。
kubectl apply -f operator-test.yml
Web UIを確認するためのPort Forwardingします。
kubectl port-forward svc/basic-example-rest 8081
FlinkDeploymentに対応するPod(TaskManagerとJobManager)が起動しています。
kubectl get pods
NAME READY STATUS RESTARTS AGE
basic-example-77d7f6ff6c-pbbxr 1/1 Running 0 22s
basic-example-taskmanager-1-1 1/1 Running 0 14s
flink-kubernetes-operator-6d8fcfc479-cc5jn 2/2 Running 0 7h46m
結果の確認
kubectl logs basic-example-taskmanager-1-1 | tail
2> (nymph,1)
2> (in,3)
2> (thy,1)
2022-11-23 12:52:05,518 INFO org.apache.flink.runtime.taskmanager.Task [] - counter -> Sink: print-sink (2/2)#0 (89071096c538eb058622b6b1a39c5bde) switched from RUNNING to FINISHED.
2022-11-23 12:52:05,519 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for counter -> Sink: print-sink (2/2)#0 (89071096c538eb058622b6b1a39c5bde).
2022-11-23 12:52:05,519 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task counter -> Sink: print-sink (2/2)#0 89071096c538eb058622b6b1a39c5bde.
2022-11-23 12:52:05,540 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=0.5, taskHeapMemory=268.800mb (281857222 bytes), taskOffHeapMemory=0 bytes, managedMemory=317.440mb (332859969 bytes), networkMemory=79.360mb (83214992 bytes)}, allocationId: 7165269bab9e883b6543d555a15cf2cc, jobId: 598fcfb12a26370bf48f13a2028ed09b).
2022-11-23 12:52:05,542 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=0.5, taskHeapMemory=268.800mb (281857222 bytes), taskOffHeapMemory=0 bytes, managedMemory=317.440mb (332859969 bytes), networkMemory=79.360mb (83214992 bytes)}, allocationId: 5feefa2316c2034d7be8fc71e07efbd8, jobId: 598fcfb12a26370bf48f13a2028ed09b).
2022-11-23 12:52:05,543 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 598fcfb12a26370bf48f13a2028ed09b from job leader monitoring.
2022-11-23 12:52:05,543 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job 598fcfb12a26370bf48f13a2028ed09b.
なお、デフォルトではnative modeなため、Task ManagerのPodはジョブが終わると消えます(JobManagerのPodは残る)。
Discussion