[FLINK-35116] Bump operator sdk version to 4.8.3
diff --git a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index d068062..918b407 100644
--- a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -304,7 +304,7 @@
</tr>
<tr>
<td><h5>kubernetes.operator.reconcile.parallelism</h5></td>
- <td style="word-wrap: break-word;">200</td>
+ <td style="word-wrap: break-word;">50</td>
<td>Integer</td>
<td>The maximum number of threads running the reconciliation loop. Use -1 for infinite.</td>
</tr>
diff --git a/docs/layouts/shortcodes/generated/system_section.html b/docs/layouts/shortcodes/generated/system_section.html
index aa053c2..ec30301 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -100,7 +100,7 @@
</tr>
<tr>
<td><h5>kubernetes.operator.reconcile.parallelism</h5></td>
- <td style="word-wrap: break-word;">200</td>
+ <td style="word-wrap: break-word;">50</td>
<td>Integer</td>
<td>The maximum number of threads running the reconciliation loop. Use -1 for infinite.</td>
</tr>
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
index 658e25a..faf6024 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/validation/CrdCompatibilityChecker.java
@@ -93,7 +93,11 @@
// This field was removed from Kubernetes ObjectMeta v1 in 1.25 as it was unused
// for a long time. If set for any reason (very unlikely as it does nothing),
// the property will be dropped / ignored by the api server.
- if (!fieldPath.endsWith(".metadata.clusterName")) {
+ if (!fieldPath.endsWith(".metadata.clusterName")
+ // This claims field was removed in Kubernetes 1.28 as it was mistakenly
+ // added in the first place. For more context please refer to
+ // https://github.com/kubernetes/api/commit/8b14183
+ && !fieldPath.contains(".volumeClaimTemplate.spec.resources.claims")) {
err(fieldPath + " has been removed");
}
} else {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index a5846f5..0ecd7c8 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -137,7 +137,7 @@
overrider.withExecutorService(Executors.newCachedThreadPool());
} else {
LOG.info("Configuring operator with {} reconciliation threads.", parallelism);
- overrider.withExecutorService(Executors.newFixedThreadPool(parallelism));
+ overrider.withConcurrentReconciliationThreads(parallelism);
}
if (operatorConf.isJosdkMetricsEnabled()) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 9fcdb6a..225bebd 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -266,8 +266,7 @@
null,
conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_LEASE_DURATION),
conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_RENEW_DEADLINE),
- conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_RETRY_PERIOD),
- null);
+ conf.get(KubernetesOperatorConfigOptions.OPERATOR_LEADER_ELECTION_RETRY_PERIOD));
}
private static Optional<String> getEnv(String key) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
index 679aa57..47165f7 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/metrics/OperatorJosdkMetrics.java
@@ -205,9 +205,9 @@
Class<? extends AbstractFlinkResource<?, ?>> resourceClass;
- if (resourceGvk.kind.equals(FlinkDeployment.class.getSimpleName())) {
+ if (resourceGvk.getKind().equals(FlinkDeployment.class.getSimpleName())) {
resourceClass = FlinkDeployment.class;
- } else if (resourceGvk.kind.equals(FlinkSessionJob.class.getSimpleName())) {
+ } else if (resourceGvk.getKind().equals(FlinkSessionJob.class.getSimpleName())) {
resourceClass = FlinkSessionJob.class;
} else {
return Optional.empty();
diff --git a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
index 1b7d232..bd46972 100644
--- a/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
+++ b/flink-kubernetes-operator/src/main/resources/META-INF/NOTICE
@@ -47,10 +47,10 @@
- io.fabric8:kubernetes-model-scheduling:jar:6.8.1
- io.fabric8:kubernetes-model-storageclass:jar:6.8.1
- io.fabric8:zjsonpatch:jar:0.3.0
-- io.javaoperatorsdk:operator-framework-core:jar:4.4.4
-- io.javaoperatorsdk:operator-framework:jar:4.4.4
+- io.javaoperatorsdk:operator-framework-core:jar:4.8.3
+- io.javaoperatorsdk:operator-framework:jar:4.8.3
- org.apache.commons:commons-compress:1.21
-- org.apache.commons:commons-lang3:3.13.0
+- org.apache.commons:commons-lang3:3.14.0
- org.apache.commons:commons-math3:3.6.1
- org.apache.commons:commons-text:jar:1.10.0
- org.apache.logging.log4j:log4j-1.2-api:2.17.1
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
index 6455096..b60551a 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/FlinkOperatorTest.java
@@ -70,21 +70,10 @@
var configService = testOperator.getOperator().getConfigurationService();
- // Test parallelism being passed expectedly
+ // Test parallelism being passed
var executorService = configService.getExecutorService();
Assertions.assertInstanceOf(ThreadPoolExecutor.class, executorService);
ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
- for (int i = 0; i < testParallelism * 2; i++) {
- threadPoolExecutor.execute(
- () -> {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- }
- Assertions.assertEquals(threadPoolExecutor.getPoolSize(), testParallelism);
Assertions.assertEquals(threadPoolExecutor.getMaximumPoolSize(), testParallelism);
// Test label selector being passed
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index 2b2e23a..62e9498 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -51,6 +51,7 @@
import io.fabric8.mockwebserver.utils.ResponseProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.IndexedResourceCache;
import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
@@ -497,5 +498,10 @@
public ExecutorService getWorkflowExecutorService() {
throw new UnsupportedOperationException("Not implemented");
}
+
+ @Override
+ public IndexedResourceCache<T> getPrimaryCache() {
+ return null;
+ }
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index 8e9683d..c4f24ee 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -1013,7 +1013,7 @@
namespace, deploymentName);
String watchUrl =
String.format(
- "/apis/apps/v1/namespaces/%s/deployments?fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&allowWatchBookmarks=true&watch=true",
+ "/apis/apps/v1/namespaces/%s/deployments?allowWatchBookmarks=true&fieldSelector=metadata.name%%3D%s&timeoutSeconds=600&watch=true",
namespace, deploymentName);
var flinkService = new TestingService(null);
diff --git a/pom.xml b/pom.xml
index 823dd03..a5a698d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -75,7 +75,7 @@
<maven-javadoc-plugin.version>3.3.2</maven-javadoc-plugin.version>
<git-commit-id-maven-plugin.version>5.0.0</git-commit-id-maven-plugin.version>
- <operator.sdk.version>4.4.4</operator.sdk.version>
+ <operator.sdk.version>4.8.3</operator.sdk.version>
<operator.sdk.webhook-framework.version>1.1.1</operator.sdk.webhook-framework.version>
<fabric8.version>6.8.1</fabric8.version>