Change Workflow monitor and PerInstanceResource monitor from static m… (#1732)

- Deprecate static metrics
- Add dynamic metrics and modify corresponding calling functions and tests

Co-authored-by: Meng Zhang <mnzhang@mnzhang-mn2.linkedin.biz>
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 1ffd7ec..a9c1811 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -64,7 +64,6 @@
   public static final String RESOURCE_DN_KEY = "resourceName";
   static final String INSTANCE_DN_KEY = "instanceName";
   static final String MESSAGE_QUEUE_DN_KEY = "messageQueue";
-  static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
   static final String JOB_TYPE_DN_KEY = "jobType";
   static final String DEFAULT_WORKFLOW_JOB_TYPE = "DEFAULT";
   public static final String DEFAULT_TAG = "DEFAULT";
@@ -459,7 +458,7 @@
         for (String instance : instanceStateMap.keySet()) {
           String state = instanceStateMap.get(instance);
           PerInstanceResourceMonitor.BeanName beanName =
-              new PerInstanceResourceMonitor.BeanName(instance, resource);
+              new PerInstanceResourceMonitor.BeanName(_clusterName, instance, resource);
           if (!beanMap.containsKey(beanName)) {
             beanMap.put(beanName, new HashMap<Partition, String>());
           }
@@ -492,7 +491,7 @@
       }
       try {
         registerPerInstanceResources(monitorsToRegister);
-      } catch (MalformedObjectNameException e) {
+      } catch (JMException e) {
         LOG.error("Fail to register per-instance resource with MBean server: " + toRegister, e);
       }
       // Update existing beans
@@ -710,8 +709,8 @@
       if (!_perTypeWorkflowMonitorMap.containsKey(workflowType)) {
         WorkflowMonitor monitor = new WorkflowMonitor(_clusterName, workflowType);
         try {
-          registerWorkflow(monitor);
-        } catch (MalformedObjectNameException e) {
+          monitor.register();
+        } catch (JMException e) {
           LOG.error("Failed to register object for workflow type : " + workflowType, e);
         }
         _perTypeWorkflowMonitorMap.put(workflowType, monitor);
@@ -874,15 +873,15 @@
   }
 
   private void registerPerInstanceResources(Collection<PerInstanceResourceMonitor> monitors)
-      throws MalformedObjectNameException {
+      throws JMException {
     synchronized (_perInstanceResourceMonitorMap) {
       for (PerInstanceResourceMonitor monitor : monitors) {
         String instanceName = monitor.getInstanceName();
         String resourceName = monitor.getResourceName();
-        String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
-        register(monitor, getObjectName(beanName));
+        monitor.register();
         _perInstanceResourceMonitorMap
-            .put(new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
+            .put(new PerInstanceResourceMonitor.BeanName(_clusterName, instanceName, resourceName),
+                monitor);
       }
     }
   }
@@ -898,26 +897,21 @@
       throws MalformedObjectNameException {
     synchronized (_perInstanceResourceMonitorMap) {
       for (PerInstanceResourceMonitor.BeanName beanName : beanNames) {
-        unregister(getObjectName(
-            getPerInstanceResourceBeanName(beanName.instanceName(), beanName.resourceName())));
+        if (_perInstanceResourceMonitorMap.get(beanName) != null) {
+          _perInstanceResourceMonitorMap.get(beanName).unregister();
+        }
       }
       _perInstanceResourceMonitorMap.keySet().removeAll(beanNames);
     }
   }
 
-  private void registerWorkflow(WorkflowMonitor workflowMonitor)
-      throws MalformedObjectNameException {
-    String workflowBeanName = getWorkflowBeanName(workflowMonitor.getWorkflowType());
-    register(workflowMonitor, getObjectName(workflowBeanName));
-  }
-
-  private void unregisterAllWorkflowsMonitor() throws MalformedObjectNameException {
+  private void unregisterAllWorkflowsMonitor() {
     synchronized (_perTypeWorkflowMonitorMap) {
       Iterator<Map.Entry<String, WorkflowMonitor>> workflowIter =
           _perTypeWorkflowMonitorMap.entrySet().iterator();
       while (workflowIter.hasNext()) {
         Map.Entry<String, WorkflowMonitor> workflowEntry = workflowIter.next();
-        unregister(getObjectName(getWorkflowBeanName(workflowEntry.getKey())));
+        workflowEntry.getValue().unregister();
         workflowIter.remove();
       }
     }
@@ -969,18 +963,8 @@
    * @return per-instance resource bean name
    */
   protected String getPerInstanceResourceBeanName(String instanceName, String resourceName) {
-    return String.format("%s,%s", clusterBeanName(),
-        new PerInstanceResourceMonitor.BeanName(instanceName, resourceName).toString());
-  }
-
-  /**
-   * Build workflow per type bean name
-   * "cluster={clusterName},workflowType={workflowType},
-   * @param workflowType The workflow type
-   * @return per workflow type bean name
-   */
-  protected String getWorkflowBeanName(String workflowType) {
-    return String.format("%s, %s=%s", clusterBeanName(), WORKFLOW_TYPE_DN_KEY, workflowType);
+    return new PerInstanceResourceMonitor.BeanName(_clusterName, instanceName, resourceName)
+        .toString();
   }
 
   /**
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
deleted file mode 100644
index 68341a0..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/JobMonitorMBean.java
+++ /dev/null
@@ -1,76 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-/**
- * Job monitor MBean for jobs, which are shared among jobs with the same type.
- */
-public interface JobMonitorMBean extends SensorNameProvider {
-
-  /**
-   * Get number of the succeeded jobs
-   * @return
-   */
-  public long getSuccessfulJobCount();
-
-  /**
-   * Get number of failed jobs
-   * @return
-   */
-  public long getFailedJobCount();
-
-  /**
-   * Get number of the aborted jobs
-   * @return
-   */
-  public long getAbortedJobCount();
-
-  /**
-   * Get number of existing jobs registered
-   * @return
-   */
-  public long getExistingJobGauge();
-
-  /**
-   * Get numbers of queued jobs, which are not running jobs
-   * @return
-   */
-  public long getQueuedJobGauge();
-
-  /**
-   * Get numbers of running jobs
-   * @return
-   */
-  public long getRunningJobGauge();
-
-  /**
-   * Get maximum latency of jobs running time. It will be cleared every hour
-   * @return
-   */
-  public long getMaximumJobLatencyGauge();
-
-  /**
-   * Get job latency counter.
-   * @return
-   */
-  public long getJobLatencyCount();
-}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
index 3bf5ea7..0b637af 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitor.java
@@ -19,10 +19,14 @@
  * under the License.
  */
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
 
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableList;
@@ -30,17 +34,28 @@
 import org.apache.helix.HelixDefinedState;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class PerInstanceResourceMonitor implements PerInstanceResourceMonitorMBean {
+public class PerInstanceResourceMonitor extends DynamicMBeanProvider {
+  private static final Logger LOG = LoggerFactory.getLogger(PerInstanceResourceMonitor.class);
+  private static final String MBEAN_DESCRIPTION = "Per Instance Resource Monitor";
+
   public static class BeanName {
     private final String _instanceName;
     private final String _resourceName;
+    private final String _clusterName;
 
-    public BeanName(String instanceName, String resourceName) {
-      if (instanceName == null || resourceName == null) {
-        throw new NullPointerException("Illegal beanName. instanceName: " + instanceName
-            + ", resourceName: " + resourceName);
+    public BeanName(String clusterName, String instanceName, String resourceName) {
+      if (clusterName == null || instanceName == null || resourceName == null) {
+        throw new NullPointerException(
+            "Illegal beanName. clusterName: " + clusterName + ", instanceName: " + instanceName
+                + ", resourceName: " + resourceName);
       }
+      _clusterName = clusterName;
       _instanceName = instanceName;
       _resourceName = resourceName;
     }
@@ -53,6 +68,17 @@
       return _resourceName;
     }
 
+    public ObjectName objectName() {
+      try {
+        return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(),
+            new BeanName(_clusterName, _instanceName, _resourceName).toString()));
+      } catch (MalformedObjectNameException e) {
+        LOG.error("Failed to create object name for cluster: {}, instance: {}, resource: {}.",
+            _clusterName, _instanceName, _resourceName);
+      }
+      return null;
+    }
+
     @Override
     public boolean equals(Object obj) {
       if (obj == null || !(obj instanceof BeanName)) {
@@ -60,7 +86,8 @@
       }
 
       BeanName that = (BeanName) obj;
-      return _instanceName.equals(that._instanceName) && _resourceName.equals(that._resourceName);
+      return _clusterName.equals(that._clusterName) && _instanceName.equals(that._instanceName)
+          && _resourceName.equals(that._resourceName);
     }
 
     @Override
@@ -70,8 +97,9 @@
 
     @Override
     public String toString() {
-      return String.format("%s=%s,%s=%s", ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName,
-          ClusterStatusMonitor.RESOURCE_DN_KEY, _resourceName);
+      return String.format("%s=%s,%s=%s,%s=%s", ClusterStatusMonitor.CLUSTER_DN_KEY, _clusterName,
+          ClusterStatusMonitor.INSTANCE_DN_KEY, _instanceName, ClusterStatusMonitor.RESOURCE_DN_KEY,
+          _resourceName);
     }
   }
 
@@ -79,14 +107,15 @@
   private List<String> _tags;
   private final String _participantName;
   private final String _resourceName;
-  private long _partitions;
+  private SimpleDynamicMetric<Long> _partitions;
 
-  public PerInstanceResourceMonitor(String clusterName, String participantName, String resourceName) {
+  public PerInstanceResourceMonitor(String clusterName, String participantName,
+      String resourceName) {
     _clusterName = clusterName;
     _tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
     _participantName = participantName;
     _resourceName = resourceName;
-    _partitions = 0;
+    _partitions = new SimpleDynamicMetric("PartitionGauge", 0L);
   }
 
   @Override
@@ -100,11 +129,6 @@
     return Joiner.on('|').skipNulls().join(_tags).toString();
   }
 
-  @Override
-  public long getPartitionGauge() {
-    return _partitions;
-  }
-
   public String getInstanceName() {
     return _participantName;
   }
@@ -131,13 +155,21 @@
     int cnt = 0;
     for (String state : stateMap.values()) {
       // Skip DROPPED and initial state (e.g. OFFLINE)
-      if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name())
-          || state.equalsIgnoreCase(stateModelDef.getInitialState())) {
+      if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.name()) || state
+          .equalsIgnoreCase(stateModelDef.getInitialState())) {
         continue;
       }
       cnt++;
     }
-    _partitions = cnt;
+    _partitions.updateValue(Long.valueOf(cnt));
   }
 
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_partitions);
+    doRegister(attributeList, MBEAN_DESCRIPTION,
+        new BeanName(_clusterName, _participantName, _resourceName).objectName());
+    return this;
+  }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
deleted file mode 100644
index 085d500..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/PerInstanceResourceMonitorMBean.java
+++ /dev/null
@@ -1,34 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-/**
- * A bean that describes the resource on each instance
- */
-public interface PerInstanceResourceMonitorMBean extends SensorNameProvider {
-  /**
-   * Get the number of partitions of the resource in best possible ideal state
-   * for the instance
-   * @return number of partitions
-   */
-  long getPartitionGauge();
-}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
deleted file mode 100644
index 4bc4b30..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/StateTransitionStatMonitorMBean.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-public interface StateTransitionStatMonitorMBean extends SensorNameProvider {
-  long getTotalStateTransitionGauge();
-
-  long getTotalFailedTransitionGauge();
-
-  long getTotalSuccessTransitionGauge();
-
-  double getMeanTransitionLatency();
-
-  double getMaxTransitionLatency();
-
-  double getMinTransitionLatency();
-
-  double getPercentileTransitionLatency(int percentage);
-
-  double getMeanTransitionExecuteLatency();
-
-  double getMaxTransitionExecuteLatency();
-
-  double getMinTransitionExecuteLatency();
-
-  double getPercentileTransitionExecuteLatency(int percentage);
-
-  double getMeanTransitionMessageLatency();
-
-  double getMaxTransitionMessageLatency();
-
-  double getMinTransitionMessageLatency();
-
-  double getPercentileTransitionMessageLatency(int percentage);
-
-  void reset();
-}
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
index c26d462..ba7ef5d 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitor.java
@@ -19,88 +19,57 @@
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.List;
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 import org.apache.helix.task.TaskState;
 
-public class WorkflowMonitor implements WorkflowMonitorMBean {
+import static org.apache.helix.monitoring.mbeans.ClusterStatusMonitor.CLUSTER_DN_KEY;
+
+public class WorkflowMonitor extends DynamicMBeanProvider {
+  private static final String MBEAN_DESCRIPTION = "Workflow Monitor";
   private static final String WORKFLOW_KEY = "Workflow";
+  static final String WORKFLOW_TYPE_DN_KEY = "workflowType";
   private static final long DEFAULT_RESET_INTERVAL_MS = 60 * 60 * 1000; // 1 hour
 
   private String _clusterName;
   private String _workflowType;
 
-  private long _successfulWorkflowCount;
-  private long _failedWorkflowCount;
-  private long _failedWorkflowGauge;
-  private long _existingWorkflowGauge;
-  private long _queuedWorkflowGauge;
-  private long _runningWorkflowGauge;
-  private long _totalWorkflowLatencyCount;
-  private long _maximumWorkflowLatencyGauge;
-  private long _lastResetTime;
-
+  private SimpleDynamicMetric<Long> _successfulWorkflowCount;
+  private SimpleDynamicMetric<Long> _failedWorkflowCount;
+  private SimpleDynamicMetric<Long> _failedWorkflowGauge;
+  private SimpleDynamicMetric<Long> _existingWorkflowGauge;
+  private SimpleDynamicMetric<Long> _queuedWorkflowGauge;
+  private SimpleDynamicMetric<Long> _runningWorkflowGauge;
+  private SimpleDynamicMetric<Long> _totalWorkflowLatencyCount;
+  private SimpleDynamicMetric<Long> _maximumWorkflowLatencyGauge;
+  private SimpleDynamicMetric<Long> _lastResetTime;
 
   public WorkflowMonitor(String clusterName, String workflowType) {
     _clusterName = clusterName;
     _workflowType = workflowType;
-    _successfulWorkflowCount = 0L;
-    _failedWorkflowCount = 0L;
-    _failedWorkflowGauge = 0L;
-    _existingWorkflowGauge = 0L;
-    _queuedWorkflowGauge = 0L;
-    _runningWorkflowGauge = 0L;
-    _totalWorkflowLatencyCount = 0L;
-    _maximumWorkflowLatencyGauge = 0L;
-    _lastResetTime = System.currentTimeMillis();
+    _successfulWorkflowCount = new SimpleDynamicMetric("SuccessfulWorkflowCount", 0L);
+    _failedWorkflowCount = new SimpleDynamicMetric("FailedWorkflowCount", 0L);
+    _failedWorkflowGauge = new SimpleDynamicMetric("FailedWorkflowGauge", 0L);
+    _existingWorkflowGauge = new SimpleDynamicMetric("ExistingWorkflowGauge", 0L);
+    _queuedWorkflowGauge = new SimpleDynamicMetric("QueuedWorkflowGauge", 0L);
+    _runningWorkflowGauge = new SimpleDynamicMetric("RunningWorkflowGauge", 0L);
+    _totalWorkflowLatencyCount = new SimpleDynamicMetric("TotalWorkflowLatencyCount", 0L);
+    _maximumWorkflowLatencyGauge = new SimpleDynamicMetric("MaximumWorkflowLatencyGauge", 0L);
+    _lastResetTime = new SimpleDynamicMetric("LastResetTime", System.currentTimeMillis());
   }
 
   @Override
-  public long getSuccessfulWorkflowCount() {
-    return _successfulWorkflowCount;
-  }
-
-  @Override
-  public long getFailedWorkflowCount() {
-    return _failedWorkflowCount;
-  }
-
-  @Override
-  public long getFailedWorkflowGauge() {
-    return _failedWorkflowGauge;
-  }
-
-  @Override
-  public long getExistingWorkflowGauge() {
-    return _existingWorkflowGauge;
-  }
-
-  @Override
-  public long getQueuedWorkflowGauge() {
-    return _queuedWorkflowGauge;
-  }
-
-  @Override
-  public long getRunningWorkflowGauge() {
-    return _runningWorkflowGauge;
-  }
-
-  @Override
-  public long getWorkflowLatencyCount() {
-    return _totalWorkflowLatencyCount;
-  }
-
-  @Override
-  public long getMaximumWorkflowLatencyGauge() {
-    return _maximumWorkflowLatencyGauge;
-  }
-
-  @Override public String getSensorName() {
+  public String getSensorName() {
     return String.format("%s.%s.%s", _clusterName, WORKFLOW_KEY, _workflowType);
   }
 
-  public String getWorkflowType() {
-    return _workflowType;
-  }
-
   /**
    * Update workflow with transition state
    * @param to The to state of a workflow
@@ -112,13 +81,15 @@
 
   public void updateWorkflowCounters(TaskState to, long latency) {
     if (to.equals(TaskState.FAILED)) {
-      _failedWorkflowCount++;
+      incrementSimpleDynamicMetric(_failedWorkflowCount, 1);
     } else if (to.equals(TaskState.COMPLETED)) {
-      _successfulWorkflowCount++;
+      incrementSimpleDynamicMetric(_successfulWorkflowCount, 1);
 
       // Only record latency larger than 0 and succeeded workflows
-      _maximumWorkflowLatencyGauge = Math.max(_maximumWorkflowLatencyGauge, latency);
-      _totalWorkflowLatencyCount += latency > 0 ? latency : 0;
+      incrementSimpleDynamicMetric(_maximumWorkflowLatencyGauge,
+          _maximumWorkflowLatencyGauge.getValue() > latency ? 0
+              : latency - _maximumWorkflowLatencyGauge.getValue());
+      incrementSimpleDynamicMetric(_totalWorkflowLatencyCount, latency > 0 ? latency : 0);
     }
   }
 
@@ -126,13 +97,13 @@
    * Reset gauges
    */
   public void resetGauges() {
-    _failedWorkflowGauge = 0L;
-    _existingWorkflowGauge = 0L;
-    _runningWorkflowGauge = 0L;
-    _queuedWorkflowGauge = 0L;
-    if (_lastResetTime + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
-      _lastResetTime = System.currentTimeMillis();
-      _maximumWorkflowLatencyGauge = 0;
+    _failedWorkflowGauge.updateValue(0L);
+    _existingWorkflowGauge.updateValue(0L);
+    _runningWorkflowGauge.updateValue(0L);
+    _queuedWorkflowGauge.updateValue(0L);
+    if (_lastResetTime.getValue() + DEFAULT_RESET_INTERVAL_MS < System.currentTimeMillis()) {
+      _lastResetTime.updateValue(System.currentTimeMillis());
+      _maximumWorkflowLatencyGauge.updateValue(0L);
     }
   }
 
@@ -142,12 +113,59 @@
    */
   public void updateWorkflowGauges(TaskState current) {
     if (current == null || current.equals(TaskState.NOT_STARTED)) {
-      _queuedWorkflowGauge++;
+      incrementSimpleDynamicMetric(_queuedWorkflowGauge);
     } else if (current.equals(TaskState.IN_PROGRESS)) {
-      _runningWorkflowGauge++;
+      incrementSimpleDynamicMetric(_runningWorkflowGauge);
     } else if (current.equals(TaskState.FAILED)) {
-      _failedWorkflowGauge++;
+      incrementSimpleDynamicMetric(_failedWorkflowGauge);
     }
-    _existingWorkflowGauge++;
+    incrementSimpleDynamicMetric(_existingWorkflowGauge);
+  }
+
+  // All the get functions are for testing purpose only.
+  public long getSuccessfulWorkflowCount() {
+    return _successfulWorkflowCount.getValue();
+  }
+
+  public long getFailedWorkflowCount() {
+    return _failedWorkflowCount.getValue();
+  }
+
+  public long getFailedWorkflowGauge() {
+    return _failedWorkflowGauge.getValue();
+  }
+
+  public long getExistingWorkflowGauge() {
+    return _existingWorkflowGauge.getValue();
+  }
+
+  public long getQueuedWorkflowGauge() {
+    return _queuedWorkflowGauge.getValue();
+  }
+
+  public long getRunningWorkflowGauge() {
+    return _runningWorkflowGauge.getValue();
+  }
+
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_successfulWorkflowCount);
+    attributeList.add(_failedWorkflowCount);
+    attributeList.add(_failedWorkflowGauge);
+    attributeList.add(_existingWorkflowGauge);
+    attributeList.add(_queuedWorkflowGauge);
+    attributeList.add(_runningWorkflowGauge);
+    attributeList.add(_totalWorkflowLatencyCount);
+    attributeList.add(_maximumWorkflowLatencyGauge);
+    attributeList.add(_lastResetTime);
+    doRegister(attributeList, MBEAN_DESCRIPTION, getObjectName(_workflowType));
+    return this;
+  }
+
+  private ObjectName getObjectName(String workflowType) throws MalformedObjectNameException {
+    return new ObjectName(String.format("%s:%s", MonitorDomainNames.ClusterStatus.name(), String
+        .format("%s, %s=%s", String.format("%s=%s", CLUSTER_DN_KEY, _clusterName),
+            WORKFLOW_TYPE_DN_KEY, workflowType)));
   }
 }
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
deleted file mode 100644
index 80f8088..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/WorkflowMonitorMBean.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-/**
- * Workflow monitor MBean for workflows, which are shared among workflows with the same type.
- */
-public interface WorkflowMonitorMBean extends SensorNameProvider {
-
-  /**
-   * Get number of succeeded workflows
-   * @return
-   */
-  public long getSuccessfulWorkflowCount();
-
-  /**
-   * Get number of failed workflows
-   * @return
-   */
-  public long getFailedWorkflowCount();
-
-  /**
-   * Get number of current failed workflows
-   */
-  public long getFailedWorkflowGauge();
-
-  /**
-   * Get number of current existing workflows
-   * @return
-   */
-  public long getExistingWorkflowGauge();
-
-  /**
-   * Get number of queued but not started workflows
-   * @return
-   */
-  public long getQueuedWorkflowGauge();
-
-  /**
-   * Get number of running workflows
-   * @return
-   */
-  public long getRunningWorkflowGauge();
-
-  /**
-   * Get workflow latency count
-   * @return
-   */
-  public long getWorkflowLatencyCount();
-
-  /**
-   * Get maximum workflow latency gauge. It will be reset in 1 hour.
-   * @return
-   */
-  public long getMaximumWorkflowLatencyGauge();
-}
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
index 7637a58..d2a5f73 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestWorkflowTermination.java
@@ -91,12 +91,11 @@
     long cleanUpTime = System.currentTimeMillis();
     Assert.assertTrue(cleanUpTime - finishTime >= workflowExpiry);
 
-
     ObjectName objectName = getWorkflowMBeanObjectName(workflowName);
     Assert.assertEquals((long) beanServer.getAttribute(objectName, "SuccessfulWorkflowCount"), 1);
     Assert
         .assertTrue((long) beanServer.getAttribute(objectName, "MaximumWorkflowLatencyGauge") > 0);
-    Assert.assertTrue((long) beanServer.getAttribute(objectName, "WorkflowLatencyCount") > 0);
+    Assert.assertTrue((long) beanServer.getAttribute(objectName, "TotalWorkflowLatencyCount") > 0);
 
   }
 
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
index 439425c..55dea1b 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/TestWorkflowMonitor.java
@@ -45,7 +45,7 @@
   @Test
   public void testRun() throws Exception {
     WorkflowMonitor wm = new WorkflowMonitor(TEST_CLUSTER_NAME, TEST_WORKFLOW_TYPE);
-    registerMbean(wm, getObjectName());
+    wm.register();
     Set<ObjectInstance> existingInstances = beanServer.queryMBeans(
         new ObjectName(MonitorDomainNames.ClusterStatus.name() + ":" + TEST_WORKFLOW_MBEAN_NAME),
         null);