Make LoadBalancer's ResourceUsage class immutable (#13639)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
index 15b6619..0c8f8a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
@@ -23,7 +23,6 @@
import com.beust.jcommander.internal.Lists;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.FastThreadLocal;
-import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.HashSet;
@@ -46,6 +45,7 @@
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -211,19 +211,19 @@
}
// Get the system resource usage for this broker.
- public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) throws IOException {
+ public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) {
SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage();
// Override System memory usage and limit with JVM heap usage and limit
- long maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
- long memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
- systemResourceUsage.memory.usage = (double) memoryUsageInBytes / MIBI;
- systemResourceUsage.memory.limit = (double) maxHeapMemoryInBytes / MIBI;
+ double maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
+ double memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+ double memoryUsage = memoryUsageInBytes / MIBI;
+ double memoryLimit = maxHeapMemoryInBytes / MIBI;
+ systemResourceUsage.setMemory(new ResourceUsage(memoryUsage, memoryLimit));
// Collect JVM direct memory
- systemResourceUsage.directMemory.usage = (double) (getJvmDirectMemoryUsed() / MIBI);
- systemResourceUsage.directMemory.limit =
- (double) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / MIBI);
+ systemResourceUsage.setDirectMemory(new ResourceUsage((double) (getJvmDirectMemoryUsed() / MIBI),
+ (double) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / MIBI)));
return systemResourceUsage;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
index 017a040..f9ad5cc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java
@@ -30,7 +30,6 @@
import com.google.common.collect.Sets;
import com.google.common.collect.TreeMultimap;
import io.netty.util.concurrent.DefaultThreadFactory;
-import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
@@ -70,6 +69,7 @@
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.policies.data.loadbalancer.ResourceUnitRanking;
+import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage.ResourceType;
import org.slf4j.Logger;
@@ -1026,10 +1026,11 @@
}
}
- public SystemResourceUsage getSystemResourceUsage() throws IOException {
+ public SystemResourceUsage getSystemResourceUsage() {
SystemResourceUsage systemResourceUsage = LoadManagerShared.getSystemResourceUsage(brokerHostUsage);
long memoryUsageInMBytes = getAverageJvmHeapUsageMBytes();
- systemResourceUsage.memory.usage = (double) memoryUsageInMBytes;
+ systemResourceUsage
+ .setMemory(new ResourceUsage((double) memoryUsageInMBytes, systemResourceUsage.memory.limit));
return systemResourceUsage;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
index d09158e..aa2528b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/ModularLoadManagerImplTest.java
@@ -342,7 +342,7 @@
// 80% is below overload threshold: verify nothing is unloaded.
verify(namespacesSpy1, Mockito.times(0)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
- localBrokerData.getCpu().usage = 90;
+ localBrokerData.setCpu(new ResourceUsage(90, 100));
primaryLoadManager.doLoadShedding();
// Most expensive bundle will be unloaded.
verify(namespacesSpy1, Mockito.times(1)).unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString());
@@ -412,17 +412,13 @@
assert (!needUpdate.get());
// Minimally test other absolute values to ensure they are included.
- lastData.getCpu().usage = 100;
- lastData.getCpu().limit = 1000;
- currentData.getCpu().usage = 106;
- currentData.getCpu().limit = 1000;
+ lastData.setCpu(new ResourceUsage(100, 1000));
+ currentData.setCpu(new ResourceUsage(106, 1000));
assert (!needUpdate.get());
// Minimally test other absolute values to ensure they are included.
- lastData.getCpu().usage = 100;
- lastData.getCpu().limit = 1000;
- currentData.getCpu().usage = 206;
- currentData.getCpu().limit = 1000;
+ lastData.setCpu(new ResourceUsage(100, 1000));
+ currentData.setCpu(new ResourceUsage(206, 1000));
assert (needUpdate.get());
lastData.setCpu(new ResourceUsage());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
index 021326e..e972a52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/SimpleLoadManagerImplTest.java
@@ -473,8 +473,6 @@
double usageLimit = 10.0;
usage.setBandwidthIn(new ResourceUsage(usageLimit, usageLimit));
assertEquals(usage.getBandwidthIn().usage, usageLimit);
- usage.reset();
- assertNotEquals(usage.getBandwidthIn().usage, usageLimit);
}
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
index 752f0f0..55093d3 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUsage.java
@@ -21,29 +21,27 @@
import lombok.EqualsAndHashCode;
/**
- * POJO used to represents any system specific resource usage this is the format that load manager expects it in.
+ * POJO used to represent any system specific resource usage this is the format that load manager expects it in.
*/
@EqualsAndHashCode
public class ResourceUsage {
- public double usage;
- public double limit;
+ public final double usage;
+ public final double limit;
+ @EqualsAndHashCode.Exclude
+ private final float percentUsage;
public ResourceUsage(double usage, double limit) {
this.usage = usage;
this.limit = limit;
- }
-
- public ResourceUsage(ResourceUsage that) {
- this.usage = that.usage;
- this.limit = that.limit;
+ float proportion = 0;
+ if (limit > 0) {
+ proportion = ((float) usage) / ((float) limit);
+ }
+ percentUsage = proportion * 100;
}
public ResourceUsage() {
- }
-
- public void reset() {
- this.usage = -1;
- this.limit = -1;
+ this(0, 0);
}
/**
@@ -59,10 +57,6 @@
}
public float percentUsage() {
- float proportion = 0;
- if (limit > 0) {
- proportion = ((float) usage) / ((float) limit);
- }
- return proportion * 100;
+ return percentUsage;
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
index 08abcbc..e63de9b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/LocalBrokerData.java
@@ -181,11 +181,11 @@
// Update resource usage given each individual usage.
private void updateSystemResourceUsage(final ResourceUsage cpu, final ResourceUsage memory,
final ResourceUsage directMemory, final ResourceUsage bandwidthIn, final ResourceUsage bandwidthOut) {
- this.cpu = new ResourceUsage(cpu);
- this.memory = new ResourceUsage(memory);
- this.directMemory = new ResourceUsage(directMemory);
- this.bandwidthIn = new ResourceUsage(bandwidthIn);
- this.bandwidthOut = new ResourceUsage(bandwidthOut);
+ this.cpu = cpu;
+ this.memory = memory;
+ this.directMemory = directMemory;
+ this.bandwidthIn = bandwidthIn;
+ this.bandwidthOut = bandwidthOut;
}
// Aggregate all message, throughput, topic count, bundle count, consumer
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/SystemResourceUsage.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/SystemResourceUsage.java
index 168bbca..6b7ef1f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/SystemResourceUsage.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/SystemResourceUsage.java
@@ -44,14 +44,6 @@
directMemory = new ResourceUsage(-1, -1);
}
- public void reset() {
- bandwidthIn.reset();
- bandwidthOut.reset();
- cpu.reset();
- memory.reset();
- directMemory.reset();
- }
-
public ResourceUsage getBandwidthIn() {
return bandwidthIn;
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/lookup/data/LookupDataTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/lookup/data/LookupDataTest.java
index ef2ad90..9023ba5 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/lookup/data/LookupDataTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/lookup/data/LookupDataTest.java
@@ -79,9 +79,8 @@
final String simpleLmReportName = "simpleLoadManager";
final String modularLmBrokerUrl = "modular";
final SystemResourceUsage simpleLmSystemResourceUsage = new SystemResourceUsage();
- final ResourceUsage resource = new ResourceUsage();
final double usage = 55.0;
- resource.usage = usage;
+ final ResourceUsage resource = new ResourceUsage(usage, 0);
simpleLmSystemResourceUsage.bandwidthIn = resource;
LoadReport simpleReport = getSimpleLoadManagerLoadReport(simpleLmBrokerUrl, simpleLmReportName,