AMBARI-23427 : Metrics Collector stops after starting.
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 7cb317b..7535b50 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -337,25 +337,36 @@
double[] aggregates = AggregatorUtils.calculateAggregates(
metric.getMetricValues());
- byte[] uuid = metadataManagerInstance.getUuid(metric);
- if (uuid == null) {
- LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
- continue;
- }
- metricRecordStmt.setBytes(1, uuid);
- metricRecordStmt.setLong(2, metric.getStartTime());
- metricRecordStmt.setDouble(3, aggregates[0]);
- metricRecordStmt.setDouble(4, aggregates[1]);
- metricRecordStmt.setDouble(5, aggregates[2]);
- metricRecordStmt.setLong(6, (long) aggregates[3]);
- String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
- metricRecordStmt.setString(7, json);
+ if (aggregates[3] != 0.0) {
+ byte[] uuid = metadataManagerInstance.getUuid(metric);
+ if (uuid == null) {
+ LOG.error("Error computing UUID for metric. Cannot write metrics : " + metric.toString());
+ continue;
+ }
+ metricRecordStmt.setBytes(1, uuid);
+ metricRecordStmt.setLong(2, metric.getStartTime());
+ metricRecordStmt.setDouble(3, aggregates[0]);
+ metricRecordStmt.setDouble(4, aggregates[1]);
+ metricRecordStmt.setDouble(5, aggregates[2]);
+ metricRecordStmt.setLong(6, (long) aggregates[3]);
+ String json = TimelineUtils.dumpTimelineRecordtoJSON(metric.getMetricValues());
+ metricRecordStmt.setString(7, json);
- try {
- metricRecordStmt.executeUpdate();
- } catch (SQLException sql) {
- LOG.error("Failed on insert records to store.", sql);
+ try {
+ int rows = metricRecordStmt.executeUpdate();
+ if (metric.getMetricName().equals("TimelineMetricStoreWatcher.FakeMetric")) {
+ LOG.info("Inserted " + rows + " rows for TimelineMetricStoreWatcher.FakeMetric.");
+ }
+ } catch (SQLException sql) {
+ LOG.error("Failed on insert records to store.", sql);
+ }
+ } else {
+ LOG.debug("Discarding empty metric record for : [" + metric.getMetricName() + "," +
+ metric.getAppId() + "," +
+ metric.getHostName() + "," +
+ metric.getInstanceId() + "]");
}
+
}
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
index 0203f88..8465dc3 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/AbstractTimelineAggregator.java
@@ -406,7 +406,7 @@
try {
stmt = PhoenixTransactSQL.prepareGetMetricsSqlStmt(conn, condition);
- LOG.debug("Downsampler Query issued : " + condition.getStatement());
+ LOG.info("Start downsampling cycle...");
if (condition.doUpdate()) {
int rows = stmt.executeUpdate();
conn.commit();
@@ -414,7 +414,6 @@
} else {
rs = stmt.executeQuery();
}
- LOG.debug("Downsampler Query returned ...");
LOG.info("End Downsampling cycle.");
} catch (SQLException e) {
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
index beaff69..bd02a66 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/query/PhoenixTransactSQL.java
@@ -302,8 +302,7 @@
"INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) " +
"SELECT UUID, %s AS SERVER_TIME, " +
"ROUND(SUM(METRIC_SUM)/SUM(METRIC_COUNT),2), SUM(METRIC_COUNT), MAX(METRIC_MAX), MIN(METRIC_MIN) " +
- "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s " +
- "GROUP BY UUID";
+ "FROM %s WHERE%s SERVER_TIME > %s AND SERVER_TIME <= %s GROUP BY UUID";
/**
* Downsample host metrics.
@@ -320,9 +319,9 @@
* N - way parallel scan where N = number of regions.
*/
public static final String GET_AGGREGATED_APP_METRIC_GROUPBY_SQL = "UPSERT " +
- "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
- "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
- "SERVER_TIME <= %s GROUP BY UUID";
+ "INTO %s (UUID, SERVER_TIME, METRIC_SUM, METRIC_COUNT, METRIC_MAX, METRIC_MIN) SELECT UUID, %s AS SERVER_TIME, " +
+ "ROUND(AVG(METRIC_SUM),2), ROUND(AVG(%s)), MAX(METRIC_MAX), MIN(METRIC_MIN) FROM %s WHERE%s SERVER_TIME > %s AND " +
+ "SERVER_TIME <= %s GROUP BY UUID";
/**
* Downsample cluster metrics.
@@ -494,9 +493,6 @@
throw e;
}
- if (condition instanceof TopNCondition) {
- LOG.info(sb.toString());
- }
return stmt;
}
@@ -676,9 +672,6 @@
throw e;
}
- if (condition instanceof TopNCondition) {
- LOG.info(sb.toString());
- }
return stmt;
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
index 16d1bf2..a85764e 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/HashBasedUuidGenStrategy.java
@@ -37,18 +37,25 @@
*/
@Override
public byte[] computeUuid(TimelineClusterMetric timelineClusterMetric, int maxLength) {
+
int metricNameUuidLength = 12;
+ String instanceId = timelineClusterMetric.getInstanceId();
+
+ if ((StringUtils.isEmpty(instanceId))) {
+ metricNameUuidLength = 14;
+ }
+
String metricName = timelineClusterMetric.getMetricName();
//Compute the individual splits.
String[] splits = getIndidivualSplits(metricName);
/*
- Compute the ascii sum of every split in the metric name. (asciiSum += (int) splits[s].charAt(i))
- For the last split, use weighted sum instead of ascii sum. (asciiSum += ((i+1) * (int) splits[s].charAt(i)))
+ Compute the weighted ascii sum of every split in the metric name. (asciiSum += (int) splits[s].charAt(i))
These weighted sums are 'appended' to get the unique ID for metric name.
*/
StringBuilder splitSums = new StringBuilder();
+ long totalAsciiSum = 0l;
if (splits.length > 0) {
for (String split : splits) {
int asciiSum = 0;
@@ -56,30 +63,31 @@
asciiSum += ((i + 1) * (int) split.charAt(i)); //weighted sum for split.
}
splitSums.append(asciiSum); //Append the sum to the array of sums.
+ totalAsciiSum += asciiSum; //Parity Sum
}
}
+ String splitSumString = totalAsciiSum + splitSums.reverse().toString(); //Reverse and attach parity sum.
+ int splitLength = splitSumString.length();
+
//Compute a unique metric seed for the stemmed metric name
String stemmedMetric = stem(metricName);
long metricSeed = 100123456789L;
metricSeed += computeWeightedNumericalAsciiSum(stemmedMetric);
-
//Reverse the computed seed to get a metric UUID portion which is used optionally.
- byte[] metricUuidPortion = StringUtils.reverse(String.valueOf(metricSeed)).getBytes();
- String splitSumString = splitSums.toString();
- int splitLength = splitSumString.length();
+ byte[] metricSeedBytes = StringUtils.reverse(String.valueOf(metricSeed)).getBytes();
- //If splitSums length > required metric UUID length, use only the required length suffix substring of the splitSums as metric UUID.
- if (splitLength > metricNameUuidLength) {
- int pad = (int)(0.25 * splitLength);
- metricUuidPortion = ArrayUtils.addAll(ArrayUtils.subarray(splitSumString.getBytes(), splitLength - metricNameUuidLength + pad, splitLength)
- , ArrayUtils.subarray(metricUuidPortion, 0, pad));
- } else {
- //If splitSums is not enough for required metric UUID length, pad with the metric uuid portion.
- int pad = metricNameUuidLength - splitLength;
- metricUuidPortion = ArrayUtils.addAll(splitSumString.getBytes(), ArrayUtils.subarray(metricUuidPortion, 0, pad));
+ int seedLength = (int)(0.25 * metricNameUuidLength);
+ int sumLength = metricNameUuidLength - seedLength;
+ if (splitLength < sumLength) {
+ sumLength = splitLength;
+ seedLength = metricNameUuidLength - sumLength;
}
+ byte[] metricUuidPortion = ArrayUtils.addAll(
+ ArrayUtils.subarray(splitSumString.getBytes(), 0, sumLength)
+ , ArrayUtils.subarray(metricSeedBytes, 0, seedLength));
+
/*
For appId and instanceId the logic is similar. Use a seed integer to start with and compute ascii sum.
Based on required length, use a suffix of the computed uuid.
@@ -87,25 +95,25 @@
String appId = timelineClusterMetric.getAppId();
int appidSeed = 11;
for (int i = 0; i < appId.length(); i++) {
- appidSeed += ((i+1) * appId.charAt(i));
+ appidSeed += appId.charAt(i);
}
String appIdSeedStr = String.valueOf(appidSeed);
byte[] appUuidPortion = ArrayUtils.subarray(appIdSeedStr.getBytes(), appIdSeedStr.length() - 2, appIdSeedStr.length());
- String instanceId = timelineClusterMetric.getInstanceId();
- ByteBuffer buffer = ByteBuffer.allocate(4);
- byte[] instanceUuidPortion = new byte[2];
if (StringUtils.isNotEmpty(instanceId)) {
+ byte[] instanceUuidPortion = new byte[2];
+ ByteBuffer buffer = ByteBuffer.allocate(4);
int instanceIdSeed = 1489;
for (int i = 0; i < appId.length(); i++) {
- instanceIdSeed += ((i+1)* appId.charAt(i));
+ instanceIdSeed += ((i+1) * appId.charAt(i));
}
buffer.putInt(instanceIdSeed);
ArrayUtils.subarray(buffer.array(), 2, 4);
+ // Concatenate all UUIDs together (metric uuid + appId uuid + instanceId uuid)
+ return ArrayUtils.addAll(ArrayUtils.addAll(metricUuidPortion, appUuidPortion), instanceUuidPortion);
}
- // Concatenate all UUIDs together (metric uuid + appId uuid + instanceId uuid)
- return ArrayUtils.addAll(ArrayUtils.addAll(metricUuidPortion, appUuidPortion), instanceUuidPortion);
+ return ArrayUtils.addAll(metricUuidPortion, appUuidPortion);
}
/**
@@ -151,7 +159,7 @@
*/
private String stem(String metricName) {
String metric = metricName.toLowerCase();
- String regex = "[\\.\\_\\%\\-\\=]";
+ String regex = "[\\.\\_\\%\\-\\=\\/\\@\\(\\)\\[\\]\\:]";
String trimmedMetric = StringUtils.removePattern(metric, regex);
return trimmedMetric;
}
@@ -181,20 +189,27 @@
}
}
- private long computeWeightedNumericalAsciiSum(String value) {
+ public long computeWeightedNumericalAsciiSum(String value) {
int len = value.length();
long numericValue = 0;
- int sum = 0;
- for (int i = 0; i < len; i++) {
+ long sum = 0;
+ int numericCtr = 0;
+ for (int i = 0; i < len;) {
int ascii = value.charAt(i);
- if (48 <= ascii && ascii <= 57) {
- numericValue += numericValue * 10 + (ascii - 48);
+ if (48 <= ascii && ascii <= 57 && numericCtr < 4) {
+ numericValue = numericValue * 10 + (ascii - 48);
+ numericCtr++;
+ i++;
} else {
if (numericValue > 0) {
sum += numericValue;
numericValue = 0;
}
- sum += value.charAt(i);
+ if (numericCtr < 4) {
+ sum += value.charAt(i);
+ i++;
+ }
+ numericCtr = 0;
}
}
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.DAT b/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.DAT
new file mode 100644
index 0000000..f5c181a
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/AMSSMOKETESTFAKE.DAT
@@ -0,0 +1 @@
+AMBARI_METRICS.SmokeTest.FakeMetric
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/NAMENODE.dat b/ambari-metrics-timelineservice/src/main/resources/metrics_def/NAMENODE.dat
index ad35c45..f9d2f27 100644
--- a/ambari-metrics-timelineservice/src/main/resources/metrics_def/NAMENODE.dat
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/NAMENODE.dat
@@ -220,6 +220,8 @@
dfs.NNTopUserOpCounts.windowMs=60000.op=setReplication.user=ambari-qa.count
dfs.NNTopUserOpCounts.windowMs=60000.op=setTimes.TotalCount
dfs.NNTopUserOpCounts.windowMs=60000.op=setTimes.user=ams.count
+dfs.NNTopUserOpCounts.windowMs=1500000.op=create.user=hbase/ctr-e138-1518143905142-124809-01-000002.hwx.site@EXAMPLE.COM.count
+dfs.NNTopUserOpCounts.windowMs=1500000.op=delete.user=hbase/ctr-e138-1518143905142-124809-01-000003.hwx.site@EXAMPLE.COM.count
dfs.namenode.AddBlockOps
dfs.namenode.AllowSnapshotOps
dfs.namenode.BlockOpsBatched
diff --git a/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.DAT b/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.DAT
new file mode 100644
index 0000000..af73d02
--- /dev/null
+++ b/ambari-metrics-timelineservice/src/main/resources/metrics_def/TIMELINE_METRIC_STORE_WATCHER.DAT
@@ -0,0 +1 @@
+TimelineMetricStoreWatcher.FakeMetric
\ No newline at end of file
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/TimelineMetricUuidManagerTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/TimelineMetricUuidManagerTest.java
index d1b3f01..a4676e6 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/TimelineMetricUuidManagerTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/uuid/TimelineMetricUuidManagerTest.java
@@ -18,17 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.uuid;
-import static org.easymock.EasyMock.anyString;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.replay;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
-import org.easymock.EasyMock;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.io.BufferedReader;
@@ -69,11 +60,6 @@
Assert.assertTrue(uuid.length == 16);
String uuidStr = new String(uuid);
Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
- if (uuids.containsKey(uuidStr) ) {
- if (!uuids.containsValue(timelineClusterMetric)) {
- System.out.println("COLLISION : " + timelineClusterMetric.toString() + " = " + uuids.get(uuidStr));
- }
- }
uuids.put(uuidStr, timelineClusterMetric);
}
}
@@ -88,11 +74,7 @@
TimelineClusterMetric timelineClusterMetric = new TimelineClusterMetric("TestMetric", app, null, -1l);
byte[] uuid = strategy.computeUuid(timelineClusterMetric, 16);
String uuidStr = new String(uuid);
- if (uuids.containsKey(uuidStr) ) {
- if (!uuids.containsValue(timelineClusterMetric)) {
- System.out.println("COLLISION : " + timelineClusterMetric.toString() + " = " + uuids.get(uuidStr));
- }
- }
+ Assert.assertFalse(uuids.containsKey(uuidStr) && !uuids.containsValue(timelineClusterMetric));
uuids.put(uuidStr, timelineClusterMetric);
}
}
@@ -145,6 +127,7 @@
Map<String, Set<String>> metricSet = new HashMap<String, Set<String>>();
FileInputStream fstream = null;
+ Set<String> hbaseMetrics = new HashSet<>();
BufferedReader br = null;
String strLine;
for (String appId : apps) {
@@ -176,9 +159,14 @@
}
}
metricsForApp.add("live_hosts");
- metricSet.put(appId.contains("hbase") ? "hbase" : appId, metricsForApp);
+ if (appId.equals("master_hbase") || appId.equals("slave_hbase")) {
+ hbaseMetrics.addAll(metricsForApp);
+ } else {
+ metricSet.put(appId, metricsForApp);
+ }
System.out.println("Found " + metricsForApp.size() + " metrics for appId = " + appId);
}
+ metricSet.put("hbase", hbaseMetrics);
return metricSet;
}
}