[FLINK-34619] Do not wait for scaling completion in UPGRADE state with in-place scaling (#793)
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
index 51203f1..35363b8 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
@@ -101,7 +101,15 @@
return lastReconciledSpec == null;
}
+ /**
+ * This method is only here for backward compatibility reasons. The current version of the
+ * operator does not leave the resources in UPGRADING state during in-place scaling therefore
+ * this method will always return false.
+ *
+ * @return True if in-place scaling is in progress.
+ */
@JsonIgnore
+ @Deprecated
public boolean scalingInProgress() {
if (isBeforeFirstDeployment() || state != ReconciliationState.UPGRADING) {
return false;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
index 0d30292..c8360db 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
@@ -78,9 +78,8 @@
// We must check if the upgrade went through without the status upgrade for some reason
if (reconciliationStatus.scalingInProgress()) {
- if (ctx.getFlinkService().scalingCompleted(ctx)) {
- reconciliationStatus.setState(ReconciliationState.DEPLOYED);
- }
+ // Keep this for backward compatibility
+ reconciliationStatus.setState(ReconciliationState.DEPLOYED);
} else if (checkIfAlreadyUpgraded(ctx)) {
ReconciliationUtils.updateStatusForAlreadyUpgraded(resource);
} else {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 5929bfc..183273b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -39,7 +39,6 @@
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.ValidationException;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -111,7 +110,8 @@
updateStatusBeforeDeploymentAttempt(target, conf, Clock.systemDefaultZone());
}
- private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
+ @VisibleForTesting
+ public static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
AbstractFlinkResource<SPEC, ?> target,
JobState stateAfterReconcile,
Configuration conf,
@@ -181,24 +181,6 @@
}
}
- public static <SPEC extends AbstractFlinkSpec> void updateAfterScaleUp(
- AbstractFlinkResource<SPEC, ?> target,
- Configuration deployConfig,
- Clock clock,
- FlinkService.ScalingResult scalingResult) {
-
- var reconState = target.getStatus().getReconciliationStatus().getState();
- // We mark the spec reconciled, and set state upgrading only if it was already upgrading or
- // we actually triggered a new scale up
- ReconciliationUtils.updateStatusForSpecReconciliation(
- target,
- JobState.RUNNING,
- deployConfig,
- reconState == ReconciliationState.UPGRADING
- || scalingResult == FlinkService.ScalingResult.SCALING_TRIGGERED,
- clock);
- }
-
public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSnapshotTriggerNonce(
SnapshotInfo snapshotInfo,
AbstractFlinkResource<SPEC, ?> target,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 7172e9b..be99fb1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -41,7 +41,6 @@
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -194,6 +193,11 @@
}
private void triggerSpecChangeEvent(CR cr, DiffResult<SPEC> specDiff, KubernetesClient client) {
+ if (DiffType.IGNORE == specDiff.getType()) {
+ // This can happen if an ignore change comes in while we are waiting in upgrading state
+ // for scaling completion
+ return;
+ }
eventRecorder.triggerEventOnce(
cr,
EventRecorder.Type.Normal,
@@ -341,14 +345,13 @@
private boolean scale(FlinkResourceContext<CR> ctx, Configuration deployConfig)
throws Exception {
- var scalingResult = ctx.getFlinkService().scale(ctx, deployConfig);
- if (scalingResult == FlinkService.ScalingResult.CANNOT_SCALE) {
- return false;
+ var scaled = ctx.getFlinkService().scale(ctx, deployConfig);
+
+ if (scaled) {
+ ReconciliationUtils.updateStatusForDeployedSpec(ctx.getResource(), deployConfig, clock);
}
- ReconciliationUtils.updateAfterScaleUp(
- ctx.getResource(), deployConfig, clock, scalingResult);
- return true;
+ return scaled;
}
/**
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index c7e60e5..42ce5eb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -69,7 +69,6 @@
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
@@ -887,16 +886,6 @@
}
}
- @Override
- public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws Exception {
-
- try (var restClient = getClusterClient(conf)) {
- return restClient
- .getJobDetails(jobID)
- .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
- }
- }
-
/** Wait until Deployment is removed, return remaining timeout. */
@VisibleForTesting
protected Duration deleteDeploymentBlocking(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index ac4c41a..3c66cd1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -36,7 +36,6 @@
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.PodList;
@@ -118,25 +117,11 @@
PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
- ScalingResult scale(FlinkResourceContext<?> resourceContext, Configuration deployConfig)
+ boolean scale(FlinkResourceContext<?> resourceContext, Configuration deployConfig)
throws Exception;
- boolean scalingCompleted(FlinkResourceContext<?> resourceContext);
-
Map<String, String> getMetrics(Configuration conf, String jobId, List<String> metricNames)
throws Exception;
RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;
-
- JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws Exception;
-
- /** Result of an in-place scaling operation. */
- enum ScalingResult {
- // Scaling triggered by the operation
- SCALING_TRIGGERED,
- // Job already scaled to target previously
- ALREADY_SCALED,
- // Cannot execute scaling, full upgrade required
- CANNOT_SCALE;
- }
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index bee991a..4216312 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -36,7 +36,6 @@
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -49,7 +48,6 @@
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
@@ -70,7 +68,6 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
@@ -177,25 +174,24 @@
}
@Override
- public ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig)
- throws Exception {
+ public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) throws Exception {
var resource = ctx.getResource();
var observeConfig = ctx.getObserveConfig();
if (!supportsInPlaceScaling(resource, observeConfig)) {
- return ScalingResult.CANNOT_SCALE;
+ return false;
}
var newOverrides = deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
var previousOverrides = observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
LOG.info("No overrides defined before or after. Cannot scale in-place.");
- return ScalingResult.CANNOT_SCALE;
+ return false;
}
try (var client = getClusterClient(observeConfig)) {
var requirements = new HashMap<>(getVertexResources(client, resource));
- var result = ScalingResult.ALREADY_SCALED;
+ var alreadyScaled = true;
for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
requirements.entrySet()) {
@@ -216,18 +212,18 @@
// If the requirements changed we mark this as scaling triggered
if (!parallelism.equals(newParallelism)) {
entry.setValue(new JobVertexResourceRequirements(newParallelism));
- result = ScalingResult.SCALING_TRIGGERED;
+ alreadyScaled = false;
}
} else if (previousOverrides.containsKey(jobId)) {
LOG.info(
"Parallelism override for {} has been removed, falling back to regular upgrade.",
jobId);
- return ScalingResult.CANNOT_SCALE;
+ return false;
} else {
// No overrides for this vertex
}
}
- if (result == ScalingResult.ALREADY_SCALED) {
+ if (alreadyScaled) {
LOG.info("Vertex resources requirements already match target, nothing to do...");
} else {
updateVertexResources(client, resource, requirements);
@@ -239,10 +235,10 @@
"In-place scaling triggered",
ctx.getKubernetesClient());
}
- return result;
+ return true;
} catch (Throwable t) {
LOG.error("Error while rescaling, falling back to regular upgrade", t);
- return ScalingResult.CANNOT_SCALE;
+ return false;
}
}
@@ -309,52 +305,6 @@
return currentRequirements.asJobResourceRequirements().get().getJobVertexParallelisms();
}
- @Override
- public boolean scalingCompleted(FlinkResourceContext<?> ctx) {
- var conf = ctx.getObserveConfig();
- var status = ctx.getResource().getStatus();
- try (var client = ctx.getFlinkService().getClusterClient(conf)) {
- var jobId = JobID.fromHexString(status.getJobStatus().getJobId());
- var jobDetailsInfo = client.getJobDetails(jobId).get();
-
- // Return false on empty jobgraph
- if (jobDetailsInfo.getJobVertexInfos().isEmpty()) {
- return false;
- }
-
- Map<JobVertexID, Integer> currentParallelisms =
- jobDetailsInfo.getJobVertexInfos().stream()
- .collect(
- Collectors.toMap(
- JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID,
- JobDetailsInfo.JobVertexDetailsInfo::getParallelism));
-
- Map<String, String> parallelismOverrides =
- conf.get(PipelineOptions.PARALLELISM_OVERRIDES);
- for (Map.Entry<JobVertexID, Integer> entry : currentParallelisms.entrySet()) {
- String override = parallelismOverrides.get(entry.getKey().toHexString());
- if (override == null) {
- // No override defined for this vertex
- continue;
- }
- Integer overrideParallelism = Integer.valueOf(override);
- if (!overrideParallelism.equals(entry.getValue())) {
- LOG.info(
- "Scaling still in progress for vertex {}, {} -> {}",
- entry.getKey(),
- entry.getValue(),
- overrideParallelism);
- return false;
- }
- }
- LOG.info("All vertexes have successfully scaled");
- status.getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
- return true;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
/**
* Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped.
* This avoids race conditions between JM shutdown and TM shutdown / failure handling.
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 46795bb..513b4e6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -173,14 +173,14 @@
}
@Override
- public ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig) {
+ public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) {
var observeConfig = ctx.getObserveConfig();
var jobSpec = ctx.getResource().getSpec();
var meta = ctx.getResource().getMetadata();
if (observeConfig.get(JobManagerOptions.SCHEDULER_MODE) != SchedulerExecutionMode.REACTIVE
&& jobSpec != null) {
LOG.info("Reactive scaling is not enabled");
- return ScalingResult.CANNOT_SCALE;
+ return false;
}
var clusterId = meta.getName();
@@ -191,7 +191,7 @@
if (deployment == null || deployment.get() == null) {
LOG.warn("TM Deployment ({}) not found", name);
- return ScalingResult.CANNOT_SCALE;
+ return false;
}
var actualReplicas = deployment.get().getSpec().getReplicas();
@@ -204,19 +204,12 @@
actualReplicas,
desiredReplicas);
deployment.scale(desiredReplicas);
- return ScalingResult.SCALING_TRIGGERED;
} else {
LOG.info(
"Not scaling TM replicas: actual({}) == desired({})",
actualReplicas,
desiredReplicas);
- return ScalingResult.ALREADY_SCALED;
}
- }
-
- @Override
- public boolean scalingCompleted(FlinkResourceContext<?> resourceContext) {
- // Currently there is no good way of checking whether reactive scaling has completed or not.
return true;
}
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index fbf07f4..7936e4e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -50,7 +50,6 @@
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
-import org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest;
import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -65,7 +64,6 @@
import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
@@ -148,8 +146,6 @@
@Setter
private Collection<AggregatedMetric> aggregatedMetricsResponse = Collections.emptyList();
- @Setter private boolean scalingCompleted;
-
public TestingFlinkService() {
this(null);
}
@@ -599,7 +595,7 @@
}
@Override
- public ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig) {
+ public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) {
boolean standalone = ctx.getDeploymentMode() == KubernetesDeploymentMode.STANDALONE;
boolean session = ctx.getResource().getSpec().getJob() == null;
boolean reactive =
@@ -612,15 +608,10 @@
.get(
StandaloneKubernetesConfigOptionsInternal
.KUBERNETES_TASKMANAGER_REPLICAS);
- return ScalingResult.SCALING_TRIGGERED;
+ return true;
}
- return ScalingResult.CANNOT_SCALE;
- }
-
- @Override
- public boolean scalingCompleted(FlinkResourceContext<?> resourceContext) {
- return scalingCompleted;
+ return false;
}
public void setMetricValue(String name, String value) {
@@ -632,9 +623,4 @@
Configuration conf, String jobId, List<String> metricNames) {
return metricsValues;
}
-
- @Override
- public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) {
- return NativeFlinkServiceTest.createJobDetailsFor(List.of());
- }
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 5d847cf..03062a8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -35,7 +35,6 @@
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.TestObserverAdapter;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
@@ -867,25 +866,12 @@
conf.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(v1.toHexString(), "2"));
deployment.getSpec().setFlinkConfiguration(conf.toMap());
- // Update status after triggering scale operation
- ReconciliationUtils.updateAfterScaleUp(
- deployment,
- new Configuration(),
- Clock.systemDefaultZone(),
- FlinkService.ScalingResult.SCALING_TRIGGERED);
+ // Assert that we move to deployed when in deprecated scaling UPGRADING state
+ ReconciliationUtils.updateStatusForSpecReconciliation(
+ deployment, JobState.RUNNING, conf, true, Clock.systemDefaultZone());
assertEquals(
ReconciliationState.UPGRADING,
deployment.getStatus().getReconciliationStatus().getState());
-
- // Assert that we remain in upgrading until scaling completes
- flinkService.setScalingCompleted(false);
- observer.observe(deployment, context);
- assertEquals(
- ReconciliationState.UPGRADING,
- deployment.getStatus().getReconciliationStatus().getState());
-
- // Assert that we move to deployed when scaling completes
- flinkService.setScalingCompleted(true);
observer.observe(deployment, context);
assertEquals(
ReconciliationState.DEPLOYED,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index c3059b9..394be6e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -725,7 +725,6 @@
getJobSpec(deployment).setParallelism(100);
reconciler.reconcile(deployment, context);
assertEquals(JobState.SUSPENDED, getReconciledJobState(deployment));
- assertFalse(deployment.getStatus().getReconciliationStatus().scalingInProgress());
}
@Test
@@ -749,7 +748,6 @@
.getFlinkConfiguration()
.put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
- assertFalse(deployment.getStatus().getReconciliationStatus().scalingInProgress());
reconciler.reconcile(deployment, context);
assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
assertEquals(0, flinkService.getDesiredReplicas());
@@ -758,7 +756,6 @@
reconciler.reconcile(deployment, context);
assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
assertEquals(2, flinkService.getDesiredReplicas());
- assertTrue(deployment.getStatus().getReconciliationStatus().scalingInProgress());
getJobSpec(deployment).setParallelism(8);
reconciler.reconcile(deployment, context);
@@ -839,49 +836,21 @@
// Job should not be stopped, we simply call the rescale api
assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
- assertTrue(deployment.getStatus().getReconciliationStatus().scalingInProgress());
+ var reconStatus = deployment.getStatus().getReconciliationStatus();
assertEquals(
v1.toHexString() + ":2",
- deployment
- .getStatus()
- .getReconciliationStatus()
+ reconStatus
.deserializeLastReconciledSpec()
.getFlinkConfiguration()
.get(PipelineOptions.PARALLELISM_OVERRIDES.key()));
+ assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
+ assertFalse(reconStatus.isLastReconciledSpecStable());
- // Reconciler should not do anything while waiting for scaling completion
+ // Reconciler should not do anything after successful scaling
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
- assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
- assertTrue(deployment.getStatus().getReconciliationStatus().scalingInProgress());
- assertEquals(
- v1.toHexString() + ":2",
- deployment
- .getStatus()
- .getReconciliationStatus()
- .deserializeLastReconciledSpec()
- .getFlinkConfiguration()
- .get(PipelineOptions.PARALLELISM_OVERRIDES.key()));
assertEquals(1, rescaleCounter.get());
assertEquals(3, eventCollector.events.size());
-
- var deploymentClone = ReconciliationUtils.clone(deployment);
-
- // Make sure to trigger regular upgrade on other spec changes
- deployment.getSpec().setRestartNonce(5L);
- deployment.getMetadata().setGeneration(3L);
- appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
- assertEquals(JobState.SUSPENDED, getReconciledJobState(deployment));
- assertEquals(1, rescaleCounter.get());
- assertEquals(
- EventRecorder.Reason.SpecChanged.toString(),
- eventCollector.events.get(eventCollector.events.size() - 2).getReason());
-
- // If the job failed while rescaling we fall back to the regular upgrade mechanism
- deployment = deploymentClone;
- getJobStatus(deployment).setState(org.apache.flink.api.common.JobStatus.FAILED.name());
- appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
- assertEquals(JobState.SUSPENDED, getReconciledJobState(deployment));
- assertEquals(1, rescaleCounter.get());
+ assertFalse(reconStatus.isLastReconciledSpecStable());
}
@Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index a51bde1..8a928a2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -1278,12 +1278,7 @@
}
@Override
- public ScalingResult scale(FlinkResourceContext<?> resourceContext, Configuration conf) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public boolean scalingCompleted(FlinkResourceContext<?> resourceContext) {
+ public boolean scale(FlinkResourceContext<?> resourceContext, Configuration conf) {
throw new UnsupportedOperationException();
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 094546c..c32957b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -42,10 +42,8 @@
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
-import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
@@ -267,8 +265,7 @@
v2,
new JobVertexResourceRequirements(
new JobVertexResourceRequirements.Parallelism(2, 2))));
- assertEquals(
- FlinkService.ScalingResult.SCALING_TRIGGERED,
+ assertTrue(
service.scale(
new FlinkDeploymentContext(
flinkDep,
@@ -290,10 +287,8 @@
// Baseline
appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(v1.toHexString(), "4"));
spec.setFlinkConfiguration(appConfig.toMap());
- testScaleConditionDep(
- flinkDep, service, d -> {}, FlinkService.ScalingResult.SCALING_TRIGGERED);
- testScaleConditionLastSpec(
- flinkDep, service, d -> {}, FlinkService.ScalingResult.SCALING_TRIGGERED);
+ testScaleConditionDep(flinkDep, service, d -> {}, true);
+ testScaleConditionLastSpec(flinkDep, service, d -> {}, true);
// Do not scale if config disabled
testScaleConditionDep(
@@ -307,7 +302,7 @@
.JOB_UPGRADE_INPLACE_SCALING_ENABLED
.key(),
"false"),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ false);
// Do not scale without adaptive scheduler deployed
testScaleConditionLastSpec(
@@ -318,45 +313,29 @@
.put(
JobManagerOptions.SCHEDULER.key(),
JobManagerOptions.SchedulerType.Default.name()),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ false);
// Do not scale without adaptive scheduler deployed
testScaleConditionLastSpec(
- flinkDep,
- service,
- ls -> ls.setFlinkVersion(FlinkVersion.v1_17),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ flinkDep, service, ls -> ls.setFlinkVersion(FlinkVersion.v1_17), false);
testScaleConditionLastSpec(
- flinkDep,
- service,
- ls -> ls.setFlinkVersion(FlinkVersion.v1_18),
- FlinkService.ScalingResult.SCALING_TRIGGERED);
+ flinkDep, service, ls -> ls.setFlinkVersion(FlinkVersion.v1_18), true);
// Make sure we only try to rescale non-terminal
testScaleConditionDep(
- flinkDep,
- service,
- d -> d.getStatus().getJobStatus().setState("FAILED"),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ flinkDep, service, d -> d.getStatus().getJobStatus().setState("FAILED"), false);
testScaleConditionDep(
flinkDep,
service,
d -> d.getStatus().getJobStatus().setState("RECONCILING"),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ false);
testScaleConditionDep(
- flinkDep,
- service,
- d -> d.getStatus().getJobStatus().setState("RUNNING"),
- FlinkService.ScalingResult.SCALING_TRIGGERED);
+ flinkDep, service, d -> d.getStatus().getJobStatus().setState("RUNNING"), true);
- testScaleConditionDep(
- flinkDep,
- service,
- d -> d.getSpec().setJob(null),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ testScaleConditionDep(flinkDep, service, d -> d.getSpec().setJob(null), false);
// Do not scale if parallelism overrides were removed from an active vertex
testScaleConditionLastSpec(
@@ -365,7 +344,7 @@
s ->
s.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v2 + ":3"),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ false);
// Scale if parallelism overrides were removed only from a non-active vertex
testScaleConditionLastSpec(
@@ -376,7 +355,7 @@
.put(
PipelineOptions.PARALLELISM_OVERRIDES.key(),
v1 + ":1," + new JobVertexID() + ":5"),
- FlinkService.ScalingResult.SCALING_TRIGGERED);
+ true);
// Do not scale if parallelism overrides were completely removed
var flinkDep2 = ReconciliationUtils.clone(flinkDep);
@@ -390,7 +369,7 @@
s ->
s.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v2 + ":3"),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ false);
// Do not scale if overrides never set
testScaleConditionDep(
@@ -400,7 +379,7 @@
d.getSpec()
.getFlinkConfiguration()
.remove(PipelineOptions.PARALLELISM_OVERRIDES.key()),
- FlinkService.ScalingResult.CANNOT_SCALE);
+ false);
// Do not scale if non active vertices are overridden only
current.set(
@@ -418,7 +397,7 @@
d.getSpec()
.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v2 + ":5"),
- FlinkService.ScalingResult.ALREADY_SCALED);
+ true);
assertNull(updated.get());
// Override v2 (not in graph) + v1 with current parallelism
@@ -431,7 +410,7 @@
.put(
PipelineOptions.PARALLELISM_OVERRIDES.key(),
v2 + ":5," + v1 + ":1"),
- FlinkService.ScalingResult.ALREADY_SCALED);
+ true);
assertNull(updated.get());
// Scale if requirements upper/lower bound doesn't match
@@ -449,26 +428,26 @@
.put(
PipelineOptions.PARALLELISM_OVERRIDES.key(),
v2 + ":5," + v1 + ":1"),
- FlinkService.ScalingResult.SCALING_TRIGGERED);
+ true);
assertEquals(
new JobVertexResourceRequirements.Parallelism(1, 1),
updated.get().get(v1).getParallelism());
// Test error handling
current.set(null);
- testScaleConditionDep(flinkDep, service, d -> {}, FlinkService.ScalingResult.CANNOT_SCALE);
+ testScaleConditionDep(flinkDep, service, d -> {}, false);
}
private void testScaleConditionDep(
FlinkDeployment dep,
NativeFlinkService service,
Consumer<FlinkDeployment> f,
- FlinkService.ScalingResult scalingResult)
+ boolean scaled)
throws Exception {
var depCopy = ReconciliationUtils.clone(dep);
f.accept(depCopy);
assertEquals(
- scalingResult,
+ scaled,
service.scale(
new FlinkDeploymentContext(
depCopy,
@@ -483,7 +462,7 @@
FlinkDeployment dep,
NativeFlinkService service,
Consumer<FlinkDeploymentSpec> f,
- FlinkService.ScalingResult scalingResult)
+ boolean scaled)
throws Exception {
testScaleConditionDep(
dep,
@@ -494,77 +473,7 @@
f.accept(lastReconciledSpec);
reconStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, fd);
},
- scalingResult);
- }
-
- @Test
- public void testScalingCompleted() throws Exception {
- var v1 = new JobVertexID();
- var v2 = new JobVertexID();
-
- var testingClusterClient =
- new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);
- var service = (NativeFlinkService) createFlinkService(testingClusterClient);
-
- var flinkDep = TestUtils.buildApplicationCluster();
- var spec = flinkDep.getSpec();
- spec.setFlinkVersion(FlinkVersion.v1_18);
-
- var appConfig = Configuration.fromMap(spec.getFlinkConfiguration());
- appConfig.set(
- PipelineOptions.PARALLELISM_OVERRIDES,
- Map.of(v1.toHexString(), "4", v2.toHexString(), "1"));
- var reconStatus = flinkDep.getStatus().getReconciliationStatus();
- spec.setFlinkConfiguration(appConfig.toMap());
- reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep);
- var jobStatus = flinkDep.getStatus().getJobStatus();
- jobStatus.setJobId(new JobID().toHexString());
- var ctx =
- new FlinkDeploymentContext(
- flinkDep,
- TestUtils.createEmptyContext(),
- null,
- configManager,
- c -> service);
-
- var currentJobDetails = new AtomicReference<JobDetailsInfo>();
- testingClusterClient.setRequestProcessor(
- (headers, parameters, requestBody) -> {
- if (headers instanceof JobDetailsHeaders) {
- return CompletableFuture.completedFuture(currentJobDetails.get());
- }
- return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
- });
-
- currentJobDetails.set(createJobDetailsFor(List.of()));
- assertFalse(service.scalingCompleted(ctx));
-
- currentJobDetails.set(
- createJobDetailsFor(
- List.of(jobVertexDetailsInfo(v1, 1), jobVertexDetailsInfo(v2, 1))));
- assertFalse(service.scalingCompleted(ctx));
-
- currentJobDetails.set(
- createJobDetailsFor(
- List.of(jobVertexDetailsInfo(v1, 4), jobVertexDetailsInfo(v2, 1))));
- assertTrue(service.scalingCompleted(ctx));
-
- // Make sure we don't wait for non-active vertex
- var v3 = new JobVertexID();
- spec.getFlinkConfiguration()
- .put(
- PipelineOptions.PARALLELISM_OVERRIDES.key(),
- v1 + ":4," + v2 + ":1," + v3 + ":100");
- reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep);
- ctx =
- new FlinkDeploymentContext(
- flinkDep,
- TestUtils.createEmptyContext(),
- null,
- configManager,
- c -> service);
-
- assertTrue(service.scalingCompleted(ctx));
+ scaled);
}
private JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index adb90de..fde2b12 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -150,8 +150,7 @@
TestUtils.createTestMetricGroup(new Configuration()),
null)
.getResourceContext(flinkDeployment, TestUtils.createEmptyContext());
- assertEquals(
- FlinkService.ScalingResult.SCALING_TRIGGERED,
+ assertTrue(
flinkStandaloneService.scale(ctx, ctx.getDeployConfig(flinkDeployment.getSpec())));
assertEquals(
5,
@@ -181,8 +180,7 @@
// Add replicas and verify that the scaling is not honoured as reactive mode not enabled
flinkDeployment.getSpec().getTaskManager().setReplicas(10);
- assertEquals(
- FlinkService.ScalingResult.CANNOT_SCALE,
+ assertFalse(
flinkStandaloneService.scale(ctx, ctx.getDeployConfig(flinkDeployment.getSpec())));
}
@@ -210,8 +208,7 @@
TestUtils.createTestMetricGroup(new Configuration()),
null)
.getResourceContext(flinkDeployment, TestUtils.createEmptyContext());
- assertEquals(
- FlinkService.ScalingResult.SCALING_TRIGGERED,
+ assertTrue(
flinkStandaloneService.scale(ctx, ctx.getDeployConfig(flinkDeployment.getSpec())));
assertEquals(
@@ -228,8 +225,7 @@
// Scale the replica count of the task managers
flinkDeployment.getSpec().getTaskManager().setReplicas(10);
createDeployments(flinkDeployment);
- assertEquals(
- FlinkService.ScalingResult.SCALING_TRIGGERED,
+ assertTrue(
flinkStandaloneService.scale(ctx, ctx.getDeployConfig(flinkDeployment.getSpec())));
assertEquals(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 63751ce..75ab932 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -24,13 +24,10 @@
import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import org.junit.jupiter.api.Test;
-import java.time.Clock;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -69,24 +66,6 @@
}
@Test
- public void testRescheduleDuringScaling() {
- FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
- app.getSpec().getJob().setState(JobState.RUNNING);
- app.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
- var previous = ReconciliationUtils.clone(app);
- ReconciliationUtils.updateAfterScaleUp(
- app,
- new Configuration(),
- Clock.systemDefaultZone(),
- FlinkService.ScalingResult.SCALING_TRIGGERED);
-
- var updateControl =
- ReconciliationUtils.toUpdateControl(operatorConfiguration, app, previous, true);
-
- assertTrue(updateControl.getScheduleDelay().get() > 0);
- }
-
- @Test
public void testRescheduleIfImmediateFlagSet() {
var previous = BaseTestUtils.buildApplicationCluster();
var current = BaseTestUtils.buildApplicationCluster();