[fix][broker] usedLocallySinceLastReport should always be reset (#22672)
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
(cherry picked from commit 8f015d89e5d246325ae5cada02c4af3017a97ed9)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
index c61a8e5..96a3a9d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroup.java
@@ -454,14 +454,13 @@
bytesUsed = monEntity.usedLocallySinceLastReport.bytes;
messagesUsed = monEntity.usedLocallySinceLastReport.messages;
-
+ monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
if (sendReport) {
p.setBytesPerPeriod(bytesUsed);
p.setMessagesPerPeriod(messagesUsed);
monEntity.lastReportedValues.bytes = bytesUsed;
monEntity.lastReportedValues.messages = messagesUsed;
monEntity.numSuppressedUsageReports = 0;
- monEntity.usedLocallySinceLastReport.bytes = monEntity.usedLocallySinceLastReport.messages = 0;
monEntity.totalUsedLocally.bytes += bytesUsed;
monEntity.totalUsedLocally.messages += messagesUsed;
monEntity.lastResourceUsageFillTimeMSecsSinceEpoch = System.currentTimeMillis();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java
index 658b7c9..139d198 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupReportLocalUsageTest.java
@@ -72,34 +72,50 @@
rgConfig.setPublishRateInMsgs(2000);
service.resourceGroupCreate(rgName, rgConfig);
- org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
BytesAndMessagesCount bytesAndMessagesCount = new BytesAndMessagesCount();
bytesAndMessagesCount.bytes = 20;
bytesAndMessagesCount.messages = 10;
- resourceGroup.incrementLocalUsageStats(ResourceGroupMonitoringClass.Publish, bytesAndMessagesCount);
+
+ org.apache.pulsar.broker.resourcegroup.ResourceGroup resourceGroup = service.resourceGroupGet(rgName);
+ for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
+ resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount);
+ }
+
+ // Case1: Suppress report ResourceUsage.
+ needReport.set(false);
ResourceUsage resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertFalse(resourceUsage.hasDispatch());
assertFalse(resourceUsage.hasPublish());
+ for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
+ PerMonitoringClassFields monitoredEntity =
+ resourceGroup.getMonitoredEntity(value);
+ assertEquals(monitoredEntity.usedLocallySinceLastReport.messages, 0);
+ assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0);
+ assertEquals(monitoredEntity.totalUsedLocally.messages, 0);
+ assertEquals(monitoredEntity.totalUsedLocally.bytes, 0);
+ assertEquals(monitoredEntity.lastReportedValues.messages, 0);
+ assertEquals(monitoredEntity.lastReportedValues.bytes, 0);
+ }
- PerMonitoringClassFields publishMonitoredEntity =
- resourceGroup.getMonitoredEntity(ResourceGroupMonitoringClass.Publish);
- assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, bytesAndMessagesCount.messages);
- assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, bytesAndMessagesCount.bytes);
- assertEquals(publishMonitoredEntity.totalUsedLocally.messages, 0);
- assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, 0);
- assertEquals(publishMonitoredEntity.lastReportedValues.messages, 0);
- assertEquals(publishMonitoredEntity.lastReportedValues.bytes, 0);
-
+ // Case2: Report ResourceUsage.
+ for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
+ resourceGroup.incrementLocalUsageStats(value, bytesAndMessagesCount);
+ }
needReport.set(true);
+ resourceUsage = new ResourceUsage();
resourceGroup.rgFillResourceUsage(resourceUsage);
assertTrue(resourceUsage.hasDispatch());
assertTrue(resourceUsage.hasPublish());
- assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.messages, 0);
- assertEquals(publishMonitoredEntity.usedLocallySinceLastReport.bytes, 0);
- assertEquals(publishMonitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
- assertEquals(publishMonitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
- assertEquals(publishMonitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
- assertEquals(publishMonitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
+ for (ResourceGroupMonitoringClass value : ResourceGroupMonitoringClass.values()) {
+ PerMonitoringClassFields monitoredEntity =
+ resourceGroup.getMonitoredEntity(value);
+ assertEquals(monitoredEntity.usedLocallySinceLastReport.messages, 0);
+ assertEquals(monitoredEntity.usedLocallySinceLastReport.bytes, 0);
+ assertEquals(monitoredEntity.totalUsedLocally.messages, bytesAndMessagesCount.messages);
+ assertEquals(monitoredEntity.totalUsedLocally.bytes, bytesAndMessagesCount.bytes);
+ assertEquals(monitoredEntity.lastReportedValues.messages, bytesAndMessagesCount.messages);
+ assertEquals(monitoredEntity.lastReportedValues.bytes, bytesAndMessagesCount.bytes);
+ }
}
}
\ No newline at end of file