[FLINK-31936] Support setting scale up max factor

diff --git a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
index e97f938..da3c5b9 100644
--- a/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
+++ b/docs/layouts/shortcodes/generated/auto_scaler_configuration.html
@@ -69,6 +69,12 @@
             <td>Duration in which no scale down of a vertex is allowed after it has been scaled up.</td>
         </tr>
         <tr>
+            <td><h5>kubernetes.operator.job.autoscaler.scale-up.max-factor</h5></td>
+            <td style="word-wrap: break-word;">2.147483647E9</td>
+            <td>Double</td>
+            <td>Max scale up factor. 2.0 means job can only be scaled up with 200% of the current parallelism.</td>
+        </tr>
+        <tr>
             <td><h5>kubernetes.operator.job.autoscaler.scaling.effectiveness.detection.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
             <td>Boolean</td>
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
index f0d6a18..90d46a8 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java
@@ -38,6 +38,7 @@
 import java.util.SortedMap;
 
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_DOWN_FACTOR;
+import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.MAX_SCALE_UP_FACTOR;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.SCALE_UP_GRACE_PERIOD;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions.VERTEX_MAX_PARALLELISM;
@@ -93,6 +94,7 @@
         LOG.debug("Target processing capacity for {} is {}", vertex, targetCapacity);
         double scaleFactor = targetCapacity / averageTrueProcessingRate;
         double minScaleFactor = 1 - conf.get(MAX_SCALE_DOWN_FACTOR);
+        double maxScaleFactor = 1 + conf.get(MAX_SCALE_UP_FACTOR);
         if (scaleFactor < minScaleFactor) {
             LOG.debug(
                     "Computed scale factor of {} for {} is capped by maximum scale down factor to {}",
@@ -100,8 +102,19 @@
                     vertex,
                     minScaleFactor);
             scaleFactor = minScaleFactor;
+        } else if (scaleFactor > maxScaleFactor) {
+            LOG.debug(
+                    "Computed scale factor of {} for {} is capped by maximum scale up factor to {}",
+                    scaleFactor,
+                    vertex,
+                    maxScaleFactor);
+            scaleFactor = maxScaleFactor;
         }
 
+        // Cap target capacity according to the capped scale factor
+        double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
+        LOG.debug("Capped target processing capacity for {} is {}", vertex, cappedTargetCapacity);
+
         int newParallelism =
                 scale(
                         currentParallelism,
@@ -124,7 +137,8 @@
 
         // We record our expectations for this scaling operation
         evaluatedMetrics.put(
-                ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(targetCapacity));
+                ScalingMetric.EXPECTED_PROCESSING_RATE,
+                EvaluatedScalingMetric.of(cappedTargetCapacity));
         return newParallelism;
     }
 
diff --git a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
index df32aa5..4f21fb9 100644
--- a/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
+++ b/flink-kubernetes-operator-autoscaler/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/config/AutoScalerOptions.java
@@ -99,6 +99,13 @@
                     .withDescription(
                             "Max scale down factor. 1 means no limit on scale down, 0.6 means job can only be scaled down with 60% of the original parallelism.");
 
+    public static final ConfigOption<Double> MAX_SCALE_UP_FACTOR =
+            autoScalerConfig("scale-up.max-factor")
+                    .doubleType()
+                    .defaultValue((double) Integer.MAX_VALUE)
+                    .withDescription(
+                            "Max scale up factor. 2.0 means job can only be scaled up with 200% of the current parallelism.");
+
     public static final ConfigOption<Duration> CATCH_UP_DURATION =
             autoScalerConfig("catch-up.duration")
                     .durationType()
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
index 7df42c6..6809dd8 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java
@@ -100,6 +100,7 @@
         defaultConf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2));
         defaultConf.set(AutoScalerOptions.SCALING_ENABLED, true);
         defaultConf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        defaultConf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.8);
         defaultConf.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, 0.1);
         defaultConf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
index d4c3d6a..7f50fac 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java
@@ -66,6 +66,7 @@
         vertexScaler = new JobVertexScaler(new EventRecorder(kubernetesClient, eventCollector));
         conf = new Configuration();
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
     }
 
@@ -125,6 +126,19 @@
                 4,
                 vertexScaler.computeScaleTargetParallelism(
                         flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.5);
+        assertEquals(
+                15,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep, conf, op, evaluated(10, 200, 10), Collections.emptySortedMap()));
+
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, 0.6);
+        assertEquals(
+                16,
+                vertexScaler.computeScaleTargetParallelism(
+                        flinkDep, conf, op, evaluated(10, 200, 10), Collections.emptySortedMap()));
     }
 
     @Test
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
index 88ae09b..2734151 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -119,6 +119,7 @@
         conf.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
         conf.set(AutoScalerOptions.SCALING_ENABLED, true);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         ReconciliationUtils.updateStatusForDeployedSpec(app, conf);
         clock = Clock.fixed(Instant.ofEpochSecond(0), ZoneId.systemDefault());
         metricsCollector.setClock(clock);
diff --git a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
index 0bff735..71103e0 100644
--- a/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
+++ b/flink-kubernetes-operator-autoscaler/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java
@@ -74,6 +74,7 @@
         conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
         conf.set(AutoScalerOptions.SCALING_ENABLED, true);
         conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.);
+        conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, (double) Integer.MAX_VALUE);
         conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
 
         flinkDep = TestUtils.buildApplicationCluster();