Watch & Learn

Debugwar Blog

Step in or Step over, this is a problem ...

Construct flink HA cluster on kubernetes

2023-05-23 @ UTC+0

With the development and improvement of various IT infrastructures, various management, orchestration, and computing frameworks are widely used in daily IT operations and development. This article involves two of the best: Kubernetes and Flink. Kubernetes (hereinafter referred to as k8s) as a container orchestration and management framework, has become very mature today, which also facilitates the integration of another protagonist of this article, Flink with k8s.

As of the publication of this article, the official stable version of Flink (1.17) provides a method for deployment on Kubernetes Reference 1. However, if you directly use the files provided in the official Appendix, the cluster in its high-availability configuration mode cannot run normally. At first, the author tried to build a Session mode high-availability (HA) cluster according to the official guide, but it kept prompting errors, so the author read the entire help document in detail, and it can be said that there are still a few pits in the official help document - although all corresponding explanations can be found, it is quite unfriendly to “take it for granted”.

This article aims to provide a set of ready-to-use k8s configuration and deployment files, to facilitate everyone to quickly build a Session mode high-availability Flink cluster.

Configuration file

Note: If the file name you created is not in accordance with the number_filename.yaml in this article, when executing the kubectl apply -f <file> command, please be sure to execute in the order from small to large numbers in this article.

01_namespace.yaml

This file is used to create the namespace of the service.

  1. apiVersion: v1    
  2. kind: Namespace    
  3. metadata:    
  4.   name: ns-public-flink    

02_rbac.yaml

This file is used to create the basic role of the service and grant permissions to the role. This actually involves the following paragraph in the official document Reference 1:

Moreover, you have to start the JobManager and TaskManager pods with a service account which has the permissions to create, edit, delete ConfigMaps. See how to configure service accounts for pods for more information.

This paragraph points out the need to give create/edit/delete permissions, but according to the author’s actual operation, a lock permission should be missed, and in the official document, there is no sample file for permission configuration, but in the HA configuration file, the account configuration of serviceAccountName: flink-service-account is directly used.

  1. apiVersion: v1    
  2. kind: ServiceAccount    
  3. metadata:    
  4.   name: flink-service-account    
  5.   namespace: ns-public-flink    
  6. ---    
  7. kind: Role    
  8. apiVersion: rbac.authorization.k8s.io/v1    
  9. metadata:    
  10.   name: create-edit-delete-privileges    
  11.   namespace: ns-public-flink    
  12. rules:    
  13.   - apiGroups: [""]    
  14.     resources: ["endpoints"]    
  15.     verbs: ["create""edit""delete""lock"]    
  16. ---    
  17. kind: RoleBinding    
  18. apiVersion: rbac.authorization.k8s.io/v1    
  19. metadata:    
  20.   name: bind-role-privileges    
  21.   namespace: ns-public-rabbitmq    
  22. subjects:    
  23.   - kind: ServiceAccount    
  24.     name: flink-service-account    
  25.     namespace: ns-public-flink    
  26. roleRef:    
  27.   kind: Role    
  28.   name: create-edit-delete-privileges    
  29.   apiGroup: rbac.authorization.k8s.io    

03_configmap.yaml

In this section, the fields kubernetes.cluster-id and high-availability.storageDir are different from the official documentation.

In the official document, kubernetes.cluster-id is hdfs:///flink/recovery, but since we are minimizing deployment and there is no hdfs file system available, this article has been modified to the local file system, i.e., /tmp/flink/recovery.

In addition, in the official document, kubernetes.cluster-id is <cluster-id>, here it should be modified to your own Flink cluster id, but the official document does not explicitly point out. And this field has a restriction, that is, it can only be composed of lowercase letters and - and . characters.

Note: When copying this configuration, especially the .name configuration item of log4j, please make sure not to end with a space, otherwise an error of Unable to locate plugin type for xxx will occur (for example, Unable to locate plugin type for RollingFile <–note the extra space at the end), the error reason is that the plugin with a space is found, so naturally it cannot be found.

  1. apiVersion: v1    
  2. kind: ConfigMap    
  3. metadata:    
  4.   name: flink-config    
  5.   namespace: ns-public-flink    
  6.   labels:    
  7.     app: flink-cluster    
  8. data:    
  9.   flink-conf.yaml: |+    
  10.     jobmanager.rpc.address: flink-jobmanager    
  11.     taskmanager.numberOfTaskSlots: 2    
  12.     blob.server.port: 6124    
  13.     jobmanager.rpc.port: 6123    
  14.     taskmanager.rpc.port: 6122    
  15.     queryable-state.proxy.ports: 6125    
  16.     jobmanager.memory.process.size: 1600m    
  17.     taskmanager.memory.process.size: 1728m    
  18.     parallelism.default: 2        
  19.     kubernetes.cluster-id: bytehero    
  20.     high-availability: kubernetes    
  21.     high-availability.storageDir: /tmp/flink/recovery    
  22.     restart-strategy: fixed-delay    
  23.     restart-strategy.fixed-delay.attempts: 10    
  24.   log4j-console.properties: |+    
  25.     # This affects logging for both user code and Flink    
  26.     rootLogger.level = INFO    
  27.     rootLogger.appenderRef.console.ref = ConsoleAppender    
  28.     rootLogger.appenderRef.rolling.ref = RollingFileAppender    
  29.     
  30.     # Uncomment this if you want to _only_ change Flink's logging    
  31.     #logger.flink.name = org.apache.flink    
  32.     #logger.flink.level = INFO    
  33.     
  34.     # The following lines keep the log level of common libraries/connectors on    
  35.     # log level INFO. The root logger does not override this. You have to manually    
  36.     # change the log levels here.    
  37.     logger.akka.name = akka    
  38.     logger.akka.level = INFO    
  39.     logger.kafka.name= org.apache.kafka    
  40.     logger.kafka.level = INFO    
  41.     logger.hadoop.name = org.apache.hadoop    
  42.     logger.hadoop.level = INFO    
  43.     logger.zookeeper.name = org.apache.zookeeper    
  44.     logger.zookeeper.level = INFO    
  45.     
  46.     # Log all infos to the console    
  47.     appender.console.name = ConsoleAppender    
  48.     appender.console.type = CONSOLE    
  49.     appender.console.layout.type = PatternLayout    
  50.     appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n    
  51.     
  52.     # Log all infos in the given rolling file    
  53.     appender.rolling.name = RollingFileAppender    
  54.     appender.rolling.type = RollingFile    
  55.     appender.rolling.append = false    
  56.     appender.rolling.fileName = ${sys:log.file}    
  57.     appender.rolling.filePattern = ${sys:log.file}.%i    
  58.     appender.rolling.layout.type = PatternLayout    
  59.     appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n    
  60.     appender.rolling.policies.type = Policies    
  61.     appender.rolling.policies.size.type = SizeBasedTriggeringPolicy    
  62.     appender.rolling.policies.size.size=100MB    
  63.     appender.rolling.strategy.type = DefaultRolloverStrategy    
  64.     appender.rolling.strategy.max = 10    
  65.     
  66.     # Suppress the irrelevant (wrong) warnings from the Netty channel handler    
  67.     logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline    
  68.     logger.netty.level = OFF    

04_service.yaml

Please note, in this section of the configuration, port 31081 will be exposed as the port for the Flink Web UI. Since the Flink Web UI itself does not provide authentication capabilities, this port should not be exposed in the actual production environment. Instead, a reverse proxy with some form of access control (for example, nginx+basicauth) should be used to proxy access to the internal port 8081.

  1. apiVersion: v1    
  2. kind: Service    
  3. metadata:    
  4.   name: flink-jobmanager    
  5.   namespace: ns-public-flink    
  6. spec:    
  7.   type: ClusterIP    
  8.   ports:    
  9.   - name: rpc    
  10.     port: 6123    
  11.   - name: blob-server    
  12.     port: 6124    
  13.   - name: webui    
  14.     port: 8081    
  15.   selector:    
  16.     app: flink-cluster    
  17.     component: jobmanager    
  18. ---    
  19. kind: Service    
  20. apiVersion: v1    
  21. metadata:    
  22.   name: flink-webui    
  23.   namespace: ns-public-flink    
  24. spec:    
  25.   ports:    
  26.   - name: http    
  27.     port: 8081    
  28.     protocol: TCP    
  29.     targetPort: 8081    
  30.     nodePort: 31081    
  31.   selector:    
  32.     app: flink-cluster    
  33.     component: jobmanager    
  34.   type: NodePort    

05_jobmanager-session.yaml

The JobManager configuration of the Flink cluster, applicable to the Session high-availability mode.

  1. apiVersion: apps/v1    
  2. kind: Deployment    
  3. metadata:    
  4.   name: flink-jobmanager    
  5.   namespace: ns-public-flink    
  6. spec:    
  7.   replicas: 1 # Set the value to greater than 1 to start standby JobManagers    
  8.   selector:    
  9.     matchLabels:    
  10.       app: flink-cluster    
  11.       component: jobmanager    
  12.   template:    
  13.     metadata:    
  14.       labels:    
  15.         app: flink-cluster    
  16.         component: jobmanager    
  17.     spec:    
  18.       containers:    
  19.       - name: jobmanager    
  20.         image: apache/flink:1.16.0-scala_2.12    
  21.         env:    
  22.         - name: POD_IP    
  23.           valueFrom:    
  24.             fieldRef:    
  25.               apiVersion: v1    
  26.               fieldPath: status.podIP    
  27.         args: ["jobmanager""$(POD_IP)"]    
  28.         ports:    
  29.         - containerPort: 6123    
  30.           name: rpc    
  31.         - containerPort: 6124    
  32.           name: blob-server    
  33.         - containerPort: 8081    
  34.           name: webui    
  35.         livenessProbe:    
  36.           tcpSocket:    
  37.             port: 6123    
  38.           initialDelaySeconds: 30    
  39.           periodSeconds: 60    
  40.         volumeMounts:    
  41.         - name: flink-config-volume    
  42.           mountPath: /opt/flink/conf    
  43.         securityContext:    
  44.           runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary    
  45.       serviceAccountName: flink-service-account    
  46.       volumes:    
  47.       - name: flink-config-volume    
  48.         configMap:    
  49.           name: flink-config    
  50.           items:    
  51.           - key: flink-conf.yaml    
  52.             path: flink-conf.yaml    
  53.           - key: log4j-console.properties    
  54.             path: log4j-console.properties    

06_taskmanager-session.yaml

TaskManager configuration。

  1. apiVersion: apps/v1    
  2. kind: Deployment    
  3. metadata:    
  4.   name: flink-taskmanager    
  5.   namespace: ns-public-flink    
  6. spec:    
  7.   replicas: 2    
  8.   selector:    
  9.     matchLabels:    
  10.       app: flink-cluster    
  11.       component: taskmanager    
  12.   template:    
  13.     metadata:    
  14.       labels:    
  15.         app: flink-cluster    
  16.         component: taskmanager    
  17.     spec:    
  18.       containers:    
  19.       - name: taskmanager    
  20.         image: apache/flink:1.16.0-scala_2.12    
  21.         args: ["taskmanager"]    
  22.         ports:    
  23.         - containerPort: 6122    
  24.           name: rpc    
  25.         - containerPort: 6125    
  26.           name: query-state    
  27.         livenessProbe:    
  28.           tcpSocket:    
  29.             port: 6122    
  30.           initialDelaySeconds: 30    
  31.           periodSeconds: 60    
  32.         volumeMounts:    
  33.         - name: flink-config-volume    
  34.           mountPath: /opt/flink/conf/    
  35.       volumes:    
  36.       - name: flink-config-volume    
  37.         configMap:    
  38.           name: flink-config    
  39.           items:    
  40.           - key: flink-conf.yaml    
  41.             path: flink-conf.yaml    
  42.           - key: log4j-console.properties    
  43.             path: log4j-console.properties    

If your file name is named according to the author’s above number_filename.yaml, then you can use the following command to deploy the entire Flink cluster to k8s:

  1. ls | sort | xargs -i kubectl apply -f {}

If the naming format is not the above format, then please make sure that the command execution order is executed in order from small to large numbers.

Result

After the cluster is deployed, the status is as follows:


Through the exposed port 31081, we can access the Flink Web UI, and tasks can be submitted on this Web page:


Error handling

In the above, we have established a high-availability mode Flink cluster based on k8s, and there are the following configurations during the configuration process:

  1. kubernetes.cluster-id: bytehero
  2. high-availability: kubernetes

At this time, Flink will use k8s’s ConfigMaps to store some persistent configurations. Since it is a high-availability mode, the job-related data is stored here (in the author’s test environment, there are hash value forms in the middle, which are ConfigMaps created by the previously established non-persistent Flink cluster).


If you delete the entire Flink cluster, but do not delete these configurations, when recreating the Flink cluster, the jobmanager will have the following errors:

  1. 2023-05-24 04:22:30,753 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Retrieved job ids [00cb29fb2141a94b930dbe8ec8df0c3d, 1a7db87959722c7dfe49f18dadc07946, 3957c74dd3e6ac62989cbc001cda07be, 48e5da2d36e234fa67145da8d9d35d29, 648e72f5be8c820382479f01d5d1f1dc, 97aa9b6af5ba8ae7d44cc16e2646ba15, b9f891b343322417cd4e07392f5b75b9, dd3f75365a881e8dfae1fd58eb44222d] from KubernetesStateHandleStore{configMapName='bytehero-cluster-config-map'}    
  2. 2023-05-24 04:22:30,753 INFO  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Trying to recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.    
  3. 2023-05-24 04:22:30,764 INFO  org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess [] - Stopping SessionDispatcherLeaderProcess.    
  4. 2023-05-24 04:22:30,767 INFO  org.apache.flink.runtime.jobmanager.DefaultJobGraphStore     [] - Stopping DefaultJobGraphStore.    
  5. 2023-05-24 04:22:30,769 INFO  org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - Starting the resource manager.    
  6. 2023-05-24 04:22:30,768 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error occurred in the cluster entrypoint.    
  7. java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.    
  8.     at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) [?:?]    
  9.     at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) [?:?]    
  10.     at java.util.concurrent.CompletableFuture$UniApply.tryFire(Unknown Source) [?:?]    
  11.     at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) [?:?]    
  12.     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]    
  13.     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]    
  14.     at java.lang.Thread.run(Unknown Source) [?:?]    
  15. Caused by: org.apache.flink.util.FlinkRuntimeException: Could not recover job with job id 00cb29fb2141a94b930dbe8ec8df0c3d.    
  16.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:183) ~[flink-dist-1.16.0.jar:1.16.0]    
  17.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]    
  18.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]    
  19.     at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]    
  20.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]    
  21.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]    
  22.     ... 5 more    
  23. Caused by: org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state handle under jobGraph-00cb29fb2141a94b930dbe8ec8df0c3d. This indicates that the retrieved state handle is broken. Try cleaning the state handle store.    
  24.     at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:175) ~[flink-dist-1.16.0.jar:1.16.0]    
  25.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:174) ~[flink-dist-1.16.0.jar:1.16.0]    
  26.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]    
  27.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]    
  28.     at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]    
  29.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]    
  30.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]    
  31.     ... 5 more    
  32. Caused by: java.io.FileNotFoundException: /tmp/flink/recovery/default/submittedJobGrapheeca97687a07 (No such file or directory)    
  33.     at java.io.FileInputStream.open0(Native Method) ~[?:?]    
  34.     at java.io.FileInputStream.open(Unknown Source) ~[?:?]    
  35.     at java.io.FileInputStream.<init>(Unknown Source) ~[?:?]    
  36.     at org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) ~[flink-dist-1.16.0.jar:1.16.0]    
  37.     at org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) ~[flink-dist-1.16.0.jar:1.16.0]    
  38.     at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:69) ~[flink-dist-1.16.0.jar:1.16.0]    
  39.     at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:66) ~[flink-dist-1.16.0.jar:1.16.0]    
  40.     at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58) ~[flink-dist-1.16.0.jar:1.16.0]    
  41.     at org.apache.flink.runtime.jobmanager.DefaultJobGraphStore.recoverJobGraph(DefaultJobGraphStore.java:166) ~[flink-dist-1.16.0.jar:1.16.0]    
  42.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.tryRecoverJob(SessionDispatcherLeaderProcess.java:174) ~[flink-dist-1.16.0.jar:1.16.0]    
  43.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobs(SessionDispatcherLeaderProcess.java:150) ~[flink-dist-1.16.0.jar:1.16.0]    
  44.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$recoverJobsIfRunning$2(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]    
  45.     at org.apache.flink.runtime.dispatcher.runner.AbstractDispatcherLeaderProcess.supplyUnsynchronizedIfRunning(AbstractDispatcherLeaderProcess.java:198) ~[flink-dist-1.16.0.jar:1.16.0]    
  46.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.recoverJobsIfRunning(SessionDispatcherLeaderProcess.java:139) ~[flink-dist-1.16.0.jar:1.16.0]    
  47.     at org.apache.flink.runtime.dispatcher.runner.SessionDispatcherLeaderProcess.lambda$createDispatcherBasedOnRecoveredJobGraphsAndRecoveredDirtyJobResults$1(SessionDispatcherLeaderProcess.java:129) ~[flink-dist-1.16.0.jar:1.16.0]    
  48.     ... 5 more    
  49. 2023-05-24 04:22:30,797 INFO  org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting StandaloneSessionClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..    
  50. 2023-05-24 04:22:30,811 INFO  org.apache.flink.runtime.blob.BlobServer                     [] - Stopped BLOB server at 0.0.0.0:6124    
  51. 2023-05-24 04:22:31,145 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.    
  52. 2023-05-24 04:22:31,147 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.    
  53. 2023-05-24 04:22:31,147 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Shutting down remote daemon.    
  54. 2023-05-24 04:22:31,152 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remote daemon shut down; proceeding with flushing remote transports.    
  55. WARNING: An illegal reflective access operation has occurred    
  56. WARNING: Illegal reflective access by org.jboss.netty.util.internal.ByteBufferUtil (file:/tmp/flink-rpc-akka_6212fea2-2c9d-45d6-b762-0ebb7f7d60d5.jar) to method java.nio.DirectByteBuffer.cleaner()    
  57. WARNING: Please consider reporting this to the maintainers of org.jboss.netty.util.internal.ByteBufferUtil    
  58. WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations    
  59. WARNING: All illegal access operations will be denied in a future release    
  60. 2023-05-24 04:22:31,177 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.    
  61. 2023-05-24 04:22:31,179 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator        [] - Remoting shut down.    

The key point is this line:

  1. 2023-05-24 04:22:30,753 INFO org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Retrieved job ids [00cb29fb2141a94b930dbe8ec8df0c3d, 1a7db87959722c7dfe49f18dadc07946, 3957c74dd3e6ac62989cbc001cda07be, 48e5da2d36e234fa67145da8d9d35d29, 648e72f5be8c820382479f01d5d1f1dc, 97aa9b6af5ba8ae7d44cc16e2646ba15, b9f891b343322417cd4e07392f5b75b9, dd3f75365a881e8dfae1fd58eb44222d] from KubernetesStateHandleStore{configMapName='bytehero-cluster-config-map'}

configMapName=‘bytehero-cluster-config-map’ indicates that the persistent job id was obtained from bytehero-cluster-config-map, but since our cluster is newly created, it is impossible to restore the job of the previous cluster, so an error occurs.

The solution is to delete this ConfigMap:

  1. kubectl delete configmaps bytehero-config-map

Note that the name of the ConfigMap to be deleted should be the same as the one in the error message.

Reference

  1. Flink 1.17 Kubernetes Deployment Document

Catalog
Configuration file
01_namespace.yaml
02_rbac.yaml
03_configmap.yaml
04_service.yaml
05_jobmanager-session.yaml
06_taskmanager-session.yaml
Result
Error handling
Reference

CopyRight (c) 2020 - 2025 Debugwar.com

Designed by Hacksign