[FLINK-33526] Autoscaler config improvement + cleanup
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 0b5f33a..c3a1d79 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -16,7 +16,7 @@
         </tr>
         <tr>
             <td><h5>job.autoscaler.catch-up.duration</h5></td>
-            <td style="word-wrap: break-word;">15 min</td>
+            <td style="word-wrap: break-word;">30 min</td>
             <td>Duration</td>
             <td>The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.</td>
         </tr>
@@ -52,7 +52,7 @@
         </tr>
         <tr>
             <td><h5>job.autoscaler.metrics.window</h5></td>
-            <td style="word-wrap: break-word;">10 min</td>
+            <td style="word-wrap: break-word;">15 min</td>
             <td>Duration</td>
             <td>Scaling metrics aggregation window size.</td>
         </tr>
@@ -76,7 +76,7 @@
         </tr>
         <tr>
             <td><h5>job.autoscaler.restart.time</h5></td>
-            <td style="word-wrap: break-word;">3 min</td>
+            <td style="word-wrap: break-word;">5 min</td>
             <td>Duration</td>
             <td>Expected restart time to be used until the operator can determine it reliably from history.</td>
         </tr>
@@ -136,7 +136,7 @@
         </tr>
         <tr>
             <td><h5>job.autoscaler.target.utilization.boundary</h5></td>
-            <td style="word-wrap: break-word;">0.4</td>
+            <td style="word-wrap: break-word;">0.3</td>
             <td>Double</td>
             <td>Target vertex utilization boundary. Scaling won't be performed if the current processing rate is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]</td>
         </tr>
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
index ff2b733..f31fac1 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
@@ -20,16 +20,26 @@
 import org.apache.flink.annotation.Internal;
 
 /**
- * The generic Autoscaler.
+ * Flink Job AutoScaler.
  *
  * @param <KEY> The job key.
+ * @param <Context> Instance of {@link JobAutoScalerContext}.
  */
 @Internal
 public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
 
-    /** Called as part of the reconciliation loop. */
+    /**
+     * Compute and apply new parallelism overrides for the provided job context.
+     *
+     * @param context Job context.
+     * @throws Exception
+     */
     void scale(Context context) throws Exception;
 
-    /** Called when the job is deleted. */
-    void cleanup(KEY key);
+    /**
+     * Called when the job is deleted.
+     *
+     * @param jobKey Job key.
+     */
+    void cleanup(KEY jobKey);
 }
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index ca64037..69c646f 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -91,6 +91,7 @@
             }
 
             if (ctx.getJobStatus() != JobStatus.RUNNING) {
+                LOG.debug("Autoscaler is waiting for stable, running state");
                 lastEvaluatedMetrics.remove(ctx.getJobKey());
                 return;
             }
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index e8872a3..4ac06fb 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -60,7 +60,7 @@
     public static final ConfigOption<Duration> METRICS_WINDOW =
             autoScalerConfig("metrics.window")
                     .durationType()
-                    .defaultValue(Duration.ofMinutes(10))
+                    .defaultValue(Duration.ofMinutes(15))
                     .withDeprecatedKeys(deprecatedOperatorConfigKey("metrics.window"))
                     .withDescription("Scaling metrics aggregation window size.");
 
@@ -82,7 +82,7 @@
     public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
             autoScalerConfig("target.utilization.boundary")
                     .doubleType()
-                    .defaultValue(0.4)
+                    .defaultValue(0.3)
                     .withDeprecatedKeys(deprecatedOperatorConfigKey("target.utilization.boundary"))
                     .withDescription(
                             "Target vertex utilization boundary. Scaling won't be performed if the current processing rate is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]");
@@ -129,7 +129,7 @@
     public static final ConfigOption<Duration> CATCH_UP_DURATION =
             autoScalerConfig("catch-up.duration")
                     .durationType()
-                    .defaultValue(Duration.ofMinutes(15))
+                    .defaultValue(Duration.ofMinutes(30))
                     .withDeprecatedKeys(deprecatedOperatorConfigKey("catch-up.duration"))
                     .withDescription(
                             "The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.");
@@ -137,7 +137,7 @@
     public static final ConfigOption<Duration> RESTART_TIME =
             autoScalerConfig("restart.time")
                     .durationType()
-                    .defaultValue(Duration.ofMinutes(3))
+                    .defaultValue(Duration.ofMinutes(5))
                     .withDeprecatedKeys(deprecatedOperatorConfigKey("restart.time"))
                     .withDescription(
                             "Expected restart time to be used until the operator can determine it reliably from history.");
@@ -234,7 +234,7 @@
     public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
             autoScalerConfig("scaling.event.interval")
                     .durationType()
-                    .defaultValue(Duration.ofSeconds(1800))
+                    .defaultValue(Duration.ofMinutes(30))
                     .withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval"))
                     .withDescription("Time interval to resend the identical event");
 
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index ef84c52..d216ddf 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -376,7 +376,7 @@
                 5000.,
                 evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent());
         assertEquals(
-                1667.,
+                1250.,
                 evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent());
         assertEquals(
                 500.,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 3dbf4fe..0735bf3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -166,8 +166,7 @@
                 MetricManager.createFlinkDeploymentMetricManager(baseConfig, metricGroup);
         var statusRecorder = StatusRecorder.create(client, metricManager, listeners);
         var autoscaler = AutoscalerFactory.create(client, eventRecorder);
-        var reconcilerFactory =
-                new ReconcilerFactory(configManager, eventRecorder, statusRecorder, autoscaler);
+        var reconcilerFactory = new ReconcilerFactory(eventRecorder, statusRecorder, autoscaler);
         var observerFactory = new FlinkDeploymentObserverFactory(eventRecorder);
         var canaryResourceManager = new CanaryResourceManager<FlinkDeployment>(configManager);
         HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index dfe98a8..5bef0ba 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -18,7 +18,6 @@
 package org.apache.flink.kubernetes.operator.reconciler.deployment;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.autoscaler.JobAutoScaler;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -138,7 +137,7 @@
                 cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
         SPEC currentDeploySpec = cr.getSpec();
 
-        scaling(ctx);
+        applyAutoscaler(ctx);
 
         var reconciliationState = reconciliationStatus.getState();
         var specDiff =
@@ -181,21 +180,13 @@
         }
     }
 
-    private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
-        KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext();
-
-        if (autoscalerDisabled(ctx)) {
-            autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
-        } else if (autoScalerContext.getJobStatus() != JobStatus.RUNNING) {
-            LOG.info("Autoscaler is waiting for stable, running state");
-        }
-
-        autoscaler.scale(autoScalerContext);
-    }
-
-    private boolean autoscalerDisabled(FlinkResourceContext<CR> ctx) {
-        return ctx.getResource().getSpec().getJob() == null
-                || !ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
+    private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
+        var autoScalerCtx = ctx.getJobAutoScalerContext();
+        boolean autoscalerEnabled =
+                ctx.getResource().getSpec().getJob() != null
+                        && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
+        autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);
+        autoscaler.scale(autoScalerCtx);
     }
 
     private void triggerSpecChangeEvent(CR cr, DiffResult<SPEC> specDiff, KubernetesClient client) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index beba19d..429caaa 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -23,7 +23,6 @@
 import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
 import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
 import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
 import org.apache.flink.kubernetes.operator.config.Mode;
 import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
 import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -37,7 +36,6 @@
 /** The factory to create reconciler based on app mode. */
 public class ReconcilerFactory {
 
-    private final FlinkConfigManager configManager;
     private final EventRecorder eventRecorder;
     private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder;
     private final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler;
@@ -45,11 +43,9 @@
             reconcilerMap;
 
     public ReconcilerFactory(
-            FlinkConfigManager configManager,
             EventRecorder eventRecorder,
             StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder,
             JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) {
-        this.configManager = configManager;
         this.eventRecorder = eventRecorder;
         this.deploymentStatusRecorder = deploymentStatusRecorder;
         this.autoscaler = autoscaler;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index b174a29..3d241b6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -92,7 +92,6 @@
         statusRecorder = new StatusRecorder<>(new MetricManager<>(), statusUpdateCounter);
         reconcilerFactory =
                 new ReconcilerFactory(
-                        configManager,
                         eventRecorder,
                         statusRecorder,
                         AutoscalerFactory.create(