Add instance capacity gauge (#557)
We need to monitor instance utilization in purpose of understanding what the instance capacity is.
Change list:
- Change instance monitor to update capacity
- Change getAttribute to throw AttributeNotFoundException in DynamicMBeanProvider
- Combine max usage and instance capacity update into one method in cluster status monitor
- Add unit test
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 9ad8fcf..727804b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -249,7 +248,7 @@
// Only use the resources in ideal states to parse all replicas.
Map<String, IdealState> idealStateMap = dataProvider.getIdealStates();
Map<String, Resource> resourceToMonitorMap = resourceMap.entrySet().stream()
- .filter(resourceName -> idealStateMap.containsKey(resourceName))
+ .filter(idealStateMap::containsKey)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Map<String, ResourceAssignment> currentStateAssignment =
@@ -257,16 +256,16 @@
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelFromExistingAssignment(
dataProvider, resourceToMonitorMap, currentStateAssignment);
- Map<String, Double> maxUsageMap = new HashMap<>();
for (AssignableNode node : clusterModel.getAssignableNodes().values()) {
String instanceName = node.getInstanceName();
+ // There is no new usage adding to this node, so an empty map is passed in.
double usage = node.getProjectedHighestUtilization(Collections.emptyMap());
- maxUsageMap.put(instanceName, usage);
+ clusterStatusMonitor
+ .updateInstanceCapacityStatus(instanceName, usage, node.getMaxCapacity());
}
-
- clusterStatusMonitor.updateInstanceMaxUsage(maxUsageMap);
} catch (Exception ex) {
- LOG.error("Failed to report instance capacity metrics.", ex);
+ LOG.error("Failed to report instance capacity metrics. Exception message: {}",
+ ex.getMessage());
}
return null;
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 15fdbcd..96f85bf 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -367,24 +367,25 @@
}
/**
- * Update max capacity usage for per instance. Before calling this API, we assume the instance
- * monitors are already registered in ReadClusterDataStage. If the monitor is not registered, this
- * max usage update will fail.
+ * Updates instance capacity status for per instance, including max usage and capacity of each
+ * capacity key. Before calling this API, we assume the instance monitors are already registered
+ * in ReadClusterDataStage. If the monitor is not registered, this instance capacity status update
+ * will fail.
*
- * @param maxUsageMap a map of max capacity usage, {instance: maxCapacityUsage}
+ * @param instanceName This instance name
+ * @param maxUsage Max capacity usage of this instance
+ * @param capacityMap A map of this instance capacity, {capacity key: capacity value}
*/
- public void updateInstanceMaxUsage(Map<String, Double> maxUsageMap) {
- synchronized (_instanceMonitorMap) {
- for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
- InstanceMonitor monitor = _instanceMonitorMap.get(entry.getKey());
- if (monitor == null) {
- LOG.warn("Failed to update max usage because instance monitor is not found, instance: {}.",
- entry.getKey());
- continue;
- }
- monitor.updateMaxCapacityUsage(entry.getValue());
- }
+ public void updateInstanceCapacityStatus(String instanceName, double maxUsage,
+ Map<String, Integer> capacityMap) {
+ InstanceMonitor monitor = _instanceMonitorMap.get(instanceName);
+ if (monitor == null) {
+ LOG.warn("Failed to update instance capacity status because instance monitor is not found, "
+ + "instance: {}.", instanceName);
+ return;
}
+ monitor.updateMaxCapacityUsage(maxUsage);
+ monitor.updateCapacity(capacityMap);
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index b93d3b9..e0c0f89 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.management.JMException;
import javax.management.ObjectName;
@@ -41,7 +42,7 @@
/**
* Metric names for instance capacity.
*/
- public enum InstanceMonitorMetrics {
+ public enum InstanceMonitorMetric {
// TODO: change the metric names with Counter and Gauge suffix and deprecate old names.
TOTAL_MESSAGE_RECEIVED_COUNTER("TotalMessageReceived"),
ENABLED_STATUS_GAUGE("Enabled"),
@@ -49,9 +50,9 @@
DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
- private String metricName;
+ private final String metricName;
- InstanceMonitorMetrics(String name) {
+ InstanceMonitorMetric(String name) {
metricName = name;
}
@@ -75,6 +76,9 @@
private SimpleDynamicMetric<Long> _onlineStatusGauge;
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
+ // A map of dynamic capacity Gauges. The map's keys could change.
+ private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;
+
/**
* Initialize the bean
* @param clusterName the cluster to monitor
@@ -85,26 +89,41 @@
_participantName = participantName;
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
_initObjectName = objectName;
+ _dynamicCapacityMetricsMap = new ConcurrentHashMap<>();
createMetrics();
}
private void createMetrics() {
_totalMessagedReceivedCounter = new SimpleDynamicMetric<>(
- InstanceMonitorMetrics.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
+ InstanceMonitorMetric.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
_disabledPartitionsGauge =
- new SimpleDynamicMetric<>(InstanceMonitorMetrics.DISABLED_PARTITIONS_GAUGE.metricName(),
+ new SimpleDynamicMetric<>(InstanceMonitorMetric.DISABLED_PARTITIONS_GAUGE.metricName(),
0L);
_enabledStatusGauge =
- new SimpleDynamicMetric<>(InstanceMonitorMetrics.ENABLED_STATUS_GAUGE.metricName(), 0L);
+ new SimpleDynamicMetric<>(InstanceMonitorMetric.ENABLED_STATUS_GAUGE.metricName(), 0L);
_onlineStatusGauge =
- new SimpleDynamicMetric<>(InstanceMonitorMetrics.ONLINE_STATUS_GAUGE.metricName(), 0L);
+ new SimpleDynamicMetric<>(InstanceMonitorMetric.ONLINE_STATUS_GAUGE.metricName(), 0L);
_maxCapacityUsageGauge =
- new SimpleDynamicMetric<>(InstanceMonitorMetrics.MAX_CAPACITY_USAGE_GAUGE.metricName(),
+ new SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
0.0d);
}
+ private List<DynamicMetric<?, ?>> buildAttributeList() {
+ List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
+ _totalMessagedReceivedCounter,
+ _disabledPartitionsGauge,
+ _enabledStatusGauge,
+ _onlineStatusGauge,
+ _maxCapacityUsageGauge
+ );
+
+ attributeList.addAll(_dynamicCapacityMetricsMap.values());
+
+ return attributeList;
+ }
+
@Override
public String getSensorName() {
return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
@@ -183,7 +202,7 @@
}
/**
- * Update max capacity usage for this instance.
+ * Updates max capacity usage for this instance.
* @param maxUsage max capacity usage of this instance
*/
public synchronized void updateMaxCapacityUsage(double maxUsage) {
@@ -191,25 +210,50 @@
}
/**
- * Get max capacity usage of this instance.
+ * Gets max capacity usage of this instance.
* @return Max capacity usage of this instance.
*/
protected synchronized double getMaxCapacityUsageGauge() {
return _maxCapacityUsageGauge.getValue();
}
- @Override
- public DynamicMBeanProvider register()
- throws JMException {
- List<DynamicMetric<?, ?>> attributeList = ImmutableList.of(
- _totalMessagedReceivedCounter,
- _disabledPartitionsGauge,
- _enabledStatusGauge,
- _onlineStatusGauge,
- _maxCapacityUsageGauge
- );
+ /**
+ * Updates instance capacity metrics.
+ * @param capacity A map of instance capacity.
+ */
+ public void updateCapacity(Map<String, Integer> capacity) {
+ synchronized (_dynamicCapacityMetricsMap) {
+ // If capacity keys don't have any change, we just update the metric values.
+ if (_dynamicCapacityMetricsMap.keySet().equals(capacity.keySet())) {
+ for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
+ _dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
+ }
+ return;
+ }
- doRegister(attributeList, _initObjectName);
+ // If capacity keys have any changes, we need to retain the capacity metrics.
+ // Make sure capacity metrics map has the same capacity keys.
+ // And update metrics values.
+ _dynamicCapacityMetricsMap.keySet().retainAll(capacity.keySet());
+ for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
+ String capacityName = entry.getKey();
+ if (_dynamicCapacityMetricsMap.containsKey(capacityName)) {
+ _dynamicCapacityMetricsMap.get(capacityName).updateValue((long) entry.getValue());
+ } else {
+ _dynamicCapacityMetricsMap.put(capacityName,
+ new SimpleDynamicMetric<>(capacityName + "Gauge", (long) entry.getValue()));
+ }
+ }
+ }
+
+ // Update MBean's all attributes.
+ updateAttributesInfo(buildAttributeList(),
+ "Instance monitor for instance: " + getInstanceName());
+ }
+
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ doRegister(buildAttributeList(), _initObjectName);
return this;
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
index 0ce0b44..407a714 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/dynamicMBeans/DynamicMBeanProvider.java
@@ -22,23 +22,19 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
-import javax.management.InvalidAttributeValueException;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanConstructorInfo;
-import javax.management.MBeanException;
import javax.management.MBeanInfo;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanOperationInfo;
import javax.management.ObjectName;
-import javax.management.ReflectionException;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.monitoring.SensorNameProvider;
@@ -53,12 +49,12 @@
public abstract class DynamicMBeanProvider implements DynamicMBean, SensorNameProvider {
protected final Logger _logger = LoggerFactory.getLogger(getClass());
protected static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // Reset time every hour
- private static String SENSOR_NAME_TAG = "SensorName";
- private static String DEFAULT_DESCRIPTION =
+ private static final String SENSOR_NAME_TAG = "SensorName";
+ private static final String DEFAULT_DESCRIPTION =
"Information on the management interface of the MBean";
// Attribute name to the DynamicMetric object mapping
- private final Map<String, DynamicMetric> _attributeMap = new HashMap<>();
+ private Map<String, DynamicMetric> _attributeMap = new HashMap<>();
private ObjectName _objectName = null;
private MBeanInfo _mBeanInfo;
@@ -88,7 +84,7 @@
objectName.getCanonicalName());
return false;
}
- updateAttributtInfos(dynamicMetrics, description);
+ updateAttributesInfo(dynamicMetrics, description);
_objectName = MBeanRegistrar.register(this, objectName);
return true;
}
@@ -99,26 +95,30 @@
}
/**
- * Update the Dynamic MBean provider with new metric list.
+ * Updates the Dynamic MBean provider with new metric list.
+ * If the pass-in metrics collection is empty, the original attributes will be removed.
+ *
* @param description description of the MBean
- * @param dynamicMetrics the DynamicMetrics
+ * @param dynamicMetrics the DynamicMetrics. Empty collection will remove the metric attributes.
*/
- private void updateAttributtInfos(Collection<DynamicMetric<?, ?>> dynamicMetrics,
+ protected void updateAttributesInfo(Collection<DynamicMetric<?, ?>> dynamicMetrics,
String description) {
- _attributeMap.clear();
+ if (dynamicMetrics == null) {
+ _logger.warn("Cannot update attributes info because dynamicMetrics is null.");
+ return;
+ }
- // get all attributes that can be emit by the dynamicMetrics.
List<MBeanAttributeInfo> attributeInfoList = new ArrayList<>();
- if (dynamicMetrics != null) {
- for (DynamicMetric dynamicMetric : dynamicMetrics) {
- Iterator<MBeanAttributeInfo> iter = dynamicMetric.getAttributeInfos().iterator();
- while (iter.hasNext()) {
- MBeanAttributeInfo attributeInfo = iter.next();
- // Info list to create MBean info
- attributeInfoList.add(attributeInfo);
- // Attribute mapping for getting attribute value when getAttribute() is called
- _attributeMap.put(attributeInfo.getName(), dynamicMetric);
- }
+ // Use a new attribute map to avoid concurrency issue.
+ Map<String, DynamicMetric> newAttributeMap = new HashMap<>();
+
+ // Get all attributes that can be emitted by the dynamicMetrics.
+ for (DynamicMetric<?, ?> dynamicMetric : dynamicMetrics) {
+ for (MBeanAttributeInfo attributeInfo : dynamicMetric.getAttributeInfos()) {
+ // Info list to create MBean info
+ attributeInfoList.add(attributeInfo);
+ // Attribute mapping for getting attribute value when getAttribute() is called
+ newAttributeMap.put(attributeInfo.getName(), dynamicMetric);
}
}
@@ -130,17 +130,19 @@
String.format("Default %s Constructor", getClass().getSimpleName()),
getClass().getConstructors()[0]);
- MBeanAttributeInfo[] attributeInfos = new MBeanAttributeInfo[attributeInfoList.size()];
- attributeInfos = attributeInfoList.toArray(attributeInfos);
+ MBeanAttributeInfo[] attributesInfo = new MBeanAttributeInfo[attributeInfoList.size()];
+ attributesInfo = attributeInfoList.toArray(attributesInfo);
if (description == null) {
description = DEFAULT_DESCRIPTION;
}
- _mBeanInfo = new MBeanInfo(getClass().getName(), description, attributeInfos,
- new MBeanConstructorInfo[] {
- constructorInfo
- }, new MBeanOperationInfo[0], new MBeanNotificationInfo[0]);
+ _mBeanInfo = new MBeanInfo(getClass().getName(), description, attributesInfo,
+ new MBeanConstructorInfo[]{constructorInfo}, new MBeanOperationInfo[0],
+ new MBeanNotificationInfo[0]);
+
+ // Update _attributeMap reference.
+ _attributeMap = newAttributeMap;
}
/**
@@ -158,17 +160,17 @@
}
@Override
- public Object getAttribute(String attribute)
- throws AttributeNotFoundException, MBeanException, ReflectionException {
+ public Object getAttribute(String attribute) throws AttributeNotFoundException {
if (SENSOR_NAME_TAG.equals(attribute)) {
return getSensorName();
}
- if (!_attributeMap.containsKey(attribute)) {
- return null;
+ DynamicMetric metric = _attributeMap.get(attribute);
+ if (metric == null) {
+ throw new AttributeNotFoundException("Attribute[" + attribute + "] is not found.");
}
- return _attributeMap.get(attribute).getAttributeValue(attribute);
+ return metric.getAttributeValue(attribute);
}
@Override
@@ -178,7 +180,7 @@
try {
Object value = getAttribute(attributeName);
attributeList.add(new Attribute(attributeName, value));
- } catch (AttributeNotFoundException | MBeanException | ReflectionException ex) {
+ } catch (AttributeNotFoundException ex) {
_logger.error("Failed to get attribute: " + attributeName, ex);
}
}
@@ -191,8 +193,7 @@
}
@Override
- public void setAttribute(Attribute attribute) throws AttributeNotFoundException,
- InvalidAttributeValueException, MBeanException, ReflectionException {
+ public void setAttribute(Attribute attribute) {
// All MBeans are readonly
return;
}
@@ -204,8 +205,7 @@
}
@Override
- public Object invoke(String actionName, Object[] params, String[] signature)
- throws MBeanException, ReflectionException {
+ public Object invoke(String actionName, Object[] params, String[] signature) {
// No operation supported
return null;
}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
index a9e6c24..f4ba01f 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestClusterStatusMonitor.java
@@ -39,6 +39,7 @@
import javax.management.ReflectionException;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.helix.TestHelper;
import org.apache.helix.ZNRecord;
@@ -62,8 +63,7 @@
private String testDB_0 = testDB + "_0";
@Test()
- public void testReportData()
- throws Exception {
+ public void testReportData() throws Exception {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
@@ -166,8 +166,7 @@
@Test
- public void testResourceAggregation()
- throws JMException, IOException {
+ public void testResourceAggregation() throws JMException, IOException {
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String clusterName = className + "_" + methodName;
@@ -315,14 +314,20 @@
}
@Test
- public void testUpdateMaxCapacityUsage()
+ public void testUpdateInstanceCapacityStatus()
throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,
ReflectionException, InstanceNotFoundException {
String clusterName = "testCluster";
List<Double> maxUsageList = ImmutableList.of(0.0d, 0.32d, 0.85d, 1.0d, 0.50d, 0.75d);
Map<String, Double> maxUsageMap = new HashMap<>();
+ Map<String, Map<String, Integer>> instanceCapacityMap = new HashMap<>();
+ Random rand = new Random();
+
for (int i = 0; i < maxUsageList.size(); i++) {
- maxUsageMap.put("instance" + i, maxUsageList.get(i));
+ String instanceName = "instance" + i;
+ maxUsageMap.put(instanceName, maxUsageList.get(i));
+ instanceCapacityMap.put(instanceName,
+ ImmutableMap.of("capacity1", rand.nextInt(100), "capacity2", rand.nextInt(100)));
}
// Setup cluster status monitor.
@@ -330,13 +335,15 @@
monitor.active();
ObjectName clusterMonitorObjName = monitor.getObjectName(monitor.clusterBeanName());
+ // Cluster status monitor is registered.
Assert.assertTrue(_server.isRegistered(clusterMonitorObjName));
// Before calling setClusterInstanceStatus, instance monitors are not yet registered.
for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
String instance = entry.getKey();
- String instanceBeanName =
- String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+ String instanceBeanName = String
+ .format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY,
+ instance);
ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
Assert.assertFalse(_server.isRegistered(instanceObjectName));
@@ -346,31 +353,86 @@
monitor.setClusterInstanceStatus(maxUsageMap.keySet(), maxUsageMap.keySet(),
Collections.emptySet(), Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap());
- // Update max usage stats.
- monitor.updateInstanceMaxUsage(maxUsageMap);
- // Verify results.
- for (Map.Entry<String, Double> entry : maxUsageMap.entrySet()) {
- String instance = entry.getKey();
- double usage = entry.getValue();
- String instanceBeanName =
- String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+ // Update instance capacity status.
+ for (Map.Entry<String, Double> usageEntry : maxUsageMap.entrySet()) {
+ String instanceName = usageEntry.getKey();
+ monitor.updateInstanceCapacityStatus(instanceName, usageEntry.getValue(),
+ instanceCapacityMap.get(instanceName));
+ }
+
+ verifyCapacityMetrics(monitor, maxUsageMap, instanceCapacityMap);
+
+ // Change capacity keys: "capacity2" -> "capacity3"
+ for (String instanceName : instanceCapacityMap.keySet()) {
+ instanceCapacityMap.put(instanceName,
+ ImmutableMap.of("capacity1", rand.nextInt(100), "capacity3", rand.nextInt(100)));
+ }
+
+ // Update instance capacity status.
+ for (Map.Entry<String, Double> usageEntry : maxUsageMap.entrySet()) {
+ String instanceName = usageEntry.getKey();
+ monitor.updateInstanceCapacityStatus(instanceName, usageEntry.getValue(),
+ instanceCapacityMap.get(instanceName));
+ }
+
+ // "capacity2" metric should not exist in MBean server.
+ String removedAttribute = "capacity2Gauge";
+ for (Map.Entry<String, Map<String, Integer>> instanceEntry : instanceCapacityMap.entrySet()) {
+ String instance = instanceEntry.getKey();
+ String instanceBeanName = String
+ .format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY,
+ instance);
ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
- Assert.assertTrue(_server.isRegistered(instanceObjectName));
- Assert.assertEquals(_server.getAttribute(instanceObjectName, "MaxCapacityUsageGauge"), usage);
+ try {
+ _server.getAttribute(instanceObjectName, removedAttribute);
+ Assert.fail();
+ } catch (AttributeNotFoundException ex) {
+ // Expected AttributeNotFoundException because "capacity2Gauge" metric does not exist in
+ // MBean server.
+ }
}
+ verifyCapacityMetrics(monitor, maxUsageMap, instanceCapacityMap);
+
// Reset monitor.
monitor.reset();
Assert.assertFalse(_server.isRegistered(clusterMonitorObjName),
"Failed to unregister ClusterStatusMonitor.");
for (String instance : maxUsageMap.keySet()) {
String instanceBeanName =
- String.format("%s,%s=%s", monitor.clusterBeanName(), monitor.INSTANCE_DN_KEY, instance);
+ String.format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY, instance);
ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
Assert.assertFalse(_server.isRegistered(instanceObjectName),
"Failed to unregister instance monitor for instance: " + instance);
}
}
+
+ private void verifyCapacityMetrics(ClusterStatusMonitor monitor, Map<String, Double> maxUsageMap,
+ Map<String, Map<String, Integer>> instanceCapacityMap)
+ throws MalformedObjectNameException, IOException, AttributeNotFoundException, MBeanException,
+ ReflectionException, InstanceNotFoundException {
+ // Verify results.
+ for (Map.Entry<String, Map<String, Integer>> instanceEntry : instanceCapacityMap.entrySet()) {
+ String instance = instanceEntry.getKey();
+ Map<String, Integer> capacityMap = instanceEntry.getValue();
+ String instanceBeanName = String
+ .format("%s,%s=%s", monitor.clusterBeanName(), ClusterStatusMonitor.INSTANCE_DN_KEY,
+ instance);
+ ObjectName instanceObjectName = monitor.getObjectName(instanceBeanName);
+
+ Assert.assertTrue(_server.isRegistered(instanceObjectName));
+ Assert.assertEquals(_server.getAttribute(instanceObjectName,
+ InstanceMonitor.InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName()),
+ maxUsageMap.get(instance));
+
+ for (Map.Entry<String, Integer> capacityEntry : capacityMap.entrySet()) {
+ String capacityKey = capacityEntry.getKey();
+ String attributeName = capacityKey + "Gauge";
+ Assert.assertEquals((long) _server.getAttribute(instanceObjectName, attributeName),
+ (long) instanceCapacityMap.get(instance).get(capacityKey));
+ }
+ }
+ }
}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
index f2b7631..5119d81 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
@@ -3,6 +3,7 @@
import java.lang.management.ManagementFactory;
import java.util.HashSet;
import java.util.Set;
+import javax.management.AttributeNotFoundException;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -82,8 +83,15 @@
Assert.assertEquals((long) _beanServer.getAttribute(name, "EventQueueSizeGauge"), 15);
Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max"), 0);
Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 0);
+
// StatePropagationLatencyGauge only apply for current state
- Assert.assertEquals(_beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max"), null);
+ try {
+ _beanServer.getAttribute(name, "StatePropagationLatencyGauge.Max");
+ Assert.fail();
+ } catch (AttributeNotFoundException ex) {
+ // Expected AttributeNotFoundException because the metric does not exist in
+ // MBean server.
+ }
long startTime = System.currentTimeMillis();
Thread.sleep(5);
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index 22be0a5..2695ee4 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -20,6 +20,7 @@
*/
import java.lang.management.ManagementFactory;
+import javax.management.AttributeNotFoundException;
import javax.management.JMException;
import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
@@ -117,7 +118,13 @@
requestGauge = (long) _beanServer.getAttribute(name, "OutstandingRequestGauge");
Assert.assertEquals(requestGauge, 0);
- Assert.assertNull(_beanServer.getAttribute(name, "PendingCallbackGauge"));
+ try {
+ _beanServer.getAttribute(name, "PendingCallbackGauge");
+ Assert.fail();
+ } catch (AttributeNotFoundException ex) {
+ // Expected AttributeNotFoundException because the metric does not exist in
+ // MBean server.
+ }
monitor.record("TEST/IDEALSTATES/myResource", 0, System.currentTimeMillis() - 10,
ZkClientMonitor.AccessType.READ);