[AMBARI-24171] Fix issues in AMS aggregation and writes. (#1603)

* [AMBARI-24171] Fix issues in AMS aggregation and writes.

* [AMBARI-24171] Fix issues in AMS aggregation and writes - 2
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
index 8dfd651..43a468c 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
@@ -455,7 +455,6 @@
     for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
       aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
       hostname = hostname == null ? entry.getTimelineMetric().getHostName() : hostname;
-      break;
     }
     long timestamp = aggregationResult.getTimeInMilis();
     if (LOG.isDebugEnabled()) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
index b6da012..182748d 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -358,8 +358,11 @@
 
             try {
               int rows = metricRecordStmt.executeUpdate();
-            } catch (SQLException sql) {
-              LOG.error("Failed on insert records to store.", sql);
+            } catch (SQLException | NumberFormatException ex) {
+              LOG.warn("Failed on insert records to store : " + ex.getMessage());
+              LOG.warn("Metric that cannot be stored : [" + metric.getMetricName() + "," + metric.getAppId() + "]" +
+                metric.getMetricValues().toString());
+              continue;
             }
 
             if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
@@ -1469,7 +1472,7 @@
         }
 
         rowCount++;
-        byte[] uuid =  metadataManagerInstance.getUuid(clusterMetric, false);
+        byte[] uuid =  metadataManagerInstance.getUuid(clusterMetric, true);
         if (uuid == null) {
           LOG.error("Error computing UUID for metric. Cannot write metrics : " + clusterMetric.toString());
           continue;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
index eb09895..f7fd72a 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
@@ -17,6 +17,16 @@
  */
 package org.apache.ambari.metrics.core.timeline;
 
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.InputStream;
@@ -331,7 +341,7 @@
   public static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles";
 
   private Configuration hbaseConf;
-  private Configuration metricsConf;
+  private Configuration metricsConf = new Configuration();
   private Configuration metricsSslConf;
   private Configuration amsEnvConf;
   private volatile boolean isInitialized = false;
@@ -698,4 +708,34 @@
     return StringUtils.EMPTY;
   }
 
+  public long getTableTtl(String tableName) {
+
+    if (StringUtils.isEmpty(tableName)) {
+      return Long.MAX_VALUE;
+    }
+
+    switch (tableName) {
+
+      case METRICS_RECORD_TABLE_NAME:
+        return metricsConf.getInt(PRECISION_TABLE_TTL, 1 * 86400);
+      case METRICS_AGGREGATE_MINUTE_TABLE_NAME:
+        return metricsConf.getInt(HOST_MINUTE_TABLE_TTL, 7 * 86400);
+      case METRICS_AGGREGATE_HOURLY_TABLE_NAME:
+        return metricsConf.getInt(HOST_HOUR_TABLE_TTL, 30 * 86400);
+      case METRICS_AGGREGATE_DAILY_TABLE_NAME:
+        return metricsConf.getInt(HOST_DAILY_TABLE_TTL, 365 * 86400);
+      case METRICS_CLUSTER_AGGREGATE_TABLE_NAME:
+        return metricsConf.getInt(CLUSTER_SECOND_TABLE_TTL, 7 * 86400);
+      case METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME:
+        return metricsConf.getInt(CLUSTER_MINUTE_TABLE_TTL, 30 * 86400);
+      case METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME:
+        return metricsConf.getInt(CLUSTER_HOUR_TABLE_TTL, 365 * 86400);
+      case METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME:
+        return metricsConf.getInt(CLUSTER_DAILY_TABLE_TTL, 730 * 86400);
+      case CONTAINER_METRICS_TABLE_NAME:
+        return metricsConf.getInt(CONTAINER_METRICS_TTL, 14 * 86400);
+      default:
+        return Long.MAX_VALUE;
+    }
+  }
 }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index 42bdd60..6a1d03f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -24,6 +24,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -666,7 +667,12 @@
     for (; i < precisions.length; i++) {
       long rowsPerMetric = getRowCountForPrecision(precisions[i], range, CollectionUtils.isNotEmpty(hostNames));
       if ((rowsPerMetric * metricNames.size() * numHosts) <= PhoenixHBaseAccessor.RESULTSET_LIMIT) {
-        break;
+
+        long ttl = getTtlForPrecision(precisions[i], CollectionUtils.isNotEmpty(hostNames));
+        long currentTime = System.currentTimeMillis();
+        if (currentTime - ttl * 1000 <= condition.getStartTime()) {
+          break;
+        }
       }
     }
     if (i >= precisions.length) {
@@ -675,6 +681,41 @@
     return precisions[i];
   }
 
+  private static long getTtlForPrecision(Precision precision, boolean withHosts) {
+    TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance();
+
+    switch (precision) {
+      case SECONDS:
+        if (withHosts) {
+          return configuration.getTableTtl(METRICS_RECORD_TABLE_NAME);
+        } else {
+          return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+        }
+
+      case MINUTES:
+        if (withHosts) {
+          return configuration.getTableTtl(METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+        } else {
+          return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
+        }
+
+      case HOURS:
+        if (withHosts) {
+          return configuration.getTableTtl(METRICS_AGGREGATE_HOURLY_TABLE_NAME);
+        } else {
+          return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+        }
+
+      default:
+        if (withHosts) {
+          return configuration.getTableTtl(METRICS_AGGREGATE_DAILY_TABLE_NAME);
+        } else {
+          return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME);
+        }
+    }
+  }
+
+
   public static PreparedStatement prepareGetLatestMetricSqlStmt(
     Connection connection, Condition condition) throws SQLException {
     return prepareGetLatestMetricSqlStmtHelper(connection, condition, GET_LATEST_METRIC_SQL, METRICS_RECORD_TABLE_NAME);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
index dba5c39..deb3927 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
@@ -52,6 +52,7 @@
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
@@ -123,11 +124,11 @@
     long ctime = startTime;
     long minute = 60 * 1000;
 
-    TimelineMetrics metrics1 = MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+    TimelineMetrics metrics1 = MetricTestHelper.prepareSingleTimelineMetric(ctime, "local3",
       "disk_free", 1);
     hdb.insertMetricRecords(metrics1, true);
 
-    TimelineMetrics metrics2 = MetricTestHelper.prepareSingleTimelineMetric(ctime + minute, "local1",
+    TimelineMetrics metrics2 = MetricTestHelper.prepareSingleTimelineMetric(ctime + minute, "local3",
       "disk_free", 2);
     hdb.insertMetricRecords(metrics2, true);
 
@@ -139,11 +140,11 @@
 
     // WHEN
     List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("disk_%"); }},
-      Collections.singletonList("local1"),
+      Collections.singletonList("local3"),
       "host", null);
     Condition condition = new DefaultCondition(uuids,
       new ArrayList<String>() {{ add("disk_free"); }},
-      Collections.singletonList("local1"),
+      Collections.singletonList("local3"),
       "host", null, startTime, endTime + 1000, Precision.MINUTES, null, false);
     TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
       singletonValueFunctionMap("disk_free"));
@@ -153,7 +154,7 @@
     TimelineMetric metric = timelineMetrics.getMetrics().get(0);
 
     assertEquals("disk_free", metric.getMetricName());
-    assertEquals("local1", metric.getHostName());
+    assertEquals("local3", metric.getHostName());
     assertEquals(1, metric.getMetricValues().size());
     Iterator<Map.Entry<Long, Double>> iterator = metric.getMetricValues().entrySet().iterator();
     assertEquals(1.5, iterator.next().getValue(), 0.00001);
@@ -361,6 +362,60 @@
   }
 
   @Test
+  public void testGetInvalidMetricRecords() throws IOException, SQLException {
+    // GIVEN
+    long startTime = System.currentTimeMillis();
+    long ctime = startTime;
+    long minute = 60 * 1000;
+
+    List<TimelineMetric> timelineMetricList = new ArrayList<>();
+
+    timelineMetricList.add(
+      MetricTestHelper.createTimelineMetric(ctime, "valid_metric", "h1", "test_app",
+        null, 1.0));
+    timelineMetricList.add(
+      MetricTestHelper.createTimelineMetric(ctime, "invalid_metric", "h1", "test_app",
+        null, Double.NaN));
+    TimelineMetrics timelineMetrics = new TimelineMetrics();
+    timelineMetrics.setMetrics(timelineMetricList);
+    hdb.insertMetricRecords(timelineMetrics, true);
+
+    // WHEN
+    long endTime = ctime + minute;
+    List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("valid_metric"); }},
+      Collections.singletonList("h1"),
+      "test_app", null);
+
+    Condition condition = new DefaultCondition(uuids,
+      new ArrayList<String>() {{ add("valid_metric"); }},
+      Collections.singletonList("h1"),
+      "test_app", null, startTime - 1000, endTime, Precision.SECONDS, null, true);
+    TimelineMetrics timelineMetricsFromStore = hdb.getMetricRecords(condition,
+      singletonValueFunctionMap("valid_metric"));
+
+    //THEN
+    assertEquals(1, timelineMetricsFromStore.getMetrics().size());
+    TimelineMetric metric = timelineMetricsFromStore.getMetrics().get(0);
+
+    assertEquals("valid_metric", metric.getMetricName());
+    assertEquals("h1", metric.getHostName());
+    assertEquals(4, metric.getMetricValues().size());
+
+
+    uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("invalid_metric"); }},
+      Collections.singletonList("h1"),
+      "test_app", null);
+    condition = new DefaultCondition(uuids,
+      new ArrayList<String>() {{ add("invalid_metric"); }},
+      Collections.singletonList("h1"),
+      "test_app", null, startTime, endTime, Precision.SECONDS, null, true);
+    timelineMetricsFromStore = hdb.getMetricRecords(condition, singletonValueFunctionMap("invalid_metric"));
+    assertTrue(timelineMetricsFromStore.getMetrics().isEmpty());
+
+  }
+
+
+  @Test
   public void testInitPoliciesAndTTL() throws Exception {
     Admin hBaseAdmin = hdb.getHBaseAdmin();
     int expectedPrecisionTtl = 2 * 86400;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
index 332779f..073b58c 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
@@ -167,7 +167,7 @@
     long hour = 60 * minute;
     long day = 24 * hour;
 
-    Long endTime = 1407959918000L;
+    Long endTime = System.currentTimeMillis();
     Long startTime = endTime - 200 * second;
 
     //SECONDS precision
@@ -289,7 +289,7 @@
     long minute = 60 * second;
     long hour = 60 * minute;
 
-    Long endTime = 1407959918000L;
+    Long endTime = System.currentTimeMillis();
     Long startTime = endTime - 200 * second;
     // SECONDS precision
     // 2 Metrics, 1 Host, Time = 200 seconds
@@ -349,7 +349,7 @@
     verify(connection, preparedStatement);
 
     // HOURS precision
-    startTime = endTime - 30 * 24 * hour;
+    startTime = endTime - 29 * 24 * hour;
     condition = new DefaultCondition(
       new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"),
       "a1", "i1", startTime, endTime, null, null, false);
@@ -472,9 +472,16 @@
       hosts.add("TestHost"+i);
     }
 
+    long second = 1000;
+    long minute = 60 * second;
+    long hour = 60 * minute;
+
+    Long endTime = System.currentTimeMillis();
+    Long startTime = endTime - hour;
+
     Condition condition = new DefaultCondition(
       metrics, hosts,
-      "a1", "i1", 1407950000L, 1407953600L, Precision.SECONDS, null, false);
+      "a1", "i1", startTime, endTime, Precision.SECONDS, null, false);
     Connection connection = createNiceMock(Connection.class);
     PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class);
     Capture<String> stmtCapture = new Capture<String>();
@@ -490,7 +497,7 @@
     //Check without passing precision. Should be OK!
     condition = new DefaultCondition(
       metrics, hosts,
-      "a1", "i1", 1407950000L, 1407953600L, null, null, false);
+      "a1", "i1", startTime, endTime, null, null, false);
     connection = createNiceMock(Connection.class);
     preparedStatement = createNiceMock(PreparedStatement.class);
     stmtCapture = new Capture<String>();
@@ -516,7 +523,7 @@
     }
     condition = new DefaultCondition(
       metrics, hosts,
-      "a1", "i1", 1407867200L, 1407953600L, null, null, false);
+      "a1", "i1", endTime - 24*hour, endTime, null, null, false);
     connection = createNiceMock(Connection.class);
     preparedStatement = createNiceMock(PreparedStatement.class);
     stmtCapture = new Capture<String>();
@@ -538,7 +545,7 @@
     }
     condition = new DefaultCondition(
       metrics, hosts,
-      "a1", "i1", 1407867200L, 1407953600L, null, null, false);
+      "a1", "i1", endTime - 24*hour, endTime, null, null, false);
     connection = createNiceMock(Connection.class);
     preparedStatement = createNiceMock(PreparedStatement.class);
     stmtCapture = new Capture<String>();
@@ -562,10 +569,9 @@
     for (int i = 0; i < numHosts; i++) {
       hosts.add("TestHost"+i);
     }
-    long endtime = 1407953600L;
     condition = new DefaultCondition(
       metrics, hosts,
-      "a1", "i1", endtime - 5 * 60 * 60, endtime, Precision.SECONDS, null, false);
+      "a1", "i1", endTime - 5 * hour, endTime, Precision.SECONDS, null, false);
     boolean exceptionThrown = false;
     boolean requestedSizeFoundInMessage = false;