STORM-3673 remove v1 built in metrics in favor of TaskMetrics (#3323)

diff --git a/docs/Metrics.md b/docs/Metrics.md
index 7f6ba47..561f2f0 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -139,23 +139,18 @@
 
 In general all of these tuple count metrics are randomly sub-sampled unless otherwise stated.  This means that the counts you see both on the UI and from the built in metrics are not necessarily exact.  In fact by default we sample only 5% of the events and estimate the total number of events from that.  The sampling percentage is configurable per topology through the `topology.stats.sample.rate` config.  Setting it to 1.0 will make the counts exact, but be aware that the more events we sample the slower your topology will run (as the metrics are counted in the same code path as tuples are processed).  This is why we have a 5% sample rate as the default.
 
-The tuple counting metrics are generally reported to the metrics consumers as maps unless explicitly stated otherwise.  They break down each count for finer grained reporting.
-The keys to these maps fall into two categories `"${stream_name}"` or `"${upstream_component}:${stream_name}"`.  The former is used for all spout metrics and for outgoing bolt metrics (`__emit-count` and `__transfer-count`).  The latter is used for bolt metrics that deal with incoming tuples.
+The tuple counting metric names contain `"${stream_name}"` or `"${upstream_component}:${stream_name}"`.  The former is used for all spout metrics and for outgoing bolt metrics (`__emit-count` and `__transfer-count`).  The latter is used for bolt metrics that deal with incoming tuples.
 
 So for a word count topology the count bolt might show something like the following for the `__ack-count` metric
 
 ```
-{
-    "split:default": 80080
-}
+    "__ack-count-split:default": 80080
 ```
 
 But the spout instead would show something like the following for the `__ack-count` metric.
 
 ```
-{
-    "default": 12500
-}
+    "__ack-count-default": 12500
 ```
 
 
diff --git a/docs/Performance.md b/docs/Performance.md
index 9738fef..229ec15 100644
--- a/docs/Performance.md
+++ b/docs/Performance.md
@@ -106,7 +106,7 @@
 
 
 #### Built-in wait strategies:
-These wait strategies are availabe for use with all of the above mentioned wait situations.
+These wait strategies are available for use with all of the above mentioned wait situations.
 
 - **ProgressiveWaitStrategy** : This strategy can be used for Bolt Wait or Backpressure Wait situations. Set the strategy to 'org.apache.storm.policy.WaitStrategyProgressive' to
 select this wait strategy. This is a dynamic wait strategy that enters into progressively deeper states of CPU conservation if the Backpressure Wait or Bolt Wait situations persist.
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/Task.java b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
index 59f2547..45a1f2d 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/Task.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/Task.java
@@ -38,7 +38,6 @@
 import org.apache.storm.grouping.LoadAwareCustomStreamGrouping;
 import org.apache.storm.hooks.ITaskHook;
 import org.apache.storm.hooks.info.EmitInfo;
-import org.apache.storm.metrics2.StormMetricRegistry;
 import org.apache.storm.metrics2.TaskMetrics;
 import org.apache.storm.spout.ShellSpout;
 import org.apache.storm.stats.CommonStats;
@@ -89,7 +88,8 @@
         this.taskObject = mkTaskObject();
         this.debug = topoConf.containsKey(Config.TOPOLOGY_DEBUG) && (Boolean) topoConf.get(Config.TOPOLOGY_DEBUG);
         this.addTaskHooks();
-        this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId, workerData.getMetricRegistry());
+        this.taskMetrics = new TaskMetrics(this.workerTopologyContext, this.componentId, this.taskId,
+                workerData.getMetricRegistry(), topoConf);
     }
 
     private static HashMap<String, ArrayList<LoadAwareCustomStreamGrouping>> getGroupersPerStream(
@@ -129,9 +129,11 @@
 
         try {
             if (emitSampler.getAsBoolean()) {
-                executorStats.emittedTuple(stream, this.taskMetrics.getEmitted(stream));
+                executorStats.emittedTuple(stream);
+                this.taskMetrics.emittedTuple(stream);
                 if (null != outTaskId) {
-                    executorStats.transferredTuples(stream, 1, this.taskMetrics.getTransferred(stream));
+                    executorStats.transferredTuples(stream, 1);
+                    this.taskMetrics.transferredTuples(stream, 1);
                 }
             }
         } catch (Exception e) {
@@ -169,8 +171,10 @@
         }
         try {
             if (emitSampler.getAsBoolean()) {
-                executorStats.emittedTuple(stream, this.taskMetrics.getEmitted(stream));
-                executorStats.transferredTuples(stream, outTasks.size(), this.taskMetrics.getTransferred(stream));
+                executorStats.emittedTuple(stream);
+                this.taskMetrics.emittedTuple(stream);
+                executorStats.transferredTuples(stream, outTasks.size());
+                this.taskMetrics.transferredTuples(stream, outTasks.size());
             }
         } catch (Exception e) {
             throw new RuntimeException(e);
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java
deleted file mode 100644
index 658f275..0000000
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinBoltMetrics.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.metric.internal.MultiCountStatAndMetric;
-import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
-import org.apache.storm.stats.BoltExecutorStats;
-
-public class BuiltinBoltMetrics extends BuiltinMetrics {
-    private final MultiCountStatAndMetric ackCount;
-    private final MultiCountStatAndMetric failCount;
-    private final MultiCountStatAndMetric emitCount;
-    private final MultiCountStatAndMetric executeCount;
-    private final MultiCountStatAndMetric transferCount;
-    private final MultiLatencyStatAndMetric executeLatency;
-    private final MultiLatencyStatAndMetric processLatency;
-
-    public BuiltinBoltMetrics(BoltExecutorStats stats) {
-        this.ackCount = stats.getAcked();
-        this.failCount = stats.getFailed();
-        this.emitCount = stats.getEmitted();
-        this.executeCount = stats.getExecuted();
-        this.transferCount = stats.getTransferred();
-        this.executeLatency = stats.getExecuteLatencies();
-        this.processLatency = stats.getProcessLatencies();
-
-        this.metricMap.put("ack-count", ackCount);
-        this.metricMap.put("fail-count", failCount);
-        this.metricMap.put("emit-count", emitCount);
-        this.metricMap.put("transfer-count", transferCount);
-        this.metricMap.put("execute-count", executeCount);
-        this.metricMap.put("process-latency", processLatency);
-        this.metricMap.put("execute-latency", executeLatency);
-    }
-
-    public MultiCountStatAndMetric getAckCount() {
-        return ackCount;
-    }
-
-    public MultiCountStatAndMetric getFailCount() {
-        return failCount;
-    }
-
-    public MultiCountStatAndMetric getEmitCount() {
-        return emitCount;
-    }
-
-    public MultiCountStatAndMetric getTransferCount() {
-        return transferCount;
-    }
-
-    public MultiCountStatAndMetric getExecuteCount() {
-        return executeCount;
-    }
-
-    public MultiLatencyStatAndMetric getExecuteLatency() {
-        return executeLatency;
-    }
-
-    public MultiLatencyStatAndMetric getProcessLatency() {
-        return processLatency;
-    }
-}
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java b/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java
deleted file mode 100644
index 7bc048e..0000000
--- a/storm-client/src/jvm/org/apache/storm/daemon/metrics/BuiltinSpoutMetrics.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.daemon.metrics;
-
-import org.apache.storm.metric.internal.MultiCountStatAndMetric;
-import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
-import org.apache.storm.stats.SpoutExecutorStats;
-
-public class BuiltinSpoutMetrics extends BuiltinMetrics {
-    private final MultiCountStatAndMetric ackCount;
-    private final MultiCountStatAndMetric failCount;
-    private final MultiCountStatAndMetric emitCount;
-    private final MultiCountStatAndMetric transferCount;
-    private final MultiLatencyStatAndMetric completeLatency;
-
-    public BuiltinSpoutMetrics(SpoutExecutorStats stats) {
-        this.ackCount = stats.getAcked();
-        this.failCount = stats.getFailed();
-        this.emitCount = stats.getEmitted();
-        this.transferCount = stats.getTransferred();
-        this.completeLatency = stats.getCompleteLatencies();
-
-        this.metricMap.put("ack-count", ackCount);
-        this.metricMap.put("fail-count", failCount);
-        this.metricMap.put("emit-count", emitCount);
-        this.metricMap.put("transfer-count", transferCount);
-        this.metricMap.put("complete-latency", completeLatency);
-    }
-
-    public MultiCountStatAndMetric getAckCount() {
-        return ackCount;
-    }
-
-    public MultiCountStatAndMetric getFailCount() {
-        return failCount;
-    }
-
-    public MultiCountStatAndMetric getEmitCount() {
-        return emitCount;
-    }
-
-    public MultiCountStatAndMetric getTransferCount() {
-        return transferCount;
-    }
-
-    public MultiLatencyStatAndMetric getCompleteLatency() {
-        return completeLatency;
-    }
-}
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
index f944e7e..bc9d6b0 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltExecutor.java
@@ -23,12 +23,9 @@
 import org.apache.storm.ICredentialsListener;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.Task;
-import org.apache.storm.daemon.metrics.BuiltinBoltMetrics;
-import org.apache.storm.daemon.metrics.BuiltinMetrics;
 import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.executor.Executor;
-import org.apache.storm.generated.Credentials;
 import org.apache.storm.generated.NodeInfo;
 import org.apache.storm.hooks.info.BoltExecuteInfo;
 import org.apache.storm.messaging.IConnection;
@@ -37,7 +34,6 @@
 import org.apache.storm.policy.IWaitStrategy.WaitSituation;
 import org.apache.storm.policy.WaitStrategyPark;
 import org.apache.storm.security.auth.IAutoCredentials;
-import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.stats.BoltExecutorStats;
 import org.apache.storm.stats.ClientStatsUtil;
 import org.apache.storm.task.IBolt;
@@ -46,7 +42,6 @@
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.JCQueue.ExitCondition;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
@@ -65,7 +60,6 @@
     private final IWaitStrategy consumeWaitStrategy;       // employed when no incoming data
     private final IWaitStrategy backPressureWaitStrategy;  // employed when outbound path is congested
     private final BoltExecutorStats stats;
-    private final BuiltinMetrics builtInMetrics;
     private BoltOutputCollectorImpl outputCollector;
 
     public BoltExecutor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials) {
@@ -82,7 +76,6 @@
         this.backPressureWaitStrategy.prepare(topoConf, WaitSituation.BACK_PRESSURE_WAIT);
         this.stats = new BoltExecutorStats(ConfigUtils.samplingRate(this.getTopoConf()),
                                            ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
-        this.builtInMetrics = new BuiltinBoltMetrics(stats);
     }
 
     private static IWaitStrategy makeSystemBoltWaitStrategy() {
@@ -117,7 +110,6 @@
             }
             IBolt boltObject = (IBolt) taskData.getTaskObject();
             TopologyContext userContext = taskData.getUserContext();
-            builtInMetrics.registerAll(topoConf, userContext);
             if (boltObject instanceof ICredentialsListener) {
                 ((ICredentialsListener) boltObject).setCredentials(credentials);
             }
@@ -236,6 +228,8 @@
             }
             if (delta >= 0) {
                 stats.boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
+                Task task = idToTask.get(taskId - idToTaskBase);
+                task.getTaskMetrics().boltExecuteTuple(tuple.getSourceComponent(), tuple.getSourceStreamId(), delta);
             }
         }
     }
diff --git a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
index da9af50..cf3e7c6 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/bolt/BoltOutputCollectorImpl.java
@@ -139,8 +139,8 @@
             boltAckInfo.applyOn(task.getUserContext());
         }
         if (delta >= 0) {
-            executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta,
-                                               task.getTaskMetrics().getAcked(input.getSourceStreamId()));
+            executor.getStats().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
+            task.getTaskMetrics().boltAckedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta);
         }
     }
 
@@ -161,8 +161,8 @@
         BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
         boltFailInfo.applyOn(task.getUserContext());
         if (delta >= 0) {
-            executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId(), delta,
-                                                task.getTaskMetrics().getFailed(input.getSourceStreamId()));
+            executor.getStats().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId());
+            task.getTaskMetrics().boltFailedTuple(input.getSourceComponent(), input.getSourceStreamId());
         }
     }
 
diff --git a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
index 23d0958..bd284c4 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/spout/SpoutExecutor.java
@@ -23,19 +23,14 @@
 import org.apache.storm.daemon.Acker;
 import org.apache.storm.daemon.StormCommon;
 import org.apache.storm.daemon.Task;
-import org.apache.storm.daemon.metrics.BuiltinMetrics;
-import org.apache.storm.daemon.metrics.BuiltinMetricsUtil;
-import org.apache.storm.daemon.metrics.BuiltinSpoutMetrics;
 import org.apache.storm.daemon.metrics.SpoutThrottlingMetrics;
 import org.apache.storm.daemon.worker.WorkerState;
 import org.apache.storm.executor.Executor;
 import org.apache.storm.executor.TupleInfo;
-import org.apache.storm.generated.Credentials;
 import org.apache.storm.hooks.info.SpoutAckInfo;
 import org.apache.storm.hooks.info.SpoutFailInfo;
 import org.apache.storm.policy.IWaitStrategy;
 import org.apache.storm.policy.IWaitStrategy.WaitSituation;
-import org.apache.storm.shade.com.google.common.collect.ImmutableMap;
 import org.apache.storm.spout.ISpout;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.stats.ClientStatsUtil;
@@ -43,7 +38,6 @@
 import org.apache.storm.tuple.AddressedTuple;
 import org.apache.storm.tuple.TupleImpl;
 import org.apache.storm.utils.ConfigUtils;
-import org.apache.storm.utils.JCQueue;
 import org.apache.storm.utils.MutableLong;
 import org.apache.storm.utils.ObjectReader;
 import org.apache.storm.utils.ReflectionUtils;
@@ -65,7 +59,6 @@
     private final SpoutThrottlingMetrics spoutThrottlingMetrics;
     private final boolean hasAckers;
     private final SpoutExecutorStats stats;
-    private final BuiltinMetrics builtInMetrics;
     SpoutOutputCollectorImpl spoutOutputCollector;
     private Integer maxSpoutPending;
     private List<ISpout> spouts;
@@ -87,7 +80,6 @@
         this.spoutThrottlingMetrics = new SpoutThrottlingMetrics();
         this.stats = new SpoutExecutorStats(
             ConfigUtils.samplingRate(this.getTopoConf()), ObjectReader.getInt(this.getTopoConf().get(Config.NUM_STAT_BUCKETS)));
-        this.builtInMetrics = new BuiltinSpoutMetrics(stats);
     }
 
     @Override
@@ -139,8 +131,6 @@
             SpoutOutputCollector outputCollector = new SpoutOutputCollector(spoutOutputCollector);
             this.outputCollectors.add(outputCollector);
 
-            builtInMetrics.registerAll(topoConf, taskData.getUserContext());
-
             if (spoutObject instanceof ICredentialsListener) {
                 ((ICredentialsListener) spoutObject).setCredentials(credentials);
             }
@@ -339,8 +329,8 @@
                 new SpoutAckInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
             }
             if (hasAckers && timeDelta != null) {
-                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta,
-                                                    taskData.getTaskMetrics().getAcked(tupleInfo.getStream()));
+                executor.getStats().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
+                taskData.getTaskMetrics().spoutAckedTuple(tupleInfo.getStream(), timeDelta);
             }
         } catch (Exception e) {
             throw Utils.wrapInRuntime(e);
@@ -357,8 +347,8 @@
             spout.fail(tupleInfo.getMessageId());
             new SpoutFailInfo(tupleInfo.getMessageId(), taskId, timeDelta).applyOn(taskData.getUserContext());
             if (timeDelta != null) {
-                executor.getStats().spoutFailedTuple(tupleInfo.getStream(), timeDelta,
-                                                     taskData.getTaskMetrics().getFailed(tupleInfo.getStream()));
+                executor.getStats().spoutFailedTuple(tupleInfo.getStream());
+                taskData.getTaskMetrics().spoutFailedTuple(tupleInfo.getStream());
             }
         } catch (Exception e) {
             throw Utils.wrapInRuntime(e);
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/CountStatAndMetric.java b/storm-client/src/jvm/org/apache/storm/metric/internal/CountStat.java
similarity index 87%
rename from storm-client/src/jvm/org/apache/storm/metric/internal/CountStatAndMetric.java
rename to storm-client/src/jvm/org/apache/storm/metric/internal/CountStat.java
index 31c51fd..31ce455 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/CountStatAndMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/CountStat.java
@@ -16,12 +16,11 @@
 import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicLong;
-import org.apache.storm.metric.api.IMetric;
 
 /**
- * Acts as a Count Metric, but also keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time.
+ * Keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time.
  */
-public class CountStatAndMetric implements IMetric {
+public class CountStat {
     private final AtomicLong currentBucket;
     //10 min values
     private final int tmSize;
@@ -49,7 +48,7 @@
      *
      * @param numBuckets the number of buckets to divide the time periods into.
      */
-    public CountStatAndMetric(int numBuckets) {
+    public CountStat(int numBuckets) {
         this(numBuckets, -1);
     }
 
@@ -59,7 +58,7 @@
      * @param numBuckets the number of buckets to divide the time periods into.
      * @param startTime  if positive the simulated time to start the from.
      */
-    CountStatAndMetric(int numBuckets, long startTime) {
+    CountStat(int numBuckets, long startTime) {
         numBuckets = Math.max(numBuckets, 2);
         //We want to capture the full time range, so the target size is as
         // if we had one bucket less, then we do
@@ -97,23 +96,6 @@
         currentBucket.addAndGet(count);
     }
 
-
-    @Override
-    public synchronized Object getValueAndReset() {
-        return getValueAndReset(System.currentTimeMillis());
-    }
-
-    @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
-    synchronized Object getValueAndReset(long now) {
-        long value = currentBucket.getAndSet(0);
-        long timeSpent = now - bucketStart;
-        long ret = value + exactExtra;
-        bucketStart = now;
-        exactExtra = 0;
-        rotateBuckets(value, timeSpent);
-        return ret;
-    }
-
     synchronized void rotateSched(long now) {
         long value = currentBucket.getAndSet(0);
         long timeSpent = now - bucketStart;
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStatAndMetric.java b/storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStat.java
similarity index 93%
rename from storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStatAndMetric.java
rename to storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStat.java
index ddaadb7..a1d3859 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStatAndMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/LatencyStat.java
@@ -21,13 +21,12 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TimerTask;
-import org.apache.storm.metric.api.IMetric;
 import org.apache.storm.utils.Utils;
 
 /**
- * Acts as a Latency Metric, but also keeps track of approximate latency for the last 10 mins, 3 hours, 1 day, and all time.
+ * Keeps track of approximate latency for the last 10 mins, 3 hours, 1 day, and all time.
  */
-public class LatencyStatAndMetric implements IMetric {
+public class LatencyStat {
     //The current lat and count buckets are protected by a different lock
     // from the other buckets.  This is to reduce the lock contention
     // When doing complex calculations.  Never grab the instance object lock
@@ -66,7 +65,7 @@
      *
      * @param numBuckets the number of buckets to divide the time periods into.
      */
-    public LatencyStatAndMetric(int numBuckets) {
+    public LatencyStat(int numBuckets) {
         this(numBuckets, -1);
     }
 
@@ -76,7 +75,7 @@
      * @param numBuckets the number of buckets to divide the time periods into.
      * @param startTime  if positive the simulated time to start the from.
      */
-    LatencyStatAndMetric(int numBuckets, long startTime) {
+    LatencyStat(int numBuckets, long startTime) {
         numBuckets = Math.max(numBuckets, 2);
         //We want to capture the full time range, so the target size is as
         // if we had one bucket less, then we do
@@ -123,11 +122,8 @@
         }
     }
 
-    @Override
-    public synchronized Object getValueAndReset() {
-        return getValueAndReset(System.currentTimeMillis());
-    }
-
+    // This is still used by a unit test.  Removing this method caused a difference
+    // in reported values for the test that needs to be debugged before removal.
     synchronized Object getValueAndReset(long now) {
         long lat;
         long count;
@@ -143,7 +139,7 @@
         long exactExtraCountSum = count + exactExtraCount;
         @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
         double ret = Utils.zeroIfNaNOrInf(
-            ((double) (lat + exactExtraLat)) / exactExtraCountSum);
+                ((double) (lat + exactExtraLat)) / exactExtraCountSum);
         bucketStart = now;
         exactExtraLat = 0;
         exactExtraCount = 0;
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStatAndMetric.java b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
similarity index 68%
rename from storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStatAndMetric.java
rename to storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
index 8e0b093..25fa270 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStatAndMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiCountStat.java
@@ -16,31 +16,30 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.storm.metric.api.IMetric;
 
 /**
- * Acts as a MultiCount Metric, but keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
+ * Acts as a MultiCount Stat, but keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for the same keys
  */
-public class MultiCountStatAndMetric<T> implements IMetric {
+public class MultiCountStat<T> {
     private final int numBuckets;
-    private ConcurrentHashMap<T, CountStatAndMetric> counts = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<T, CountStat> counts = new ConcurrentHashMap<>();
 
     /**
      * Constructor.
      *
      * @param numBuckets the number of buckets to divide the time periods into.
      */
-    public MultiCountStatAndMetric(int numBuckets) {
+    public MultiCountStat(int numBuckets) {
         this.numBuckets = numBuckets;
     }
 
-    CountStatAndMetric get(T key) {
-        CountStatAndMetric c = counts.get(key);
+    CountStat get(T key) {
+        CountStat c = counts.get(key);
         if (c == null) {
             synchronized (this) {
                 c = counts.get(key);
                 if (c == null) {
-                    c = new CountStatAndMetric(numBuckets);
+                    c = new CountStat(numBuckets);
                     counts.put(key, c);
                 }
             }
@@ -67,23 +66,9 @@
         return key.toString();
     }
 
-    @Override
-    public Object getValueAndReset() {
-        Map<String, Long> ret = new HashMap<String, Long>();
-        for (Map.Entry<T, CountStatAndMetric> entry : counts.entrySet()) {
-            String key = keyToString(entry.getKey());
-            //There could be collisions if keyToString returns only part of a result.
-            Long val = (Long) entry.getValue().getValueAndReset();
-            Long other = ret.get(key);
-            val += other == null ? 0L : other;
-            ret.put(key, val);
-        }
-        return ret;
-    }
-
     public Map<String, Map<T, Long>> getTimeCounts() {
         Map<String, Map<T, Long>> ret = new HashMap<>();
-        for (Map.Entry<T, CountStatAndMetric> entry : counts.entrySet()) {
+        for (Map.Entry<T, CountStat> entry : counts.entrySet()) {
             T key = entry.getKey();
             Map<String, Long> toFlip = entry.getValue().getTimeCounts();
             for (Map.Entry<String, Long> subEntry : toFlip.entrySet()) {
@@ -100,7 +85,7 @@
     }
 
     public void close() {
-        for (CountStatAndMetric cc : counts.values()) {
+        for (CountStat cc : counts.values()) {
             cc.close();
         }
     }
diff --git a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStatAndMetric.java b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStat.java
similarity index 61%
rename from storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStatAndMetric.java
rename to storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStat.java
index eae373c..83c4f7a 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStatAndMetric.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/internal/MultiLatencyStat.java
@@ -13,35 +13,33 @@
 package org.apache.storm.metric.internal;
 
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import org.apache.storm.metric.api.IMetric;
 
 /**
- * Acts as a Latnecy Metric for multiple keys, but keeps track of approximate counts for the last 10 mins, 3 hours, 1 day, and all time. for
+ * Keeps track of approximate latency for the last 10 mins, 3 hours, 1 day, and all time. for
  * the same keys
  */
-public class MultiLatencyStatAndMetric<T> implements IMetric {
+public class MultiLatencyStat<T> {
     private final int numBuckets;
-    private ConcurrentHashMap<T, LatencyStatAndMetric> lat = new ConcurrentHashMap<>();
+    private ConcurrentHashMap<T, LatencyStat> lat = new ConcurrentHashMap<>();
 
     /**
      * Constructor.
      *
      * @param numBuckets the number of buckets to divide the time periods into.
      */
-    public MultiLatencyStatAndMetric(int numBuckets) {
+    public MultiLatencyStat(int numBuckets) {
         this.numBuckets = numBuckets;
     }
 
-    LatencyStatAndMetric get(T key) {
-        LatencyStatAndMetric c = lat.get(key);
+    LatencyStat get(T key) {
+        LatencyStat c = lat.get(key);
         if (c == null) {
             synchronized (this) {
                 c = lat.get(key);
                 if (c == null) {
-                    c = new LatencyStatAndMetric(numBuckets);
+                    c = new LatencyStat(numBuckets);
                     lat.put(key, c);
                 }
             }
@@ -58,30 +56,9 @@
         get(key).record(latency);
     }
 
-    protected String keyToString(T key) {
-        if (key instanceof List) {
-            //This is a bit of a hack.  If it is a list, then it is [component, stream]
-            //we want to format this as component:stream
-            List<String> lk = (List<String>) key;
-            return lk.get(0) + ":" + lk.get(1);
-        }
-        return key.toString();
-    }
-
-    @Override
-    public Object getValueAndReset() {
-        Map<String, Double> ret = new HashMap<String, Double>();
-        for (Map.Entry<T, LatencyStatAndMetric> entry : lat.entrySet()) {
-            String key = keyToString(entry.getKey());
-            Double val = (Double) entry.getValue().getValueAndReset();
-            ret.put(key, val);
-        }
-        return ret;
-    }
-
     public Map<String, Map<T, Double>> getTimeLatAvg() {
         Map<String, Map<T, Double>> ret = new HashMap<>();
-        for (Map.Entry<T, LatencyStatAndMetric> entry : lat.entrySet()) {
+        for (Map.Entry<T, LatencyStat> entry : lat.entrySet()) {
             T key = entry.getKey();
             Map<String, Double> toFlip = entry.getValue().getTimeLatAvg();
             for (Map.Entry<String, Double> subEntry : toFlip.entrySet()) {
@@ -98,7 +75,7 @@
     }
 
     public void close() {
-        for (LatencyStatAndMetric l : lat.values()) {
+        for (LatencyStat l : lat.values()) {
             l.close();
         }
     }
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
new file mode 100644
index 0000000..650fdee
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/RollingAverageGauge.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import com.codahale.metrics.Gauge;
+
+public class RollingAverageGauge implements Gauge<Double> {
+    private long[] samples = new long[3];
+    private int index = 0;
+
+    @Override
+    public Double getValue() {
+        synchronized (this) {
+            long total = samples[0] + samples[1] + samples[2];
+            return total / 3.0;
+        }
+    }
+
+    public void addValue(long value) {
+        synchronized (this) {
+            samples[index] = value;
+            index = (++index % 3);
+        }
+    }
+}
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 42179c9..43cfcc7 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -74,6 +74,14 @@
         return gauge;
     }
 
+    public <T> Gauge<T> gauge(String name, Gauge<T> gauge, String topologyId, String componentId,
+                              String streamId, Integer taskId, Integer port) {
+        MetricNames metricNames = workerMetricName(name, topologyId, componentId, streamId, taskId, port);
+        gauge = registry.register(metricNames.getLongName(), gauge);
+        saveMetricTaskIdMapping(taskId, metricNames, gauge, taskIdGauges);
+        return gauge;
+    }
+
     public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId, String streamId) {
         MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, streamId, taskId, context.getThisWorkerPort());
         Meter meter = registry.meter(metricNames.getLongName());
@@ -156,6 +164,10 @@
     private static <T extends Metric> void saveMetricTaskIdMapping(Integer taskId, MetricNames names, T metric, Map<Integer,
             Map<String, T>> taskIdMetrics) {
         Map<String, T> metrics = taskIdMetrics.computeIfAbsent(taskId, (tid) -> new HashMap<>());
+        if (metrics.get(names.getV2TickName()) != null) {
+            LOG.warn("Adding duplicate short metric for {} with long name {}, only the last metric "
+                    + "will be reported during the V2 metrics tick.", names.getV2TickName(), names.longName);
+        }
         metrics.put(names.getV2TickName(), metric);
     }
 
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
index e792d62..d52190f 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -13,69 +13,131 @@
 package org.apache.storm.metrics2;
 
 import com.codahale.metrics.Counter;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.storm.task.WorkerTopologyContext;
+import org.apache.storm.utils.ConfigUtils;
 
 public class TaskMetrics {
-    private static final String METRIC_NAME_ACKED = "acked";
-    private static final String METRIC_NAME_FAILED = "failed";
-    private static final String METRIC_NAME_EMITTED = "emitted";
-    private static final String METRIC_NAME_TRANSFERRED = "transferred";
+    private static final String METRIC_NAME_ACKED = "__ack-count";
+    private static final String METRIC_NAME_FAILED = "__fail-count";
+    private static final String METRIC_NAME_EMITTED = "__emit-count";
+    private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
+    private static final String METRIC_NAME_EXECUTED = "__execute-count";
+    private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
+    private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
+    private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
 
-    private final ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, Counter> counters = new ConcurrentHashMap<>();
+    private final ConcurrentMap<String, RollingAverageGauge> gauges = new ConcurrentHashMap<>();
 
     private final String topologyId;
     private final String componentId;
     private final Integer taskId;
     private final Integer workerPort;
     private final StormMetricRegistry metricRegistry;
+    private final int samplingRate;
 
-    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid, StormMetricRegistry metricRegistry) {
+
+    public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
+                       StormMetricRegistry metricRegistry, Map<String, Object> topoConf) {
         this.metricRegistry = metricRegistry;
         this.topologyId = context.getStormId();
         this.componentId = componentId;
         this.taskId = taskid;
         this.workerPort = context.getThisWorkerPort();
+        this.samplingRate = ConfigUtils.samplingRate(topoConf);
     }
 
-    public Counter getAcked(String streamId) {
-        Counter c = this.ackedByStream.get(streamId);
+    public void spoutAckedTuple(String streamId, long latencyMs) {
+        String metricName = METRIC_NAME_ACKED + "-" + streamId;
+        Counter c = this.getCounter(metricName, streamId);
+        c.inc(this.samplingRate);
+
+        metricName = METRIC_NAME_COMPLETE_LATENCY + "-" + streamId;
+        RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, streamId);
+        gauge.addValue(latencyMs);
+    }
+
+    public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
+        String key = sourceComponentId + ":" + sourceStreamId;
+        String metricName = METRIC_NAME_ACKED + "-" + key;
+        Counter c = this.getCounter(metricName, sourceStreamId);
+        c.inc(this.samplingRate);
+
+        metricName = METRIC_NAME_PROCESS_LATENCY + "-" + key;
+        RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
+        gauge.addValue(latencyMs);
+    }
+
+    public void spoutFailedTuple(String streamId) {
+        String key = streamId;
+        String metricName = METRIC_NAME_FAILED + "-" + key;
+        Counter c = this.getCounter(metricName, streamId);
+        c.inc(this.samplingRate);
+    }
+
+    public void boltFailedTuple(String sourceComponentId, String sourceStreamId) {
+        String key = sourceComponentId + ":" + sourceStreamId;
+        String metricName = METRIC_NAME_FAILED + "-" + key;
+        Counter c = this.getCounter(metricName, sourceStreamId);
+        c.inc(this.samplingRate);
+    }
+
+    public void emittedTuple(String streamId) {
+        String key = streamId;
+        String metricName = METRIC_NAME_EMITTED + "-" + key;
+        Counter c = this.getCounter(metricName, streamId);
+        c.inc(this.samplingRate);
+    }
+
+    public void transferredTuples(String streamId, int amount) {
+        String key = streamId;
+        String metricName = METRIC_NAME_TRANSFERRED + "-" + key;
+        Counter c = this.getCounter(metricName, streamId);
+        c.inc(amount * this.samplingRate);
+    }
+
+    public void boltExecuteTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
+        String key = sourceComponentId + ":" + sourceStreamId;
+        String metricName = METRIC_NAME_EXECUTED + "-" + key;
+        Counter c = this.getCounter(metricName, sourceStreamId);
+        c.inc(this.samplingRate);
+
+        metricName = METRIC_NAME_EXECUTE_LATENCY + "-" + key;
+        RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
+        gauge.addValue(latencyMs);
+    }
+
+    private Counter getCounter(String metricName, String streamId) {
+        Counter c = this.counters.get(metricName);
         if (c == null) {
-            c = metricRegistry.counter(METRIC_NAME_ACKED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
-            this.ackedByStream.put(streamId, c);
+            synchronized (this) {
+                c = this.counters.get(metricName);
+                if (c == null) {
+                    c = metricRegistry.counter(metricName, this.topologyId, this.componentId,
+                            this.taskId, this.workerPort, streamId);
+                    this.counters.put(metricName, c);
+                }
+            }
         }
         return c;
     }
 
-    public Counter getFailed(String streamId) {
-        Counter c = this.failedByStream.get(streamId);
-        if (c == null) {
-            c = metricRegistry.counter(METRIC_NAME_FAILED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
-            this.failedByStream.put(streamId, c);
+    private RollingAverageGauge getRollingAverageGauge(String metricName, String streamId) {
+        RollingAverageGauge gauge = this.gauges.get(metricName);
+        if (gauge == null) {
+            synchronized (this) {
+                gauge = this.gauges.get(metricName);
+                if (gauge == null) {
+                    gauge = new RollingAverageGauge();
+                    metricRegistry.gauge(metricName, gauge, this.topologyId, this.componentId,
+                            streamId, this.taskId, this.workerPort);
+                    this.gauges.put(metricName, gauge);
+                }
+            }
         }
-        return c;
-    }
-
-    public Counter getEmitted(String streamId) {
-        Counter c = this.emittedByStream.get(streamId);
-        if (c == null) {
-            c = metricRegistry.counter(METRIC_NAME_EMITTED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
-            this.emittedByStream.put(streamId, c);
-        }
-        return c;
-    }
-
-    public Counter getTransferred(String streamId) {
-        Counter c = this.transferredByStream.get(streamId);
-        if (c == null) {
-            c = metricRegistry.counter(
-                METRIC_NAME_TRANSFERRED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId);
-            this.transferredByStream.put(streamId, c);
-        }
-        return c;
+        return gauge;
     }
 }
diff --git a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
index 26b2d74..030b0d7 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/BoltExecutorStats.java
@@ -12,37 +12,36 @@
 
 package org.apache.storm.stats;
 
-import com.codahale.metrics.Counter;
 import java.util.List;
 import org.apache.storm.generated.BoltStats;
 import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
-import org.apache.storm.metric.internal.MultiCountStatAndMetric;
-import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+import org.apache.storm.metric.internal.MultiCountStat;
+import org.apache.storm.metric.internal.MultiLatencyStat;
 import org.apache.storm.shade.com.google.common.collect.Lists;
 
 @SuppressWarnings("unchecked")
 public class BoltExecutorStats extends CommonStats {
-    MultiCountStatAndMetric executedStats;
-    MultiLatencyStatAndMetric processLatencyStats;
-    MultiLatencyStatAndMetric executeLatencyStats;
+    MultiCountStat executedStats;
+    MultiLatencyStat processLatencyStats;
+    MultiLatencyStat executeLatencyStats;
 
     public BoltExecutorStats(int rate, int numStatBuckets) {
         super(rate, numStatBuckets);
-        this.executedStats = new MultiCountStatAndMetric(numStatBuckets);
-        this.processLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets);
-        this.executeLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets);
+        this.executedStats = new MultiCountStat(numStatBuckets);
+        this.processLatencyStats = new MultiLatencyStat(numStatBuckets);
+        this.executeLatencyStats = new MultiLatencyStat(numStatBuckets);
     }
 
-    public MultiCountStatAndMetric getExecuted() {
+    public MultiCountStat getExecuted() {
         return executedStats;
     }
 
-    public MultiLatencyStatAndMetric getProcessLatencies() {
+    public MultiLatencyStat getProcessLatencies() {
         return processLatencyStats;
     }
 
-    public MultiLatencyStatAndMetric getExecuteLatencies() {
+    public MultiLatencyStat getExecuteLatencies() {
         return executeLatencyStats;
     }
 
@@ -60,17 +59,15 @@
         this.getExecuteLatencies().record(key, latencyMs);
     }
 
-    public void boltAckedTuple(String component, String stream, long latencyMs, Counter ackedCounter) {
+    public void boltAckedTuple(String component, String stream, long latencyMs) {
         List key = Lists.newArrayList(component, stream);
         this.getAcked().incBy(key, this.rate);
-        ackedCounter.inc(this.rate);
         this.getProcessLatencies().record(key, latencyMs);
     }
 
-    public void boltFailedTuple(String component, String stream, long latencyMs, Counter failedCounter) {
+    public void boltFailedTuple(String component, String stream) {
         List key = Lists.newArrayList(component, stream);
         this.getFailed().incBy(key, this.rate);
-        failedCounter.inc(this.rate);
     }
 
     @Override
diff --git a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
index ed56685..320ab57 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/CommonStats.java
@@ -12,33 +12,32 @@
 
 package org.apache.storm.stats;
 
-import com.codahale.metrics.Counter;
 import java.util.Map;
 import org.apache.storm.generated.ExecutorStats;
-import org.apache.storm.metric.internal.MultiCountStatAndMetric;
-import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+import org.apache.storm.metric.internal.MultiCountStat;
+import org.apache.storm.metric.internal.MultiLatencyStat;
 
 @SuppressWarnings("unchecked")
 public abstract class CommonStats {
     protected final int rate;
-    private final MultiCountStatAndMetric emittedStats;
-    private final MultiCountStatAndMetric transferredStats;
-    private final MultiCountStatAndMetric ackedStats;
-    private final MultiCountStatAndMetric failedStats;
+    private final MultiCountStat emittedStats;
+    private final MultiCountStat transferredStats;
+    private final MultiCountStat ackedStats;
+    private final MultiCountStat failedStats;
 
     public CommonStats(int rate, int numStatBuckets) {
         this.rate = rate;
-        this.emittedStats = new MultiCountStatAndMetric(numStatBuckets);
-        this.transferredStats = new MultiCountStatAndMetric(numStatBuckets);
-        this.ackedStats = new MultiCountStatAndMetric(numStatBuckets);
-        this.failedStats = new MultiCountStatAndMetric(numStatBuckets);
+        this.emittedStats = new MultiCountStat(numStatBuckets);
+        this.transferredStats = new MultiCountStat(numStatBuckets);
+        this.ackedStats = new MultiCountStat(numStatBuckets);
+        this.failedStats = new MultiCountStat(numStatBuckets);
     }
 
-    public MultiCountStatAndMetric getFailed() {
+    public MultiCountStat getFailed() {
         return failedStats;
     }
 
-    public MultiCountStatAndMetric getAcked() {
+    public MultiCountStat getAcked() {
         return ackedStats;
     }
 
@@ -46,22 +45,20 @@
         return this.rate;
     }
 
-    public MultiCountStatAndMetric getEmitted() {
+    public MultiCountStat getEmitted() {
         return emittedStats;
     }
 
-    public MultiCountStatAndMetric getTransferred() {
+    public MultiCountStat getTransferred() {
         return transferredStats;
     }
 
-    public void emittedTuple(String stream, Counter emittedCounter) {
+    public void emittedTuple(String stream) {
         this.getEmitted().incBy(stream, this.rate);
-        emittedCounter.inc(this.rate);
     }
 
-    public void transferredTuples(String stream, int amount, Counter transferredCounter) {
+    public void transferredTuples(String stream, int amount) {
         this.getTransferred().incBy(stream, this.rate * amount);
-        transferredCounter.inc(amount);
     }
 
     public void cleanupStats() {
@@ -71,11 +68,11 @@
         failedStats.close();
     }
 
-    protected Map<String, Map<String, Long>> valueStat(MultiCountStatAndMetric metric) {
+    protected Map<String, Map<String, Long>> valueStat(MultiCountStat metric) {
         return metric.getTimeCounts();
     }
 
-    protected Map<String, Map<String, Double>> valueStat(MultiLatencyStatAndMetric metric) {
+    protected Map<String, Map<String, Double>> valueStat(MultiLatencyStat metric) {
         return metric.getTimeLatAvg();
     }
 
diff --git a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
index f0f4c30..e13de34 100644
--- a/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
+++ b/storm-client/src/jvm/org/apache/storm/stats/SpoutExecutorStats.java
@@ -12,22 +12,21 @@
 
 package org.apache.storm.stats;
 
-import com.codahale.metrics.Counter;
 import org.apache.storm.generated.ExecutorSpecificStats;
 import org.apache.storm.generated.ExecutorStats;
 import org.apache.storm.generated.SpoutStats;
-import org.apache.storm.metric.internal.MultiLatencyStatAndMetric;
+import org.apache.storm.metric.internal.MultiLatencyStat;
 
 @SuppressWarnings("unchecked")
 public class SpoutExecutorStats extends CommonStats {
-    private final MultiLatencyStatAndMetric completeLatencyStats;
+    private final MultiLatencyStat completeLatencyStats;
 
     public SpoutExecutorStats(int rate, int numStatBuckets) {
         super(rate, numStatBuckets);
-        this.completeLatencyStats = new MultiLatencyStatAndMetric(numStatBuckets);
+        this.completeLatencyStats = new MultiLatencyStat(numStatBuckets);
     }
 
-    public MultiLatencyStatAndMetric getCompleteLatencies() {
+    public MultiLatencyStat getCompleteLatencies() {
         return completeLatencyStats;
     }
 
@@ -37,15 +36,13 @@
         super.cleanupStats();
     }
 
-    public void spoutAckedTuple(String stream, long latencyMs, Counter ackedCounter) {
+    public void spoutAckedTuple(String stream, long latencyMs) {
         this.getAcked().incBy(stream, this.rate);
-        ackedCounter.inc(this.rate);
         this.getCompleteLatencies().record(stream, latencyMs);
     }
 
-    public void spoutFailedTuple(String stream, long latencyMs, Counter failedCounter) {
+    public void spoutFailedTuple(String stream) {
         this.getFailed().incBy(stream, this.rate);
-        failedCounter.inc(this.rate);
     }
 
     @Override
diff --git a/storm-client/test/jvm/org/apache/storm/metric/internal/CountStatAndMetricTest.java b/storm-client/test/jvm/org/apache/storm/metric/internal/CountStatTest.java
similarity index 86%
rename from storm-client/test/jvm/org/apache/storm/metric/internal/CountStatAndMetricTest.java
rename to storm-client/test/jvm/org/apache/storm/metric/internal/CountStatTest.java
index 6cd8826..58e88ba 100644
--- a/storm-client/test/jvm/org/apache/storm/metric/internal/CountStatAndMetricTest.java
+++ b/storm-client/test/jvm/org/apache/storm/metric/internal/CountStatTest.java
@@ -18,9 +18,9 @@
 import org.junit.Test;
 
 /**
- * Unit test for CountStatAndMetric
+ * Unit test for CountStat
  */
-public class CountStatAndMetricTest extends TestCase {
+public class CountStatTest extends TestCase {
     final long TEN_MIN = 10 * 60 * 1000;
     final long THIRTY_SEC = 30 * 1000;
     final long THREE_HOUR = 3 * 60 * 60 * 1000;
@@ -29,7 +29,7 @@
     @Test
     public void testBasic() {
         long time = 0l;
-        CountStatAndMetric count = new CountStatAndMetric(10, time);
+        CountStat count = new CountStat(10, time);
         while (time < TEN_MIN) {
             //For this part of the test we interleve the differnt rotation types.
             count.incBy(50);
@@ -37,7 +37,6 @@
             count.rotateSched(time);
             count.incBy(50);
             time += THIRTY_SEC / 2;
-            assertEquals(100l, ((Long) count.getValueAndReset(time)).longValue());
         }
 
         long val = 100 * TEN_MIN / THIRTY_SEC;
@@ -51,7 +50,6 @@
         while (time < THREE_HOUR) {
             count.incBy(100);
             time += THIRTY_SEC;
-            assertEquals(100l, ((Long) count.getValueAndReset(time)).longValue());
         }
 
         val = 100 * THREE_HOUR / THIRTY_SEC;
@@ -65,7 +63,6 @@
         while (time < ONE_DAY) {
             count.incBy(100);
             time += THIRTY_SEC;
-            assertEquals(100l, ((Long) count.getValueAndReset(time)).longValue());
         }
 
         val = 100 * ONE_DAY / THIRTY_SEC;
diff --git a/storm-client/test/jvm/org/apache/storm/metric/internal/LatencyStatAndMetricTest.java b/storm-client/test/jvm/org/apache/storm/metric/internal/LatencyStatTest.java
similarity index 94%
rename from storm-client/test/jvm/org/apache/storm/metric/internal/LatencyStatAndMetricTest.java
rename to storm-client/test/jvm/org/apache/storm/metric/internal/LatencyStatTest.java
index f2f566f..08030c1 100644
--- a/storm-client/test/jvm/org/apache/storm/metric/internal/LatencyStatAndMetricTest.java
+++ b/storm-client/test/jvm/org/apache/storm/metric/internal/LatencyStatTest.java
@@ -17,9 +17,9 @@
 import org.junit.Test;
 
 /**
- * Unit test for LatencyStatAndMetric
+ * Unit test for LatencyStat
  */
-public class LatencyStatAndMetricTest extends TestCase {
+public class LatencyStatTest extends TestCase {
     final long TEN_MIN = 10 * 60 * 1000;
     final long THIRTY_SEC = 30 * 1000;
     final long THREE_HOUR = 3 * 60 * 60 * 1000;
@@ -28,7 +28,7 @@
     @Test
     public void testBasic() {
         long time = 0l;
-        LatencyStatAndMetric lat = new LatencyStatAndMetric(10, time);
+        LatencyStat lat = new LatencyStat(10, time);
         while (time < TEN_MIN) {
             lat.record(100);
             time += THIRTY_SEC;
diff --git a/storm-client/test/jvm/org/apache/storm/metrics2/RollingAverageGaugeTest.java b/storm-client/test/jvm/org/apache/storm/metrics2/RollingAverageGaugeTest.java
new file mode 100644
index 0000000..1985b19
--- /dev/null
+++ b/storm-client/test/jvm/org/apache/storm/metrics2/RollingAverageGaugeTest.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metrics2;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RollingAverageGaugeTest {
+
+    @Test
+    public void testAverage() {
+        RollingAverageGauge gauge = new RollingAverageGauge();
+        Assert.assertEquals(0.0, gauge.getValue(), 0.001);
+        gauge.addValue(30);
+        Assert.assertEquals(10.0, gauge.getValue(), 0.001);
+        gauge.addValue(30);
+        Assert.assertEquals(20.0, gauge.getValue(), 0.001);
+        gauge.addValue(30);
+        Assert.assertEquals(30.0, gauge.getValue(), 0.001);
+        gauge.addValue(90);
+        Assert.assertEquals(50.0, gauge.getValue(), 0.001);
+        gauge.addValue(0);
+        Assert.assertEquals(40.0, gauge.getValue(), 0.001);
+    }
+}
\ No newline at end of file
diff --git a/storm-core/test/clj/org/apache/storm/metrics_test.clj b/storm-core/test/clj/org/apache/storm/metrics_test.clj
index c779ced..2689a7a 100644
--- a/storm-core/test/clj/org/apache/storm/metrics_test.clj
+++ b/storm-core/test/clj/org/apache/storm/metrics_test.clj
@@ -238,143 +238,4 @@
       )))
 
 
-(deftest test-builtin-metrics-1
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
-                           TOPOLOGY-STATS-SAMPLE-RATE 1.0
-                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60})))]
-    (let [feeder (FeederSpout. ["field1"])
-          topology (Thrift/buildTopology
-                    {"myspout" (Thrift/prepareSpoutDetails feeder)}
-                    {"mybolt" (Thrift/prepareBoltDetails
-                                {(Utils/getGlobalStreamId "myspout" nil)
-                                 (Thrift/prepareShuffleGrouping)}
-                                acking-bolt)})]
-      (.submitTopology cluster "metrics-tester" {} topology)
-      
-      (.feed feeder ["a"] 1)
-      (.advanceClusterTime cluster 61)
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 1 1 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 1 1 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 1 1 cluster)            
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 1 cluster)
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 1 1 cluster)
-
-      (.advanceClusterTime cluster 120)
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 1 3 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 1 3 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 1 3 cluster)
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 3 cluster)
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 1 3 cluster)
-
-      (.feed feeder ["b"] 1)
-      (.feed feeder ["c"] 1)
-      (.advanceClusterTime cluster 60)
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 3 4 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 3 4 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 3 4 cluster)      
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 3 4 cluster)
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 4 cluster))))
-
-
-(deftest test-builtin-metrics-2
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
-                           TOPOLOGY-STATS-SAMPLE-RATE 1.0
-                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5})))]
-    (let [feeder (FeederSpout. ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (Thrift/buildTopology
-                    {"myspout" (Thrift/prepareSpoutDetails feeder)}
-                    {"mybolt" (Thrift/prepareBoltDetails
-                                {(Utils/getGlobalStreamId "myspout" nil)
-                                 (Thrift/prepareShuffleGrouping)}
-                                ack-every-other)})]
-      (.submitTopology cluster
-                             "metrics-tester"
-                             {}
-                             topology)
-      
-      (.feed feeder ["a"] 1)
-      (.advanceClusterTime cluster 6)
-      (assert-metric-running-sum! "myspout" "__fail-count/default" 0 0 cluster)
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 1 1 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 1 1 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 1 1 cluster)            
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 1 cluster)     
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 1 1 cluster)
-      (assert-acked tracker 1)
-
-      (.feed feeder ["b"] 2)      
-      (.advanceClusterTime cluster 5)
-      (assert-metric-running-sum! "myspout" "__fail-count/default" 0 0 cluster)
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 1 2 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 2 2 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 2 2 cluster)                  
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 2 cluster)
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 2 2 cluster)
-
-      (.advanceClusterTime cluster 15)      
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 1 5 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 2 5 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 2 5 cluster)
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 1 5 cluster)
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 2 5 cluster)
-      
-      (.feed feeder ["c"] 3)            
-      (.advanceClusterTime cluster 15)      
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 2 8 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 3 8 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 3 8 cluster)
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 2 8 cluster)
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 8 cluster))))
-
-(deftest test-builtin-metrics-3
-  (with-open [cluster (.build (doto (LocalCluster$Builder.)
-                                (.withSimulatedTime)
-                                (.withDaemonConf {TOPOLOGY-METRICS-CONSUMER-REGISTER
-                           [{"class" "org.apache.storm.metric.FakeMetricConsumer"}]
-                           TOPOLOGY-STATS-SAMPLE-RATE 1.0
-                           TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5
-                           TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true})))]
-    (let [feeder (FeederSpout. ["field1"])
-          tracker (AckFailMapTracker.)
-          _ (.setAckFailDelegate feeder tracker)
-          topology (Thrift/buildTopology
-                     {"myspout" (Thrift/prepareSpoutDetails feeder)}
-                     {"mybolt" (Thrift/prepareBoltDetails
-                                 {(Utils/getGlobalStreamId "myspout" nil)
-                                  (Thrift/prepareGlobalGrouping)}
-                                 ack-every-other)})]
-      (.submitTopology cluster
-                             "timeout-tester"
-                             {TOPOLOGY-MESSAGE-TIMEOUT-SECS 10}
-                             topology)
-      (.feed feeder ["a"] 1)
-      (.feed feeder ["b"] 2)
-      (.feed feeder ["c"] 3)
-      (.advanceClusterTime cluster 9)
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 2 1 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 3 1 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 3 1 cluster)
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 2 1 cluster)
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 1 cluster)
-      (assert-acked tracker 1 3)
-      
-      (is (not (.isFailed tracker 2)))
-      (.advanceClusterTime cluster 30)
-      (assert-failed tracker 2)
-      (assert-metric-running-sum! "myspout" "__fail-count/default" 1 1 cluster)
-      (assert-metric-running-sum! "myspout" "__ack-count/default" 2 4 cluster)
-      (assert-metric-running-sum! "myspout" "__emit-count/default" 3 4 cluster)
-      (assert-metric-running-sum! "myspout" "__transfer-count/default" 3 4 cluster)
-      (assert-metric-running-sum! "mybolt" "__ack-count/myspout:default" 2 4 cluster)
-      (assert-metric-running-sum! "mybolt" "__execute-count/myspout:default" 3 4 cluster))))
-
-