[hotfix] Fix incorrect `messageKey` passed in `ScalingLimited` event
diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 87099bc..84520d0 100644
--- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -464,7 +464,7 @@
AutoScalerEventHandler.Type.Warning,
SCALING_LIMITED,
message,
- SCALING_LIMITED + vertex + (scaleFactor * currentParallelism),
+ SCALING_LIMITED + vertex + newParallelism,
context.getConfiguration().get(SCALING_EVENT_INTERVAL));
return p;
}
diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 704ddd5..6b6c3b1 100644
--- a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -1114,6 +1114,30 @@
.isEqualTo(
String.format(
SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1));
+ // small changes for scaleFactor, verify that the event messageKey is the same.
+ var smallChangesForScaleFactor = evaluated(10, 199, 100);
+ smallChangesForScaleFactor.put(
+ ScalingMetric.NUM_SOURCE_PARTITIONS, EvaluatedScalingMetric.of(15));
+ assertEquals(
+ ParallelismChange.required(15),
+ vertexScaler.computeScaleTargetParallelism(
+ context,
+ jobVertexID,
+ List.of(),
+ smallChangesForScaleFactor,
+ history,
+ restartTime,
+ delayedScaleDown));
+ assertEquals(1, eventCollector.events.size());
+ TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>>
+ smallChangesForScaleFactorLimitedEvent = eventCollector.events.poll();
+ assertThat(partitionLimitedEvent.getMessage())
+ .isEqualTo(
+ String.format(
+ SCALE_LIMITED_MESSAGE_FORMAT, jobVertexID, 20, 15, 15, 200, 1));
+ assertThat(smallChangesForScaleFactorLimitedEvent).isNotNull();
+ assertThat(partitionLimitedEvent.getMessageKey())
+ .isEqualTo(smallChangesForScaleFactorLimitedEvent.getMessageKey());
}
private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(