Revert "Change participant message monitor to use dynamic metric (#1685)" (#1693)
This reverts commit d5f5273d483ba54c51c79d497dead190cf758bb6.
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
index 9c90295..261790d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitor.java
@@ -19,29 +19,10 @@
* under the License.
*/
-import java.util.ArrayList;
-import java.util.List;
-import javax.management.JMException;
-import javax.management.ObjectName;
-import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
-import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
-import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
-
-
-public class ParticipantMessageMonitor extends DynamicMBeanProvider {
+public class ParticipantMessageMonitor implements ParticipantMessageMonitorMBean {
public static final String PARTICIPANT_KEY = "ParticipantName";
public static final String PARTICIPANT_STATUS_KEY = "ParticipantMessageStatus";
- // For registering dynamic metrics
- private final ObjectName _initObjectName;
- private final String _participantName;
-
- private SimpleDynamicMetric<Long> _receivedMessages;
- private SimpleDynamicMetric<Long> _discardedMessages;
- private SimpleDynamicMetric<Long> _completedMessages;
- private SimpleDynamicMetric<Long> _failedMessages;
- private SimpleDynamicMetric<Long> _pendingMessages;
-
/**
* The current processed state of the message
*/
@@ -51,42 +32,68 @@
COMPLETED
}
- public ParticipantMessageMonitor(String participantName, ObjectName objectName) {
+ private final String _participantName;
+ private long _receivedMessages = 0;
+ private long _discardedMessages = 0;
+ private long _completedMessages = 0;
+ private long _failedMessages = 0;
+ private long _pendingMessages = 0;
+
+ public ParticipantMessageMonitor(String participantName) {
_participantName = participantName;
- _initObjectName = objectName;
- _receivedMessages = new SimpleDynamicMetric("ReceivedMessages", 0L);
- _discardedMessages = new SimpleDynamicMetric("DiscardedMessages", 0L);
- _completedMessages = new SimpleDynamicMetric("CompletedMessages", 0L);
- _failedMessages = new SimpleDynamicMetric("FailedMessages", 0L);
- _pendingMessages = new SimpleDynamicMetric("PendingMessages", 0L);
}
public String getParticipantBeanName() {
return String.format("%s=%s", PARTICIPANT_KEY, _participantName);
}
- public void incrementReceivedMessages(long count) {
- incrementSimpleDynamicMetric(_receivedMessages, count);
+ public void incrementReceivedMessages(int count) {
+ _receivedMessages += count;
}
public void incrementDiscardedMessages(int count) {
- incrementSimpleDynamicMetric(_discardedMessages, count);
+ _discardedMessages += count;
}
public void incrementCompletedMessages(int count) {
- incrementSimpleDynamicMetric(_completedMessages, count);
+ _completedMessages += count;
}
public void incrementFailedMessages(int count) {
- incrementSimpleDynamicMetric(_failedMessages, count);
+ _failedMessages += count;
}
public void incrementPendingMessages(int count) {
- incrementSimpleDynamicMetric(_pendingMessages, count);
+ _pendingMessages += count;
}
public void decrementPendingMessages(int count) {
- incrementSimpleDynamicMetric(_pendingMessages, -1 * count);
+ _pendingMessages -= count;
+ }
+
+ @Override
+ public long getReceivedMessages() {
+ return _receivedMessages;
+ }
+
+ @Override
+ public long getDiscardedMessages() {
+ return _discardedMessages;
+ }
+
+ @Override
+ public long getCompletedMessages() {
+ return _completedMessages;
+ }
+
+ @Override
+ public long getFailedMessages() {
+ return _failedMessages;
+ }
+
+ @Override
+ public long getPendingMessages() {
+ return _pendingMessages;
}
@Override
@@ -94,20 +101,4 @@
return PARTICIPANT_STATUS_KEY;
}
- /**
- * This method registers the dynamic metrics.
- * @return
- * @throws JMException
- */
- @Override
- public DynamicMBeanProvider register() throws JMException {
- List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
- attributeList.add(_receivedMessages);
- attributeList.add(_discardedMessages);
- attributeList.add(_completedMessages);
- attributeList.add(_failedMessages);
- attributeList.add(_pendingMessages);
- doRegister(attributeList, _initObjectName);
- return this;
- }
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
index f806d1b..d4a899f 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantMessageMonitorMBean.java
@@ -8,6 +8,7 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
@@ -20,7 +21,7 @@
import org.apache.helix.monitoring.SensorNameProvider;
-@Deprecated
+
public interface ParticipantMessageMonitorMBean extends SensorNameProvider {
public long getReceivedMessages();
public long getDiscardedMessages();
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
index 5b91cff..6e5b346 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ParticipantStatusMonitor.java
@@ -49,8 +49,7 @@
try {
_beanServer = ManagementFactory.getPlatformMBeanServer();
if (isParticipant) {
- _messageMonitor =
- new ParticipantMessageMonitor(instanceName, getObjectName(_messageMonitor.getParticipantBeanName()));
+ _messageMonitor = new ParticipantMessageMonitor(instanceName);
_messageLatencyMonitor =
new MessageLatencyMonitor(MonitorDomainNames.CLMParticipantReport.name(), instanceName);
_messageLatencyMonitor.register();