[FLINK-33526] Autoscaler config improvement + cleanup
diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index 0b5f33a..c3a1d79 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -16,7 +16,7 @@
</tr>
<tr>
<td><h5>job.autoscaler.catch-up.duration</h5></td>
- <td style="word-wrap: break-word;">15 min</td>
+ <td style="word-wrap: break-word;">30 min</td>
<td>Duration</td>
<td>The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.</td>
</tr>
@@ -52,7 +52,7 @@
</tr>
<tr>
<td><h5>job.autoscaler.metrics.window</h5></td>
- <td style="word-wrap: break-word;">10 min</td>
+ <td style="word-wrap: break-word;">15 min</td>
<td>Duration</td>
<td>Scaling metrics aggregation window size.</td>
</tr>
@@ -76,7 +76,7 @@
</tr>
<tr>
<td><h5>job.autoscaler.restart.time</h5></td>
- <td style="word-wrap: break-word;">3 min</td>
+ <td style="word-wrap: break-word;">5 min</td>
<td>Duration</td>
<td>Expected restart time to be used until the operator can determine it reliably from history.</td>
</tr>
@@ -136,7 +136,7 @@
</tr>
<tr>
<td><h5>job.autoscaler.target.utilization.boundary</h5></td>
- <td style="word-wrap: break-word;">0.4</td>
+ <td style="word-wrap: break-word;">0.3</td>
<td>Double</td>
<td>Target vertex utilization boundary. Scaling won't be performed if the current processing rate is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]</td>
</tr>
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
index ff2b733..f31fac1 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScaler.java
@@ -20,16 +20,26 @@
import org.apache.flink.annotation.Internal;
/**
- * The generic Autoscaler.
+ * Flink Job AutoScaler.
*
* @param <KEY> The job key.
+ * @param <Context> Instance of {@link JobAutoScalerContext}.
*/
@Internal
public interface JobAutoScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
- /** Called as part of the reconciliation loop. */
+ /**
+ * Compute and apply new parallelism overrides for the provided job context.
+ *
+ * @param context Job context.
+ * @throws Exception
+ */
void scale(Context context) throws Exception;
- /** Called when the job is deleted. */
- void cleanup(KEY key);
+ /**
+ * Called when the job is deleted.
+ *
+ * @param jobKey Job key.
+ */
+ void cleanup(KEY jobKey);
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
index ca64037..69c646f 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobAutoScalerImpl.java
@@ -91,6 +91,7 @@
}
if (ctx.getJobStatus() != JobStatus.RUNNING) {
+ LOG.debug("Autoscaler is waiting for stable, running state");
lastEvaluatedMetrics.remove(ctx.getJobKey());
return;
}
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
index e8872a3..4ac06fb 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java
@@ -60,7 +60,7 @@
public static final ConfigOption<Duration> METRICS_WINDOW =
autoScalerConfig("metrics.window")
.durationType()
- .defaultValue(Duration.ofMinutes(10))
+ .defaultValue(Duration.ofMinutes(15))
.withDeprecatedKeys(deprecatedOperatorConfigKey("metrics.window"))
.withDescription("Scaling metrics aggregation window size.");
@@ -82,7 +82,7 @@
public static final ConfigOption<Double> TARGET_UTILIZATION_BOUNDARY =
autoScalerConfig("target.utilization.boundary")
.doubleType()
- .defaultValue(0.4)
+ .defaultValue(0.3)
.withDeprecatedKeys(deprecatedOperatorConfigKey("target.utilization.boundary"))
.withDescription(
"Target vertex utilization boundary. Scaling won't be performed if the current processing rate is within [target_rate / (target_utilization - boundary), (target_rate / (target_utilization + boundary)]");
@@ -129,7 +129,7 @@
public static final ConfigOption<Duration> CATCH_UP_DURATION =
autoScalerConfig("catch-up.duration")
.durationType()
- .defaultValue(Duration.ofMinutes(15))
+ .defaultValue(Duration.ofMinutes(30))
.withDeprecatedKeys(deprecatedOperatorConfigKey("catch-up.duration"))
.withDescription(
"The target duration for fully processing any backlog after a scaling operation. Set to 0 to disable backlog based scaling.");
@@ -137,7 +137,7 @@
public static final ConfigOption<Duration> RESTART_TIME =
autoScalerConfig("restart.time")
.durationType()
- .defaultValue(Duration.ofMinutes(3))
+ .defaultValue(Duration.ofMinutes(5))
.withDeprecatedKeys(deprecatedOperatorConfigKey("restart.time"))
.withDescription(
"Expected restart time to be used until the operator can determine it reliably from history.");
@@ -234,7 +234,7 @@
public static final ConfigOption<Duration> SCALING_EVENT_INTERVAL =
autoScalerConfig("scaling.event.interval")
.durationType()
- .defaultValue(Duration.ofSeconds(1800))
+ .defaultValue(Duration.ofMinutes(30))
.withDeprecatedKeys(deprecatedOperatorConfigKey("scaling.event.interval"))
.withDescription("Time interval to resend the identical event");
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index ef84c52..d216ddf 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -376,7 +376,7 @@
5000.,
evaluation.get(source1).get(ScalingMetric.TRUE_PROCESSING_RATE).getCurrent());
assertEquals(
- 1667.,
+ 1250.,
evaluation.get(source1).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent());
assertEquals(
500.,
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
index 3dbf4fe..0735bf3 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/FlinkOperator.java
@@ -166,8 +166,7 @@
MetricManager.createFlinkDeploymentMetricManager(baseConfig, metricGroup);
var statusRecorder = StatusRecorder.create(client, metricManager, listeners);
var autoscaler = AutoscalerFactory.create(client, eventRecorder);
- var reconcilerFactory =
- new ReconcilerFactory(configManager, eventRecorder, statusRecorder, autoscaler);
+ var reconcilerFactory = new ReconcilerFactory(eventRecorder, statusRecorder, autoscaler);
var observerFactory = new FlinkDeploymentObserverFactory(eventRecorder);
var canaryResourceManager = new CanaryResourceManager<FlinkDeployment>(configManager);
HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager);
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
index dfe98a8..5bef0ba 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java
@@ -18,7 +18,6 @@
package org.apache.flink.kubernetes.operator.reconciler.deployment;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
@@ -138,7 +137,7 @@
cr.getStatus().getReconciliationStatus().deserializeLastReconciledSpec();
SPEC currentDeploySpec = cr.getSpec();
- scaling(ctx);
+ applyAutoscaler(ctx);
var reconciliationState = reconciliationStatus.getState();
var specDiff =
@@ -181,21 +180,13 @@
}
}
- private void scaling(FlinkResourceContext<CR> ctx) throws Exception {
- KubernetesJobAutoScalerContext autoScalerContext = ctx.getJobAutoScalerContext();
-
- if (autoscalerDisabled(ctx)) {
- autoScalerContext.getConfiguration().set(AUTOSCALER_ENABLED, false);
- } else if (autoScalerContext.getJobStatus() != JobStatus.RUNNING) {
- LOG.info("Autoscaler is waiting for stable, running state");
- }
-
- autoscaler.scale(autoScalerContext);
- }
-
- private boolean autoscalerDisabled(FlinkResourceContext<CR> ctx) {
- return ctx.getResource().getSpec().getJob() == null
- || !ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
+ private void applyAutoscaler(FlinkResourceContext<CR> ctx) throws Exception {
+ var autoScalerCtx = ctx.getJobAutoScalerContext();
+ boolean autoscalerEnabled =
+ ctx.getResource().getSpec().getJob() != null
+ && ctx.getObserveConfig().getBoolean(AUTOSCALER_ENABLED);
+ autoScalerCtx.getConfiguration().set(AUTOSCALER_ENABLED, autoscalerEnabled);
+ autoscaler.scale(autoScalerCtx);
}
private void triggerSpecChangeEvent(CR cr, DiffResult<SPEC> specDiff, KubernetesClient client) {
diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
index beba19d..429caaa 100644
--- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
+++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/ReconcilerFactory.java
@@ -23,7 +23,6 @@
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
-import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
@@ -37,7 +36,6 @@
/** The factory to create reconciler based on app mode. */
public class ReconcilerFactory {
- private final FlinkConfigManager configManager;
private final EventRecorder eventRecorder;
private final StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder;
private final JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler;
@@ -45,11 +43,9 @@
reconcilerMap;
public ReconcilerFactory(
- FlinkConfigManager configManager,
EventRecorder eventRecorder,
StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> deploymentStatusRecorder,
JobAutoScaler<ResourceID, KubernetesJobAutoScalerContext> autoscaler) {
- this.configManager = configManager;
this.eventRecorder = eventRecorder;
this.deploymentStatusRecorder = deploymentStatusRecorder;
this.autoscaler = autoscaler;
diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
index b174a29..3d241b6 100644
--- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
+++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/controller/TestingFlinkDeploymentController.java
@@ -92,7 +92,6 @@
statusRecorder = new StatusRecorder<>(new MetricManager<>(), statusUpdateCounter);
reconcilerFactory =
new ReconcilerFactory(
- configManager,
eventRecorder,
statusRecorder,
AutoscalerFactory.create(