blob: e0c0f89519eb5a22c99a25147ec9fd478495feaf [file] [log] [blame]
package org.apache.helix.monitoring.mbeans;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.JMException;
import javax.management.ObjectName;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
/**
* Implementation of the instance status bean
*/
public class InstanceMonitor extends DynamicMBeanProvider {
/**
* Metric names for instance capacity.
*/
public enum InstanceMonitorMetric {
// TODO: change the metric names with Counter and Gauge suffix and deprecate old names.
TOTAL_MESSAGE_RECEIVED_COUNTER("TotalMessageReceived"),
ENABLED_STATUS_GAUGE("Enabled"),
ONLINE_STATUS_GAUGE("Online"),
DISABLED_PARTITIONS_GAUGE("DisabledPartitions"),
MAX_CAPACITY_USAGE_GAUGE("MaxCapacityUsageGauge");
private final String metricName;
InstanceMonitorMetric(String name) {
metricName = name;
}
public String metricName() {
return metricName;
}
}
private final String _clusterName;
private final String _participantName;
private final ObjectName _initObjectName;
private List<String> _tags;
// Counters
private SimpleDynamicMetric<Long> _totalMessagedReceivedCounter;
// Gauges
private SimpleDynamicMetric<Long> _enabledStatusGauge;
private SimpleDynamicMetric<Long> _disabledPartitionsGauge;
private SimpleDynamicMetric<Long> _onlineStatusGauge;
private SimpleDynamicMetric<Double> _maxCapacityUsageGauge;
// A map of dynamic capacity Gauges. The map's keys could change.
private final Map<String, SimpleDynamicMetric<Long>> _dynamicCapacityMetricsMap;
/**
* Initialize the bean
* @param clusterName the cluster to monitor
* @param participantName the instance whose statistics this holds
*/
public InstanceMonitor(String clusterName, String participantName, ObjectName objectName) {
_clusterName = clusterName;
_participantName = participantName;
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
_initObjectName = objectName;
_dynamicCapacityMetricsMap = new ConcurrentHashMap<>();
createMetrics();
}
private void createMetrics() {
_totalMessagedReceivedCounter = new SimpleDynamicMetric<>(
InstanceMonitorMetric.TOTAL_MESSAGE_RECEIVED_COUNTER.metricName(), 0L);
_disabledPartitionsGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.DISABLED_PARTITIONS_GAUGE.metricName(),
0L);
_enabledStatusGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.ENABLED_STATUS_GAUGE.metricName(), 0L);
_onlineStatusGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.ONLINE_STATUS_GAUGE.metricName(), 0L);
_maxCapacityUsageGauge =
new SimpleDynamicMetric<>(InstanceMonitorMetric.MAX_CAPACITY_USAGE_GAUGE.metricName(),
0.0d);
}
private List<DynamicMetric<?, ?>> buildAttributeList() {
List<DynamicMetric<?, ?>> attributeList = Lists.newArrayList(
_totalMessagedReceivedCounter,
_disabledPartitionsGauge,
_enabledStatusGauge,
_onlineStatusGauge,
_maxCapacityUsageGauge
);
attributeList.addAll(_dynamicCapacityMetricsMap.values());
return attributeList;
}
@Override
public String getSensorName() {
return String.format("%s.%s.%s.%s", ClusterStatusMonitor.PARTICIPANT_STATUS_KEY, _clusterName,
serializedTags(), _participantName);
}
protected long getOnline() {
return _onlineStatusGauge.getValue();
}
protected long getEnabled() {
return _enabledStatusGauge.getValue();
}
protected long getTotalMessageReceived() {
return _totalMessagedReceivedCounter.getValue();
}
protected long getDisabledPartitions() {
return _disabledPartitionsGauge.getValue();
}
/**
* Get the name of the monitored instance
* @return instance name as a string
*/
protected String getInstanceName() {
return _participantName;
}
private String serializedTags() {
return Joiner.on('|').skipNulls().join(_tags);
}
/**
* Update the gauges for this instance
* @param tags current tags
* @param disabledPartitions current disabled partitions
* @param isLive true if running, false otherwise
* @param isEnabled true if enabled, false if disabled
*/
public synchronized void updateInstance(Set<String> tags,
Map<String, List<String>> disabledPartitions, List<String> oldDisabledPartitions,
boolean isLive, boolean isEnabled) {
if (tags == null || tags.isEmpty()) {
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
} else {
_tags = Lists.newArrayList(tags);
Collections.sort(_tags);
}
long numDisabledPartitions = 0L;
if (disabledPartitions != null) {
for (List<String> partitions : disabledPartitions.values()) {
if (partitions != null) {
numDisabledPartitions += partitions.size();
}
}
}
// TODO : Get rid of this when old API removed.
if (oldDisabledPartitions != null) {
numDisabledPartitions += oldDisabledPartitions.size();
}
_onlineStatusGauge.updateValue(isLive ? 1L : 0L);
_enabledStatusGauge.updateValue(isEnabled ? 1L : 0L);
_disabledPartitionsGauge.updateValue(numDisabledPartitions);
}
/**
* Increase message received for this instance
* @param messageReceived received message numbers
*/
public synchronized void increaseMessageCount(long messageReceived) {
_totalMessagedReceivedCounter
.updateValue(_totalMessagedReceivedCounter.getValue() + messageReceived);
}
/**
* Updates max capacity usage for this instance.
* @param maxUsage max capacity usage of this instance
*/
public synchronized void updateMaxCapacityUsage(double maxUsage) {
_maxCapacityUsageGauge.updateValue(maxUsage);
}
/**
* Gets max capacity usage of this instance.
* @return Max capacity usage of this instance.
*/
protected synchronized double getMaxCapacityUsageGauge() {
return _maxCapacityUsageGauge.getValue();
}
/**
* Updates instance capacity metrics.
* @param capacity A map of instance capacity.
*/
public void updateCapacity(Map<String, Integer> capacity) {
synchronized (_dynamicCapacityMetricsMap) {
// If capacity keys don't have any change, we just update the metric values.
if (_dynamicCapacityMetricsMap.keySet().equals(capacity.keySet())) {
for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
_dynamicCapacityMetricsMap.get(entry.getKey()).updateValue((long) entry.getValue());
}
return;
}
// If capacity keys have any changes, we need to retain the capacity metrics.
// Make sure capacity metrics map has the same capacity keys.
// And update metrics values.
_dynamicCapacityMetricsMap.keySet().retainAll(capacity.keySet());
for (Map.Entry<String, Integer> entry : capacity.entrySet()) {
String capacityName = entry.getKey();
if (_dynamicCapacityMetricsMap.containsKey(capacityName)) {
_dynamicCapacityMetricsMap.get(capacityName).updateValue((long) entry.getValue());
} else {
_dynamicCapacityMetricsMap.put(capacityName,
new SimpleDynamicMetric<>(capacityName + "Gauge", (long) entry.getValue()));
}
}
}
// Update MBean's all attributes.
updateAttributesInfo(buildAttributeList(),
"Instance monitor for instance: " + getInstanceName());
}
@Override
public DynamicMBeanProvider register() throws JMException {
doRegister(buildAttributeList(), _initObjectName);
return this;
}
}