🐿️

🐿️🐿️🐿️Apache Flink触ってみた(Kubernetes編)🐿️🐿️🐿️

2022/12/04に公開

気にはなってるけど触ってないビッグデータ系のツール・サービスを触る Advent Calendar 2022の#4です。

の続きです。

環境

  • Ubuntu 20.04 (Windows10のWSL2上)
  • minikube kubernetes-version v1.23.8

概要

以前の記事では、Flinkクラスタ(JobMangaer、TaskManager…)をローカルのJVMで動かしました。

Kubernetes上でFlinkを動かす仕組みはいくつかあり、例えば、

などの方法があります。
(Kubernetes Operatorは他二つと少しレイヤーが違い、Kubernetes Operatorの場合は他Standlaone Mode・Native Modeのどちらかと組み合わせて使います)

三つの方法は、実装の「Kubernetesっぽさ」が違い、

  • Standalone Modeでは、各コンポーネント(JobManager、Taskamanager)のPodそれぞれが動き続けます
    • 非KubernetesのクラスタをそのままKubernetesに持ってきた感じ
  • Native Modeでは、Flink Job毎にTask ManagerのPodが立ち上がり、Jobの終了に伴いそのPodも終わります
  • Kubernetes Operatorでは、所望の状態になるように監視・対応をするOperatorが常駐し、必要なコンポーネントを管理します

正直なところ使い分けがわかっていないですが、ドキュメントでは

といった記載があります。

試してみる

準備

minikube使いました(他のKubernetes環境でも(多分)動くはず)。

minikube start --kubernetes-version=v1.23.8

また、Flinkのコマンドを使いますので適当なバージョンをダウンロード・解凍しておきます(1.16を私は使いました)。

Standalone Mode

手順としては、

  1. Kubernetesクラスタの準備
  2. ConfigMap、JobManager・TaskManagerのDeploymentをapplyし、Flinkクラスタを起動
  3. ローカルからFlink Jobをデプロイするための、Port forwardingの設定
  4. Flink Jobのサブミット

を行います。

# minikubeを使う時の注意書きにあるコマンド
minikube ssh 'sudo ip link set docker0 promisc on'

FlinkのWebページにある設定ファイルを保存しapplyします。

kubectl create -f flink-configuration-configmap.yaml
kubectl create -f jobmanager-service.yaml
kubectl create -f jobmanager-session-deployment-non-ha.yaml
kubectl create -f taskmanager-session-deployment.yaml

JobManager・TaskManagerのPodが立ち上がっているはずです。

kubectl get pod
NAME                                 READY   STATUS    RESTARTS   AGE
flink-jobmanager-5c6b6c6b7d-9j2f2    1/1     Running   0          17s
flink-taskmanager-76cb7df9d6-gwst5   1/1     Running   0          14s
flink-taskmanager-76cb7df9d6-j9fjh   1/1     Running   0          14s

kubectl get service
NAME               TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)                      AGE
flink-jobmanager   ClusterIP   10.96.123.162   <none>        6123/TCP,6124/TCP,8081/TCP   22s
kubernetes         ClusterIP   10.96.0.1       <none>        443/TCP                      7d4h

JobManagerへのPort forwardingの設定をします。

kubectl port-forward $(kubectl get pods --selector component=jobmanager -o jsonpath="{.items[*].metadata.name}") 8081:8081

Jobをサブミットします。

../flink-1.16.0/bin/flink run -m localhost:8081 ../flink-1.16.0/examples/streaming/WordCount.jar

WebUI(http://localhost:8081)からJobの実行が確認できるはずです。

また、kubectl logsでTask ManagerのPodを見るとJobのログも確認できます。

# TaskManagerのPodは適当に変えてください
kubectl logs flink-taskmanager-76cb7df9d6-gwst5 | tail
(my,1)
(sins,1)
(remember,1)
(d,4)
2022-11-23 02:51:51,616 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - counter -> Sink: print-sink (1/1)#0 (bf3d928db08d2e4deb9b8c7a7c67a746_90bea66de1c231edf33913ecd54406c1_0_0) switched from RUNNING to FINISHED.
2022-11-23 02:51:51,616 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for counter -> Sink: print-sink (1/1)#0 (bf3d928db08d2e4deb9b8c7a7c67a746_90bea66de1c231edf33913ecd54406c1_0_0).
2022-11-23 02:51:51,617 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task counter -> Sink: print-sink (1/1)#0 bf3d928db08d2e4deb9b8c7a7c67a746_90bea66de1c231edf33913ecd54406c1_0_0.
2022-11-23 02:51:51,653 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=192.000mb (201326587 bytes), taskOffHeapMemory=0 bytes, managedMemory=256.000mb (268435460 bytes), networkMemory=64.000mb (67108865 bytes)}, allocationId: 256d3849b57236cf254b9627be4db473, jobId: a174b8c2c3294e206462bfb03b8db2f5).
2022-11-23 02:51:51,713 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job a174b8c2c3294e206462bfb03b8db2f5 from job leader monitoring.
2022-11-23 02:51:51,714 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job a174b8c2c3294e206462bfb03b8db2f5.

Native Mode

Native ModeでFlinkクラスターを動かすには、

  1. 権限の設定
  2. Flinkクラスタの起動
  3. ローカルからFlink Jobをデプロイするための、Port forwardingの設定
  4. /etc/hostsの設定
  5. Flink Jobのサブミット

を行います。

まずは、default service accountの設定を行います。

kubectl create clusterrolebinding flink-role-binding-default --clusterrole=edit --serviceaccount=default:default

クラスタの起動(kubernetes-session.shはFlinkのパッケージに含まれます)します。

../flink-1.16.0/bin/kubernetes-session.sh -Dkubernetes.cluster-id=my-first-flink-cluster

JobManagerのPodが立ち上がっているはずです。Session Modeとは違いTask Managerは起動していません。

kubectl get pod
NAME                                      READY   STATUS    RESTARTS   AGE
my-first-flink-cluster-595c9d8ff5-pdwb5   1/1     Running   0          20m

kubectl get svc
NAME                          TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
kubernetes                    ClusterIP   10.96.0.1       <none>        443/TCP             7d6h
my-first-flink-cluster        ClusterIP   None            <none>        6123/TCP,6124/TCP   21m
my-first-flink-cluster-rest   ClusterIP   10.100.126.98   <none>        8081/TCP            21m

Port forwardingの設定

kubectl port-forward service/my-first-flink-cluster-rest 8081

/etc/hostsも設定します。なお、設定しないでジョブをサブミットするとCaused by: java.net.UnknownHostException: my-first-flink-cluster-rest.default: Name or service not known のようなエラーがでます。

sudo echo '127.0.0.1 my-first-flink-cluster-rest.default' >> /etc/hosts

ジョブのサブミット

../flink-1.16.0/bin/flink run --target kubernetes-session -Dkubernetes.cluster-id=my-first-flink-cluster ../flink-1.16.0/examples/streaming/WordCount.jar

WebUI(http://localhost:8081)からJobの実行が確認できるはずです。

Standalone Modeと違い、Task ManagerのPodはジョブの始まりに合わせて起動し、終わると終了します。Task ManagerのPodが生きている間であれば、ジョブのログを確認することもできます。
(本番ではfluentdとか使いませう)

# ジョブ開始から、ジョブが終了して少しの間はTask Managerのpodが見えます
kubectl get pod
NAME                                      READY   STATUS    RESTARTS   AGE
my-first-flink-cluster-595c9d8ff5-pdwb5   1/1     Running   0          25m
my-first-flink-cluster-taskmanager-1-1    1/1     Running   0          23s

# ジョブのログの確認
kubectl logs my-first-flink-cluster-taskmanager-1-1 | tail
(orisons,1)
(be,4)
(all,2)
(my,1)
(sins,1)
(remember,1)
(d,4)
2022-11-23 00:22:22,020 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - counter -> Sink: print-sink (1/1)#0 (d62975ecc002eea2b526ddfcf3eaf0ab_90bea66de1c231edf33913ecd54406c1_0_0) switched from RUNNING to FINISHED.
2022-11-23 00:22:22,020 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for counter -> Sink: print-sink (1/1)#0 (d62975ecc002eea2b526ddfcf3eaf0ab_90bea66de1c231edf33913ecd54406c1_0_0).
2022-11-23 00:22:22,020 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task counter -> Sink: print-sink (1/1)#0 d62975ecc002eea2b526ddfcf3eaf0ab_90bea66de1c231edf33913ecd54406c1_0_0.
2022-11-23 00:22:22,033 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=1, taskHeapMemory=384.000mb (402653174 bytes), taskOffHeapMemory=0 bytes, managedMemory=512.000mb (536870920 bytes), networkMemory=128.000mb (134217730 bytes)}, allocationId: ee871e23f964f52ca9cd576fc745359a, jobId: cbad95afe6b0e2c9dd3a459c322d1442).
2022-11-23 00:22:22,034 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job cbad95afe6b0e2c9dd3a459c322d1442 from job leader monitoring.
2022-11-23 00:22:22,035 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor  

Kubernetes Operator

Kubernetes OperatorでFlinkクラスターを動かすには、

  1. Helmチャートのインストール
  2. 実行したjarを含むコンテナイメージの作成
    • (NFSとかObject Storageに置いて実行することも可能だと思います)
  3. FlinkDeployment(Flinkのジョブに対応するCRD)をapply
  4. ローカからアクセスできるようにPort forwardingの設定

を行います。
なお、前述のStandalone Mode・Native Modeと、Kubernetes Operatorはレイヤーが異なる話で、Kubernetes Operatorを使う場合は、Standalone Mode・Native Modeを選択して使います。

まずは、helmチャートをインストールします。

kubectl create -f https://github.com/jetstack/cert-manager/releases/download/v1.8.2/cert-manager.yaml
helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator-<OPERATOR-VERSION>/
helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator

実行したjarを含むコンテナイメージを作成します。

Dockerfileを作成します。WordCount.jarはFlinkのbinary releaseに含まれるサンプルのjarです。

FROM flink:1.16
ENV FLINK_PLUGINS_DIR=/opt/flink/plugins
COPY WordCount.jar $FLINK_PLUGINS_DIR/hadoop-fs/

コンテナイメージをビルドします。

# minikubeで動かす場合は eval $(minikube docker-env)(build前)かminikube image load flink-with-word-count:v.0.0.1(build後)
docker image build -t flink-with-word-count:v.0.0.1 .

FlinkDeployment(Flinkのジョブに対応するCRD)を作成します。

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: basic-example
spec:
  image: flink-with-word-count:v.0.0.1
  imagePullPolicy: Never
  flinkVersion: v1_16
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
  serviceAccount: flink
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
  job:
    jarURI: local:////opt/flink/plugins/hadoop-fs/WordCount.jar
    parallelism: 2
  • jarURIの部分はローカルではなく、imageに指定しているコンテナイメージの中でのパスです。
    • 適切に権限を設定すればS3やHDFSもいけるはず(試していないですが)
  • 設定項目はこちらのドキュメントに説明あります

作成したFlinkDeployment(Flinkのジョブに対応するCRD)ファイルをapplyします。

kubectl apply -f operator-test.yml 

Web UIを確認するためのPort Forwardingします。

kubectl port-forward svc/basic-example-rest 8081

FlinkDeploymentに対応するPod(TaskManagerとJobManager)が起動しています。

kubectl get pods
NAME                                         READY   STATUS    RESTARTS   AGE
basic-example-77d7f6ff6c-pbbxr               1/1     Running   0          22s
basic-example-taskmanager-1-1                1/1     Running   0          14s
flink-kubernetes-operator-6d8fcfc479-cc5jn   2/2     Running   0          7h46m

結果の確認

 kubectl logs basic-example-taskmanager-1-1 | tail
2> (nymph,1)
2> (in,3)
2> (thy,1)
2022-11-23 12:52:05,518 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - counter -> Sink: print-sink (2/2)#0 (89071096c538eb058622b6b1a39c5bde) switched from RUNNING to FINISHED.
2022-11-23 12:52:05,519 INFO  org.apache.flink.runtime.taskmanager.Task                    [] - Freeing task resources for counter -> Sink: print-sink (2/2)#0 (89071096c538eb058622b6b1a39c5bde).
2022-11-23 12:52:05,519 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Un-registering task and sending final execution state FINISHED to JobManager for task counter -> Sink: print-sink (2/2)#0 89071096c538eb058622b6b1a39c5bde.
2022-11-23 12:52:05,540 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=0.5, taskHeapMemory=268.800mb (281857222 bytes), taskOffHeapMemory=0 bytes, managedMemory=317.440mb (332859969 bytes), networkMemory=79.360mb (83214992 bytes)}, allocationId: 7165269bab9e883b6543d555a15cf2cc, jobId: 598fcfb12a26370bf48f13a2028ed09b).
2022-11-23 12:52:05,542 INFO  org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:1, state:ACTIVE, resource profile: ResourceProfile{cpuCores=0.5, taskHeapMemory=268.800mb (281857222 bytes), taskOffHeapMemory=0 bytes, managedMemory=317.440mb (332859969 bytes), networkMemory=79.360mb (83214992 bytes)}, allocationId: 5feefa2316c2034d7be8fc71e07efbd8, jobId: 598fcfb12a26370bf48f13a2028ed09b).
2022-11-23 12:52:05,543 INFO  org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job 598fcfb12a26370bf48f13a2028ed09b from job leader monitoring.
2022-11-23 12:52:05,543 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Close JobManager connection for job 598fcfb12a26370bf48f13a2028ed09b.

なお、デフォルトではnative modeなため、Task ManagerのPodはジョブが終わると消えます(JobManagerのPodは残る)。

Discussion