[FLINK-35116] Bump operator sdk version to 4.8.3
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index d068062..918b407 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -304,7 +304,7 @@
         </tr>
         <tr>
             <td><h5>kubernetes.operator.reconcile.parallelism</h5></td>
-            <td style="word-wrap: break-word;">200</td>
+            <td style="word-wrap: break-word;">50</td>
             <td>Integer</td>
             <td>The maximum number of threads running the reconciliation loop. Use -1 for infinite.</td>
         </tr>
diff --git a/docs/layouts/shortcodes/generated/system_section.html b/docs/layouts/shortcodes/generated/system_section.html
index aa053c2..ec30301 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -100,7 +100,7 @@
         </tr>
         <tr>
             <td><h5>kubernetes.operator.reconcile.parallelism</h5></td>
-            <td style="word-wrap: break-word;">200</td>
+            <td style="word-wrap: break-word;">50</td>
             <td>Integer</td>
             <td>The maximum number of threads running the reconciliation loop. Use -1 for infinite.</td>
         </tr>
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
index 658e25a..faf6024 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
@@ -93,7 +93,11 @@
                     // This field was removed from Kubernetes ObjectMeta v1 in 1.25 as it was unused
                     // for a long time. If set for any reason (very unlikely as it does nothing),
                     // the property will be dropped / ignored by the api server.
-                    if (!fieldPath.endsWith(".metadata.clusterName")) {
+                    if (!fieldPath.endsWith(".metadata.clusterName")
+                            // This claims field was removed in Kubernetes 1.28 as it was mistakenly
+                            // added in the first place. For more context please refer to
+                            // https://github.com/kubernetes/api/commit/8b14183
+                            && !fieldPath.contains(".volumeClaimTemplate.spec.resources.claims")) {
                         err(fieldPath + " has been removed");
                     }
                 } else {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index a5846f5..0ecd7c8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -137,7 +137,7 @@
             overrider.withExecutorService(Executors.newCachedThreadPool());
         } else {
             LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
-            overrider.withExecutorService(Executors.newFixedThreadPool(parallelism));
+            overrider.withConcurrentReconciliationThreads(parallelism);
         }
 
         if (operatorConf.isJosdkMetricsEnabled()) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 9fcdb6a..225bebd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -266,8 +266,7 @@
                 null,
                 conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_LEASE_DURATION),
                 conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_RENEW_DEADLINE),
-                conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_RETRY_PERIOD),
-                null);
+                conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_RETRY_PERIOD));
     }
 
     private static Optional<String> getEnv(String key) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index 679aa57..47165f7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -205,9 +205,9 @@
 
         Class<? extends AbstractFlinkResource<?, ?>> resourceClass;
 
-        if (resourceGvk.kind.equals(FlinkDeployment.class.getSimpleName())) {
+        if (resourceGvk.getKind().equals(FlinkDeployment.class.getSimpleName())) {
             resourceClass = FlinkDeployment.class;
-        } else if (resourceGvk.kind.equals(FlinkSessionJob.class.getSimpleName())) {
+        } else if (resourceGvk.getKind().equals(FlinkSessionJob.class.getSimpleName())) {
             resourceClass = FlinkSessionJob.class;
         } else {
             return Optional.empty();
diff --git a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
index 1b7d232..bd46972 100644
--- a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
+++ b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
@@ -47,10 +47,10 @@
 - io.fabric8:kubernetes-model-scheduling:jar:6.8.1
 - io.fabric8:kubernetes-model-storageclass:jar:6.8.1
 - io.fabric8:zjsonpatch:jar:0.3.0
-- io.javaoperatorsdk:operator-framework-core:jar:4.4.4
-- io.javaoperatorsdk:operator-framework:jar:4.4.4
+- io.javaoperatorsdk:operator-framework-core:jar:4.8.3
+- io.javaoperatorsdk:operator-framework:jar:4.8.3
 - org.apache.commons:commons-compress:1.21
-- org.apache.commons:commons-lang3:3.13.0
+- org.apache.commons:commons-lang3:3.14.0
 - org.apache.commons:commons-math3:3.6.1
 - org.apache.commons:commons-text:jar:1.10.0
 - org.apache.logging.log4j:log4j-1.2-api:2.17.1
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
index 6455096..b60551a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
@@ -70,21 +70,10 @@
 
         var configService = testOperator.getOperator().getConfigurationService();
 
-        // Test parallelism being passed expectedly
+        // Test parallelism being passed
         var executorService = configService.getExecutorService();
         Assertions.assertInstanceOf(ThreadPoolExecutor.class, executorService);
         ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
-        for (int i = 0; i < testParallelism * 2; i++) {
-            threadPoolExecutor.execute(
-                    () -> {
-                        try {
-                            Thread.sleep(1000);
-                        } catch (InterruptedException e) {
-                            e.printStackTrace();
-                        }
-                    });
-        }
-        Assertions.assertEquals(threadPoolExecutor.getPoolSize(), testParallelism);
         Assertions.assertEquals(threadPoolExecutor.getMaximumPoolSize(), testParallelism);
 
         // Test label selector being passed
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 2b2e23a..62e9498 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -51,6 +51,7 @@
 import io.fabric8.mockwebserver.utils.ResponseProvider;
 import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
 import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.IndexedResourceCache;
 import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
 import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
 import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
@@ -497,5 +498,10 @@
         public ExecutorService getWorkflowExecutorService() {
             throw new UnsupportedOperationException("Not implemented");
         }
+
+        @Override
+        public IndexedResourceCache<T> getPrimaryCache() {
+            return null;
+        }
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 8e9683d..c4f24ee 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -1013,7 +1013,7 @@
                         namespace, deploymentName);
         String watchUrl =
                 String.format(
-                        "/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&allowWatchBookmarks=true&watch=true",
+                        "/apis/apps/v1/namespaces/%s/deployments?allowWatchBookmarks=true&fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&watch=true",
                         namespace, deploymentName);
 
         var flinkService = new TestingService(null);
diff --git a/pom.xml b/pom.xml
index 823dd03..a5a698d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
         <maven-javadoc-plugin.version>3.3.2</maven-javadoc-plugin.version>
         <git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version>
 
-        <operator.sdk.version>4.4.4</operator.sdk.version>
+        <operator.sdk.version>4.8.3</operator.sdk.version>
         <operator.sdk.webhook-framework.version>1.1.1</operator.sdk.webhook-framework.version>
 
         <fabric8.version>6.8.1</fabric8.version>