Change Workflow monitor and PerInstanceResource monitor from static m… (#1732)
- Deprecate static metrics
- Add dynamic metrics and modify corresponding calling functions and tests
Co-authored-by: Meng Zhang <mnzhang@mnzhang-mn2.linkedin.biz>
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 1ffd7ec..a9c1811 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -64,7 +64,6 @@
public static final String RESOURCE_DN_KEY = "resourceName";
static final String INSTANCE_DN_KEY = "instanceName";
static final String MESSAGE_QUEUE_DN_KEY = "messageQueue";
- static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
static final String JOB_TYPE_DN_KEY = "jobType";
static final String DEFAULT_WORKFLOW_JOB_TYPE = "DEFAULT";
public static final String DEFAULT_TAG = "DEFAULT";
@@ -459,7 +458,7 @@
for (String instance : instanceStateMap.keySet()) {
String state = instanceStateMap.get(instance);
PerInstanceResourceMonitor.BeanName beanName =
- new PerInstanceResourceMonitor.BeanName(instance, resource);
+ new PerInstanceResourceMonitor.BeanName(_clusterName, instance, resource);
if (!beanMap.containsKey(beanName)) {
beanMap.put(beanName, new HashMap<Partition, String>());
}
@@ -492,7 +491,7 @@
}
try {
registerPerInstanceResources(monitorsToRegister);
- } catch (MalformedObjectNameException e) {
+ } catch (JMException e) {
LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
}
// Update existing beans
@@ -710,8 +709,8 @@
if (!_perTypeWorkflowMonitorMap.containsKey(workflowType)) {
WorkflowMonitor monitor = new WorkflowMonitor(_clusterName, workflowType);
try {
- registerWorkflow(monitor);
- } catch (MalformedObjectNameException e) {
+ monitor.register();
+ } catch (JMException e) {
LOG.error("Failed to register object for workflow type : " + workflowType, e);
}
_perTypeWorkflowMonitorMap.put(workflowType, monitor);
@@ -874,15 +873,15 @@
}
private void registerPerInstanceResources(Collection<PerInstanceResourceMonitor> monitors)
- throws MalformedObjectNameException {
+ throws JMException {
synchronized (_perInstanceResourceMonitorMap) {
for (PerInstanceResourceMonitor monitor : monitors) {
String instanceName = monitor.getInstanceName();
String resourceName = monitor.getResourceName();
- String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
- register(monitor, getObjectName(beanName));
+ monitor.register();
_perInstanceResourceMonitorMap
- .put(new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
+ .put(new PerInstanceResourceMonitor.BeanName(_clusterName, instanceName, resourceName),
+ monitor);
}
}
}
@@ -898,26 +897,21 @@
throws MalformedObjectNameException {
synchronized (_perInstanceResourceMonitorMap) {
for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
- unregister(getObjectName(
- getPerInstanceResourceBeanName(beanName.instanceName(), beanName.resourceName())));
+ if (_perInstanceResourceMonitorMap.get(beanName) != null) {
+ _perInstanceResourceMonitorMap.get(beanName).unregister();
+ }
}
_perInstanceResourceMonitorMap.keySet().removeAll(beanNames);
}
}
- private void registerWorkflow(WorkflowMonitor workflowMonitor)
- throws MalformedObjectNameException {
- String workflowBeanName = getWorkflowBeanName(workflowMonitor.getWorkflowType());
- register(workflowMonitor, getObjectName(workflowBeanName));
- }
-
- private void unregisterAllWorkflowsMonitor() throws MalformedObjectNameException {
+ private void unregisterAllWorkflowsMonitor() {
synchronized (_perTypeWorkflowMonitorMap) {
Iterator<Map.Entry<String, WorkflowMonitor>> workflowIter =
_perTypeWorkflowMonitorMap.entrySet().iterator();
while (workflowIter.hasNext()) {
Map.Entry<String, WorkflowMonitor> workflowEntry = workflowIter.next();
- unregister(getObjectName(getWorkflowBeanName(workflowEntry.getKey())));
+ workflowEntry.getValue().unregister();
workflowIter.remove();
}
}
@@ -969,18 +963,8 @@
* @return per-instance resource bean name
*/
protected String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
- return String.format("%s,%s", clusterBeanName(),
- new PerInstanceResourceMonitor.BeanName(instanceName, resourceName).toString());
- }
-
- /**
- * Build workflow per type bean name
- * "cluster={clusterName},workflowType={workflowType},
- * @param workflowType The workflow type
- * @return per workflow type bean name
- */
- protected String getWorkflowBeanName(String workflowType) {
- return String.format("%s, %s=%s", clusterBeanName(), WORKFLOW_TYPE_DN_KEY, workflowType);
+ return new PerInstanceResourceMonitor.BeanName(_clusterName, instanceName, resourceName)
+ .toString();
}
/**
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
deleted file mode 100644
index 68341a0..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-/**
- * Job monitor MBean for jobs, which are shared among jobs with the same type.
- */
-public interface JobMonitorMBean extends SensorNameProvider {
-
- /**
- * Get number of the succeeded jobs
- * @return
- */
- public long getSuccessfulJobCount();
-
- /**
- * Get number of failed jobs
- * @return
- */
- public long getFailedJobCount();
-
- /**
- * Get number of the aborted jobs
- * @return
- */
- public long getAbortedJobCount();
-
- /**
- * Get number of existing jobs registered
- * @return
- */
- public long getExistingJobGauge();
-
- /**
- * Get numbers of queued jobs, which are not running jobs
- * @return
- */
- public long getQueuedJobGauge();
-
- /**
- * Get numbers of running jobs
- * @return
- */
- public long getRunningJobGauge();
-
- /**
- * Get maximum latency of jobs running time. It will be cleared every hour
- * @return
- */
- public long getMaximumJobLatencyGauge();
-
- /**
- * Get job latency counter.
- * @return
- */
- public long getJobLatencyCount();
-}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
index 3bf5ea7..0b637af 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -19,10 +19,14 @@
* under the License.
*/
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
@@ -30,17 +34,28 @@
import org.apache.helix.HelixDefinedState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBean {
+public class PerInstanceResourceMonitor extends DynamicMBeanProvider {
+ private static final Logger LOG = LoggerFactory.getLogger(PerInstanceResourceMonitor.class);
+ private static final String MBEAN_DESCRIPTION = "Per Instance Resource Monitor";
+
public static class BeanName {
private final String _instanceName;
private final String _resourceName;
+ private final String _clusterName;
- public BeanName(String instanceName, String resourceName) {
- if (instanceName == null || resourceName == null) {
- throw new NullPointerException("Illegal beanName. instanceName: " + instanceName
- + ", resourceName: " + resourceName);
+ public BeanName(String clusterName, String instanceName, String resourceName) {
+ if (clusterName == null || instanceName == null || resourceName == null) {
+ throw new NullPointerException(
+ "Illegal beanName. clusterName: " + clusterName + ", instanceName: " + instanceName
+ + ", resourceName: " + resourceName);
}
+ _clusterName = clusterName;
_instanceName = instanceName;
_resourceName = resourceName;
}
@@ -53,6 +68,17 @@
return _resourceName;
}
+ public ObjectName objectName() {
+ try {
+ return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(),
+ new BeanName(_clusterName, _instanceName, _resourceName).toString()));
+ } catch (MalformedObjectNameException e) {
+ LOG.error("Failed to create object name for cluster: {}, instance: {}, resource: {}.",
+ _clusterName, _instanceName, _resourceName);
+ }
+ return null;
+ }
+
@Override
public boolean equals(Object obj) {
if (obj == null || !(obj instanceof BeanName)) {
@@ -60,7 +86,8 @@
}
BeanName that = (BeanName) obj;
- return _instanceName.equals(that._instanceName) && _resourceName.equals(that._resourceName);
+ return _clusterName.equals(that._clusterName) && _instanceName.equals(that._instanceName)
+ && _resourceName.equals(that._resourceName);
}
@Override
@@ -70,8 +97,9 @@
@Override
public String toString() {
- return String.format("%s=%s,%s=%s", ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName,
- ClusterStatusMonitor.RESOURCE_DN_KEY, _resourceName);
+ return String.format("%s=%s,%s=%s,%s=%s", ClusterStatusMonitor.CLUSTER_DN_KEY, _clusterName,
+ ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName, ClusterStatusMonitor.RESOURCE_DN_KEY,
+ _resourceName);
}
}
@@ -79,14 +107,15 @@
private List<String> _tags;
private final String _participantName;
private final String _resourceName;
- private long _partitions;
+ private SimpleDynamicMetric<Long> _partitions;
- public PerInstanceResourceMonitor(String clusterName, String participantName, String resourceName) {
+ public PerInstanceResourceMonitor(String clusterName, String participantName,
+ String resourceName) {
_clusterName = clusterName;
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
_participantName = participantName;
_resourceName = resourceName;
- _partitions = 0;
+ _partitions = new SimpleDynamicMetric("PartitionGauge", 0L);
}
@Override
@@ -100,11 +129,6 @@
return Joiner.on('|').skipNulls().join(_tags).toString();
}
- @Override
- public long getPartitionGauge() {
- return _partitions;
- }
-
public String getInstanceName() {
return _participantName;
}
@@ -131,13 +155,21 @@
int cnt = 0;
for (String state : stateMap.values()) {
// Skip DROPPED and initial state (e.g. OFFLINE)
- if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name())
- || state.equalsIgnoreCase(stateModelDef.getInitialState())) {
+ if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name()) || state
+ .equalsIgnoreCase(stateModelDef.getInitialState())) {
continue;
}
cnt++;
}
- _partitions = cnt;
+ _partitions.updateValue(Long.valueOf(cnt));
}
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+ attributeList.add(_partitions);
+ doRegister(attributeList, MBEAN_DESCRIPTION,
+ new BeanName(_clusterName, _participantName, _resourceName).objectName());
+ return this;
+ }
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
deleted file mode 100644
index 085d500..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-/**
- * A bean that describes the resource on each instance
- */
-public interface PerInstanceResourceMonitorMBean extends SensorNameProvider {
- /**
- * Get the number of partitions of the resource in best possible ideal state
- * for the instance
- * @return number of partitions
- */
- long getPartitionGauge();
-}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
deleted file mode 100644
index 4bc4b30..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-public interface StateTransitionStatMonitorMBean extends SensorNameProvider {
- long getTotalStateTransitionGauge();
-
- long getTotalFailedTransitionGauge();
-
- long getTotalSuccessTransitionGauge();
-
- double getMeanTransitionLatency();
-
- double getMaxTransitionLatency();
-
- double getMinTransitionLatency();
-
- double getPercentileTransitionLatency(int percentage);
-
- double getMeanTransitionExecuteLatency();
-
- double getMaxTransitionExecuteLatency();
-
- double getMinTransitionExecuteLatency();
-
- double getPercentileTransitionExecuteLatency(int percentage);
-
- double getMeanTransitionMessageLatency();
-
- double getMaxTransitionMessageLatency();
-
- double getMinTransitionMessageLatency();
-
- double getPercentileTransitionMessageLatency(int percentage);
-
- void reset();
-}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
index c26d462..ba7ef5d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
@@ -19,88 +19,57 @@
* under the License.
*/
+import java.util.ArrayList;
+import java.util.List;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
import org.apache.helix.task.TaskState;
-public class WorkflowMonitor implements WorkflowMonitorMBean {
+import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+
+public class WorkflowMonitor extends DynamicMBeanProvider {
+ private static final String MBEAN_DESCRIPTION = "Workflow Monitor";
private static final String WORKFLOW_KEY = "Workflow";
+ static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
private String _clusterName;
private String _workflowType;
- private long _successfulWorkflowCount;
- private long _failedWorkflowCount;
- private long _failedWorkflowGauge;
- private long _existingWorkflowGauge;
- private long _queuedWorkflowGauge;
- private long _runningWorkflowGauge;
- private long _totalWorkflowLatencyCount;
- private long _maximumWorkflowLatencyGauge;
- private long _lastResetTime;
-
+ private SimpleDynamicMetric<Long> _successfulWorkflowCount;
+ private SimpleDynamicMetric<Long> _failedWorkflowCount;
+ private SimpleDynamicMetric<Long> _failedWorkflowGauge;
+ private SimpleDynamicMetric<Long> _existingWorkflowGauge;
+ private SimpleDynamicMetric<Long> _queuedWorkflowGauge;
+ private SimpleDynamicMetric<Long> _runningWorkflowGauge;
+ private SimpleDynamicMetric<Long> _totalWorkflowLatencyCount;
+ private SimpleDynamicMetric<Long> _maximumWorkflowLatencyGauge;
+ private SimpleDynamicMetric<Long> _lastResetTime;
public WorkflowMonitor(String clusterName, String workflowType) {
_clusterName = clusterName;
_workflowType = workflowType;
- _successfulWorkflowCount = 0L;
- _failedWorkflowCount = 0L;
- _failedWorkflowGauge = 0L;
- _existingWorkflowGauge = 0L;
- _queuedWorkflowGauge = 0L;
- _runningWorkflowGauge = 0L;
- _totalWorkflowLatencyCount = 0L;
- _maximumWorkflowLatencyGauge = 0L;
- _lastResetTime = System.currentTimeMillis();
+ _successfulWorkflowCount = new SimpleDynamicMetric("SuccessfulWorkflowCount", 0L);
+ _failedWorkflowCount = new SimpleDynamicMetric("FailedWorkflowCount", 0L);
+ _failedWorkflowGauge = new SimpleDynamicMetric("FailedWorkflowGauge", 0L);
+ _existingWorkflowGauge = new SimpleDynamicMetric("ExistingWorkflowGauge", 0L);
+ _queuedWorkflowGauge = new SimpleDynamicMetric("QueuedWorkflowGauge", 0L);
+ _runningWorkflowGauge = new SimpleDynamicMetric("RunningWorkflowGauge", 0L);
+ _totalWorkflowLatencyCount = new SimpleDynamicMetric("TotalWorkflowLatencyCount", 0L);
+ _maximumWorkflowLatencyGauge = new SimpleDynamicMetric("MaximumWorkflowLatencyGauge", 0L);
+ _lastResetTime = new SimpleDynamicMetric("LastResetTime", System.currentTimeMillis());
}
@Override
- public long getSuccessfulWorkflowCount() {
- return _successfulWorkflowCount;
- }
-
- @Override
- public long getFailedWorkflowCount() {
- return _failedWorkflowCount;
- }
-
- @Override
- public long getFailedWorkflowGauge() {
- return _failedWorkflowGauge;
- }
-
- @Override
- public long getExistingWorkflowGauge() {
- return _existingWorkflowGauge;
- }
-
- @Override
- public long getQueuedWorkflowGauge() {
- return _queuedWorkflowGauge;
- }
-
- @Override
- public long getRunningWorkflowGauge() {
- return _runningWorkflowGauge;
- }
-
- @Override
- public long getWorkflowLatencyCount() {
- return _totalWorkflowLatencyCount;
- }
-
- @Override
- public long getMaximumWorkflowLatencyGauge() {
- return _maximumWorkflowLatencyGauge;
- }
-
- @Override public String getSensorName() {
+ public String getSensorName() {
return String.format("%s.%s.%s", _clusterName, WORKFLOW_KEY, _workflowType);
}
- public String getWorkflowType() {
- return _workflowType;
- }
-
/**
* Update workflow with transition state
* @param to The to state of a workflow
@@ -112,13 +81,15 @@
public void updateWorkflowCounters(TaskState to, long latency) {
if (to.equals(TaskState.FAILED)) {
- _failedWorkflowCount++;
+ incrementSimpleDynamicMetric(_failedWorkflowCount, 1);
} else if (to.equals(TaskState.COMPLETED)) {
- _successfulWorkflowCount++;
+ incrementSimpleDynamicMetric(_successfulWorkflowCount, 1);
// Only record latency larger than 0 and succeeded workflows
- _maximumWorkflowLatencyGauge = Math.max(_maximumWorkflowLatencyGauge, latency);
- _totalWorkflowLatencyCount += latency > 0 ? latency : 0;
+ incrementSimpleDynamicMetric(_maximumWorkflowLatencyGauge,
+ _maximumWorkflowLatencyGauge.getValue() > latency ? 0
+ : latency - _maximumWorkflowLatencyGauge.getValue());
+ incrementSimpleDynamicMetric(_totalWorkflowLatencyCount, latency > 0 ? latency : 0);
}
}
@@ -126,13 +97,13 @@
* Reset gauges
*/
public void resetGauges() {
- _failedWorkflowGauge = 0L;
- _existingWorkflowGauge = 0L;
- _runningWorkflowGauge = 0L;
- _queuedWorkflowGauge = 0L;
- if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
- _lastResetTime = System.currentTimeMillis();
- _maximumWorkflowLatencyGauge = 0;
+ _failedWorkflowGauge.updateValue(0L);
+ _existingWorkflowGauge.updateValue(0L);
+ _runningWorkflowGauge.updateValue(0L);
+ _queuedWorkflowGauge.updateValue(0L);
+ if (_lastResetTime.getValue() + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
+ _lastResetTime.updateValue(System.currentTimeMillis());
+ _maximumWorkflowLatencyGauge.updateValue(0L);
}
}
@@ -142,12 +113,59 @@
*/
public void updateWorkflowGauges(TaskState current) {
if (current == null || current.equals(TaskState.NOT_STARTED)) {
- _queuedWorkflowGauge++;
+ incrementSimpleDynamicMetric(_queuedWorkflowGauge);
} else if (current.equals(TaskState.IN_PROGRESS)) {
- _runningWorkflowGauge++;
+ incrementSimpleDynamicMetric(_runningWorkflowGauge);
} else if (current.equals(TaskState.FAILED)) {
- _failedWorkflowGauge++;
+ incrementSimpleDynamicMetric(_failedWorkflowGauge);
}
- _existingWorkflowGauge++;
+ incrementSimpleDynamicMetric(_existingWorkflowGauge);
+ }
+
+ // All the get functions are for testing purpose only.
+ public long getSuccessfulWorkflowCount() {
+ return _successfulWorkflowCount.getValue();
+ }
+
+ public long getFailedWorkflowCount() {
+ return _failedWorkflowCount.getValue();
+ }
+
+ public long getFailedWorkflowGauge() {
+ return _failedWorkflowGauge.getValue();
+ }
+
+ public long getExistingWorkflowGauge() {
+ return _existingWorkflowGauge.getValue();
+ }
+
+ public long getQueuedWorkflowGauge() {
+ return _queuedWorkflowGauge.getValue();
+ }
+
+ public long getRunningWorkflowGauge() {
+ return _runningWorkflowGauge.getValue();
+ }
+
+ @Override
+ public DynamicMBeanProvider register() throws JMException {
+ List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+ attributeList.add(_successfulWorkflowCount);
+ attributeList.add(_failedWorkflowCount);
+ attributeList.add(_failedWorkflowGauge);
+ attributeList.add(_existingWorkflowGauge);
+ attributeList.add(_queuedWorkflowGauge);
+ attributeList.add(_runningWorkflowGauge);
+ attributeList.add(_totalWorkflowLatencyCount);
+ attributeList.add(_maximumWorkflowLatencyGauge);
+ attributeList.add(_lastResetTime);
+ doRegister(attributeList, MBEAN_DESCRIPTION, getObjectName(_workflowType));
+ return this;
+ }
+
+ private ObjectName getObjectName(String workflowType) throws MalformedObjectNameException {
+ return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), String
+ .format("%s, %s=%s", String.format("%s=%s", CLUSTER_DN_KEY, _clusterName),
+ WORKFLOW_TYPE_DN_KEY, workflowType)));
}
}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
deleted file mode 100644
index 80f8088..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-/**
- * Workflow monitor MBean for workflows, which are shared among workflows with the same type.
- */
-public interface WorkflowMonitorMBean extends SensorNameProvider {
-
- /**
- * Get number of succeeded workflows
- * @return
- */
- public long getSuccessfulWorkflowCount();
-
- /**
- * Get number of failed workflows
- * @return
- */
- public long getFailedWorkflowCount();
-
- /**
- * Get number of current failed workflows
- */
- public long getFailedWorkflowGauge();
-
- /**
- * Get number of current existing workflows
- * @return
- */
- public long getExistingWorkflowGauge();
-
- /**
- * Get number of queued but not started workflows
- * @return
- */
- public long getQueuedWorkflowGauge();
-
- /**
- * Get number of running workflows
- * @return
- */
- public long getRunningWorkflowGauge();
-
- /**
- * Get workflow latency count
- * @return
- */
- public long getWorkflowLatencyCount();
-
- /**
- * Get maximum workflow latency gauge. It will be reset in 1 hour.
- * @return
- */
- public long getMaximumWorkflowLatencyGauge();
-}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
index 7637a58..d2a5f73 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -91,12 +91,11 @@
long cleanUpTime = System.currentTimeMillis();
Assert.assertTrue(cleanUpTime - finishTime >= workflowExpiry);
-
ObjectName objectName = getWorkflowMBeanObjectName(workflowName);
Assert.assertEquals((long) beanServer.getAttribute(objectName, "SuccessfulWorkflowCount"), 1);
Assert
.assertTrue((long) beanServer.getAttribute(objectName, "MaximumWorkflowLatencyGauge") > 0);
- Assert.assertTrue((long) beanServer.getAttribute(objectName, "WorkflowLatencyCount") > 0);
+ Assert.assertTrue((long) beanServer.getAttribute(objectName, "TotalWorkflowLatencyCount") > 0);
}
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
index 439425c..55dea1b 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
@@ -45,7 +45,7 @@
@Test
public void testRun() throws Exception {
WorkflowMonitor wm = new WorkflowMonitor(TEST_CLUSTER_NAME, TEST_WORKFLOW_TYPE);
- registerMbean(wm, getObjectName());
+ wm.register();
Set<ObjectInstance> existingInstances = beanServer.queryMBeans(
new ObjectName(MonitorDomainNames.ClusterStatus.name() + ":" + TEST_WORKFLOW_MBEAN_NAME),
null);