[FLINK-36640] Add autoscalerResetNonce to jobSpec
diff --git a/docs/content/docs/custom-resource/reference.md b/docs/content/docs/custom-resource/reference.md
index 3405183..41bd17e 100644
--- a/docs/content/docs/custom-resource/reference.md
+++ b/docs/content/docs/custom-resource/reference.md
@@ -182,6 +182,7 @@
| upgradeMode | org.apache.flink.kubernetes.operator.api.spec.UpgradeMode | Upgrade mode of the Flink job. |
| allowNonRestoredState | java.lang.Boolean | Allow checkpoint state that cannot be mapped to any job vertex in tasks. |
| savepointRedeployNonce | java.lang.Long | Nonce used to trigger a full redeployment of the job from the savepoint path specified in initialSavepointPath. In order to trigger redeployment, change the number to a different non-null value. Rollback is not possible after redeployment. |
+| autoscalerResetNonce | java.lang.Long | Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job. This can be used to quickly go back to the initial user-provided parallelism settings without having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply change the nonce to a new non-null value. |
### JobState
**Class**: org.apache.flink.kubernetes.operator.api.spec.JobState
diff --git a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
index a6c582c..7a08260 100644
--- a/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
+++ b/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/spec/JobSpec.java
@@ -97,4 +97,13 @@
*/
@SpecDiff(value = DiffType.SAVEPOINT_REDEPLOY, onNullIgnore = true)
private Long savepointRedeployNonce;
+
+ /**
+ * Nonce used to reset the autoscaler metrics, parallelism overrides and history for the job.
+ * This can be used to quickly go back to the initial user-provided parallelism settings without
+ * having to toggle the autoscaler on and off. In order to trigger the reset behaviour simply
+ * change the nonce to a new non-null value.
+ */
+ @SpecDiff(value = DiffType.IGNORE)
+ private Long autoscalerResetNonce;
}
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
index 37e2bde..2936501 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/ReconciliationUtils.java
@@ -211,6 +211,19 @@
reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
}
+ public static <SPEC extends AbstractFlinkSpec> void updateLastReconciledAutoscalerResetNonce(
+ AbstractFlinkResource<SPEC, ?> target) {
+ var spec = target.getSpec();
+ var reconciliationStatus = target.getStatus().getReconciliationStatus();
+ var lastReconciledSpec = reconciliationStatus.deserializeLastReconciledSpec();
+
+ lastReconciledSpec
+ .getJob()
+ .setAutoscalerResetNonce(spec.getJob().getAutoscalerResetNonce());
+ reconciliationStatus.serializeAndSetLastReconciledSpec(lastReconciledSpec, target);
+ reconciliationStatus.setReconciliationTimestamp(System.currentTimeMillis());
+ }
+
private static void updateLastReconciledJobSpec(
JobSpec lastReconciledJobSpec, JobSpec jobSpec, SnapshotType snapshotType) {
switch (snapshotType) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index 3b2048c..7f8af3c 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -185,11 +185,27 @@
private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
var autoScalerCtx = ctx.getJobAutoScalerContext();
+ var resource = ctx.getResource();
boolean autoscalerEnabled =
- ctx.getResource().getSpec().getJob() != null
+ resource.getSpec().getJob() != null
&& ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);
+ var reconStatus = resource.getStatus().getReconciliationStatus();
+ if (!reconStatus.isBeforeFirstDeployment() && autoscalerEnabled) {
+ var newResetNonce = resource.getSpec().getJob().getAutoscalerResetNonce();
+ // check if the nonce changed to a non-null value
+ if (newResetNonce != null
+ && !newResetNonce.equals(
+ reconStatus
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getAutoscalerResetNonce())) {
+ autoscaler.cleanup(autoScalerCtx);
+ ReconciliationUtils.updateLastReconciledAutoscalerResetNonce(resource);
+ }
+ }
+
autoscaler.scale(autoScalerCtx);
}
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
index 34a71d3..47ce5e4 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
+import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
@@ -57,6 +58,7 @@
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils;
+import org.apache.flink.kubernetes.operator.api.utils.SpecUtils;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
@@ -947,29 +949,43 @@
public void scale(KubernetesJobAutoScalerContext ctx) {
overrideFunction.get().accept(ctx.getResource().getSpec());
}
+
+ @Override
+ public void cleanup(KubernetesJobAutoScalerContext ctx) {
+ overrideFunction.set(s -> {});
+ }
};
+ var v1 = new JobVertexID();
appReconciler = new ApplicationReconciler(eventRecorder, statusRecorder, autoscaler);
var deployment = TestUtils.buildApplicationCluster();
+ var config = deployment.getSpec().getFlinkConfiguration();
+ config.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
+ config.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":1");
+
+ var specCopy = SpecUtils.clone(deployment.getSpec());
+
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+ deployment.setSpec(SpecUtils.clone(specCopy));
// Job running verify no upgrades if overrides are empty
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+ deployment.setSpec(SpecUtils.clone(specCopy));
assertEquals(
ReconciliationState.DEPLOYED,
deployment.getStatus().getReconciliationStatus().getState());
assertEquals(RUNNING, deployment.getStatus().getJobStatus().getState());
// Test overrides are applied correctly
- var v1 = new JobVertexID();
overrideFunction.set(
s ->
s.getFlinkConfiguration()
.put(PipelineOptions.PARALLELISM_OVERRIDES.key(), v1 + ":2"));
appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+ deployment.setSpec(SpecUtils.clone(specCopy));
assertEquals(
ReconciliationState.UPGRADING,
deployment.getStatus().getReconciliationStatus().getState());
@@ -979,6 +995,55 @@
.getResourceContext(deployment, context)
.getObserveConfig()
.get(PipelineOptions.PARALLELISM_OVERRIDES));
+
+ // Set the job into running state (scale up completed)
+ appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+ deployment.setSpec(SpecUtils.clone(specCopy));
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+ deployment.setSpec(SpecUtils.clone(specCopy));
+
+ // Make sure new reset nonce clears autoscaler
+ deployment.getSpec().getJob().setAutoscalerResetNonce(1L);
+ appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+ deployment.setSpec(SpecUtils.clone(specCopy));
+ assertEquals(
+ ReconciliationState.UPGRADING,
+ deployment.getStatus().getReconciliationStatus().getState());
+ assertEquals(
+ Map.of(v1.toHexString(), "1"),
+ ctxFactory
+ .getResourceContext(deployment, context)
+ .getObserveConfig()
+ .get(PipelineOptions.PARALLELISM_OVERRIDES));
+ assertEquals(
+ 1L,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getAutoscalerResetNonce());
+
+ appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+ verifyAndSetRunningJobsToStatus(deployment, flinkService.listJobs());
+ deployment.setSpec(SpecUtils.clone(specCopy));
+
+ // Make sure autoscaler reset nonce properly updated even if no deployment happens
+
+ deployment.getSpec().getJob().setAutoscalerResetNonce(2L);
+ appReconciler.reconcile(ctxFactory.getResourceContext(deployment, context));
+ deployment.setSpec(SpecUtils.clone(specCopy));
+ assertEquals(
+ 2L,
+ deployment
+ .getStatus()
+ .getReconciliationStatus()
+ .deserializeLastReconciledSpec()
+ .getJob()
+ .getAutoscalerResetNonce());
+ assertEquals(
+ ReconciliationState.DEPLOYED,
+ deployment.getStatus().getReconciliationStatus().getState());
}
@ParameterizedTest
diff --git a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
index 6042799..c94c138 100644
--- a/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
@@ -84,6 +84,8 @@
items:
type: string
type: array
+ autoscalerResetNonce:
+ type: integer
checkpointTriggerNonce:
type: integer
entryClass:
diff --git a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
index 431787c..93515bf 100644
--- a/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
+++ b/helm/flink-kubernetes-operator/crds/flinksessionjobs.flink.apache.org-v1.yml
@@ -45,6 +45,8 @@
items:
type: string
type: array
+ autoscalerResetNonce:
+ type: integer
checkpointTriggerNonce:
type: integer
entryClass: