[AMBARI-24041] AMS truncates metric response quietly. (#1474)

* [AMBARI-24041] AMS truncates metric response quietly.

* [AMBARI-24041] AMS truncates metric response quietly. - 2

* [AMBARI-24041] AMS truncates metric response quietly. - 3
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
index 6f8a1de..600114b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -316,6 +316,7 @@
     Connection conn = null;
     PreparedStatement metricRecordStmt = null;
     List<TimelineMetric> transientMetrics = new ArrayList<>();
+    int rowCount = 0;
 
     try {
       conn = getConnection();
@@ -339,6 +340,7 @@
                   metric.getMetricValues());
 
           if (aggregates[3] != 0.0) {
+            rowCount++;
             byte[] uuid = metadataManagerInstance.getUuid(metric, true);
             if (uuid == null) {
               LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
@@ -358,6 +360,12 @@
             } catch (SQLException sql) {
               LOG.error("Failed on insert records to store.", sql);
             }
+
+            if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
+              conn.commit();
+              rowCount = 0;
+            }
+
           } else {
             LOG.debug("Discarding empty metric record for : [" + metric.getMetricName() + "," +
               metric.getAppId() + "," +
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java
index d3bdd9e..f12a597 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/aggregators/AbstractTimelineAggregator.java
@@ -271,8 +271,10 @@
 
       LOG.debug("Query issued @: " + new Date());
       if (condition.doUpdate()) {
+        conn.setAutoCommit(true);
         int rows = stmt.executeUpdate();
         conn.commit();
+        conn.setAutoCommit(false);
         LOG.info(rows + " row(s) updated in aggregation.");
 
         //TODO : Fix downsampling after UUID change.
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index af24deb..539bb17 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -508,6 +508,8 @@
       String query;
       if (condition.getPrecision() == null) {
         condition.setPrecision(getBestPrecisionForCondition(condition));
+      } else {
+        condition.setNoLimit();
       }
       switch (condition.getPrecision()) {
         case DAYS:
@@ -703,6 +705,8 @@
     String queryStmt;
     if (condition.getPrecision() == null) {
       condition.setPrecision(getBestPrecisionForCondition(condition));
+    } else {
+      condition.setNoLimit();
     }
     switch (condition.getPrecision()) {
       case DAYS:
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
index 3960751..332779f 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
@@ -155,6 +155,7 @@
     PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
     String stmt = stmtCapture.getValue();
     Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_MINUTE_UUID"));
+    Assert.assertNull(condition.getLimit());
     verify(connection, preparedStatement);
   }
 
@@ -220,6 +221,7 @@
     PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
     stmt = stmtCapture.getValue();
     Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_HOURLY_UUID"));
+    Assert.assertNotNull(condition.getLimit());
     Assert.assertEquals(Precision.HOURS, condition.getPrecision());
     verify(connection, preparedStatement);
 
@@ -257,6 +259,7 @@
     PhoenixTransactSQL.prepareGetAggregateSqlStmt(connection, condition);
     String stmt = stmtCapture.getValue();
     Assert.assertTrue(stmt.contains("FROM METRIC_AGGREGATE_HOURLY_UUID"));
+    Assert.assertNull(condition.getLimit());
     verify(connection, preparedStatement);
   }
 
@@ -275,6 +278,7 @@
     PhoenixTransactSQL.prepareGetMetricsSqlStmt(connection, condition);
     String stmt = stmtCapture.getValue();
     Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_MINUTE_UUID"));
+    Assert.assertNull(condition.getLimit());
     verify(connection, preparedStatement);
   }
 
@@ -322,6 +326,7 @@
     stmt = stmtCapture.getValue();
     Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_UUID"));
     Assert.assertEquals(Precision.SECONDS, condition.getPrecision());
+    Assert.assertNotNull(condition.getLimit());
     verify(connection, preparedStatement);
 
     // MINUTES precision
@@ -340,6 +345,7 @@
     stmt = stmtCapture.getValue();
     Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_MINUTE_UUID"));
     Assert.assertEquals(Precision.MINUTES, condition.getPrecision());
+    Assert.assertNotNull(condition.getLimit());
     verify(connection, preparedStatement);
 
     // HOURS precision
@@ -357,6 +363,7 @@
     stmt = stmtCapture.getValue();
     Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_HOURLY_UUID"));
     Assert.assertEquals(Precision.HOURS, condition.getPrecision());
+    Assert.assertNotNull(condition.getLimit());
     verify(connection, preparedStatement);
 
     // DAYS precision
@@ -374,6 +381,7 @@
     stmt = stmtCapture.getValue();
     Assert.assertTrue(stmt.contains("FROM METRIC_RECORD_DAILY_UUID"));
     Assert.assertEquals(Precision.DAYS, condition.getPrecision());
+    Assert.assertNotNull(condition.getLimit());
     verify(connection, preparedStatement);
 
   }