[FLINK-30406] Detect when jobmanager never started
diff --git a/e2e-tests/test_application_kubernetes_ha.sh b/e2e-tests/test_application_kubernetes_ha.sh
index 3cafd0a..3c1a4d8 100755
--- a/e2e-tests/test_application_kubernetes_ha.sh
+++ b/e2e-tests/test_application_kubernetes_ha.sh
@@ -47,7 +47,7 @@
wait_for_status flinkdep/flink-example-statemachine '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status flinkdep/flink-example-statemachine '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
-check_operator_log_for_errors '|grep -v "REST service in session cluster is bad now"' || exit 1
+check_operator_log_for_errors '|grep -v "REST service in session cluster timed out"' || exit 1
echo "Successfully run the Flink Kubernetes application HA test"
diff --git a/e2e-tests/test_sessionjob_kubernetes_ha.sh b/e2e-tests/test_sessionjob_kubernetes_ha.sh
index b768d43..37c8c37 100755
--- a/e2e-tests/test_sessionjob_kubernetes_ha.sh
+++ b/e2e-tests/test_sessionjob_kubernetes_ha.sh
@@ -48,7 +48,7 @@
wait_for_status $SESSION_CLUSTER_IDENTIFIER '.status.jobManagerDeploymentStatus' READY ${TIMEOUT} || exit 1
wait_for_status $SESSION_JOB_IDENTIFIER '.status.jobStatus.state' RUNNING ${TIMEOUT} || exit 1
-check_operator_log_for_errors '|grep -v "REST service in session cluster is bad now"' || exit 1
+check_operator_log_for_errors '|grep -v "REST service in session cluster timed out"' || exit 1
echo "Successfully run the Flink Session Job HA test"
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
index 41279e1..df4b61c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/SessionObserver.java
@@ -51,7 +51,7 @@
rs.markReconciledSpecAsStable();
}
} catch (Exception e) {
- logger.error("REST service in session cluster is bad now", e);
+ logger.error("REST service in session cluster timed out", e);
if (e instanceof TimeoutException) {
// check for problems with the underlying deployment
observeJmDeployment(deployment, context, observerContext.getDeployedConfig());
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 94d154a..411b58f 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -27,9 +27,12 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
+import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
+import org.apache.flink.kubernetes.operator.api.status.Savepoint;
+import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.autoscaler.JobAutoScaler;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -99,7 +102,7 @@
}
@Override
- public final void reconcile(CR cr, Context<?> ctx) throws Exception {
+ public void reconcile(CR cr, Context<?> ctx) throws Exception {
var spec = cr.getSpec();
var deployConfig = getDeployConfig(cr.getMetadata(), spec, ctx);
var status = cr.getStatus();
@@ -115,12 +118,7 @@
// No further logic is required at this point.
if (reconciliationStatus.isBeforeFirstDeployment()) {
LOG.info("Deploying for the first time");
-
- // Before we try to submit the job we record the current spec in the status so we can
- // handle subsequent deployment and status update errors
- ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
- statusRecorder.patchAndCacheStatus(cr);
-
+ updateStatusBeforeFirstDeployment(cr, spec, deployConfig, status);
deploy(
cr,
spec,
@@ -189,6 +187,36 @@
}
/**
+ * Update the status before the first deployment. We have to record the upgrade mode based on
+ * the initial savepoint path provided, and record the to-be-deployed spec in the status.
+ *
+ * @param cr Related flink resource
+ * @param spec Spec to be deployed
+ * @param deployConfig Deploy configuration
+ * @param status Resource status
+ */
+ private void updateStatusBeforeFirstDeployment(
+ CR cr, SPEC spec, Configuration deployConfig, STATUS status) {
+ if (spec.getJob() != null) {
+ var initialUpgradeMode = UpgradeMode.STATELESS;
+ var initialSp = spec.getJob().getInitialSavepointPath();
+
+ if (initialSp != null) {
+ status.getJobStatus()
+ .getSavepointInfo()
+ .setLastSavepoint(Savepoint.of(initialSp, SavepointTriggerType.UNKNOWN));
+ initialUpgradeMode = UpgradeMode.SAVEPOINT;
+ }
+
+ spec.getJob().setUpgradeMode(initialUpgradeMode);
+ }
+ ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, deployConfig);
+ // Before we try to submit the job we record the current spec in the status so we can
+ // handle subsequent deployment and status update errors
+ statusRecorder.patchAndCacheStatus(cr);
+ }
+
+ /**
* Get Flink configuration object for deploying the given spec using {@link #deploy}.
*
* @param meta ObjectMeta of the related resource.
@@ -260,7 +288,7 @@
CR cr, Context<?> context, Configuration observeConfig) throws Exception;
@Override
- public final DeleteControl cleanup(CR resource, Context<?> context) {
+ public DeleteControl cleanup(CR resource, Context<?> context) {
resourceScaler.cleanup(resource);
return cleanupInternal(resource, context);
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
index 33b2841..5140f14 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.java
@@ -102,7 +102,7 @@
LOG.info("Upgrading/Restarting running job, suspending first...");
}
Optional<UpgradeMode> availableUpgradeMode =
- getAvailableUpgradeMode(resource, deployConfig, observeConfig);
+ getAvailableUpgradeMode(resource, ctx, deployConfig, observeConfig);
if (availableUpgradeMode.isEmpty()) {
return;
}
@@ -122,7 +122,14 @@
ReconciliationUtils.updateStatusForDeployedSpec(resource, deployConfig);
}
}
+
if (currentJobState == JobState.SUSPENDED && desiredJobState == JobState.RUNNING) {
+ // We inherit the upgrade mode unless stateless upgrade requested
+ if (currentDeploySpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+ currentDeploySpec
+ .getJob()
+ .setUpgradeMode(lastReconciledSpec.getJob().getUpgradeMode());
+ }
// We record the target spec into an upgrading state before deploying
ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, deployConfig);
statusRecorder.patchAndCacheStatus(resource);
@@ -141,7 +148,7 @@
}
protected Optional<UpgradeMode> getAvailableUpgradeMode(
- CR resource, Configuration deployConfig, Configuration observeConfig) {
+ CR resource, Context<?> ctx, Configuration deployConfig, Configuration observeConfig) {
var status = resource.getStatus();
var upgradeMode = resource.getSpec().getJob().getUpgradeMode();
@@ -150,7 +157,9 @@
return Optional.of(UpgradeMode.STATELESS);
}
- if (ReconciliationUtils.isJobInTerminalState(status)) {
+ var flinkService = getFlinkService(resource, ctx);
+ if (ReconciliationUtils.isJobInTerminalState(status)
+ && !flinkService.isHaMetadataAvailable(observeConfig)) {
LOG.info(
"Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
return Optional.of(UpgradeMode.SAVEPOINT);
@@ -203,6 +212,7 @@
throws Exception {
var reconciliationStatus = resource.getStatus().getReconciliationStatus();
var rollbackSpec = reconciliationStatus.deserializeLastStableSpec();
+ rollbackSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
UpgradeMode upgradeMode = resource.getSpec().getJob().getUpgradeMode();
@@ -249,6 +259,7 @@
throws Exception {
LOG.info("Resubmitting Flink job...");
SPEC specToRecover = ReconciliationUtils.getDeployedSpec(deployment);
+ specToRecover.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
restoreJob(
deployment,
specToRecover,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
index 54463dc..918ec0e 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.java
@@ -18,6 +18,7 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
@@ -94,11 +95,14 @@
@Override
protected Optional<UpgradeMode> getAvailableUpgradeMode(
- FlinkDeployment deployment, Configuration deployConfig, Configuration observeConfig) {
+ FlinkDeployment deployment,
+ Context<?> ctx,
+ Configuration deployConfig,
+ Configuration observeConfig) {
var status = deployment.getStatus();
var availableUpgradeMode =
- super.getAvailableUpgradeMode(deployment, deployConfig, observeConfig);
+ super.getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
if (availableUpgradeMode.isPresent()) {
return availableUpgradeMode;
@@ -112,20 +116,27 @@
&& !flinkVersionChanged(
ReconciliationUtils.getDeployedSpec(deployment), deployment.getSpec())) {
- if (!flinkService.isHaMetadataAvailable(deployConfig)) {
- if (deployment.getStatus().getReconciliationStatus().getLastStableSpec() == null) {
- // initial deployment failure, reset to allow for spec change to proceed
- return resetOnMissingStableSpec(deployment, deployConfig);
- }
- } else {
+ if (flinkService.isHaMetadataAvailable(deployConfig)) {
LOG.info(
"Job is not running but HA metadata is available for last state restore, ready for upgrade");
return Optional.of(UpgradeMode.LAST_STATE);
}
}
- if (status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING
- || status.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
+ var jmDeployStatus = status.getJobManagerDeploymentStatus();
+ if (jmDeployStatus != JobManagerDeploymentStatus.MISSING
+ && status.getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getUpgradeMode()
+ != UpgradeMode.LAST_STATE
+ && FlinkUtils.jmPodNeverStarted(ctx)) {
+ deleteJmThatNeverStarted(deployment, deployConfig);
+ return getAvailableUpgradeMode(deployment, ctx, deployConfig, observeConfig);
+ }
+
+ if (jmDeployStatus == JobManagerDeploymentStatus.MISSING
+ || jmDeployStatus == JobManagerDeploymentStatus.ERROR) {
throw new RecoveryFailureException(
"JobManager deployment is missing and HA data is not available to make stateful upgrades. "
+ "It is possible that the job has finished or terminally failed, or the configmaps have been deleted. "
@@ -138,21 +149,12 @@
return Optional.empty();
}
- private Optional<UpgradeMode> resetOnMissingStableSpec(
- FlinkDeployment deployment, Configuration deployConfig) {
- // initial deployment failure, reset to allow for spec change to proceed
+ private void deleteJmThatNeverStarted(FlinkDeployment deployment, Configuration deployConfig) {
+ deployment.getStatus().getJobStatus().setState(JobStatus.FAILED.name());
flinkService.deleteClusterDeployment(
deployment.getMetadata(), deployment.getStatus(), false);
flinkService.waitForClusterShutdown(deployConfig);
- if (!flinkService.isHaMetadataAvailable(deployConfig)) {
- LOG.info("Job never entered stable state. Resetting status for initial deploy");
- ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(deployment);
- return Optional.empty();
- } else {
- // proceed with upgrade if deployment succeeded between check and delete
- LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade");
- return Optional.of(UpgradeMode.LAST_STATE);
- }
+ LOG.info("Deleted jobmanager deployment that never started.");
}
@Override
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
index 221d055..572845c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/FlinkUtils.java
@@ -38,7 +38,10 @@
import io.fabric8.kubernetes.api.model.ConfigMapList;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,6 +49,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
/** Flink Utility methods used by the operator. */
public class FlinkUtils {
@@ -221,4 +225,35 @@
return new JobID(
Preconditions.checkNotNull(uid).hashCode(), Preconditions.checkNotNull(generation));
}
+
+ /**
+ * Check if the jobmanager pod has never successfully started. This is an important check to
+ * determine whether it is possible that the job has started and taken any checkpoints that we
+ * are unaware of.
+ *
+ * <p>The way we check this is by using the availability condition transition timestamp. If the
+ * deployment never transitioned out of the unavailable state, we can assume that the JM never
+ * started.
+ *
+ * @param context Resource context
+ * @return True only if we are sure that the jobmanager pod never started
+ */
+ public static boolean jmPodNeverStarted(Context<?> context) {
+ Optional<Deployment> depOpt = context.getSecondaryResource(Deployment.class);
+ if (depOpt.isPresent()) {
+ Deployment deployment = depOpt.get();
+ for (DeploymentCondition condition : deployment.getStatus().getConditions()) {
+ if (condition.getType().equals("Available")) {
+ var createTs = deployment.getMetadata().getCreationTimestamp();
+ if ("False".equals(condition.getStatus())
+ && createTs.equals(condition.getLastTransitionTime())) {
+ return true;
+ }
+ }
+ }
+ }
+
+ // If unsure, return false to be on the safe side
+ return false;
+ }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
index aac61d8..1d99f25 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestUtils.java
@@ -57,6 +57,7 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -94,15 +95,27 @@
}
public static Deployment createDeployment(boolean ready) {
- DeploymentStatus status = new DeploymentStatus();
+ String nowTs = Instant.now().toString();
+ var status = new DeploymentStatus();
status.setAvailableReplicas(ready ? 1 : 0);
status.setReplicas(1);
+ var availableCondition = new DeploymentCondition();
+ availableCondition.setType("Available");
+ availableCondition.setStatus(ready ? "True" : "False");
+ availableCondition.setLastTransitionTime(nowTs);
+ status.setConditions(List.of(availableCondition));
+
DeploymentSpec spec = new DeploymentSpec();
spec.setReplicas(1);
+
+ var meta = new ObjectMeta();
+ meta.setCreationTimestamp(nowTs);
+
Deployment deployment = new Deployment();
- deployment.setMetadata(new ObjectMeta());
+ deployment.setMetadata(meta);
deployment.setSpec(spec);
deployment.setStatus(status);
+
return deployment;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java
new file mode 100644
index 0000000..b624bf3
--- /dev/null
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingApplicationReconciler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator;
+
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
+import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
+import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
+import org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler;
+import org.apache.flink.kubernetes.operator.service.FlinkService;
+import org.apache.flink.kubernetes.operator.utils.EventRecorder;
+import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
+
+/** Testing wrapper for {@link ApplicationReconciler}. */
+public class TestingApplicationReconciler extends ApplicationReconciler {
+ public TestingApplicationReconciler(
+ KubernetesClient kubernetesClient,
+ FlinkService flinkService,
+ FlinkConfigManager configManager,
+ EventRecorder eventRecorder,
+ StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder,
+ KubernetesOperatorMetricGroup operatorMetricGroup) {
+ super(
+ kubernetesClient,
+ flinkService,
+ configManager,
+ eventRecorder,
+ statusRecorder,
+ operatorMetricGroup);
+ }
+
+ @Override
+ public void reconcile(FlinkDeployment flinkDeployment, Context<?> context) throws Exception {
+ var cr = ReconciliationUtils.clone(flinkDeployment);
+ cr.setStatus(flinkDeployment.getStatus());
+ super.reconcile(cr, context);
+ }
+
+ @Override
+ public DeleteControl cleanup(FlinkDeployment flinkDeployment, Context<?> context) {
+ var cr = ReconciliationUtils.clone(flinkDeployment);
+ cr.setStatus(flinkDeployment.getStatus());
+ return super.cleanup(cr, context);
+ }
+}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index 22efa1f..72aa6c8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -96,6 +96,7 @@
private final Set<String> sessions = new HashSet<>();
private boolean isPortReady = true;
private boolean haDataAvailable = true;
+ private boolean jobManagerReady = true;
private boolean deployFailure = false;
private Runnable sessionJobSubmittedCallback;
private PodList podList = new PodList();
@@ -123,7 +124,7 @@
if (jobs.isEmpty() && sessions.isEmpty()) {
return Optional.empty();
}
- return (Optional<T>) Optional.of(TestUtils.createDeployment(true));
+ return (Optional<T>) Optional.of(TestUtils.createDeployment(jobManagerReady));
}
};
}
@@ -194,6 +195,10 @@
this.haDataAvailable = haDataAvailable;
}
+ public void setJobManagerReady(boolean jmReady) {
+ this.jobManagerReady = jmReady;
+ }
+
public void setDeployFailure(boolean deployFailure) {
this.deployFailure = deployFailure;
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
index 99a6ca3..f34d804 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FailedDeploymentRestartTest.java
@@ -96,6 +96,9 @@
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
+
+ // We started without savepoint
+ appCluster.getSpec().getJob().setUpgradeMode(UpgradeMode.STATELESS);
assertEquals(
appCluster.getSpec(),
appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
index 9772ab0..b0b09a3 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentControllerTest.java
@@ -767,8 +767,12 @@
private void testUpgradeNotReadyCluster(FlinkDeployment appCluster) throws Exception {
flinkService.clear();
testController.reconcile(appCluster, context);
+ var specClone = ReconciliationUtils.clone(appCluster.getSpec());
+ if (specClone.getJob() != null) {
+ specClone.getJob().setUpgradeMode(UpgradeMode.STATELESS);
+ }
assertEquals(
- appCluster.getSpec(),
+ specClone,
appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
flinkService.setPortReady(false);
@@ -786,8 +790,15 @@
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
appCluster.getStatus().getJobManagerDeploymentStatus());
+
+ var expectedSpec = ReconciliationUtils.clone(appCluster.getSpec());
+ if (expectedSpec.getJob() != null
+ && expectedSpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
+ expectedSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
+ }
+
assertEquals(
- appCluster.getSpec(),
+ expectedSpec,
appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
flinkService.setPortReady(true);
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
index 07dae7d..a4f6754 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.java
@@ -100,9 +100,6 @@
JobManagerDeploymentStatus.READY,
appCluster.getStatus().getJobManagerDeploymentStatus());
assertEquals("RUNNING", appCluster.getStatus().getJobStatus().getState());
- assertEquals(
- appCluster.getSpec(),
- appCluster.getStatus().getReconciliationStatus().deserializeLastReconciledSpec());
}
private static Stream<Arguments> applicationTestParams() {
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index a4f4acb..813c490 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -27,6 +27,7 @@
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingApplicationReconciler;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -94,6 +95,7 @@
private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
private TestingFlinkService flinkService;
private ApplicationReconciler reconciler;
+
private Context<FlinkDeployment> context;
private StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder;
@@ -107,7 +109,7 @@
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
reconciler =
- new ApplicationReconciler(
+ new TestingApplicationReconciler(
kubernetesClient,
flinkService,
configManager,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
index 0e8549a..675055b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerUpgradeModeTest.java
@@ -22,6 +22,7 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
+import org.apache.flink.kubernetes.operator.TestingApplicationReconciler;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.TestingStatusRecorder;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
@@ -47,10 +48,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.MethodSource;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -75,7 +80,7 @@
flinkService = new TestingFlinkService(kubernetesClient);
context = flinkService.getContext();
reconciler =
- new ApplicationReconciler(
+ new TestingApplicationReconciler(
kubernetesClient,
flinkService,
configManager,
@@ -260,36 +265,213 @@
}
@ParameterizedTest
- @EnumSource(UpgradeMode.class)
- public void testUpgradeBeforeReachingStableSpec(UpgradeMode upgradeMode) throws Exception {
+ @MethodSource("testUpgradeJmDeployCannotStartParams")
+ public void testUpgradeJmDeployCannotStart(UpgradeMode fromMode, UpgradeMode toMode)
+ throws Exception {
+
+ flinkService.setHaDataAvailable(true);
+ flinkService.setJobManagerReady(true);
+
+ // Prepare running deployment
+ var deployment = TestUtils.buildApplicationCluster();
+ var jobSpec = deployment.getSpec().getJob();
+ jobSpec.setUpgradeMode(fromMode);
+
+ reconciler.reconcile(deployment, context);
+ var runningJobs = flinkService.listJobs();
+ verifyAndSetRunningJobsToStatus(deployment, runningJobs);
+
+ // Suspend running deployment and assert that correct upgradeMode is set
+ jobSpec.setState(JobState.SUSPENDED);
+ reconciler.reconcile(deployment, context);
+
+ var lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+ assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+ assertEquals(fromMode, lastReconciledSpec.getJob().getUpgradeMode());
+
+ // Restore deployment and assert that correct upgradeMode is set
+ jobSpec.setState(JobState.RUNNING);
+ jobSpec.setUpgradeMode(toMode);
+ reconciler.reconcile(deployment, context);
+
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+ assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+ assertEquals(
+ toMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : fromMode,
+ lastReconciledSpec.getJob().getUpgradeMode());
+
+ // Simulate JM failure after deployment, we need this to test the actual upgrade behaviour
+ // with a jobmanager that never started
+ flinkService.setJobManagerReady(false);
flinkService.setHaDataAvailable(false);
- final FlinkDeployment deployment = TestUtils.buildApplicationCluster();
+ deployment.getStatus().setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
+
+ // Send in a new upgrade while the jobmanager still not started
+ jobSpec.setState(JobState.RUNNING);
+ jobSpec.setEntryClass("newClass");
+ reconciler.reconcile(deployment, context);
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ // Make sure the upgrade was executed as long as we have the savepoint information
+ if (fromMode == UpgradeMode.LAST_STATE && toMode != UpgradeMode.STATELESS) {
+ // We cant make progress as no HA meta available after LAST_STATE, upgrade. It means the
+ // job started and terminated, but we didn't see...
+ assertEquals(
+ JobManagerDeploymentStatus.DEPLOYING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+ } else {
+ assertEquals(
+ JobManagerDeploymentStatus.MISSING,
+ deployment.getStatus().getJobManagerDeploymentStatus());
+ assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
+ assertEquals(
+ toMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.SAVEPOINT,
+ lastReconciledSpec.getJob().getUpgradeMode());
+
+ // Complete upgrade and recover succesfully with the latest savepoint
+ reconciler.reconcile(deployment, context);
+ lastReconciledSpec =
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec();
+
+ assertEquals(JobState.RUNNING, lastReconciledSpec.getJob().getState());
+ assertEquals(1, flinkService.listJobs().size());
+ if (fromMode == UpgradeMode.STATELESS || toMode == UpgradeMode.STATELESS) {
+ assertNull(flinkService.listJobs().get(0).f0);
+ } else {
+ assertEquals("savepoint_0", flinkService.listJobs().get(0).f0);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("testInitialJmDeployCannotStartParams")
+ public void testInitialJmDeployCannotStart(UpgradeMode upgradeMode, boolean initSavepoint)
+ throws Exception {
+
+ // We simulate JM failure to test the initial submission/upgrade behavior when the JM can
+ // never start initially
+ flinkService.setHaDataAvailable(false);
+ flinkService.setJobManagerReady(false);
+
+ var deployment = TestUtils.buildApplicationCluster();
+ if (initSavepoint) {
+ deployment.getSpec().getJob().setInitialSavepointPath("init-sp");
+ }
reconciler.reconcile(deployment, context);
assertEquals(
JobManagerDeploymentStatus.DEPLOYING,
deployment.getStatus().getJobManagerDeploymentStatus());
- // Ready for spec changes, the reconciliation should be performed
+ var lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ // Make sure savepoint path is recorded in status and upgradeMode set correctly for initial
+ // startup. Either stateless or savepoint depending only on the initialSavepointPath
+ // setting.
+ if (initSavepoint) {
+ assertEquals("init-sp", flinkService.listJobs().get(0).f0);
+ assertEquals(
+ "init-sp",
+ deployment
+ .getStatus()
+ .getJobStatus()
+ .getSavepointInfo()
+ .getLastSavepoint()
+ .getLocation());
+ assertEquals(UpgradeMode.SAVEPOINT, lastReconciledSpec.getJob().getUpgradeMode());
+ } else {
+ assertNull(flinkService.listJobs().get(0).f0);
+ assertNull(deployment.getStatus().getJobStatus().getSavepointInfo().getLastSavepoint());
+ assertEquals(UpgradeMode.STATELESS, lastReconciledSpec.getJob().getUpgradeMode());
+ }
+
+ // JM is failed, but we submit an upgrade, this should always be possible on initial deploy
+ // failure
final String newImage = "new-image-1";
deployment.getSpec().getJob().setUpgradeMode(upgradeMode);
deployment.getSpec().setImage(newImage);
reconciler.reconcile(deployment, context);
- if (!UpgradeMode.STATELESS.equals(upgradeMode)) {
- assertNull(deployment.getStatus().getReconciliationStatus().getLastReconciledSpec());
- assertEquals(
- ReconciliationState.UPGRADING,
- deployment.getStatus().getReconciliationStatus().getState());
- reconciler.reconcile(deployment, context);
- }
assertEquals(
- newImage,
- deployment
- .getStatus()
- .getReconciliationStatus()
- .deserializeLastReconciledSpec()
- .getImage());
+ ReconciliationState.UPGRADING,
+ deployment.getStatus().getReconciliationStatus().getState());
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ // We make sure that stateless upgrade request is respected (drop state)
+ assertEquals(
+ upgradeMode == UpgradeMode.STATELESS
+ ? UpgradeMode.STATELESS
+ : UpgradeMode.SAVEPOINT,
+ lastReconciledSpec.getJob().getUpgradeMode());
+
+ reconciler.reconcile(deployment, context);
+ lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+ assertEquals(newImage, lastReconciledSpec.getImage());
+ assertEquals(
+ upgradeMode == UpgradeMode.STATELESS
+ ? UpgradeMode.STATELESS
+ : UpgradeMode.SAVEPOINT,
+ lastReconciledSpec.getJob().getUpgradeMode());
+ assertEquals(1, flinkService.listJobs().size());
+ assertEquals(
+ initSavepoint && upgradeMode != UpgradeMode.STATELESS ? "init-sp" : null,
+ flinkService.listJobs().get(0).f0);
+ }
+
+ private static Stream<Arguments> testInitialJmDeployCannotStartParams() {
+ return Stream.of(
+ Arguments.of(UpgradeMode.LAST_STATE, true),
+ Arguments.of(UpgradeMode.LAST_STATE, false),
+ Arguments.of(UpgradeMode.SAVEPOINT, true),
+ Arguments.of(UpgradeMode.SAVEPOINT, false),
+ Arguments.of(UpgradeMode.STATELESS, true),
+ Arguments.of(UpgradeMode.STATELESS, false));
+ }
+
+ private static Stream<Arguments> testUpgradeJmDeployCannotStartParams() {
+ var args = new ArrayList<Arguments>();
+ for (UpgradeMode from : UpgradeMode.values()) {
+ for (UpgradeMode to : UpgradeMode.values()) {
+ args.add(Arguments.of(from, to));
+ }
+ }
+ return args.stream();
+ }
+
+ @Test
+ public void testLastStateOnDeletedDeployment() throws Exception {
+ // Bootstrap running deployment
+ var deployment = TestUtils.buildApplicationCluster();
+ deployment.getSpec().getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
+
+ reconciler.reconcile(deployment, context);
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+
+ // Delete cluster and keep HA metadata
+ flinkService.deleteClusterDeployment(
+ deployment.getMetadata(), deployment.getStatus(), false);
+ flinkService.setHaDataAvailable(true);
+
+ // Submit upgrade
+ deployment.getSpec().setRestartNonce(123L);
+ reconciler.reconcile(deployment, context);
+
+ var lastReconciledSpec =
+ deployment.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
+
+ // Make sure we correctly record upgrade mode to last state
+ assertEquals(UpgradeMode.LAST_STATE, lastReconciledSpec.getJob().getUpgradeMode());
+ assertEquals(JobState.SUSPENDED, lastReconciledSpec.getJob().getState());
}
@Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
index ca7d0c3..136e931 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/FlinkUtilsTest.java
@@ -30,16 +30,22 @@
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.apps.Deployment;
+import io.fabric8.kubernetes.api.model.apps.DeploymentCondition;
+import io.fabric8.kubernetes.api.model.apps.DeploymentStatus;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.junit.jupiter.api.Test;
import java.net.HttpURLConnection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -150,6 +156,43 @@
}
@Test
+ public void testJmNeverStartedDetection() {
+ var jmDeployment = new Deployment();
+ jmDeployment.setMetadata(new ObjectMeta());
+ jmDeployment.getMetadata().setCreationTimestamp("create-ts");
+ jmDeployment.setStatus(new DeploymentStatus());
+ var deployStatus = jmDeployment.getStatus();
+ var jmNeverStartedCondition =
+ new DeploymentCondition("create-ts", null, null, null, "False", "Available");
+ var jmStartedButStopped =
+ new DeploymentCondition("other-ts", null, null, null, "False", "Available");
+ var jmAvailable =
+ new DeploymentCondition("other-ts", null, null, null, "True", "Available");
+
+ var context =
+ new TestUtils.TestingContext<Deployment>() {
+ @Override
+ public <R> Optional<R> getSecondaryResource(Class<R> aClass, String name) {
+ return (Optional<R>) Optional.of(jmDeployment);
+ }
+ };
+
+ deployStatus.setConditions(Collections.emptyList());
+ assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+ deployStatus.setConditions(List.of(jmNeverStartedCondition));
+ assertTrue(FlinkUtils.jmPodNeverStarted(context));
+
+ deployStatus.setConditions(List.of(jmStartedButStopped));
+ assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+ deployStatus.setConditions(List.of(jmAvailable));
+ assertFalse(FlinkUtils.jmPodNeverStarted(context));
+
+ assertFalse(FlinkUtils.jmPodNeverStarted(TestUtils.createEmptyContext()));
+ }
+
+ @Test
public void testDeleteJobGraphInKubernetesHAShouldNotUpdateWithEmptyConfigMap() {
final String name = "empty-ha-configmap";
final String clusterId = "cluster-id-2";