HBASE-14743 Add metrics around HeapMemoryManager. (Reid Chan)

Change-Id: I7305f7b7034b216930b5fb5c57de9ba5eabf96d8

Signed-off-by: Apekshit Sharma <appy@apache.org>
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManagerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManagerSource.java
new file mode 100644
index 0000000..532c5af
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManagerSource.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+
+/**
+ * This interface will be implemented by a MetricsSource that will export metrics from
+ * HeapMemoryManager in RegionServer into the hadoop metrics system.
+ */
+public interface MetricsHeapMemoryManagerSource extends BaseSource {
+  /**
+   * The name of the metrics
+   */
+  String METRICS_NAME = "Memory";
+
+  /**
+   * The name of the metrics context that metrics will be under.
+   */
+  String METRICS_CONTEXT = "regionserver";
+
+  /**
+   * Description
+   */
+  String METRICS_DESCRIPTION = "Metrics about HBase RegionServer's memory";
+
+  /**
+   * The name of the metrics context that metrics will be under in jmx
+   */
+  String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+  /**
+   * Update/Set the blocked flush count histogram/gauge
+   * @param blockedFlushCount the number of blocked flush since last tuning.
+   */
+  void updateBlockedFlushCount(long blockedFlushCount);
+
+  /**
+   * Update/Set the unblocked flush count histogram/gauge
+   * @param unblockedFlushCount the number of unblocked flush since last tuning.
+   */
+  void updateUnblockedFlushCount(long unblockedFlushCount);
+
+  /**
+   * Set the current blockcache size used gauge
+   * @param blockCacheSize the current memory usage in blockcache, in bytes.
+   */
+  void setCurBlockCacheSizeGauge(long blockCacheSize);
+
+  /**
+   * Set the current global memstore size used gauge
+   * @param memStoreSize the current memory usage in memstore, in bytes.
+   */
+  void setCurMemStoreSizeGauge(long memStoreSize);
+
+  /**
+   * Update the increase/decrease memstore size histogram
+   * @param memStoreDeltaSize the tuning result of memstore.
+   */
+  void updateMemStoreDeltaSizeHistogram(int memStoreDeltaSize);
+
+  /**
+   * Update the increase/decrease blockcache size histogram
+   * @param blockCacheDeltaSize the tuning result of blockcache.
+   */
+  void updateBlockCacheDeltaSizeHistogram(int blockCacheDeltaSize);
+
+  /**
+   * Increase the counter for tuner neither expanding memstore global size limit nor expanding
+   * blockcache max size.
+   */
+  void increaseTunerDoNothingCounter();
+
+  /**
+   * Increase the counter for heap occupancy percent above low watermark
+   */
+  void increaseAboveHeapOccupancyLowWatermarkCounter();
+
+  // Histograms
+  String BLOCKED_FLUSH_NAME = "blockedFlushes";
+  String BLOCKED_FLUSH_DESC = "Histogram for the number of blocked flushes in the memstore";
+  String UNBLOCKED_FLUSH_NAME = "unblockedFlushes";
+  String UNBLOCKED_FLUSH_DESC = "Histogram for the number of unblocked flushes in the memstore";
+  String INC_MEMSTORE_TUNING_NAME = "increaseMemStoreSize";
+  String INC_MEMSTORE_TUNING_DESC =
+      "Histogram for the heap memory tuner expanding memstore global size limit in bytes";
+  String DEC_MEMSTORE_TUNING_NAME = "decreaseMemStoreSize";
+  String DEC_MEMSTORE_TUNING_DESC =
+      "Histogram for the heap memory tuner shrinking memstore global size limit in bytes";
+  String INC_BLOCKCACHE_TUNING_NAME = "increaseBlockCacheSize";
+  String INC_BLOCKCACHE_TUNING_DESC =
+      "Histogram for the heap memory tuner expanding blockcache max heap size in bytes";
+  String DEC_BLOCKCACHE_TUNING_NAME = "decreaseBlockCacheSize";
+  String DEC_BLOCKCACHE_TUNING_DESC =
+      "Histogram for the heap memory tuner shrinking blockcache max heap size in bytes";
+
+  // Gauges
+  String BLOCKED_FLUSH_GAUGE_NAME = "blockedFlushGauge";
+  String BLOCKED_FLUSH_GAUGE_DESC = "Gauge for the blocked flush count before tuning";
+  String UNBLOCKED_FLUSH_GAUGE_NAME = "unblockedFlushGauge";
+  String UNBLOCKED_FLUSH_GAUGE_DESC = "Gauge for the unblocked flush count before tuning";
+  String MEMSTORE_SIZE_GAUGE_NAME = "memStoreSize";
+  String MEMSTORE_SIZE_GAUGE_DESC = "Global MemStore used in bytes by the RegionServer";
+  String BLOCKCACHE_SIZE_GAUGE_NAME = "blockCacheSize";
+  String BLOCKCACHE_SIZE_GAUGE_DESC = "BlockCache used in bytes by the RegionServer";
+
+  // Counters
+  String DO_NOTHING_COUNTER_NAME = "tunerDoNothingCounter";
+  String DO_NOTHING_COUNTER_DESC =
+      "The number of times that tuner neither expands memstore global size limit nor expands blockcache max size";
+  String ABOVE_HEAP_LOW_WATERMARK_COUNTER_NAME = "aboveHeapOccupancyLowWaterMarkCounter";
+  String ABOVE_HEAP_LOW_WATERMARK_COUNTER_DESC =
+      "The number of times that heap occupancy percent is above low watermark";
+}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
index f0289f7..71739fe 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
@@ -54,4 +54,10 @@
    * @return A metrics table aggregate source
    */
   MetricsTableAggregateSource getTableAggregate();
+
+  /**
+   * Get a MetricsHeapMemoryManagerSource
+   * @return A metrics heap memory manager source
+   */
+  MetricsHeapMemoryManagerSource getHeapMemoryManager();
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManagerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManagerSourceImpl.java
new file mode 100644
index 0000000..1aa1911
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManagerSourceImpl.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+/**
+ * Hadoop2 implementation of MetricsHeapMemoryManagerSource. Implements BaseSource through
+ * BaseSourceImpl, following the pattern
+ */
+@InterfaceAudience.Private
+public class MetricsHeapMemoryManagerSourceImpl extends BaseSourceImpl implements
+    MetricsHeapMemoryManagerSource {
+
+  private final MetricHistogram blockedFlushHistogram;
+  private final MetricHistogram unblockedFlushHistogram;
+  private final MetricHistogram incMemStoreSizeHistogram;
+  private final MetricHistogram decMemStoreSizeHistogram;
+  private final MetricHistogram incBlockCacheSizeHistogram;
+  private final MetricHistogram decBlockCacheSizeHistogram;
+
+  private final MutableGaugeLong blockedFlushGauge;
+  private final MutableGaugeLong unblockedFlushGauge;
+  private final MutableGaugeLong memStoreSizeGauge;
+  private final MutableGaugeLong blockCacheSizeGauge;
+
+  private final MutableFastCounter doNothingCounter;
+  private final MutableFastCounter aboveHeapOccupancyLowWatermarkCounter;
+
+  public MetricsHeapMemoryManagerSourceImpl() {
+    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+  }
+
+  public MetricsHeapMemoryManagerSourceImpl(String metricsName, String metricsDescription,
+      String metricsContext, String metricsJmxContext) {
+    super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+
+    // Histograms
+    blockedFlushHistogram = getMetricsRegistry()
+        .newSizeHistogram(BLOCKED_FLUSH_NAME, BLOCKED_FLUSH_DESC);
+    unblockedFlushHistogram = getMetricsRegistry()
+        .newSizeHistogram(UNBLOCKED_FLUSH_NAME, UNBLOCKED_FLUSH_DESC);
+    incMemStoreSizeHistogram = getMetricsRegistry()
+        .newSizeHistogram(INC_MEMSTORE_TUNING_NAME, INC_MEMSTORE_TUNING_DESC);
+    decMemStoreSizeHistogram = getMetricsRegistry()
+        .newSizeHistogram(DEC_MEMSTORE_TUNING_NAME, DEC_MEMSTORE_TUNING_DESC);
+    incBlockCacheSizeHistogram = getMetricsRegistry()
+        .newSizeHistogram(INC_BLOCKCACHE_TUNING_NAME, INC_BLOCKCACHE_TUNING_DESC);
+    decBlockCacheSizeHistogram = getMetricsRegistry()
+        .newSizeHistogram(DEC_BLOCKCACHE_TUNING_NAME, DEC_BLOCKCACHE_TUNING_DESC);
+
+    // Gauges
+    blockedFlushGauge = getMetricsRegistry()
+        .newGauge(BLOCKED_FLUSH_GAUGE_NAME, BLOCKED_FLUSH_GAUGE_DESC, 0L);
+    unblockedFlushGauge = getMetricsRegistry()
+        .newGauge(UNBLOCKED_FLUSH_GAUGE_NAME, UNBLOCKED_FLUSH_GAUGE_DESC, 0L);
+    memStoreSizeGauge = getMetricsRegistry()
+        .newGauge(MEMSTORE_SIZE_GAUGE_NAME, MEMSTORE_SIZE_GAUGE_DESC, 0L);
+    blockCacheSizeGauge = getMetricsRegistry()
+        .newGauge(BLOCKCACHE_SIZE_GAUGE_NAME, BLOCKCACHE_SIZE_GAUGE_DESC, 0L);
+
+    // Counters
+    doNothingCounter = getMetricsRegistry()
+        .newCounter(DO_NOTHING_COUNTER_NAME, DO_NOTHING_COUNTER_DESC, 0L);
+    aboveHeapOccupancyLowWatermarkCounter = getMetricsRegistry()
+        .newCounter(ABOVE_HEAP_LOW_WATERMARK_COUNTER_NAME,
+          ABOVE_HEAP_LOW_WATERMARK_COUNTER_DESC, 0L);
+  }
+
+  @Override
+  public void updateBlockedFlushCount(long blockedFlushCount) {
+    if (blockedFlushCount > 0) {
+      blockedFlushHistogram.add(blockedFlushCount);
+      blockedFlushGauge.set(blockedFlushCount);
+    }
+  }
+
+  @Override
+  public void updateUnblockedFlushCount(long unblockedFlushCount) {
+    if (unblockedFlushCount > 0) {
+      unblockedFlushHistogram.add(unblockedFlushCount);
+      unblockedFlushGauge.set(unblockedFlushCount);
+    }
+  }
+
+  @Override
+  public void setCurBlockCacheSizeGauge(long blockcacheSize) {
+    blockCacheSizeGauge.set(blockcacheSize);
+  }
+
+  @Override
+  public void setCurMemStoreSizeGauge(long memstoreSize) {
+    memStoreSizeGauge.set(memstoreSize);
+  }
+
+  @Override
+  public void updateMemStoreDeltaSizeHistogram(int memStoreDeltaSize) {
+    if (memStoreDeltaSize >= 0) {
+      incMemStoreSizeHistogram.add(memStoreDeltaSize);
+    } else if (memStoreDeltaSize < 0) {
+      decMemStoreSizeHistogram.add(-memStoreDeltaSize);
+    }
+  }
+
+  @Override
+  public void updateBlockCacheDeltaSizeHistogram(int blockCacheDeltaSize) {
+    if (blockCacheDeltaSize >= 0) {
+      incBlockCacheSizeHistogram.add(blockCacheDeltaSize);
+    } else if (blockCacheDeltaSize < 0) {
+      decBlockCacheSizeHistogram.add(-blockCacheDeltaSize);
+    }
+  }
+
+  @Override
+  public void increaseTunerDoNothingCounter() {
+    doNothingCounter.incr();
+  }
+
+  @Override
+  public void increaseAboveHeapOccupancyLowWatermarkCounter() {
+    aboveHeapOccupancyLowWatermarkCounter.incr();
+  }
+}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
index 7a9b9c0..3b26ab0 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
@@ -15,7 +15,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -30,6 +29,7 @@
     private Object aggLock = new Object();
     private MetricsRegionAggregateSourceImpl aggImpl;
     private MetricsTableAggregateSourceImpl tblAggImpl;
+    private MetricsHeapMemoryManagerSourceImpl heapMemMngImpl;
   }
 
   private synchronized MetricsRegionAggregateSourceImpl getAggregate() {
@@ -52,6 +52,16 @@
   }
 
   @Override
+  public synchronized MetricsHeapMemoryManagerSource getHeapMemoryManager() {
+    synchronized (FactoryStorage.INSTANCE.aggLock) {
+      if (FactoryStorage.INSTANCE.heapMemMngImpl == null) {
+        FactoryStorage.INSTANCE.heapMemMngImpl = new MetricsHeapMemoryManagerSourceImpl();
+      }
+      return FactoryStorage.INSTANCE.heapMemMngImpl;
+    }
+  }
+
+  @Override
   public synchronized MetricsRegionServerSource createServer(MetricsRegionServerWrapper regionServerWrapper) {
     return new MetricsRegionServerSourceImpl(regionServerWrapper);
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
index 77a9186..f90125e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HeapMemoryManager.java
@@ -85,6 +85,8 @@
 
   private long maxHeapSize = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
 
+  private MetricsHeapMemoryManager metricsHeapMemoryManager;
+
   public static HeapMemoryManager create(Configuration conf, FlushRequester memStoreFlusher,
                 Server server, RegionServerAccounting regionServerAccounting) {
     BlockCache blockCache = CacheConfig.instantiateBlockCache(conf);
@@ -108,6 +110,7 @@
       HBASE_RS_HEAP_MEMORY_TUNER_DEFAULT_PERIOD);
     this.heapOccupancyLowWatermark = conf.getFloat(HConstants.HEAP_OCCUPANCY_LOW_WATERMARK_KEY,
       HConstants.DEFAULT_HEAP_OCCUPANCY_LOW_WATERMARK);
+    metricsHeapMemoryManager = new MetricsHeapMemoryManager();
   }
 
   private boolean doInit(Configuration conf) {
@@ -201,7 +204,6 @@
     // The thread is Daemon. Just interrupting the ongoing process.
     LOG.info("Stoping HeapMemoryTuner chore.");
     this.heapMemTunerChore.cancel(true);
-    
   }
 
   // Used by the test cases.
@@ -245,6 +247,7 @@
             " is above heap occupancy alarm watermark (" + heapOccupancyLowWatermark + ")");
           alarming = true;
         }
+        metricsHeapMemoryManager.increaseAboveHeapOccupancyLowWatermarkCounter();
         triggerNow();
         try {
           // Need to sleep ourselves since we've told the chore's sleeper
@@ -273,17 +276,24 @@
       // while remaining in the limits
       long curEvictCount;
       long curCacheMisCount;
+      long blockedFlushCnt;
+      long unblockedFlushCnt;
       curEvictCount = blockCache.getStats().getEvictedCount();
       tunerContext.setEvictCount(curEvictCount - evictCount);
       evictCount = curEvictCount;
       curCacheMisCount = blockCache.getStats().getMissCachingCount();
       tunerContext.setCacheMissCount(curCacheMisCount-cacheMissCount);
       cacheMissCount = curCacheMisCount;
-      tunerContext.setBlockedFlushCount(blockedFlushCount.getAndSet(0));
-      tunerContext.setUnblockedFlushCount(unblockedFlushCount.getAndSet(0));
-      tunerContext.setCurBlockCacheUsed((float)blockCache.getCurrentSize() / maxHeapSize);
-      tunerContext.setCurMemStoreUsed(
-                 (float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
+      blockedFlushCnt = blockedFlushCount.getAndSet(0);
+      tunerContext.setBlockedFlushCount(blockedFlushCnt);
+      metricsHeapMemoryManager.updateBlockedFlushCount(blockedFlushCnt);
+      unblockedFlushCnt = unblockedFlushCount.getAndSet(0);
+      tunerContext.setUnblockedFlushCount(unblockedFlushCnt);
+      metricsHeapMemoryManager.updateUnblockedFlushCount(unblockedFlushCnt);
+      tunerContext.setCurBlockCacheUsed((float) blockCache.getCurrentSize() / maxHeapSize);
+      metricsHeapMemoryManager.setCurBlockCacheSizeGauge(blockCache.getCurrentSize());
+      tunerContext.setCurMemStoreUsed((float)regionServerAccounting.getGlobalMemstoreSize() / maxHeapSize);
+      metricsHeapMemoryManager.setCurMemStoreSizeGauge(regionServerAccounting.getGlobalMemstoreSize());
       tunerContext.setCurBlockCacheSize(blockCachePercent);
       tunerContext.setCurMemStoreSize(globalMemStorePercent);
       TunerResult result = null;
@@ -327,6 +337,12 @@
               + blockCacheSize);
           // TODO can adjust the value so as not exceed 80%. Is that correct? may be.
         } else {
+          int memStoreDeltaSize =
+              (int) ((memstoreSize - globalMemStorePercent) * CONVERT_TO_PERCENTAGE);
+          int blockCacheDeltaSize =
+              (int) ((blockCacheSize - blockCachePercent) * CONVERT_TO_PERCENTAGE);
+          metricsHeapMemoryManager.updateMemStoreDeltaSizeHistogram(memStoreDeltaSize);
+          metricsHeapMemoryManager.updateBlockCacheDeltaSizeHistogram(blockCacheDeltaSize);
           long newBlockCacheSize = (long) (maxHeapSize * blockCacheSize);
           long newMemstoreSize = (long) (maxHeapSize * memstoreSize);
           LOG.info("Setting block cache heap size to " + newBlockCacheSize
@@ -336,8 +352,11 @@
           globalMemStorePercent = memstoreSize;
           memStoreFlusher.setGlobalMemstoreLimit(newMemstoreSize);
         }
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug("No changes made by HeapMemoryTuner.");
+      } else {
+        metricsHeapMemoryManager.increaseTunerDoNothingCounter();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No changes made by HeapMemoryTuner.");
+        }
       }
     }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManager.java
new file mode 100644
index 0000000..f670779
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsHeapMemoryManager.java
@@ -0,0 +1,109 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is for maintaining the various regionserver's heap memory manager statistics and
+ * publishing them through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+public class MetricsHeapMemoryManager {
+  private final MetricsHeapMemoryManagerSource source;
+
+  public MetricsHeapMemoryManager() {
+    this(CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+        .getHeapMemoryManager());
+  }
+
+  public MetricsHeapMemoryManager(MetricsHeapMemoryManagerSource source) {
+    this.source = source;
+  }
+
+  public MetricsHeapMemoryManagerSource getMetricsSource() {
+    return source;
+  }
+
+  /**
+   * Update/Set the blocked flush count histogram/gauge
+   * @param blockedFlushCount the number of blocked memstore flush since last tuning.
+   */
+  public void updateBlockedFlushCount(final long blockedFlushCount) {
+    source.updateBlockedFlushCount(blockedFlushCount);
+  }
+
+  /**
+   * Update/Set the unblocked flush count histogram/gauge
+   * @param unblockedFlushCount the number of unblocked memstore flush since last tuning.
+   */
+  public void updateUnblockedFlushCount(final long unblockedFlushCount) {
+    source.updateUnblockedFlushCount(unblockedFlushCount);
+  }
+
+  /**
+   * Set the current blockcache size used gauge
+   * @param blockCacheSize the current memory usage in blockcache, in bytes.
+   */
+  public void setCurBlockCacheSizeGauge(final long blockCacheSize) {
+    source.setCurBlockCacheSizeGauge(blockCacheSize);
+  }
+
+  /**
+   * Set the current global memstore size used gauge
+   * @param memStoreSize the current memory usage in memstore, in bytes.
+   */
+  public void setCurMemStoreSizeGauge(final long memStoreSize) {
+    source.setCurMemStoreSizeGauge(memStoreSize);
+  }
+
+  /**
+   * Update the increase/decrease memstore size histogram
+   * @param memStoreDeltaSize the tuning result of memstore.
+   */
+  public void updateMemStoreDeltaSizeHistogram(final int memStoreDeltaSize) {
+    source.updateMemStoreDeltaSizeHistogram(memStoreDeltaSize);
+  }
+
+  /**
+   * Update the increase/decrease blockcache size histogram
+   * @param blockCacheDeltaSize the tuning result of blockcache.
+   */
+  public void updateBlockCacheDeltaSizeHistogram(final int blockCacheDeltaSize) {
+    source.updateBlockCacheDeltaSizeHistogram(blockCacheDeltaSize);
+  }
+
+  /**
+   * Increase the counter for tuner neither expanding memstore global size limit nor expanding
+   * blockcache max size.
+   */
+  public void increaseTunerDoNothingCounter() {
+    source.increaseTunerDoNothingCounter();
+  }
+
+  /**
+   * Increase the counter for heap occupancy percent above low watermark
+   */
+  public void increaseAboveHeapOccupancyLowWatermarkCounter() {
+    source.increaseAboveHeapOccupancyLowWatermarkCounter();
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsHeapMemoryManager.java
new file mode 100644
index 0000000..44f4961
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsHeapMemoryManager.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Unit test version of rs metrics tests.
+ */
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestMetricsHeapMemoryManager {
+  public static MetricsAssertHelper HELPER = CompatibilitySingletonFactory
+      .getInstance(MetricsAssertHelper.class);
+
+  private MetricsHeapMemoryManager hmm;
+  private MetricsHeapMemoryManagerSource source;
+
+  @Before
+  public void setUp() {
+    hmm = new MetricsHeapMemoryManager();
+    source = hmm.getMetricsSource();
+  }
+
+  @Test
+  public void testConstuctor() {
+    assertNotNull("There should be a hadoop1/hadoop2 metrics source", source);
+  }
+
+  @Test
+  public void testCounter() {
+    for (int i = 0; i < 10; i++) {
+      hmm.increaseAboveHeapOccupancyLowWatermarkCounter();
+    }
+    for (int i = 0; i < 11; i++) {
+      hmm.increaseTunerDoNothingCounter();
+    }
+
+    HELPER.assertCounter("aboveHeapOccupancyLowWaterMarkCounter", 10L, source);
+    HELPER.assertCounter("tunerDoNothingCounter", 11L, source);
+  }
+
+  @Test
+  public void testGauge() {
+    hmm.updateBlockedFlushCount(200);
+    hmm.updateUnblockedFlushCount(50);
+    hmm.setCurMemStoreSizeGauge(256 * 1024 * 1024);
+    hmm.setCurBlockCacheSizeGauge(100 * 1024 * 1024);
+
+    HELPER.assertGauge("blockedFlushGauge", 200, source);
+    HELPER.assertGauge("unblockedFlushGauge", 50, source);
+    HELPER.assertGauge("memStoreSize", 256 * 1024 * 1024, source);
+    HELPER.assertGauge("blockCacheSize", 100 * 1024 * 1024, source);
+  }
+}