
随着各种IT基础设施的发展与完善,各种管理、编排、计算框架被大量应用在日常的IT运维与开发中。本篇文章涉及的,是其中两位佼佼者:Kubernetes和Flink。Kubernetes(下文简称k8s)作为容器编排、管理的框架,发展到今天已经非常成熟,这也方便了本文的另一位主角Flink与k8s的集成。
截止到本文发布为止,Flink稳定版(1.17)的官方提供了部署在Kubernetes上的方法参考1。但是如果直接使用官方Appendix中提供的文件,则其高可用配置模式的集群无法正常运行。一开始笔者根据官方指导文档尝试搭建一个Session模式的高可用(HA)集群,但是一直提示错误,于是笔者详细阅读了一下整篇帮助文档,可以说官方帮助文档中还是有几个坑的——虽然都能找到对应的说明,但是对于“拿来主义”相当不友好。
本文旨在提供一套拿来即用的k8s配置、部署文件,方便各位快速搭建Session模式的高可用Flink集群。
配置文件
注意:如果你创建的文件名不是按照本文的数字_文件名.yaml的话,在执行kubectl apply -f <文件>命令时,请务必按照本文中数字由小到大的顺序执行。
01_namespace.yaml
此文件用于创建服务的命名空间。
- apiVersion: v1
- kind: Namespace
- metadata:
- name: ns-public-flink
02_rbac.yaml
此文件用于创建服务的基本角色,并赋予角色权限。这里其实涉及到官方文档参考1中的下面这段话:
Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps. See how to configure service accounts for pods for more information.
这段话点明了需要给create/edit/delete权限,但根据笔者实际操作下来,应该还漏掉了一个lock权限,而且官方文档中, 并没有给出权限的配置样例文件,但是在HA的配置文件中,却直接使用了serviceAccountName: flink-service-account的账户配置。
- apiVersion: v1
- kind: ServiceAccount
- metadata:
- name: flink-service-account
- namespace: ns-public-flink
- ---
- kind: Role
- apiVersion: rbac.authorization.k8s.io/v1
- metadata:
- name: create-edit-delete-privileges
- namespace: ns-public-flink
- rules:
- - apiGroups: [""]
- resources: ["endpoints"]
- verbs: ["create", "edit", "delete", "lock"]
- ---
- kind: RoleBinding
- apiVersion: rbac.authorization.k8s.io/v1
- metadata:
- name: bind-role-privileges
- namespace: ns-public-rabbitmq
- subjects:
- - kind: ServiceAccount
- name: flink-service-account
- namespace: ns-public-flink
- roleRef:
- kind: Role
- name: create-edit-delete-privileges
- apiGroup: rbac.authorization.k8s.io
03_configmap.yaml
此节中kubernetes.cluster-id与high-availability.storageDir两个字段与官方文档中不同。
在官方文档中kubernetes.cluster-id为hdfs:///flink/recovery,但由于我们是最小化部署,并没有hdfs文件系统可用,因此本文修改成了本地文件系统,即/tmp/flink/recovery。
此外,官方文档中kubernetes.cluster-id为<cluster-id>,这里应该是需要修改成自己的Flink集群id的,但是官方文档没有明确指出。而且这个字段还有一个限制,即只能由小写字母与-和.字符构成。
注意:复制本配置的时候,特别是log4j的.name配置项目,请务必保证不要以空格结尾,不然会出现Unable to locate plugin type for xxx类型的错误(例如Unable to locate plugin type for RollingFile <--注意最后多了一个空格),错误原因是找的带空格的插件,那么自然会找不到。
- apiVersion: v1
- kind: ConfigMap
- metadata:
- name: flink-config
- namespace: ns-public-flink
- labels:
- app: flink-cluster
- data:
- flink-conf.yaml: |+
- jobmanager.rpc.address: flink-jobmanager
- taskmanager.numberOfTaskSlots: 2
- blob.server.port: 6124
- jobmanager.rpc.port: 6123
- taskmanager.rpc.port: 6122
- queryable-state.proxy.ports: 6125
- jobmanager.memory.process.size: 1600m
- taskmanager.memory.process.size: 1728m
- parallelism.default: 2
- kubernetes.cluster-id: bytehero
- high-availability: kubernetes
- high-availability.storageDir: /tmp/flink/recovery
- restart-strategy: fixed-delay
- restart-strategy.fixed-delay.attempts: 10
- log4j-console.properties: |+
- # This affects logging for both user code and Flink
- rootLogger.level = INFO
- rootLogger.appenderRef.console.ref = ConsoleAppender
- rootLogger.appenderRef.rolling.ref = RollingFileAppender
- # Uncomment this if you want to _only_ change Flink's logging
- #logger.flink.name = org.apache.flink
- #logger.flink.level = INFO
- # The following lines keep the log level of common libraries/connectors on
- # log level INFO. The root logger does not override this. You have to manually
- # change the log levels here.
- logger.akka.name = akka
- logger.akka.level = INFO
- logger.kafka.name= org.apache.kafka
- logger.kafka.level = INFO
- logger.hadoop.name = org.apache.hadoop
- logger.hadoop.level = INFO
- logger.zookeeper.name = org.apache.zookeeper
- logger.zookeeper.level = INFO
- # Log all infos to the console
- appender.console.name = ConsoleAppender
- appender.console.type = CONSOLE
- appender.console.layout.type = PatternLayout
- appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- # Log all infos in the given rolling file
- appender.rolling.name = RollingFileAppender
- appender.rolling.type = RollingFile
- appender.rolling.append = false
- appender.rolling.fileName = ${sys:log.file}
- appender.rolling.filePattern = ${sys:log.file}.%i
- appender.rolling.layout.type = PatternLayout
- appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
- appender.rolling.policies.type = Policies
- appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
- appender.rolling.policies.size.size=100MB
- appender.rolling.strategy.type = DefaultRolloverStrategy
- appender.rolling.strategy.max = 10
- # Suppress the irrelevant (wrong) warnings from the Netty channel handler
- logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
- logger.netty.level = OFF
04_service.yaml
注意,此节的配置中,会暴露31081端口为Flink Web UI的端口,由于Flink Web UI本身不提供鉴权能力,因此实际生产环境中不应暴露此端口,而是应该使用反向代理加某种权限控制手段(例如nginx+basicauth)来代理访问内部的8081端口。
- apiVersion: v1
- kind: Service
- metadata:
- name: flink-jobmanager
- namespace: ns-public-flink
- spec:
- type: ClusterIP
- ports:
- - name: rpc
- port: 6123
- - name: blob-server
- port: 6124
- - name: webui
- port: 8081
- selector:
- app: flink-cluster
- component: jobmanager
- ---
- kind: Service
- apiVersion: v1
- metadata:
- name: flink-webui
- namespace: ns-public-flink
- spec:
- ports:
- - name: http
- port: 8081
- protocol: TCP
- targetPort: 8081
- nodePort: 31081
- selector:
- app: flink-cluster
- component: jobmanager
- type: NodePort
05_jobmanager-session.yaml
Flink集群的JobManager配置,适用的为Session高可用模式。
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-jobmanager
- namespace: ns-public-flink
- spec:
- replicas: 1 # Set the value to greater than 1 to start standby JobManagers
- selector:
- matchLabels:
- app: flink-cluster
- component: jobmanager
- template:
- metadata:
- labels:
- app: flink-cluster
- component: jobmanager
- spec:
- containers:
- - name: jobmanager
- image: apache/flink:1.16.0-scala_2.12
- env:
- - name: POD_IP
- valueFrom:
- fieldRef:
- apiVersion: v1
- fieldPath: status.podIP
- args: ["jobmanager", "$(POD_IP)"]
- ports:
- - containerPort: 6123
- name: rpc
- - containerPort: 6124
- name: blob-server
- - containerPort: 8081
- name: webui
- livenessProbe:
- tcpSocket:
- port: 6123
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/flink/conf
- securityContext:
- runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
- serviceAccountName: flink-service-account
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
06_taskmanager-session.yaml
TaskManager配置文件。
- apiVersion: apps/v1
- kind: Deployment
- metadata:
- name: flink-taskmanager
- namespace: ns-public-flink
- spec:
- replicas: 2
- selector:
- matchLabels:
- app: flink-cluster
- component: taskmanager
- template:
- metadata:
- labels:
- app: flink-cluster
- component: taskmanager
- spec:
- containers:
- - name: taskmanager
- image: apache/flink:1.16.0-scala_2.12
- args: ["taskmanager"]
- ports:
- - containerPort: 6122
- name: rpc
- - containerPort: 6125
- name: query-state
- livenessProbe:
- tcpSocket:
- port: 6122
- initialDelaySeconds: 30
- periodSeconds: 60
- volumeMounts:
- - name: flink-config-volume
- mountPath: /opt/flink/conf/
- volumes:
- - name: flink-config-volume
- configMap:
- name: flink-config
- items:
- - key: flink-conf.yaml
- path: flink-conf.yaml
- - key: log4j-console.properties
- path: log4j-console.properties
如果你的文件名是按照笔者上述数字_文件名.yaml命名的话,那么可以使用如下命令将整个Flink集群部署到k8s:
- ls | sort | xargs -i kubectl apply -f {}
如果命名格式不是上述格式,那么请务必保证命令执行顺序根据数字由小到大执行。
结果
集群部署完毕后,状态如下图:

通过暴露的31081端口,我们可以访问Flink Web UI,可以在此Web页面提交任务:

错误处理
上文中我们建立了一个基于k8s的高可用模式Flink集群, 配置过程中有如下配置:
- kubernetes.cluster-id: bytehero
- high-availability: kubernetes
此时Flink会利用k8s的ConfigMaps存储一些持久化配置, 由于是高可用模式的, 因此job的相关数据就存储在这里面(笔者的测试环境,中间有哈希值形式的是之前建立的非持久化Flink集群创建的ConfigMap):

如果你删除了整个Flink集群,但是没有删除这些配置的话,在重新创建Flink集群时,jobmanager会有如下错误:
- 2023-05-24 04:22:30,753 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Retrieved job ids [00cb29fb2141a94b930dbe8ec8df0c3d, 1a7db87959722c7dfe49f18dadc07946, 3957c74dd3e6ac62989cbc001cda07be, 48e5da2d36e234fa67145da8d9d35d29, 648e72f5be8c820382479f01d5d1f1dc, 97aa9b6af5ba8ae7d44cc16e2646ba15, b9f891b343322417cd4e07392f5b75b9, dd3f75365a881e8dfae1fd58eb44222d] from KubernetesStateHandleStore{configMapName='bytehero-cluster-config-map'}
- 2023-05-24 04:22:30,753 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Trying to recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.
- 2023-05-24 04:22:30,764 INFO org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess.
- 2023-05-24 04:22:30,767 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Stopping DefaultJobGraphStore.
- 2023-05-24 04:22:30,769 INFO org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Starting the resource manager.
- 2023-05-24 04:22:30,768 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error occurred in the cluster entrypoint.
- java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.
- at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) [?:?]
- at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]
- at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) [?:?]
- at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) [?:?]
- at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
- at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
- at java.lang.Thread.run(Unknown Source) [?:?]
- Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:183) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]
- ... 5 more
- Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under jobGraph-00cb29fb2141a94b930dbe8ec8df0c3d. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.
- at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:175) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:174) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]
- ... 5 more
- Caused by: java.io.FileNotFoundException: /tmp/flink/recovery/default/submittedJobGrapheeca97687a07 (No such file or directory)
- at java.io.FileInputStream.open0(Native Method) ~[?:?]
- at java.io.FileInputStream.open(Unknown Source) ~[?:?]
- at java.io.FileInputStream.<init>(Unknown Source) ~[?:?]
- at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:66) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:166) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:174) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]
- at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]
- ... 5 more
- 2023-05-24 04:22:30,797 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
- 2023-05-24 04:22:30,811 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:6124
- 2023-05-24 04:22:31,145 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
- 2023-05-24 04:22:31,147 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
- 2023-05-24 04:22:31,147 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
- 2023-05-24 04:22:31,152 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
- WARNING: An illegal reflective access operation has occurred
- WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_6212fea2-2c9d-45d6-b762-0ebb7f7d60d5.jar) to method java.nio.DirectByteBuffer.cleaner()
- WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil
- WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
- WARNING: All illegal access operations will be denied in a future release
- 2023-05-24 04:22:31,177 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
- 2023-05-24 04:22:31,179 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
重点是这行:
- 2023-05-24 04:22:30,753 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Retrieved job ids [00cb29fb2141a94b930dbe8ec8df0c3d, 1a7db87959722c7dfe49f18dadc07946, 3957c74dd3e6ac62989cbc001cda07be, 48e5da2d36e234fa67145da8d9d35d29, 648e72f5be8c820382479f01d5d1f1dc, 97aa9b6af5ba8ae7d44cc16e2646ba15, b9f891b343322417cd4e07392f5b75b9, dd3f75365a881e8dfae1fd58eb44222d] from KubernetesStateHandleStore{configMapName='bytehero-cluster-config-map'}
configMapName='bytehero-cluster-config-map'说明从bytehero-cluster-config-map中获取了持久化的job id,但是由于我们的集群是新创建的,无法重新恢复上一个集群的job,因此产生错误。
解决方法是删除这个ConfigMap即可:
- kubectl delete configmaps bytehero-config-map
注意删除的ConfigMap名称要和报错中的相同。