[AMBARI-24171] Fix issues in AMS aggregation and writes. (#1603)
* [AMBARI-24171] Fix issues in AMS aggregation and writes.
* [AMBARI-24171] Fix issues in AMS aggregation and writes - 2
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
index 8dfd651..43a468c 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/HBaseTimelineMetricsService.java
@@ -455,7 +455,6 @@
for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
hostname = hostname == null ? entry.getTimelineMetric().getHostName() : hostname;
- break;
}
long timestamp = aggregationResult.getTimeInMilis();
if (LOG.isDebugEnabled()) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
index b6da012..182748d 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/PhoenixHBaseAccessor.java
@@ -358,8 +358,11 @@
try {
int rows = metricRecordStmt.executeUpdate();
- } catch (SQLException sql) {
- LOG.error("Failed on insert records to store.", sql);
+ } catch (SQLException | NumberFormatException ex) {
+ LOG.warn("Failed on insert records to store : " + ex.getMessage());
+ LOG.warn("Metric that cannot be stored : [" + metric.getMetricName() + "," + metric.getAppId() + "]" +
+ metric.getMetricValues().toString());
+ continue;
}
if (rowCount >= PHOENIX_MAX_MUTATION_STATE_SIZE - 1) {
@@ -1469,7 +1472,7 @@
}
rowCount++;
- byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, false);
+ byte[] uuid = metadataManagerInstance.getUuid(clusterMetric, true);
if (uuid == null) {
LOG.error("Error computing UUID for metric. Cannot write metrics : " + clusterMetric.toString());
continue;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
index eb09895..f7fd72a 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/TimelineMetricConfiguration.java
@@ -17,6 +17,16 @@
*/
package org.apache.ambari.metrics.core.timeline;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.CONTAINER_METRICS_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_CLUSTER_AGGREGATE_TABLE_NAME;
+import static org.apache.ambari.metrics.core.timeline.query.PhoenixTransactSQL.METRICS_RECORD_TABLE_NAME;
+
import java.io.BufferedReader;
import java.io.File;
import java.io.InputStream;
@@ -331,7 +341,7 @@
public static final String BLOCKING_STORE_FILES_KEY = "hbase.hstore.blockingStoreFiles";
private Configuration hbaseConf;
- private Configuration metricsConf;
+ private Configuration metricsConf = new Configuration();
private Configuration metricsSslConf;
private Configuration amsEnvConf;
private volatile boolean isInitialized = false;
@@ -698,4 +708,34 @@
return StringUtils.EMPTY;
}
+ public long getTableTtl(String tableName) {
+
+ if (StringUtils.isEmpty(tableName)) {
+ return Long.MAX_VALUE;
+ }
+
+ switch (tableName) {
+
+ case METRICS_RECORD_TABLE_NAME:
+ return metricsConf.getInt(PRECISION_TABLE_TTL, 1 * 86400);
+ case METRICS_AGGREGATE_MINUTE_TABLE_NAME:
+ return metricsConf.getInt(HOST_MINUTE_TABLE_TTL, 7 * 86400);
+ case METRICS_AGGREGATE_HOURLY_TABLE_NAME:
+ return metricsConf.getInt(HOST_HOUR_TABLE_TTL, 30 * 86400);
+ case METRICS_AGGREGATE_DAILY_TABLE_NAME:
+ return metricsConf.getInt(HOST_DAILY_TABLE_TTL, 365 * 86400);
+ case METRICS_CLUSTER_AGGREGATE_TABLE_NAME:
+ return metricsConf.getInt(CLUSTER_SECOND_TABLE_TTL, 7 * 86400);
+ case METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME:
+ return metricsConf.getInt(CLUSTER_MINUTE_TABLE_TTL, 30 * 86400);
+ case METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME:
+ return metricsConf.getInt(CLUSTER_HOUR_TABLE_TTL, 365 * 86400);
+ case METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME:
+ return metricsConf.getInt(CLUSTER_DAILY_TABLE_TTL, 730 * 86400);
+ case CONTAINER_METRICS_TABLE_NAME:
+ return metricsConf.getInt(CONTAINER_METRICS_TTL, 14 * 86400);
+ default:
+ return Long.MAX_VALUE;
+ }
+ }
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
index 42bdd60..6a1d03f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/ambari/metrics/core/timeline/query/PhoenixTransactSQL.java
@@ -24,6 +24,7 @@
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import org.apache.ambari.metrics.core.timeline.TimelineMetricConfiguration;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -666,7 +667,12 @@
for (; i < precisions.length; i++) {
long rowsPerMetric = getRowCountForPrecision(precisions[i], range, CollectionUtils.isNotEmpty(hostNames));
if ((rowsPerMetric * metricNames.size() * numHosts) <= PhoenixHBaseAccessor.RESULTSET_LIMIT) {
- break;
+
+ long ttl = getTtlForPrecision(precisions[i], CollectionUtils.isNotEmpty(hostNames));
+ long currentTime = System.currentTimeMillis();
+ if (currentTime - ttl * 1000 <= condition.getStartTime()) {
+ break;
+ }
}
}
if (i >= precisions.length) {
@@ -675,6 +681,41 @@
return precisions[i];
}
+ private static long getTtlForPrecision(Precision precision, boolean withHosts) {
+ TimelineMetricConfiguration configuration = TimelineMetricConfiguration.getInstance();
+
+ switch (precision) {
+ case SECONDS:
+ if (withHosts) {
+ return configuration.getTableTtl(METRICS_RECORD_TABLE_NAME);
+ } else {
+ return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_TABLE_NAME);
+ }
+
+ case MINUTES:
+ if (withHosts) {
+ return configuration.getTableTtl(METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+ } else {
+ return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_MINUTE_TABLE_NAME);
+ }
+
+ case HOURS:
+ if (withHosts) {
+ return configuration.getTableTtl(METRICS_AGGREGATE_HOURLY_TABLE_NAME);
+ } else {
+ return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_HOURLY_TABLE_NAME);
+ }
+
+ default:
+ if (withHosts) {
+ return configuration.getTableTtl(METRICS_AGGREGATE_DAILY_TABLE_NAME);
+ } else {
+ return configuration.getTableTtl(METRICS_CLUSTER_AGGREGATE_DAILY_TABLE_NAME);
+ }
+ }
+ }
+
+
public static PreparedStatement prepareGetLatestMetricSqlStmt(
Connection connection, Condition condition) throws SQLException {
return prepareGetLatestMetricSqlStmtHelper(connection, condition, GET_LATEST_METRIC_SQL, METRICS_RECORD_TABLE_NAME);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
index dba5c39..deb3927 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/ITPhoenixHBaseAccessor.java
@@ -52,6 +52,7 @@
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
@@ -123,11 +124,11 @@
long ctime = startTime;
long minute = 60 * 1000;
- TimelineMetrics metrics1 = MetricTestHelper.prepareSingleTimelineMetric(ctime, "local1",
+ TimelineMetrics metrics1 = MetricTestHelper.prepareSingleTimelineMetric(ctime, "local3",
"disk_free", 1);
hdb.insertMetricRecords(metrics1, true);
- TimelineMetrics metrics2 = MetricTestHelper.prepareSingleTimelineMetric(ctime + minute, "local1",
+ TimelineMetrics metrics2 = MetricTestHelper.prepareSingleTimelineMetric(ctime + minute, "local3",
"disk_free", 2);
hdb.insertMetricRecords(metrics2, true);
@@ -139,11 +140,11 @@
// WHEN
List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("disk_%"); }},
- Collections.singletonList("local1"),
+ Collections.singletonList("local3"),
"host", null);
Condition condition = new DefaultCondition(uuids,
new ArrayList<String>() {{ add("disk_free"); }},
- Collections.singletonList("local1"),
+ Collections.singletonList("local3"),
"host", null, startTime, endTime + 1000, Precision.MINUTES, null, false);
TimelineMetrics timelineMetrics = hdb.getMetricRecords(condition,
singletonValueFunctionMap("disk_free"));
@@ -153,7 +154,7 @@
TimelineMetric metric = timelineMetrics.getMetrics().get(0);
assertEquals("disk_free", metric.getMetricName());
- assertEquals("local1", metric.getHostName());
+ assertEquals("local3", metric.getHostName());
assertEquals(1, metric.getMetricValues().size());
Iterator<Map.Entry<Long, Double>> iterator = metric.getMetricValues().entrySet().iterator();
assertEquals(1.5, iterator.next().getValue(), 0.00001);
@@ -361,6 +362,60 @@
}
@Test
+ public void testGetInvalidMetricRecords() throws IOException, SQLException {
+ // GIVEN
+ long startTime = System.currentTimeMillis();
+ long ctime = startTime;
+ long minute = 60 * 1000;
+
+ List<TimelineMetric> timelineMetricList = new ArrayList<>();
+
+ timelineMetricList.add(
+ MetricTestHelper.createTimelineMetric(ctime, "valid_metric", "h1", "test_app",
+ null, 1.0));
+ timelineMetricList.add(
+ MetricTestHelper.createTimelineMetric(ctime, "invalid_metric", "h1", "test_app",
+ null, Double.NaN));
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.setMetrics(timelineMetricList);
+ hdb.insertMetricRecords(timelineMetrics, true);
+
+ // WHEN
+ long endTime = ctime + minute;
+ List<byte[]> uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("valid_metric"); }},
+ Collections.singletonList("h1"),
+ "test_app", null);
+
+ Condition condition = new DefaultCondition(uuids,
+ new ArrayList<String>() {{ add("valid_metric"); }},
+ Collections.singletonList("h1"),
+ "test_app", null, startTime - 1000, endTime, Precision.SECONDS, null, true);
+ TimelineMetrics timelineMetricsFromStore = hdb.getMetricRecords(condition,
+ singletonValueFunctionMap("valid_metric"));
+
+ //THEN
+ assertEquals(1, timelineMetricsFromStore.getMetrics().size());
+ TimelineMetric metric = timelineMetricsFromStore.getMetrics().get(0);
+
+ assertEquals("valid_metric", metric.getMetricName());
+ assertEquals("h1", metric.getHostName());
+ assertEquals(4, metric.getMetricValues().size());
+
+
+ uuids = metadataManager.getUuidsForGetMetricQuery(new ArrayList<String>() {{ add("invalid_metric"); }},
+ Collections.singletonList("h1"),
+ "test_app", null);
+ condition = new DefaultCondition(uuids,
+ new ArrayList<String>() {{ add("invalid_metric"); }},
+ Collections.singletonList("h1"),
+ "test_app", null, startTime, endTime, Precision.SECONDS, null, true);
+ timelineMetricsFromStore = hdb.getMetricRecords(condition, singletonValueFunctionMap("invalid_metric"));
+ assertTrue(timelineMetricsFromStore.getMetrics().isEmpty());
+
+ }
+
+
+ @Test
public void testInitPoliciesAndTTL() throws Exception {
Admin hBaseAdmin = hdb.getHBaseAdmin();
int expectedPrecisionTtl = 2 * 86400;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
index 332779f..073b58c 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/ambari/metrics/core/timeline/TestPhoenixTransactSQL.java
@@ -167,7 +167,7 @@
long hour = 60 * minute;
long day = 24 * hour;
- Long endTime = 1407959918000L;
+ Long endTime = System.currentTimeMillis();
Long startTime = endTime - 200 * second;
//SECONDS precision
@@ -289,7 +289,7 @@
long minute = 60 * second;
long hour = 60 * minute;
- Long endTime = 1407959918000L;
+ Long endTime = System.currentTimeMillis();
Long startTime = endTime - 200 * second;
// SECONDS precision
// 2 Metrics, 1 Host, Time = 200 seconds
@@ -349,7 +349,7 @@
verify(connection, preparedStatement);
// HOURS precision
- startTime = endTime - 30 * 24 * hour;
+ startTime = endTime - 29 * 24 * hour;
condition = new DefaultCondition(
new ArrayList<>(Arrays.asList("cpu_user", "mem_free")), Collections.singletonList("h1"),
"a1", "i1", startTime, endTime, null, null, false);
@@ -472,9 +472,16 @@
hosts.add("TestHost"+i);
}
+ long second = 1000;
+ long minute = 60 * second;
+ long hour = 60 * minute;
+
+ Long endTime = System.currentTimeMillis();
+ Long startTime = endTime - hour;
+
Condition condition = new DefaultCondition(
metrics, hosts,
- "a1", "i1", 1407950000L, 1407953600L, Precision.SECONDS, null, false);
+ "a1", "i1", startTime, endTime, Precision.SECONDS, null, false);
Connection connection = createNiceMock(Connection.class);
PreparedStatement preparedStatement = createNiceMock(PreparedStatement.class);
Capture<String> stmtCapture = new Capture<String>();
@@ -490,7 +497,7 @@
//Check without passing precision. Should be OK!
condition = new DefaultCondition(
metrics, hosts,
- "a1", "i1", 1407950000L, 1407953600L, null, null, false);
+ "a1", "i1", startTime, endTime, null, null, false);
connection = createNiceMock(Connection.class);
preparedStatement = createNiceMock(PreparedStatement.class);
stmtCapture = new Capture<String>();
@@ -516,7 +523,7 @@
}
condition = new DefaultCondition(
metrics, hosts,
- "a1", "i1", 1407867200L, 1407953600L, null, null, false);
+ "a1", "i1", endTime - 24*hour, endTime, null, null, false);
connection = createNiceMock(Connection.class);
preparedStatement = createNiceMock(PreparedStatement.class);
stmtCapture = new Capture<String>();
@@ -538,7 +545,7 @@
}
condition = new DefaultCondition(
metrics, hosts,
- "a1", "i1", 1407867200L, 1407953600L, null, null, false);
+ "a1", "i1", endTime - 24*hour, endTime, null, null, false);
connection = createNiceMock(Connection.class);
preparedStatement = createNiceMock(PreparedStatement.class);
stmtCapture = new Capture<String>();
@@ -562,10 +569,9 @@
for (int i = 0; i < numHosts; i++) {
hosts.add("TestHost"+i);
}
- long endtime = 1407953600L;
condition = new DefaultCondition(
metrics, hosts,
- "a1", "i1", endtime - 5 * 60 * 60, endtime, Precision.SECONDS, null, false);
+ "a1", "i1", endTime - 5 * hour, endTime, Precision.SECONDS, null, false);
boolean exceptionThrown = false;
boolean requestedSizeFoundInMessage = false;