[FLINK-34619] Do not wait for scaling completion in UPGRADE state with in-place scaling (#793)

diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
index 51203f1..35363b8 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/ReconciliationStatus.java
@@ -101,7 +101,15 @@
         return lastReconciledSpec == null;
     }
 
+    /**
+     * This method is only here for backward compatibility reasons. The current version of the
+     * operator does not leave the resources in UPGRADING state during in-place scaling therefore
+     * this method will always return false.
+     *
+     * @return True if in-place scaling is in progress.
+     */
     @JsonIgnore
+    @Deprecated
     public boolean scalingInProgress() {
         if (isBeforeFirstDeployment() || state != ReconciliationState.UPGRADING) {
             return false;
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
index 0d30292..c8360db 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/AbstractFlinkResourceObserver.java
@@ -78,9 +78,8 @@
             // We must check if the upgrade went through without the status upgrade for some reason
 
             if (reconciliationStatus.scalingInProgress()) {
-                if (ctx.getFlinkService().scalingCompleted(ctx)) {
-                    reconciliationStatus.setState(ReconciliationState.DEPLOYED);
-                }
+                // Keep this for backward compatibility
+                reconciliationStatus.setState(ReconciliationState.DEPLOYED);
             } else if (checkIfAlreadyUpgraded(ctx)) {
                 ReconciliationUtils.updateStatusForAlreadyUpgraded(resource);
             } else {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 5929bfc..183273b 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -39,7 +39,6 @@
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
 import org.apache.flink.kubernetes.operator.exception.ValidationException;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
@@ -111,7 +110,8 @@
         updateStatusBeforeDeploymentAttempt(target, conf, Clock.systemDefaultZone());
     }
 
-    private static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
+    @VisibleForTesting
+    public static <SPEC extends AbstractFlinkSpec> void updateStatusForSpecReconciliation(
             AbstractFlinkResource<SPEC, ?> target,
             JobState stateAfterReconcile,
             Configuration conf,
@@ -181,24 +181,6 @@
         }
     }
 
-    public static <SPEC extends AbstractFlinkSpec> void updateAfterScaleUp(
-            AbstractFlinkResource<SPEC, ?> target,
-            Configuration deployConfig,
-            Clock clock,
-            FlinkService.ScalingResult scalingResult) {
-
-        var reconState = target.getStatus().getReconciliationStatus().getState();
-        // We mark the spec reconciled, and set state upgrading only if it was already upgrading or
-        // we actually triggered a new scale up
-        ReconciliationUtils.updateStatusForSpecReconciliation(
-                target,
-                JobState.RUNNING,
-                deployConfig,
-                reconState == ReconciliationState.UPGRADING
-                        || scalingResult == FlinkService.ScalingResult.SCALING_TRIGGERED,
-                clock);
-    }
-
     public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledSnapshotTriggerNonce(
             SnapshotInfo snapshotInfo,
             AbstractFlinkResource<SPEC, ?> target,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 7172e9b..be99fb1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -41,7 +41,6 @@
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
 import org.apache.flink.kubernetes.operator.reconciler.diff.DiffResult;
 import org.apache.flink.kubernetes.operator.reconciler.diff.ReflectiveDiffBuilder;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
@@ -194,6 +193,11 @@
     }
 
     private void triggerSpecChangeEvent(CR cr, DiffResult<SPEC> specDiff, KubernetesClient client) {
+        if (DiffType.IGNORE == specDiff.getType()) {
+            // This can happen if an ignore change comes in while we are waiting in upgrading state
+            // for scaling completion
+            return;
+        }
         eventRecorder.triggerEventOnce(
                 cr,
                 EventRecorder.Type.Normal,
@@ -341,14 +345,13 @@
     private boolean scale(FlinkResourceContext<CR> ctx, Configuration deployConfig)
             throws Exception {
 
-        var scalingResult = ctx.getFlinkService().scale(ctx, deployConfig);
-        if (scalingResult == FlinkService.ScalingResult.CANNOT_SCALE) {
-            return false;
+        var scaled = ctx.getFlinkService().scale(ctx, deployConfig);
+
+        if (scaled) {
+            ReconciliationUtils.updateStatusForDeployedSpec(ctx.getResource(), deployConfig, clock);
         }
 
-        ReconciliationUtils.updateAfterScaleUp(
-                ctx.getResource(), deployConfig, clock, scalingResult);
-        return true;
+        return scaled;
     }
 
     /**
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
index c7e60e5..42ce5eb 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java
@@ -69,7 +69,6 @@
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatusMessageParameters;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
 import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
@@ -887,16 +886,6 @@
         }
     }
 
-    @Override
-    public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws Exception {
-
-        try (var restClient = getClusterClient(conf)) {
-            return restClient
-                    .getJobDetails(jobID)
-                    .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
-        }
-    }
-
     /** Wait until Deployment is removed, return remaining timeout. */
     @VisibleForTesting
     protected Duration deleteDeploymentBlocking(
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
index ac4c41a..3c66cd1 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java
@@ -36,7 +36,6 @@
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.jobmaster.JobResult;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 
 import io.fabric8.kubernetes.api.model.ObjectMeta;
 import io.fabric8.kubernetes.api.model.PodList;
@@ -118,25 +117,11 @@
 
     PodList getJmPodList(FlinkDeployment deployment, Configuration conf);
 
-    ScalingResult scale(FlinkResourceContext<?> resourceContext, Configuration deployConfig)
+    boolean scale(FlinkResourceContext<?> resourceContext, Configuration deployConfig)
             throws Exception;
 
-    boolean scalingCompleted(FlinkResourceContext<?> resourceContext);
-
     Map<String, String> getMetrics(Configuration conf, String jobId, List<String> metricNames)
             throws Exception;
 
     RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;
-
-    JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) throws Exception;
-
-    /** Result of an in-place scaling operation. */
-    enum ScalingResult {
-        // Scaling triggered by the operation
-        SCALING_TRIGGERED,
-        // Job already scaled to target previously
-        ALREADY_SCALED,
-        // Cannot execute scaling, full upgrade required
-        CANNOT_SCALE;
-    }
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
index bee991a..4216312 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/NativeFlinkService.java
@@ -36,7 +36,6 @@
 import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
 import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
 import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
-import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
 import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -49,7 +48,6 @@
 import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
 import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobResourcesRequirementsUpdateHeaders;
@@ -70,7 +68,6 @@
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder.FLINK_VERSION;
 
@@ -177,25 +174,24 @@
     }
 
     @Override
-    public ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig)
-            throws Exception {
+    public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) throws Exception {
         var resource = ctx.getResource();
         var observeConfig = ctx.getObserveConfig();
 
         if (!supportsInPlaceScaling(resource, observeConfig)) {
-            return ScalingResult.CANNOT_SCALE;
+            return false;
         }
 
         var newOverrides = deployConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
         var previousOverrides = observeConfig.get(PipelineOptions.PARALLELISM_OVERRIDES);
         if (newOverrides.isEmpty() && previousOverrides.isEmpty()) {
             LOG.info("No overrides defined before or after. Cannot scale in-place.");
-            return ScalingResult.CANNOT_SCALE;
+            return false;
         }
 
         try (var client = getClusterClient(observeConfig)) {
             var requirements = new HashMap<>(getVertexResources(client, resource));
-            var result = ScalingResult.ALREADY_SCALED;
+            var alreadyScaled = true;
 
             for (Map.Entry<JobVertexID, JobVertexResourceRequirements> entry :
                     requirements.entrySet()) {
@@ -216,18 +212,18 @@
                     // If the requirements changed we mark this as scaling triggered
                     if (!parallelism.equals(newParallelism)) {
                         entry.setValue(new JobVertexResourceRequirements(newParallelism));
-                        result = ScalingResult.SCALING_TRIGGERED;
+                        alreadyScaled = false;
                     }
                 } else if (previousOverrides.containsKey(jobId)) {
                     LOG.info(
                             "Parallelism override for {} has been removed, falling back to regular upgrade.",
                             jobId);
-                    return ScalingResult.CANNOT_SCALE;
+                    return false;
                 } else {
                     // No overrides for this vertex
                 }
             }
-            if (result == ScalingResult.ALREADY_SCALED) {
+            if (alreadyScaled) {
                 LOG.info("Vertex resources requirements already match target, nothing to do...");
             } else {
                 updateVertexResources(client, resource, requirements);
@@ -239,10 +235,10 @@
                         "In-place scaling triggered",
                         ctx.getKubernetesClient());
             }
-            return result;
+            return true;
         } catch (Throwable t) {
             LOG.error("Error while rescaling, falling back to regular upgrade", t);
-            return ScalingResult.CANNOT_SCALE;
+            return false;
         }
     }
 
@@ -309,52 +305,6 @@
         return currentRequirements.asJobResourceRequirements().get().getJobVertexParallelisms();
     }
 
-    @Override
-    public boolean scalingCompleted(FlinkResourceContext<?> ctx) {
-        var conf = ctx.getObserveConfig();
-        var status = ctx.getResource().getStatus();
-        try (var client = ctx.getFlinkService().getClusterClient(conf)) {
-            var jobId = JobID.fromHexString(status.getJobStatus().getJobId());
-            var jobDetailsInfo = client.getJobDetails(jobId).get();
-
-            // Return false on empty jobgraph
-            if (jobDetailsInfo.getJobVertexInfos().isEmpty()) {
-                return false;
-            }
-
-            Map<JobVertexID, Integer> currentParallelisms =
-                    jobDetailsInfo.getJobVertexInfos().stream()
-                            .collect(
-                                    Collectors.toMap(
-                                            JobDetailsInfo.JobVertexDetailsInfo::getJobVertexID,
-                                            JobDetailsInfo.JobVertexDetailsInfo::getParallelism));
-
-            Map<String, String> parallelismOverrides =
-                    conf.get(PipelineOptions.PARALLELISM_OVERRIDES);
-            for (Map.Entry<JobVertexID, Integer> entry : currentParallelisms.entrySet()) {
-                String override = parallelismOverrides.get(entry.getKey().toHexString());
-                if (override == null) {
-                    // No override defined for this vertex
-                    continue;
-                }
-                Integer overrideParallelism = Integer.valueOf(override);
-                if (!overrideParallelism.equals(entry.getValue())) {
-                    LOG.info(
-                            "Scaling still in progress for vertex {}, {} -> {}",
-                            entry.getKey(),
-                            entry.getValue(),
-                            overrideParallelism);
-                    return false;
-                }
-            }
-            LOG.info("All vertexes have successfully scaled");
-            status.getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
-            return true;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     /**
      * Scale JM deployment to zero to gracefully stop all JM instances before any TMs are stopped.
      * This avoids race conditions between JM shutdown and TM shutdown / failure handling.
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
index 46795bb..513b4e6 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkService.java
@@ -173,14 +173,14 @@
     }
 
     @Override
-    public ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig) {
+    public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) {
         var observeConfig = ctx.getObserveConfig();
         var jobSpec = ctx.getResource().getSpec();
         var meta = ctx.getResource().getMetadata();
         if (observeConfig.get(JobManagerOptions.SCHEDULER_MODE) != SchedulerExecutionMode.REACTIVE
                 && jobSpec != null) {
             LOG.info("Reactive scaling is not enabled");
-            return ScalingResult.CANNOT_SCALE;
+            return false;
         }
 
         var clusterId = meta.getName();
@@ -191,7 +191,7 @@
 
         if (deployment == null || deployment.get() == null) {
             LOG.warn("TM Deployment ({}) not found", name);
-            return ScalingResult.CANNOT_SCALE;
+            return false;
         }
 
         var actualReplicas = deployment.get().getSpec().getReplicas();
@@ -204,19 +204,12 @@
                     actualReplicas,
                     desiredReplicas);
             deployment.scale(desiredReplicas);
-            return ScalingResult.SCALING_TRIGGERED;
         } else {
             LOG.info(
                     "Not scaling TM replicas: actual({}) == desired({})",
                     actualReplicas,
                     desiredReplicas);
-            return ScalingResult.ALREADY_SCALED;
         }
-    }
-
-    @Override
-    public boolean scalingCompleted(FlinkResourceContext<?> resourceContext) {
-        // Currently there is no good way of checking whether reactive scaling has completed or not.
         return true;
     }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
index fbf07f4..7936e4e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java
@@ -50,7 +50,6 @@
 import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
 import org.apache.flink.kubernetes.operator.service.AbstractFlinkService;
 import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
-import org.apache.flink.kubernetes.operator.service.NativeFlinkServiceTest;
 import org.apache.flink.kubernetes.operator.standalone.StandaloneKubernetesConfigOptionsInternal;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -65,7 +64,6 @@
 import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
@@ -148,8 +146,6 @@
     @Setter
     private Collection<AggregatedMetric> aggregatedMetricsResponse = Collections.emptyList();
 
-    @Setter private boolean scalingCompleted;
-
     public TestingFlinkService() {
         this(null);
     }
@@ -599,7 +595,7 @@
     }
 
     @Override
-    public ScalingResult scale(FlinkResourceContext<?> ctx, Configuration deployConfig) {
+    public boolean scale(FlinkResourceContext<?> ctx, Configuration deployConfig) {
         boolean standalone = ctx.getDeploymentMode() == KubernetesDeploymentMode.STANDALONE;
         boolean session = ctx.getResource().getSpec().getJob() == null;
         boolean reactive =
@@ -612,15 +608,10 @@
                             .get(
                                     StandaloneKubernetesConfigOptionsInternal
                                             .KUBERNETES_TASKMANAGER_REPLICAS);
-            return ScalingResult.SCALING_TRIGGERED;
+            return true;
         }
 
-        return ScalingResult.CANNOT_SCALE;
-    }
-
-    @Override
-    public boolean scalingCompleted(FlinkResourceContext<?> resourceContext) {
-        return scalingCompleted;
+        return false;
     }
 
     public void setMetricValue(String name, String value) {
@@ -632,9 +623,4 @@
             Configuration conf, String jobId, List<String> metricNames) {
         return metricsValues;
     }
-
-    @Override
-    public JobDetailsInfo getJobDetailsInfo(JobID jobID, Configuration conf) {
-        return NativeFlinkServiceTest.createJobDetailsFor(List.of());
-    }
 }
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
index 5d847cf..03062a8 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.java
@@ -35,7 +35,6 @@
 import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
 import org.apache.flink.kubernetes.operator.observer.TestObserverAdapter;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
 import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
 import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
@@ -867,25 +866,12 @@
         conf.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(v1.toHexString(), "2"));
         deployment.getSpec().setFlinkConfiguration(conf.toMap());
 
-        // Update status after triggering scale operation
-        ReconciliationUtils.updateAfterScaleUp(
-                deployment,
-                new Configuration(),
-                Clock.systemDefaultZone(),
-                FlinkService.ScalingResult.SCALING_TRIGGERED);
+        // Assert that we move to deployed when in deprecated scaling UPGRADING state
+        ReconciliationUtils.updateStatusForSpecReconciliation(
+                deployment, JobState.RUNNING, conf, true, Clock.systemDefaultZone());
         assertEquals(
                 ReconciliationState.UPGRADING,
                 deployment.getStatus().getReconciliationStatus().getState());
-
-        // Assert that we remain in upgrading until scaling completes
-        flinkService.setScalingCompleted(false);
-        observer.observe(deployment, context);
-        assertEquals(
-                ReconciliationState.UPGRADING,
-                deployment.getStatus().getReconciliationStatus().getState());
-
-        // Assert that we move to deployed when scaling completes
-        flinkService.setScalingCompleted(true);
         observer.observe(deployment, context);
         assertEquals(
                 ReconciliationState.DEPLOYED,
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index c3059b9..394be6e 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -725,7 +725,6 @@
         getJobSpec(deployment).setParallelism(100);
         reconciler.reconcile(deployment, context);
         assertEquals(JobState.SUSPENDED, getReconciledJobState(deployment));
-        assertFalse(deployment.getStatus().getReconciliationStatus().scalingInProgress());
     }
 
     @Test
@@ -749,7 +748,6 @@
                 .getFlinkConfiguration()
                 .put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
 
-        assertFalse(deployment.getStatus().getReconciliationStatus().scalingInProgress());
         reconciler.reconcile(deployment, context);
         assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
         assertEquals(0, flinkService.getDesiredReplicas());
@@ -758,7 +756,6 @@
         reconciler.reconcile(deployment, context);
         assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
         assertEquals(2, flinkService.getDesiredReplicas());
-        assertTrue(deployment.getStatus().getReconciliationStatus().scalingInProgress());
 
         getJobSpec(deployment).setParallelism(8);
         reconciler.reconcile(deployment, context);
@@ -839,49 +836,21 @@
 
         // Job should not be stopped, we simply call the rescale api
         assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
-        assertTrue(deployment.getStatus().getReconciliationStatus().scalingInProgress());
+        var reconStatus = deployment.getStatus().getReconciliationStatus();
         assertEquals(
                 v1.toHexString() + ":2",
-                deployment
-                        .getStatus()
-                        .getReconciliationStatus()
+                reconStatus
                         .deserializeLastReconciledSpec()
                         .getFlinkConfiguration()
                         .get(PipelineOptions.PARALLELISM_OVERRIDES.key()));
+        assertEquals(ReconciliationState.DEPLOYED, reconStatus.getState());
+        assertFalse(reconStatus.isLastReconciledSpecStable());
 
-        // Reconciler should not do anything while waiting for scaling completion
+        // Reconciler should not do anything after successful scaling
         appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
-        assertEquals(JobState.RUNNING, getReconciledJobState(deployment));
-        assertTrue(deployment.getStatus().getReconciliationStatus().scalingInProgress());
-        assertEquals(
-                v1.toHexString() + ":2",
-                deployment
-                        .getStatus()
-                        .getReconciliationStatus()
-                        .deserializeLastReconciledSpec()
-                        .getFlinkConfiguration()
-                        .get(PipelineOptions.PARALLELISM_OVERRIDES.key()));
         assertEquals(1, rescaleCounter.get());
         assertEquals(3, eventCollector.events.size());
-
-        var deploymentClone = ReconciliationUtils.clone(deployment);
-
-        // Make sure to trigger regular upgrade on other spec changes
-        deployment.getSpec().setRestartNonce(5L);
-        deployment.getMetadata().setGeneration(3L);
-        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
-        assertEquals(JobState.SUSPENDED, getReconciledJobState(deployment));
-        assertEquals(1, rescaleCounter.get());
-        assertEquals(
-                EventRecorder.Reason.SpecChanged.toString(),
-                eventCollector.events.get(eventCollector.events.size() - 2).getReason());
-
-        // If the job failed while rescaling we fall back to the regular upgrade mechanism
-        deployment = deploymentClone;
-        getJobStatus(deployment).setState(org.apache.flink.api.common.JobStatus.FAILED.name());
-        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
-        assertEquals(JobState.SUSPENDED, getReconciledJobState(deployment));
-        assertEquals(1, rescaleCounter.get());
+        assertFalse(reconStatus.isLastReconciledSpecStable());
     }
 
     @Test
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
index a51bde1..8a928a2 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkServiceTest.java
@@ -1278,12 +1278,7 @@
         }
 
         @Override
-        public ScalingResult scale(FlinkResourceContext<?> resourceContext, Configuration conf) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public boolean scalingCompleted(FlinkResourceContext<?> resourceContext) {
+        public boolean scale(FlinkResourceContext<?> resourceContext, Configuration conf) {
             throw new UnsupportedOperationException();
         }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
index 094546c..c32957b 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/NativeFlinkServiceTest.java
@@ -42,10 +42,8 @@
 import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
-import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
-import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsBody;
 import org.apache.flink.runtime.rest.messages.job.JobResourceRequirementsHeaders;
@@ -267,8 +265,7 @@
                         v2,
                                 new JobVertexResourceRequirements(
                                         new JobVertexResourceRequirements.Parallelism(2, 2))));
-        assertEquals(
-                FlinkService.ScalingResult.SCALING_TRIGGERED,
+        assertTrue(
                 service.scale(
                         new FlinkDeploymentContext(
                                 flinkDep,
@@ -290,10 +287,8 @@
         // Baseline
         appConfig.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(v1.toHexString(), "4"));
         spec.setFlinkConfiguration(appConfig.toMap());
-        testScaleConditionDep(
-                flinkDep, service, d -> {}, FlinkService.ScalingResult.SCALING_TRIGGERED);
-        testScaleConditionLastSpec(
-                flinkDep, service, d -> {}, FlinkService.ScalingResult.SCALING_TRIGGERED);
+        testScaleConditionDep(flinkDep, service, d -> {}, true);
+        testScaleConditionLastSpec(flinkDep, service, d -> {}, true);
 
         // Do not scale if config disabled
         testScaleConditionDep(
@@ -307,7 +302,7 @@
                                                 .JOB_UPGRADE_INPLACE_SCALING_ENABLED
                                                 .key(),
                                         "false"),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+                false);
 
         // Do not scale without adaptive scheduler deployed
         testScaleConditionLastSpec(
@@ -318,45 +313,29 @@
                                 .put(
                                         JobManagerOptions.SCHEDULER.key(),
                                         JobManagerOptions.SchedulerType.Default.name()),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+                false);
 
         // Do not scale without adaptive scheduler deployed
         testScaleConditionLastSpec(
-                flinkDep,
-                service,
-                ls -> ls.setFlinkVersion(FlinkVersion.v1_17),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+                flinkDep, service, ls -> ls.setFlinkVersion(FlinkVersion.v1_17), false);
 
         testScaleConditionLastSpec(
-                flinkDep,
-                service,
-                ls -> ls.setFlinkVersion(FlinkVersion.v1_18),
-                FlinkService.ScalingResult.SCALING_TRIGGERED);
+                flinkDep, service, ls -> ls.setFlinkVersion(FlinkVersion.v1_18), true);
 
         // Make sure we only try to rescale non-terminal
         testScaleConditionDep(
-                flinkDep,
-                service,
-                d -> d.getStatus().getJobStatus().setState("FAILED"),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+                flinkDep, service, d -> d.getStatus().getJobStatus().setState("FAILED"), false);
 
         testScaleConditionDep(
                 flinkDep,
                 service,
                 d -> d.getStatus().getJobStatus().setState("RECONCILING"),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+                false);
 
         testScaleConditionDep(
-                flinkDep,
-                service,
-                d -> d.getStatus().getJobStatus().setState("RUNNING"),
-                FlinkService.ScalingResult.SCALING_TRIGGERED);
+                flinkDep, service, d -> d.getStatus().getJobStatus().setState("RUNNING"), true);
 
-        testScaleConditionDep(
-                flinkDep,
-                service,
-                d -> d.getSpec().setJob(null),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+        testScaleConditionDep(flinkDep, service, d -> d.getSpec().setJob(null), false);
 
         // Do not scale if parallelism overrides were removed from an active vertex
         testScaleConditionLastSpec(
@@ -365,7 +344,7 @@
                 s ->
                         s.getFlinkConfiguration()
                                 .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v2 + ":3"),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+                false);
 
         // Scale if parallelism overrides were removed only from a non-active vertex
         testScaleConditionLastSpec(
@@ -376,7 +355,7 @@
                                 .put(
                                         PipelineOptions.PARALLELISM_OVERRIDES.key(),
                                         v1 + ":1," + new JobVertexID() + ":5"),
-                FlinkService.ScalingResult.SCALING_TRIGGERED);
+                true);
 
         // Do not scale if parallelism overrides were completely removed
         var flinkDep2 = ReconciliationUtils.clone(flinkDep);
@@ -390,7 +369,7 @@
                 s ->
                         s.getFlinkConfiguration()
                                 .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v2 + ":3"),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+                false);
 
         // Do not scale if overrides never set
         testScaleConditionDep(
@@ -400,7 +379,7 @@
                         d.getSpec()
                                 .getFlinkConfiguration()
                                 .remove(PipelineOptions.PARALLELISM_OVERRIDES.key()),
-                FlinkService.ScalingResult.CANNOT_SCALE);
+                false);
 
         // Do not scale if non active vertices are overridden only
         current.set(
@@ -418,7 +397,7 @@
                         d.getSpec()
                                 .getFlinkConfiguration()
                                 .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v2 + ":5"),
-                FlinkService.ScalingResult.ALREADY_SCALED);
+                true);
         assertNull(updated.get());
 
         // Override v2 (not in graph) + v1 with current parallelism
@@ -431,7 +410,7 @@
                                 .put(
                                         PipelineOptions.PARALLELISM_OVERRIDES.key(),
                                         v2 + ":5," + v1 + ":1"),
-                FlinkService.ScalingResult.ALREADY_SCALED);
+                true);
         assertNull(updated.get());
 
         // Scale if requirements upper/lower bound doesn't match
@@ -449,26 +428,26 @@
                                 .put(
                                         PipelineOptions.PARALLELISM_OVERRIDES.key(),
                                         v2 + ":5," + v1 + ":1"),
-                FlinkService.ScalingResult.SCALING_TRIGGERED);
+                true);
         assertEquals(
                 new JobVertexResourceRequirements.Parallelism(1, 1),
                 updated.get().get(v1).getParallelism());
 
         // Test error handling
         current.set(null);
-        testScaleConditionDep(flinkDep, service, d -> {}, FlinkService.ScalingResult.CANNOT_SCALE);
+        testScaleConditionDep(flinkDep, service, d -> {}, false);
     }
 
     private void testScaleConditionDep(
             FlinkDeployment dep,
             NativeFlinkService service,
             Consumer<FlinkDeployment> f,
-            FlinkService.ScalingResult scalingResult)
+            boolean scaled)
             throws Exception {
         var depCopy = ReconciliationUtils.clone(dep);
         f.accept(depCopy);
         assertEquals(
-                scalingResult,
+                scaled,
                 service.scale(
                         new FlinkDeploymentContext(
                                 depCopy,
@@ -483,7 +462,7 @@
             FlinkDeployment dep,
             NativeFlinkService service,
             Consumer<FlinkDeploymentSpec> f,
-            FlinkService.ScalingResult scalingResult)
+            boolean scaled)
             throws Exception {
         testScaleConditionDep(
                 dep,
@@ -494,77 +473,7 @@
                     f.accept(lastReconciledSpec);
                     reconStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, fd);
                 },
-                scalingResult);
-    }
-
-    @Test
-    public void testScalingCompleted() throws Exception {
-        var v1 = new JobVertexID();
-        var v2 = new JobVertexID();
-
-        var testingClusterClient =
-                new TestingClusterClient<>(configuration, TestUtils.TEST_DEPLOYMENT_NAME);
-        var service = (NativeFlinkService) createFlinkService(testingClusterClient);
-
-        var flinkDep = TestUtils.buildApplicationCluster();
-        var spec = flinkDep.getSpec();
-        spec.setFlinkVersion(FlinkVersion.v1_18);
-
-        var appConfig = Configuration.fromMap(spec.getFlinkConfiguration());
-        appConfig.set(
-                PipelineOptions.PARALLELISM_OVERRIDES,
-                Map.of(v1.toHexString(), "4", v2.toHexString(), "1"));
-        var reconStatus = flinkDep.getStatus().getReconciliationStatus();
-        spec.setFlinkConfiguration(appConfig.toMap());
-        reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep);
-        var jobStatus = flinkDep.getStatus().getJobStatus();
-        jobStatus.setJobId(new JobID().toHexString());
-        var ctx =
-                new FlinkDeploymentContext(
-                        flinkDep,
-                        TestUtils.createEmptyContext(),
-                        null,
-                        configManager,
-                        c -> service);
-
-        var currentJobDetails = new AtomicReference<JobDetailsInfo>();
-        testingClusterClient.setRequestProcessor(
-                (headers, parameters, requestBody) -> {
-                    if (headers instanceof JobDetailsHeaders) {
-                        return CompletableFuture.completedFuture(currentJobDetails.get());
-                    }
-                    return CompletableFuture.completedFuture(EmptyResponseBody.getInstance());
-                });
-
-        currentJobDetails.set(createJobDetailsFor(List.of()));
-        assertFalse(service.scalingCompleted(ctx));
-
-        currentJobDetails.set(
-                createJobDetailsFor(
-                        List.of(jobVertexDetailsInfo(v1, 1), jobVertexDetailsInfo(v2, 1))));
-        assertFalse(service.scalingCompleted(ctx));
-
-        currentJobDetails.set(
-                createJobDetailsFor(
-                        List.of(jobVertexDetailsInfo(v1, 4), jobVertexDetailsInfo(v2, 1))));
-        assertTrue(service.scalingCompleted(ctx));
-
-        // Make sure we don't wait for non-active vertex
-        var v3 = new JobVertexID();
-        spec.getFlinkConfiguration()
-                .put(
-                        PipelineOptions.PARALLELISM_OVERRIDES.key(),
-                        v1 + ":4," + v2 + ":1," + v3 + ":100");
-        reconStatus.serializeAndSetLastReconciledSpec(spec, flinkDep);
-        ctx =
-                new FlinkDeploymentContext(
-                        flinkDep,
-                        TestUtils.createEmptyContext(),
-                        null,
-                        configManager,
-                        c -> service);
-
-        assertTrue(service.scalingCompleted(ctx));
+                scaled);
     }
 
     private JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
index adb90de..fde2b12 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.java
@@ -150,8 +150,7 @@
                                 TestUtils.createTestMetricGroup(new Configuration()),
                                 null)
                         .getResourceContext(flinkDeployment, TestUtils.createEmptyContext());
-        assertEquals(
-                FlinkService.ScalingResult.SCALING_TRIGGERED,
+        assertTrue(
                 flinkStandaloneService.scale(ctx, ctx.getDeployConfig(flinkDeployment.getSpec())));
         assertEquals(
                 5,
@@ -181,8 +180,7 @@
 
         // Add replicas and verify that the scaling is not honoured as reactive mode not enabled
         flinkDeployment.getSpec().getTaskManager().setReplicas(10);
-        assertEquals(
-                FlinkService.ScalingResult.CANNOT_SCALE,
+        assertFalse(
                 flinkStandaloneService.scale(ctx, ctx.getDeployConfig(flinkDeployment.getSpec())));
     }
 
@@ -210,8 +208,7 @@
                                 TestUtils.createTestMetricGroup(new Configuration()),
                                 null)
                         .getResourceContext(flinkDeployment, TestUtils.createEmptyContext());
-        assertEquals(
-                FlinkService.ScalingResult.SCALING_TRIGGERED,
+        assertTrue(
                 flinkStandaloneService.scale(ctx, ctx.getDeployConfig(flinkDeployment.getSpec())));
 
         assertEquals(
@@ -228,8 +225,7 @@
         // Scale the replica count of the task managers
         flinkDeployment.getSpec().getTaskManager().setReplicas(10);
         createDeployments(flinkDeployment);
-        assertEquals(
-                FlinkService.ScalingResult.SCALING_TRIGGERED,
+        assertTrue(
                 flinkStandaloneService.scale(ctx, ctx.getDeployConfig(flinkDeployment.getSpec())));
 
         assertEquals(
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
index 63751ce..75ab932 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/ReconciliationUtilsTest.java
@@ -24,13 +24,10 @@
 import org.apache.flink.kubernetes.operator.api.utils.BaseTestUtils;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
-import org.apache.flink.kubernetes.operator.service.FlinkService;
 
 import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
 import org.junit.jupiter.api.Test;
 
-import java.time.Clock;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -69,24 +66,6 @@
     }
 
     @Test
-    public void testRescheduleDuringScaling() {
-        FlinkDeployment app = BaseTestUtils.buildApplicationCluster();
-        app.getSpec().getJob().setState(JobState.RUNNING);
-        app.getStatus().getReconciliationStatus().setState(ReconciliationState.DEPLOYED);
-        var previous = ReconciliationUtils.clone(app);
-        ReconciliationUtils.updateAfterScaleUp(
-                app,
-                new Configuration(),
-                Clock.systemDefaultZone(),
-                FlinkService.ScalingResult.SCALING_TRIGGERED);
-
-        var updateControl =
-                ReconciliationUtils.toUpdateControl(operatorConfiguration, app, previous, true);
-
-        assertTrue(updateControl.getScheduleDelay().get() > 0);
-    }
-
-    @Test
     public void testRescheduleIfImmediateFlagSet() {
         var previous = BaseTestUtils.buildApplicationCluster();
         var current = BaseTestUtils.buildApplicationCluster();