AMBARI-23427 : Metrics Collector stops after starting.
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 7cb317b..7535b50 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -337,25 +337,36 @@
           double[] aggregates = AggregatorUtils.calculateAggregates(
                   metric.getMetricValues());
 
-          byte[] uuid = metadataManagerInstance.getUuid(metric);
-          if (uuid == null) {
-            LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
-            continue;
-          }
-          metricRecordStmt.setBytes(1, uuid);
-          metricRecordStmt.setLong(2, metric.getStartTime());
-          metricRecordStmt.setDouble(3, aggregates[0]);
-          metricRecordStmt.setDouble(4, aggregates[1]);
-          metricRecordStmt.setDouble(5, aggregates[2]);
-          metricRecordStmt.setLong(6, (long) aggregates[3]);
-          String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
-          metricRecordStmt.setString(7, json);
+          if (aggregates[3] != 0.0) {
+            byte[] uuid = metadataManagerInstance.getUuid(metric);
+            if (uuid == null) {
+              LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
+              continue;
+            }
+            metricRecordStmt.setBytes(1, uuid);
+            metricRecordStmt.setLong(2, metric.getStartTime());
+            metricRecordStmt.setDouble(3, aggregates[0]);
+            metricRecordStmt.setDouble(4, aggregates[1]);
+            metricRecordStmt.setDouble(5, aggregates[2]);
+            metricRecordStmt.setLong(6, (long) aggregates[3]);
+            String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+            metricRecordStmt.setString(7, json);
 
-          try {
-            metricRecordStmt.executeUpdate();
-          } catch (SQLException sql) {
-            LOG.error("Failed on insert records to store.", sql);
+            try {
+              int rows = metricRecordStmt.executeUpdate();
+              if (metric.getMetricName().equals("TimelineMetricStoreWatcher.FakeMetric")) {
+                LOG.info("Inserted " + rows + " rows for TimelineMetricStoreWatcher.FakeMetric.");
+              }
+            } catch (SQLException sql) {
+              LOG.error("Failed on insert records to store.", sql);
+            }
+          } else {
+            LOG.debug("Discarding empty metric record for : [" + metric.getMetricName() + "," +
+              metric.getAppId() + "," +
+              metric.getHostName() + "," +
+              metric.getInstanceId() + "]");
           }
+
         }
       }
 
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 0203f88..8465dc3 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -406,7 +406,7 @@
     try {
       stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
 
-      LOG.debug("Downsampler Query issued : " + condition.getStatement());
+      LOG.info("Start downsampling cycle...");
       if (condition.doUpdate()) {
         int rows = stmt.executeUpdate();
         conn.commit();
@@ -414,7 +414,6 @@
       } else {
         rs = stmt.executeQuery();
       }
-      LOG.debug("Downsampler Query returned ...");
       LOG.info("End Downsampling cycle.");
 
     } catch (SQLException e) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index beaff69..bd02a66 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -302,8 +302,7 @@
     "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
     "SELECT UUID, %s AS SERVER_TIME, " +
     "ROUND(SUM(METRIC_SUM)/SUM(METRIC_COUNT),2), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
-    "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s " +
-    "GROUP BY UUID";
+    "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s GROUP BY UUID";
 
   /**
    * Downsample host metrics.
@@ -320,9 +319,9 @@
    * N - way parallel scan where N = number of regions.
    */
   public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " +
-         "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
-         "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
-         "SERVER_TIME <= %s GROUP BY UUID";
+    "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
+    "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
+    "SERVER_TIME <= %s GROUP BY UUID";
 
   /**
    * Downsample cluster metrics.
@@ -494,9 +493,6 @@
       throw e;
     }
 
-    if (condition instanceof TopNCondition) {
-      LOG.info(sb.toString());
-    }
     return stmt;
   }
 
@@ -676,9 +672,6 @@
       throw e;
     }
 
-    if (condition instanceof TopNCondition) {
-      LOG.info(sb.toString());
-    }
     return stmt;
   }
 
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
index 16d1bf2..a85764e 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
@@ -37,18 +37,25 @@
    */
   @Override
   public byte[] computeUuid(TimelineClusterMetric timelineClusterMetric, int maxLength) {
+
     int metricNameUuidLength = 12;
+    String instanceId = timelineClusterMetric.getInstanceId();
+
+    if ((StringUtils.isEmpty(instanceId))) {
+      metricNameUuidLength = 14;
+    }
+
     String metricName = timelineClusterMetric.getMetricName();
 
     //Compute the individual splits.
     String[] splits = getIndidivualSplits(metricName);
 
     /*
-    Compute the ascii sum of every split in the metric name. (asciiSum += (int) splits[s].charAt(i))
-    For the last split, use weighted sum instead of ascii sum. (asciiSum += ((i+1) * (int) splits[s].charAt(i)))
+    Compute the weighted ascii sum of every split in the metric name. (asciiSum += (int) splits[s].charAt(i))
     These weighted sums are 'appended' to get the unique ID for metric name.
      */
     StringBuilder splitSums = new StringBuilder();
+    long totalAsciiSum = 0l;
     if (splits.length > 0) {
       for (String split : splits) {
         int asciiSum = 0;
@@ -56,30 +63,31 @@
           asciiSum += ((i + 1) * (int) split.charAt(i)); //weighted sum for split.
         }
         splitSums.append(asciiSum); //Append the sum to the array of sums.
+        totalAsciiSum += asciiSum; //Parity Sum
       }
     }
 
+    String splitSumString = totalAsciiSum + splitSums.reverse().toString(); //Reverse and attach parity sum.
+    int splitLength = splitSumString.length();
+
     //Compute a unique metric seed for the stemmed metric name
     String stemmedMetric = stem(metricName);
     long metricSeed = 100123456789L;
     metricSeed += computeWeightedNumericalAsciiSum(stemmedMetric);
-
     //Reverse the computed seed to get a metric UUID portion which is used optionally.
-    byte[] metricUuidPortion = StringUtils.reverse(String.valueOf(metricSeed)).getBytes();
-    String splitSumString = splitSums.toString();
-    int splitLength = splitSumString.length();
+    byte[] metricSeedBytes = StringUtils.reverse(String.valueOf(metricSeed)).getBytes();
 
-    //If splitSums length > required metric UUID length, use only the required length suffix substring of the splitSums as metric UUID.
-    if (splitLength > metricNameUuidLength) {
-      int pad = (int)(0.25 * splitLength);
-      metricUuidPortion = ArrayUtils.addAll(ArrayUtils.subarray(splitSumString.getBytes(), splitLength - metricNameUuidLength + pad, splitLength)
-        , ArrayUtils.subarray(metricUuidPortion, 0, pad));
-    } else {
-      //If splitSums is not enough for required metric UUID length, pad with the metric uuid portion.
-      int pad = metricNameUuidLength - splitLength;
-      metricUuidPortion = ArrayUtils.addAll(splitSumString.getBytes(), ArrayUtils.subarray(metricUuidPortion, 0, pad));
+    int seedLength = (int)(0.25 * metricNameUuidLength);
+    int sumLength = metricNameUuidLength - seedLength;
+    if (splitLength < sumLength) {
+      sumLength = splitLength;
+      seedLength = metricNameUuidLength - sumLength;
     }
 
+    byte[] metricUuidPortion = ArrayUtils.addAll(
+      ArrayUtils.subarray(splitSumString.getBytes(), 0, sumLength)
+      , ArrayUtils.subarray(metricSeedBytes, 0, seedLength));
+
     /*
       For appId and instanceId the logic is similar. Use a seed integer to start with and compute ascii sum.
       Based on required length, use a suffix of the computed uuid.
@@ -87,25 +95,25 @@
     String appId = timelineClusterMetric.getAppId();
     int appidSeed = 11;
     for (int i = 0; i < appId.length(); i++) {
-      appidSeed += ((i+1) * appId.charAt(i));
+      appidSeed += appId.charAt(i);
     }
     String appIdSeedStr = String.valueOf(appidSeed);
     byte[] appUuidPortion = ArrayUtils.subarray(appIdSeedStr.getBytes(), appIdSeedStr.length() - 2, appIdSeedStr.length());
 
-    String instanceId = timelineClusterMetric.getInstanceId();
-    ByteBuffer buffer = ByteBuffer.allocate(4);
-    byte[] instanceUuidPortion = new byte[2];
     if (StringUtils.isNotEmpty(instanceId)) {
+      byte[] instanceUuidPortion = new byte[2];
+      ByteBuffer buffer = ByteBuffer.allocate(4);
       int instanceIdSeed = 1489;
       for (int i = 0; i < appId.length(); i++) {
-        instanceIdSeed += ((i+1)* appId.charAt(i));
+        instanceIdSeed += ((i+1) * appId.charAt(i));
       }
       buffer.putInt(instanceIdSeed);
       ArrayUtils.subarray(buffer.array(), 2, 4);
+      // Concatenate all UUIDs together (metric uuid + appId uuid + instanceId uuid)
+      return ArrayUtils.addAll(ArrayUtils.addAll(metricUuidPortion, appUuidPortion), instanceUuidPortion);
     }
 
-    // Concatenate all UUIDs together (metric uuid + appId uuid + instanceId uuid)
-    return ArrayUtils.addAll(ArrayUtils.addAll(metricUuidPortion, appUuidPortion), instanceUuidPortion);
+    return ArrayUtils.addAll(metricUuidPortion, appUuidPortion);
   }
 
   /**
@@ -151,7 +159,7 @@
    */
   private String stem(String metricName) {
     String metric = metricName.toLowerCase();
-    String regex = "[\\.\\_\\%\\-\\=]";
+    String regex = "[\\.\\_\\%\\-\\=\\/\\@\\(\\)\\[\\]\\:]";
     String trimmedMetric = StringUtils.removePattern(metric, regex);
     return trimmedMetric;
   }
@@ -181,20 +189,27 @@
     }
   }
 
-  private long computeWeightedNumericalAsciiSum(String value) {
+  public long computeWeightedNumericalAsciiSum(String value) {
     int len = value.length();
     long numericValue = 0;
-    int sum = 0;
-    for (int i = 0; i < len; i++) {
+    long sum = 0;
+    int numericCtr = 0;
+    for (int i = 0; i < len;) {
       int ascii = value.charAt(i);
-      if (48 <= ascii && ascii <= 57) {
-        numericValue += numericValue * 10 + (ascii - 48);
+      if (48 <= ascii && ascii <= 57 && numericCtr < 4) {
+        numericValue = numericValue * 10 + (ascii - 48);
+        numericCtr++;
+        i++;
       } else {
         if (numericValue > 0) {
           sum += numericValue;
           numericValue = 0;
         }
-        sum += value.charAt(i);
+        if (numericCtr < 4) {
+          sum += value.charAt(i);
+          i++;
+        }
+        numericCtr = 0;
       }
     }
 
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.DAT b/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.DAT
new file mode 100644
index 0000000..f5c181a
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.DAT
@@ -0,0 +1 @@
+AMBARI_METRICS.SmokeTest.FakeMetric
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/NAMENODE.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/NAMENODE.dat
index ad35c45..f9d2f27 100644
--- a/ambari-metrics-timelineservice/src/main/resources/metrics_def/NAMENODE.dat
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/NAMENODE.dat
@@ -220,6 +220,8 @@
 dfs.NNTopUserOpCounts.windowMs=60000.op=setReplication.user=ambari-qa.count
 dfs.NNTopUserOpCounts.windowMs=60000.op=setTimes.TotalCount
 dfs.NNTopUserOpCounts.windowMs=60000.op=setTimes.user=ams.count
+dfs.NNTopUserOpCounts.windowMs=1500000.op=create.user=hbase/ctr-e138-1518143905142-124809-01-000002.hwx.site@EXAMPLE.COM.count
+dfs.NNTopUserOpCounts.windowMs=1500000.op=delete.user=hbase/ctr-e138-1518143905142-124809-01-000003.hwx.site@EXAMPLE.COM.count
 dfs.namenode.AddBlockOps
 dfs.namenode.AllowSnapshotOps
 dfs.namenode.BlockOpsBatched
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.DAT b/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.DAT
new file mode 100644
index 0000000..af73d02
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.DAT
@@ -0,0 +1 @@
+TimelineMetricStoreWatcher.FakeMetric
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/TimelineMetricUuidManagerTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/TimelineMetricUuidManagerTest.java
index d1b3f01..a4676e6 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/TimelineMetricUuidManagerTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/TimelineMetricUuidManagerTest.java
@@ -18,17 +18,8 @@
 
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid;
 
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import org.easymock.EasyMock;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.BufferedReader;
@@ -69,11 +60,6 @@
         Assert.assertTrue(uuid.length == 16);
         String uuidStr = new String(uuid);
         Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
-        if (uuids.containsKey(uuidStr) ) {
-          if (!uuids.containsValue(timelineClusterMetric)) {
-            System.out.println("COLLISION : " + timelineClusterMetric.toString() + " = " + uuids.get(uuidStr));
-          }
-        }
         uuids.put(uuidStr, timelineClusterMetric);
       }
     }
@@ -88,11 +74,7 @@
       TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("TestMetric", app, null, -1l);
       byte[] uuid = strategy.computeUuid(timelineClusterMetric, 16);
       String uuidStr = new String(uuid);
-      if (uuids.containsKey(uuidStr) ) {
-        if (!uuids.containsValue(timelineClusterMetric)) {
-          System.out.println("COLLISION : " + timelineClusterMetric.toString() + " = " + uuids.get(uuidStr));
-        }
-      }
+      Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
       uuids.put(uuidStr, timelineClusterMetric);
     }
   }
@@ -145,6 +127,7 @@
 
     Map<String, Set<String>> metricSet = new HashMap<String, Set<String>>();
     FileInputStream fstream = null;
+    Set<String> hbaseMetrics = new HashSet<>();
     BufferedReader br = null;
     String strLine;
     for (String appId : apps) {
@@ -176,9 +159,14 @@
         }
       }
       metricsForApp.add("live_hosts");
-      metricSet.put(appId.contains("hbase") ? "hbase" : appId, metricsForApp);
+      if (appId.equals("master_hbase") || appId.equals("slave_hbase")) {
+        hbaseMetrics.addAll(metricsForApp);
+      } else {
+        metricSet.put(appId, metricsForApp);
+      }
       System.out.println("Found " + metricsForApp.size() + " metrics for appId = " + appId);
     }
+    metricSet.put("hbase", hbaseMetrics);
     return metricSet;
   }
 }