[FLINK-36640] Add autoscalerResetNonce to jobSpec
diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 3405183..41bd17e 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -182,6 +182,7 @@
 | upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
 | allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
 | savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
+| autoscalerResetNonce | java.lang.Long | Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job. This can be used to quickly go back to the initial user-provided parallelism settings without having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply change the nonce to a new non-null value. |
 
 ### JobState
 **Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
index a6c582c..7a08260 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
@@ -97,4 +97,13 @@
      */
     @SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true)
     private Long savepointRedeployNonce;
+
+    /**
+     * Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job.
+     * This can be used to quickly go back to the initial user-provided parallelism settings without
+     * having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply
+     * change the nonce to a new non-null value.
+     */
+    @SpecDiff(value = DiffType.IGNORE)
+    private Long autoscalerResetNonce;
 }
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 37e2bde..2936501 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -211,6 +211,19 @@
         reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
     }
 
+    public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledAutoscalerResetNonce(
+            AbstractFlinkResource<SPEC, ?> target) {
+        var spec = target.getSpec();
+        var reconciliationStatus = target.getStatus().getReconciliationStatus();
+        var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+
+        lastReconciledSpec
+                .getJob()
+                .setAutoscalerResetNonce(spec.getJob().getAutoscalerResetNonce());
+        reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, target);
+        reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
+    }
+
     private static void updateLastReconciledJobSpec(
             JobSpec lastReconciledJobSpec, JobSpec jobSpec, SnapshotType snapshotType) {
         switch (snapshotType) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 3b2048c..7f8af3c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -185,11 +185,27 @@
 
     private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
         var autoScalerCtx = ctx.getJobAutoScalerContext();
+        var resource = ctx.getResource();
         boolean autoscalerEnabled =
-                ctx.getResource().getSpec().getJob() != null
+                resource.getSpec().getJob() != null
                         && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
         autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);
 
+        var reconStatus = resource.getStatus().getReconciliationStatus();
+        if (!reconStatus.isBeforeFirstDeployment() && autoscalerEnabled) {
+            var newResetNonce = resource.getSpec().getJob().getAutoscalerResetNonce();
+            // check if the nonce changed to a non-null value
+            if (newResetNonce != null
+                    && !newResetNonce.equals(
+                            reconStatus
+                                    .deserializeLastReconciledSpec()
+                                    .getJob()
+                                    .getAutoscalerResetNonce())) {
+                autoscaler.cleanup(autoScalerCtx);
+                ReconciliationUtils.updateLastReconciledAutoscalerResetNonce(resource);
+            }
+        }
+
         autoscaler.scale(autoScalerCtx);
     }
 
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 34a71d3..47ce5e4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.autoscaler.NoopJobAutoscaler;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
@@ -57,6 +58,7 @@
 import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
 import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
 import org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils;
+import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
 import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
 import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
 import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -947,29 +949,43 @@
                     public void scale(KubernetesJobAutoScalerContext ctx) {
                         overrideFunction.get().accept(ctx.getResource().getSpec());
                     }
+
+                    @Override
+                    public void cleanup(KubernetesJobAutoScalerContext ctx) {
+                        overrideFunction.set(s -> {});
+                    }
                 };
+        var v1 = new JobVertexID();
 
         appReconciler = new ApplicationReconciler(eventRecorder, statusRecorder, autoscaler);
 
         var deployment = TestUtils.buildApplicationCluster();
+        var config = deployment.getSpec().getFlinkConfiguration();
+        config.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
+        config.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1");
+
+        var specCopy = SpecUtils.clone(deployment.getSpec());
+
         appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
         verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        deployment.setSpec(SpecUtils.clone(specCopy));
 
         // Job running verify no upgrades if overrides are empty
         appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+        deployment.setSpec(SpecUtils.clone(specCopy));
         assertEquals(
                 ReconciliationState.DEPLOYED,
                 deployment.getStatus().getReconciliationStatus().getState());
         assertEquals(RUNNING, deployment.getStatus().getJobStatus().getState());
 
         // Test overrides are applied correctly
-        var v1 = new JobVertexID();
         overrideFunction.set(
                 s ->
                         s.getFlinkConfiguration()
                                 .put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":2"));
 
         appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+        deployment.setSpec(SpecUtils.clone(specCopy));
         assertEquals(
                 ReconciliationState.UPGRADING,
                 deployment.getStatus().getReconciliationStatus().getState());
@@ -979,6 +995,55 @@
                         .getResourceContext(deployment, context)
                         .getObserveConfig()
                         .get(PipelineOptions.PARALLELISM_OVERRIDES));
+
+        // Set the job into running state (scale up completed)
+        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+        deployment.setSpec(SpecUtils.clone(specCopy));
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        deployment.setSpec(SpecUtils.clone(specCopy));
+
+        // Make sure new reset nonce clears autoscaler
+        deployment.getSpec().getJob().setAutoscalerResetNonce(1L);
+        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+        deployment.setSpec(SpecUtils.clone(specCopy));
+        assertEquals(
+                ReconciliationState.UPGRADING,
+                deployment.getStatus().getReconciliationStatus().getState());
+        assertEquals(
+                Map.of(v1.toHexString(), "1"),
+                ctxFactory
+                        .getResourceContext(deployment, context)
+                        .getObserveConfig()
+                        .get(PipelineOptions.PARALLELISM_OVERRIDES));
+        assertEquals(
+                1L,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getAutoscalerResetNonce());
+
+        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+        verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+        deployment.setSpec(SpecUtils.clone(specCopy));
+
+        // Make sure autoscaler reset nonce properly updated even if no deployment happens
+
+        deployment.getSpec().getJob().setAutoscalerResetNonce(2L);
+        appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+        deployment.setSpec(SpecUtils.clone(specCopy));
+        assertEquals(
+                2L,
+                deployment
+                        .getStatus()
+                        .getReconciliationStatus()
+                        .deserializeLastReconciledSpec()
+                        .getJob()
+                        .getAutoscalerResetNonce());
+        assertEquals(
+                ReconciliationState.DEPLOYED,
+                deployment.getStatus().getReconciliationStatus().getState());
     }
 
     @ParameterizedTest
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 6042799..c94c138 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -84,6 +84,8 @@
                     items:
                       type: string
                     type: array
+                  autoscalerResetNonce:
+                    type: integer
                   checkpointTriggerNonce:
                     type: integer
                   entryClass:
diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index 431787c..93515bf 100644
--- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -45,6 +45,8 @@
                     items:
                       type: string
                     type: array
+                  autoscalerResetNonce:
+                    type: integer
                   checkpointTriggerNonce:
                     type: integer
                   entryClass: