[hotfix] Fix incorrect `messageKey` passed in `ScalingLimited` event
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 87099bc..84520d0 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -464,7 +464,7 @@
                 AutoScalerEventHandler.Type.Warning,
                 SCALING_LIMITED,
                 message,
-                SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
+                SCALING_LIMITED + vertex + newParallelism,
                 context.getConfiguration().get(SCALING_EVENT_INTERVAL));
         return p;
     }
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 704ddd5..6b6c3b1 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -1114,6 +1114,30 @@
                 .isEqualTo(
                         String.format(
                                 SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1));
+        // small changes for scaleFactor, verify that the event messageKey is the same.
+        var smallChangesForScaleFactor = evaluated(10, 199, 100);
+        smallChangesForScaleFactor.put(
+                ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15));
+        assertEquals(
+                ParallelismChange.required(15),
+                vertexScaler.computeScaleTargetParallelism(
+                        context,
+                        jobVertexID,
+                        List.of(),
+                        smallChangesForScaleFactor,
+                        history,
+                        restartTime,
+                        delayedScaleDown));
+        assertEquals(1, eventCollector.events.size());
+        TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>>
+                smallChangesForScaleFactorLimitedEvent = eventCollector.events.poll();
+        assertThat(partitionLimitedEvent.getMessage())
+                .isEqualTo(
+                        String.format(
+                                SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1));
+        assertThat(smallChangesForScaleFactorLimitedEvent).isNotNull();
+        assertThat(partitionLimitedEvent.getMessageKey())
+                .isEqualTo(smallChangesForScaleFactorLimitedEvent.getMessageKey());
     }
 
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(