
With the development and improvement of various IT infrastructures, various management, orchestration, and computing frameworks are widely used in daily IT operations and development. This article involves two of the best: Kubernetes and Flink. Kubernetes (hereinafter referred to as k8s) as a container orchestration and management framework, has become very mature today, which also facilitates the integration of another protagonist of this article, Flink with k8s.
As of the publication of this article, the official stable version of Flink (1.17) provides a method for deployment on Kubernetes Reference 1. However, if you directly use the files provided in the official Appendix, the cluster in its high-availability configuration mode cannot run normally. At first, the author tried to build a Session mode high-availability (HA) cluster according to the official guide, but it kept prompting errors, so the author read the entire help document in detail, and it can be said that there are still a few pits in the official help document - although all corresponding explanations can be found, it is quite unfriendly to “take it for granted”.
This article aims to provide a set of ready-to-use k8s configuration and deployment files, to facilitate everyone to quickly build a Session mode high-availability Flink cluster.
Configuration file
Note: If the file name you created is not in accordance with the number_filename.yaml in this article, when executing the kubectl apply -f <file> command, please be sure to execute in the order from small to large numbers in this article.
01_namespace.yaml
This file is used to create the namespace of the service.
- apiVersion: v1
- kind: Namespace
- metadata:
- name: ns-public-flink
02_rbac.yaml
This file is used to create the basic role of the service and grant permissions to the role. This actually involves the following paragraph in the official document Reference 1:
Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps. See how to configure service accounts for pods for more information.
This paragraph points out the need to give create/edit/delete permissions, but according to the author’s actual operation, a lock permission should be missed, and in the official document, there is no sample file for permission configuration, but in the HA configuration file, the account configuration of serviceAccountName: flink-service-account is directly used.
- apiVersion: v1
- kind: ServiceAccount
- metadata:
- name: flink-service-account
- namespace: ns-public-flink
- ---
- kind: Role
- apiVersion: rbac.authorization.k8s.io/v1
- metadata:
- name: create-edit-delete-privileges
- namespace: ns-public-flink
- rules:
- - apiGroups: [""]
- resources: ["endpoints"]
- verbs: ["create", "edit", "delete", "lock"]
- ---
- kind: RoleBinding
- apiVersion: rbac.authorization.k8s.io/v1
- metadata:
- name: bind-role-privileges
- namespace: ns-public-rabbitmq
- subjects:
- - kind: ServiceAccount
- name: flink-service-account
- namespace: ns-public-flink
- roleRef:
- kind: Role
- name: create-edit-delete-privileges
- apiGroup: rbac.authorization.k8s.io
03_configmap.yaml
In this section, the fields kubernetes.cluster-id and high-availability.storageDir are different from the official documentation.
In the official document, kubernetes.cluster-id is hdfs:///flink/recovery, but since we are minimizing deployment and there is no hdfs file system available, this article has been modified to the local file system, i.e., /tmp/flink/recovery.
In addition, in the official document, kubernetes.cluster-id is <cluster-id>, here it should be modified to your own Flink cluster id, but the official document does not explicitly point out. And this field has a restriction, that is, it can only be composed of lowercase letters and - and . characters.
Note: When copying this configuration, especially the .name configuration item of log4j, please make sure not to end with a space, otherwise an error of Unable to locate plugin type for xxx will occur (for example, Unable to locate plugin type for RollingFile <–note the extra space at the end), the error reason is that the plugin with a space is found, so naturally it cannot be found.
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: flink-config
- namespace: ns-public-flink
- labels:
- app: flink-cluster
- data:
- flink-conf.yaml: |+
- jobmanager.rpc.address: flink-jobmanager
- taskmanager.numberOfTaskSlots: 2
- blob.server.port: 6124
- jobmanager.rpc.port: 6123
- taskmanager.rpc.port: 6122
- queryable-state.proxy.ports: 6125
- jobmanager.memory.process.size: 1600m
- taskmanager.memory.process.size: 1728m
- parallelism.default: 2
- kubernetes.cluster-id: bytehero
- high-availability: kubernetes
- high-availability.storageDir: /tmp/flink/recovery
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 10
- log4j-console.properties: |+
- # This affects logging for both user code and Flink
- rootLogger.level = INFO
- rootLogger.appenderRef.console.ref = ConsoleAppender
- rootLogger.appenderRef.rolling.ref = RollingFileAppender
- # Uncomment this if you want to _only_ change Flink's logging
- #logger.flink.name = org.apache.flink
- #logger.flink.level = INFO
- # The following lines keep the log level of common libraries/connectors on
- # log level INFO. The root logger does not override this. You have to manually
- # change the log levels here.
- logger.akka.name = akka
- logger.akka.level = INFO
- logger.kafka.name= org.apache.kafka
- logger.kafka.level = INFO
- logger.hadoop.name = org.apache.hadoop
- logger.hadoop.level = INFO
- logger.zookeeper.name = org.apache.zookeeper
- logger.zookeeper.level = INFO
- # Log all infos to the console
- appender.console.name = ConsoleAppender
- appender.console.type = CONSOLE
- appender.console.layout.type = PatternLayout
- appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- # Log all infos in the given rolling file
- appender.rolling.name = RollingFileAppender
- appender.rolling.type = RollingFile
- appender.rolling.append = false
- appender.rolling.fileName = ${sys:log.file}
- appender.rolling.filePattern = ${sys:log.file}.%i
- appender.rolling.layout.type = PatternLayout
- appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- appender.rolling.policies.type = Policies
- appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
- appender.rolling.policies.size.size=100MB
- appender.rolling.strategy.type = DefaultRolloverStrategy
- appender.rolling.strategy.max = 10
- # Suppress the irrelevant (wrong) warnings from the Netty channel handler
- logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
- logger.netty.level = OFF
04_service.yaml
Please note, in this section of the configuration, port 31081 will be exposed as the port for the Flink Web UI. Since the Flink Web UI itself does not provide authentication capabilities, this port should not be exposed in the actual production environment. Instead, a reverse proxy with some form of access control (for example, nginx+basicauth) should be used to proxy access to the internal port 8081.
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager
- namespace: ns-public-flink
- spec:
- type: ClusterIP
- ports:
- - name: rpc
- port: 6123
- - name: blob-server
- port: 6124
- - name: webui
- port: 8081
- selector:
- app: flink-cluster
- component: jobmanager
- ---
- kind: Service
- apiVersion: v1
- metadata:
- name: flink-webui
- namespace: ns-public-flink
- spec:
- ports:
- - name: http
- port: 8081
- protocol: TCP
- targetPort: 8081
- nodePort: 31081
- selector:
- app: flink-cluster
- component: jobmanager
- type: NodePort
05_jobmanager-session.yaml
The JobManager configuration of the Flink cluster, applicable to the Session high-availability mode.
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-jobmanager
- namespace: ns-public-flink
- spec:
- replicas: 1 # Set the value to greater than 1 to start standby JobManagers
- selector:
- matchLabels:
- app: flink-cluster
- component: jobmanager
- template:
- metadata:
- labels:
- app: flink-cluster
- component: jobmanager
- spec:
- containers:
- - name: jobmanager
- image: apache/flink:1.16.0-scala_2.12
- env:
- - name: POD_IP
- valueFrom:
- fieldRef:
- apiVersion: v1
- fieldPath: status.podIP
- args: ["jobmanager", "$(POD_IP)"]
- ports:
- - containerPort: 6123
- name: rpc
- - containerPort: 6124
- name: blob-server
- - containerPort: 8081
- name: webui
- livenessProbe:
- tcpSocket:
- port: 6123
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/flink/conf
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- serviceAccountName: flink-service-account
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
06_taskmanager-session.yaml
TaskManager configuration。
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-taskmanager
- namespace: ns-public-flink
- spec:
- replicas: 2
- selector:
- matchLabels:
- app: flink-cluster
- component: taskmanager
- template:
- metadata:
- labels:
- app: flink-cluster
- component: taskmanager
- spec:
- containers:
- - name: taskmanager
- image: apache/flink:1.16.0-scala_2.12
- args: ["taskmanager"]
- ports:
- - containerPort: 6122
- name: rpc
- - containerPort: 6125
- name: query-state
- livenessProbe:
- tcpSocket:
- port: 6122
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/flink/conf/
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
If your file name is named according to the author’s above number_filename.yaml, then you can use the following command to deploy the entire Flink cluster to k8s:
- ls | sort | xargs -i kubectl apply -f {}
If the naming format is not the above format, then please make sure that the command execution order is executed in order from small to large numbers.
Result
After the cluster is deployed, the status is as follows:

Through the exposed port 31081, we can access the Flink Web UI, and tasks can be submitted on this Web page:

Error handling
In the above, we have established a high-availability mode Flink cluster based on k8s, and there are the following configurations during the configuration process:
- kubernetes.cluster-id: bytehero
- high-availability: kubernetes
At this time, Flink will use k8s’s ConfigMaps to store some persistent configurations. Since it is a high-availability mode, the job-related data is stored here (in the author’s test environment, there are hash value forms in the middle, which are ConfigMaps created by the previously established non-persistent Flink cluster).

If you delete the entire Flink cluster, but do not delete these configurations, when recreating the Flink cluster, the jobmanager will have the following errors:
- 2023-05-24 04:22:30,753 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Retrieved job ids [00cb29fb2141a94b930dbe8ec8df0c3d, 1a7db87959722c7dfe49f18dadc07946, 3957c74dd3e6ac62989cbc001cda07be, 48e5da2d36e234fa67145da8d9d35d29, 648e72f5be8c820382479f01d5d1f1dc, 97aa9b6af5ba8ae7d44cc16e2646ba15, b9f891b343322417cd4e07392f5b75b9, dd3f75365a881e8dfae1fd58eb44222d] from KubernetesStateHandleStore{configMapName='bytehero-cluster-config-map'}
- 2023-05-24 04:22:30,753 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Trying to recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.
- 2023-05-24 04:22:30,764 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess.
- 2023-05-24 04:22:30,767 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore.
- 2023-05-24 04:22:30,769 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Starting the resource manager.
- 2023-05-24 04:22:30,768 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
- java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.
- at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) [?:?]
- at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
- at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) [?:?]
- at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) [?:?]
- at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
- at java.lang.Thread.run(Unknown Source) [?:?]
- Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:183) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]
- ... 5 more
- Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under jobGraph-00cb29fb2141a94b930dbe8ec8df0c3d. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
- at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:175) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:174) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]
- ... 5 more
- Caused by: java.io.FileNotFoundException: /tmp/flink/recovery/default/submittedJobGrapheeca97687a07 (No such file or directory)
- at java.io.FileInputStream.open0(Native Method) ~[?:?]
- at java.io.FileInputStream.open(Unknown Source) ~[?:?]
- at java.io.FileInputStream.<init>(Unknown Source) ~[?:?]
- at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:66) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:166) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:174) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]
- ... 5 more
- 2023-05-24 04:22:30,797 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
- 2023-05-24 04:22:30,811 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
- 2023-05-24 04:22:31,145 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
- 2023-05-24 04:22:31,147 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
- 2023-05-24 04:22:31,147 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
- 2023-05-24 04:22:31,152 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
- WARNING: An illegal reflective access operation has occurred
- WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_6212fea2-2c9d-45d6-b762-0ebb7f7d60d5.jar) to method java.nio.DirectByteBuffer.cleaner()
- WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
- WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
- WARNING: All illegal access operations will be denied in a future release
- 2023-05-24 04:22:31,177 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
- 2023-05-24 04:22:31,179 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
The key point is this line:
- 2023-05-24 04:22:30,753 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Retrieved job ids [00cb29fb2141a94b930dbe8ec8df0c3d, 1a7db87959722c7dfe49f18dadc07946, 3957c74dd3e6ac62989cbc001cda07be, 48e5da2d36e234fa67145da8d9d35d29, 648e72f5be8c820382479f01d5d1f1dc, 97aa9b6af5ba8ae7d44cc16e2646ba15, b9f891b343322417cd4e07392f5b75b9, dd3f75365a881e8dfae1fd58eb44222d] from KubernetesStateHandleStore{configMapName='bytehero-cluster-config-map'}
configMapName=‘bytehero-cluster-config-map’ indicates that the persistent job id was obtained from bytehero-cluster-config-map, but since our cluster is newly created, it is impossible to restore the job of the previous cluster, so an error occurs.
The solution is to delete this ConfigMap:
- kubectl delete configmaps bytehero-config-map
Note that the name of the ConfigMap to be deleted should be the same as the one in the error message.