Merge remote-tracking branch 'remotes/origin/trunk' into branch-3.0-perf
diff --git a/ambari-metrics-assembly/pom.xml b/ambari-metrics-assembly/pom.xml
index a4b87de..d9875ce 100644
--- a/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics-assembly/pom.xml
@@ -35,6 +35,7 @@
<properties>
<collector.dir>${project.basedir}/../ambari-metrics-timelineservice</collector.dir>
<monitor.dir>${project.basedir}/../ambari-metrics-host-monitoring</monitor.dir>
+ <aggregator.dir>${project.basedir}/../ambari-metrics-host-aggregator</aggregator.dir>
<grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
<hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
<storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
@@ -243,6 +244,7 @@
<location>${collector.dir}/target/lib</location>
<excludes>
<exclude>*tests.jar</exclude>
+ <exclude>findbugs*.jar</exclude>
</excludes>
</source>
<source>
@@ -262,6 +264,7 @@
<exclude>bin/**</exclude>
<exclude>bin/*</exclude>
<exclude>lib/*tests.jar</exclude>
+ <exclude>lib/findbugs*.jar</exclude>
</excludes>
</source>
</sources>
@@ -599,6 +602,19 @@
</sources>
</mapping>
<mapping>
+ <directory>/var/lib/ambari-metrics-monitor/lib</directory>
+ <sources>
+ <source>
+ <location>
+ ${aggregator.dir}/target/
+ </location>
+ <includes>
+ <include>ambari-metrics-host-aggregator-${project.version}.jar</include>
+ </includes>
+ </source>
+ </sources>
+ </mapping>
+ <mapping>
<directory>/etc/ambari-metrics-monitor/conf</directory>
<configuration>true</configuration>
</mapping>
@@ -744,6 +760,7 @@
<path>/var/run/ambari-metrics-grafana</path>
<path>/var/log/ambari-metrics-grafana</path>
<path>/var/lib/ambari-metrics-collector</path>
+ <path>/var/lib/ambari-metrics-monitor/lib</path>
<path>/var/lib/ambari-metrics-grafana</path>
<path>/usr/lib/ambari-metrics-hadoop-sink</path>
<path>/usr/lib/ambari-metrics-kafka-sink</path>
@@ -1331,6 +1348,11 @@
<type>pom</type>
<optional>true</optional>
</dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-host-aggregator</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
diff --git a/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml b/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
index ab309a1..d015d31 100644
--- a/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
+++ b/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
@@ -64,6 +64,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>${aggregator.dir}/conf/windows</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>${monitor.dir}/conf/windows</directory>
<outputDirectory>/</outputDirectory>
<includes>
diff --git a/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics-assembly/src/main/assembly/monitor.xml
index 99a41c3..448fe62 100644
--- a/ambari-metrics-assembly/src/main/assembly/monitor.xml
+++ b/ambari-metrics-assembly/src/main/assembly/monitor.xml
@@ -46,6 +46,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>${aggregator.dir}/conf/unix</directory>
+ <outputDirectory>conf</outputDirectory>
+ <includes>
+ <include>log4j.properties</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>${monitor.dir}/conf/unix</directory>
<outputDirectory>bin</outputDirectory>
<includes>
@@ -68,4 +75,4 @@
-</assembly>
\ No newline at end of file
+</assembly>
diff --git a/ambari-metrics-common/pom.xml b/ambari-metrics-common/pom.xml
index 62ae75f..cae9734 100644
--- a/ambari-metrics-common/pom.xml
+++ b/ambari-metrics-common/pom.xml
@@ -108,6 +108,10 @@
<pattern>org.jboss</pattern>
<shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jboss</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.apache.http</pattern>
+ <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.apache.http</shadedPattern>
+ </relocation>
</relocations>
</configuration>
</execution>
@@ -189,5 +193,10 @@
<artifactId>powermock-module-junit4</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ <version>4.2.5</version>
+ </dependency>
</dependencies>
</project>
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 46f32f9..337f640 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -30,6 +30,7 @@
import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorUnavailableException;
import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardHostnameHashingStrategy;
import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy;
+import org.apache.http.HttpStatus;
import org.codehaus.jackson.map.AnnotationIntrospector;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -78,9 +79,16 @@
public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
+ public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
+ public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
+ public static final String INSTANCE_ID_PROPERTY = "instanceId";
+ public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
+ public static final String COOKIE = "Cookie";
+ private static final String WWW_AUTHENTICATE = "WWW-Authenticate";
+ private static final String NEGOTIATE = "Negotiate";
- protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
+ protected final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
protected static final AtomicInteger nullCollectorCounter = new AtomicInteger(0);
public static int NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS = 20;
@@ -93,6 +101,7 @@
private long lastFailedZkRequestTime = 0l;
private SSLSocketFactory sslSocketFactory;
+ private AppCookieManager appCookieManager = null;
protected final Log LOG;
@@ -111,7 +120,7 @@
private volatile boolean isInitializedForHA = false;
@SuppressWarnings("all")
- private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5;
+ private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 3;
private final Gson gson = new Gson();
@@ -153,20 +162,40 @@
connection = connectUrl.startsWith("https") ?
getSSLConnection(connectUrl) : getConnection(connectUrl);
- connection.setRequestMethod("POST");
- connection.setRequestProperty("Content-Type", "application/json");
- connection.setRequestProperty("Connection", "Keep-Alive");
- connection.setConnectTimeout(timeout);
- connection.setReadTimeout(timeout);
- connection.setDoOutput(true);
-
- if (jsonData != null) {
- try (OutputStream os = connection.getOutputStream()) {
- os.write(jsonData.getBytes("UTF-8"));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("emitMetricsJson to " + connectUrl + ", " + jsonData);
+ }
+ AppCookieManager appCookieManager = getAppCookieManager();
+ String appCookie = appCookieManager.getCachedAppCookie(connectUrl);
+ if (appCookie != null) {
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Using cached app cookie for URL:" + connectUrl);
}
+ connection.setRequestProperty(COOKIE, appCookie);
}
- int statusCode = connection.getResponseCode();
+ int statusCode = emitMetricsJson(connection, timeout, jsonData);
+
+ if (statusCode == HttpStatus.SC_UNAUTHORIZED ) {
+ String wwwAuthHeader = connection.getHeaderField(WWW_AUTHENTICATE);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Received WWW-Authentication header:" + wwwAuthHeader + ", for URL:" + connectUrl);
+ }
+ if (wwwAuthHeader != null && wwwAuthHeader.trim().startsWith(NEGOTIATE)) {
+ appCookie = appCookieManager.getAppCookie(connectUrl, true);
+ if (appCookie != null) {
+ cleanupInputStream(connection.getInputStream());
+ connection = connectUrl.startsWith("https") ?
+ getSSLConnection(connectUrl) : getConnection(connectUrl);
+ connection.setRequestProperty(COOKIE, appCookie);
+ statusCode = emitMetricsJson(connection, timeout, jsonData);
+ }
+ } else {
+ // no supported authentication type found
+ // we would let the original response propagate
+ LOG.error("Unsupported WWW-Authentication header:" + wwwAuthHeader+ ", for URL:" + connectUrl);
+ }
+ }
if (statusCode != 200) {
LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
@@ -209,6 +238,27 @@
}
}
+ private int emitMetricsJson(HttpURLConnection connection, int timeout, String jsonData) throws IOException {
+ connection.setRequestMethod("POST");
+ connection.setRequestProperty("Content-Type", "application/json");
+ connection.setRequestProperty("Connection", "Keep-Alive");
+ connection.setConnectTimeout(timeout);
+ connection.setReadTimeout(timeout);
+ connection.setDoOutput(true);
+
+ if (jsonData != null) {
+ try (OutputStream os = connection.getOutputStream()) {
+ os.write(jsonData.getBytes("UTF-8"));
+ }
+ }
+
+ int statusCode = connection.getResponseCode();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("emitMetricsJson: statusCode = " + statusCode);
+ }
+ return statusCode;
+ }
+
protected String getCurrentCollectorHost() {
String collectorHost;
// Get cached target
@@ -239,22 +289,47 @@
}
protected boolean emitMetrics(TimelineMetrics metrics) {
- String collectorHost = getCurrentCollectorHost();
- String connectUrl = getCollectorUri(collectorHost);
- String jsonData = null;
- LOG.debug("EmitMetrics connectUrl = " + connectUrl);
- try {
- jsonData = mapper.writeValueAsString(metrics);
- } catch (IOException e) {
- LOG.error("Unable to parse metrics", e);
+ String connectUrl;
+ boolean validCollectorHost = true;
+
+ if (isHostInMemoryAggregationEnabled()) {
+ connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+ } else {
+ String collectorHost = getCurrentCollectorHost();
+ if (collectorHost == null) {
+ validCollectorHost = false;
+ }
+ connectUrl = getCollectorUri(collectorHost);
}
- if (jsonData != null) {
- return emitMetricsJson(connectUrl, jsonData);
+
+ if (validCollectorHost) {
+ String jsonData = null;
+ LOG.debug("EmitMetrics connectUrl = " + connectUrl);
+ try {
+ jsonData = mapper.writeValueAsString(metrics);
+ } catch (IOException e) {
+ LOG.error("Unable to parse metrics", e);
+ }
+ if (jsonData != null) {
+ return emitMetricsJson(connectUrl, jsonData);
+ }
}
return false;
}
/**
+ * Get the associated app cookie manager.
+ *
+ * @return the app cookie manager
+ */
+ public synchronized AppCookieManager getAppCookieManager() {
+ if (appCookieManager == null) {
+ appCookieManager = new AppCookieManager();
+ }
+ return appCookieManager;
+ }
+
+ /**
* Cleans up and closes an input stream
* see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
* @param is the InputStream to clean up
@@ -560,4 +635,16 @@
* @return String "host1"
*/
abstract protected String getHostname();
+
+ /**
+ * Check if host in-memory aggregation is enabled
+ * @return
+ */
+ abstract protected boolean isHostInMemoryAggregationEnabled();
+
+ /**
+ * In memory aggregation port
+ * @return
+ */
+ abstract protected int getHostInMemoryAggregationPort();
}
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
new file mode 100644
index 0000000..c903e3d
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Set;
+
+@XmlRootElement(name="AggregationResult")
+public class AggregationResult {
+ protected Set<TimelineMetricWithAggregatedValues> result;
+ protected Long timeInMilis;
+
+ @Override
+ public String toString() {
+ return "AggregationResult{" +
+ "result=" + result +
+ ", timeInMilis=" + timeInMilis +
+ '}';
+ }
+
+ public AggregationResult() {
+ }
+
+ public AggregationResult(Set<TimelineMetricWithAggregatedValues> result, Long timeInMilis) {
+ this.result = result;
+ this.timeInMilis = timeInMilis;
+ }
+ @XmlElement
+ public Set<TimelineMetricWithAggregatedValues> getResult() {
+ return result;
+ }
+
+ public void setResult(Set<TimelineMetricWithAggregatedValues> result) {
+ this.result = result;
+ }
+ @XmlElement
+ public Long getTimeInMilis() {
+ return timeInMilis;
+ }
+
+ public void setTimeInMilis(Long timeInMilis) {
+ this.timeInMilis = timeInMilis;
+ }
+}
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java
new file mode 100644
index 0000000..bcba238
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.params.AuthPolicy;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * Handles SPNego authentication as a client of hadoop service, caches
+ * hadoop.auth cookie returned by hadoop service on successful SPNego
+ * authentication. Refreshes hadoop.auth cookie on demand if the cookie has
+ * expired.
+ *
+ */
+public class AppCookieManager {
+
+ static final String HADOOP_AUTH = "hadoop.auth";
+ private static final String HADOOP_AUTH_EQ = "hadoop.auth=";
+ private static final String SET_COOKIE = "Set-Cookie";
+
+ private static final EmptyJaasCredentials EMPTY_JAAS_CREDENTIALS = new EmptyJaasCredentials();
+
+ private Map<String, String> endpointCookieMap = new ConcurrentHashMap<String, String>();
+ private static Log LOG = LogFactory.getLog(AppCookieManager.class);
+
+ /**
+ * Utility method to exercise AppCookieManager directly
+ * @param args element 0 of args should be a URL to hadoop service protected by SPengo
+ * @throws IOException in case of errors
+ */
+ public static void main(String[] args) throws IOException {
+ new AppCookieManager().getAppCookie(args[0], false);
+ }
+
+ public AppCookieManager() {
+ }
+
+ /**
+ * Returns hadoop.auth cookie, doing needed SPNego authentication
+ *
+ * @param endpoint
+ * the URL of the Hadoop service
+ * @param refresh
+ * flag indicating wehther to refresh the cookie, if
+ * <code>true</code>, we do a new SPNego authentication and refresh
+ * the cookie even if the cookie already exists in local cache
+ * @return hadoop.auth cookie value
+ * @throws IOException
+ * in case of problem getting hadoop.auth cookie
+ */
+ public String getAppCookie(String endpoint, boolean refresh)
+ throws IOException {
+
+ HttpUriRequest outboundRequest = new HttpGet(endpoint);
+ URI uri = outboundRequest.getURI();
+ String scheme = uri.getScheme();
+ String host = uri.getHost();
+ int port = uri.getPort();
+ String path = uri.getPath();
+ if (!refresh) {
+ String appCookie = endpointCookieMap.get(endpoint);
+ if (appCookie != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("got cached cookie");
+ }
+ return appCookie;
+ }
+ }
+
+ clearAppCookie(endpoint);
+
+ DefaultHttpClient client = new DefaultHttpClient();
+ SPNegoSchemeFactory spNegoSF = new SPNegoSchemeFactory(/* stripPort */true);
+ client.getAuthSchemes().register(AuthPolicy.SPNEGO, spNegoSF);
+ client.getCredentialsProvider().setCredentials(
+ new AuthScope(/* host */null, /* port */-1, /* realm */null),
+ EMPTY_JAAS_CREDENTIALS);
+
+ String hadoopAuthCookie = null;
+ HttpResponse httpResponse = null;
+ try {
+ HttpHost httpHost = new HttpHost(host, port, scheme);
+ HttpRequest httpRequest = new HttpOptions(path);
+ httpResponse = client.execute(httpHost, httpRequest);
+ Header[] headers = httpResponse.getHeaders(SET_COOKIE);
+ if (LOG.isDebugEnabled()) {
+ for (Header header : headers) {
+ LOG.debug(header.getName() + " : " + header.getValue());
+ }
+ }
+ hadoopAuthCookie = getHadoopAuthCookieValue(headers);
+ if (hadoopAuthCookie == null) {
+ int statusCode = httpResponse.getStatusLine().getStatusCode();
+ HttpEntity entity = httpResponse.getEntity();
+ String responseBody = entity != null ? EntityUtils.toString(entity) : null;
+ LOG.error("SPNego authentication failed with statusCode = " + statusCode + ", responseBody = " + responseBody + ", can not get hadoop.auth cookie for URL: " + endpoint);
+ return null;
+ }
+ } finally {
+ if (httpResponse != null) {
+ HttpEntity entity = httpResponse.getEntity();
+ if (entity != null) {
+ entity.getContent().close();
+ }
+ }
+
+ }
+
+ hadoopAuthCookie = HADOOP_AUTH_EQ + quote(hadoopAuthCookie);
+ setAppCookie(endpoint, hadoopAuthCookie);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Successful SPNego authentication to URL:" + uri.toString());
+ }
+ return hadoopAuthCookie;
+ }
+
+
+ /**
+ * Returns the cached app cookie
+ * @param endpoint the hadoop end point we authenticate to
+ * @return the cached app cookie, can be null
+ */
+ public String getCachedAppCookie(String endpoint) {
+ return endpointCookieMap.get(endpoint);
+ }
+
+ /**
+ * Sets the cached app cookie cache
+ * @param endpoint the hadoop end point we authenticate to
+ * @param appCookie the app cookie
+ */
+ private void setAppCookie(String endpoint, String appCookie) {
+ endpointCookieMap.put(endpoint, appCookie);
+ }
+
+ /**
+ * Clears the cached app cookie
+ * @param endpoint the hadoop end point we authenticate to
+ */
+ private void clearAppCookie(String endpoint) {
+ endpointCookieMap.remove(endpoint);
+ }
+
+ static String quote(String s) {
+ return s == null ? s : "\"" + s + "\"";
+ }
+
+ static String getHadoopAuthCookieValue(Header[] headers) {
+ if (headers == null) {
+ return null;
+ }
+ for (Header header : headers) {
+ HeaderElement[] elements = header.getElements();
+ for (HeaderElement element : elements) {
+ String cookieName = element.getName();
+ if (cookieName.equals(HADOOP_AUTH)) {
+ if (element.getValue() != null) {
+ String trimmedVal = element.getValue().trim();
+ if (!trimmedVal.isEmpty()) {
+ return trimmedVal;
+ }
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+
+ private static class EmptyJaasCredentials implements Credentials {
+
+ public String getPassword() {
+ return null;
+ }
+
+ public Principal getUserPrincipal() {
+ return null;
+ }
+
+ }
+
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
similarity index 96%
rename from ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
rename to ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
index 825ac25..84cba0e 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
import org.apache.hadoop.classification.InterfaceAudience;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
similarity index 95%
rename from ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
rename to ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
index 9c837b6..7ef2c1d 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
import org.codehaus.jackson.annotate.JsonCreator;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
similarity index 94%
rename from ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
rename to ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
index 340ec75..e190913 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
import org.codehaus.jackson.annotate.JsonCreator;
@@ -54,7 +54,7 @@
this.numberOfSamples = numberOfSamples;
}
- public double getAvg() {
+ public double calculateAverage() {
return sum / numberOfSamples;
}
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
index 31044cc..e87f06e 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
@@ -35,7 +35,7 @@
}
public static Precision getPrecision(String precision) throws PrecisionFormatException {
- if (precision == null ) {
+ if (precision == null || precision.isEmpty()) {
return null;
}
try {
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 44c9d4a..edace52 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -45,7 +45,7 @@
private String type;
private String units;
private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
- private Map<String, String> metadata = new HashMap<>();
+ private HashMap<String, String> metadata = new HashMap<>();
// default
public TimelineMetric() {
@@ -151,11 +151,11 @@
}
@XmlElement(name = "metadata")
- public Map<String,String> getMetadata () {
+ public HashMap<String,String> getMetadata () {
return metadata;
}
- public void setMetadata (Map<String,String> metadata) {
+ public void setMetadata (HashMap<String,String> metadata) {
this.metadata = metadata;
}
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
new file mode 100644
index 0000000..626ac5f
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "TimelineMetricWithAggregatedValues")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetricWithAggregatedValues {
+ private TimelineMetric timelineMetric;
+ private MetricHostAggregate metricAggregate;
+
+ public TimelineMetricWithAggregatedValues() {
+ }
+
+ public TimelineMetricWithAggregatedValues(TimelineMetric metric, MetricHostAggregate metricAggregate) {
+ timelineMetric = metric;
+ this.metricAggregate = metricAggregate;
+ }
+
+ @XmlElement
+ public MetricHostAggregate getMetricAggregate() {
+ return metricAggregate;
+ }
+ @XmlElement
+ public TimelineMetric getTimelineMetric() {
+ return timelineMetric;
+ }
+
+ public void setTimelineMetric(TimelineMetric timelineMetric) {
+ this.timelineMetric = timelineMetric;
+ }
+
+ public void setMetricAggregate(MetricHostAggregate metricAggregate) {
+ this.metricAggregate = metricAggregate;
+ }
+
+ @Override
+ public String toString() {
+ return "TimelineMetricWithAggregatedValues{" +
+ "timelineMetric=" + timelineMetric +
+ ", metricAggregate=" + metricAggregate +
+ '}';
+ }
+}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java
new file mode 100644
index 0000000..8355288
--- /dev/null
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.junit.Test;
+
+public class AppCookieManagerTest {
+
+ @Test
+ public void getCachedAppCookie() {
+ assertNull(new AppCookieManager().getCachedAppCookie("http://dummy"));
+ }
+
+ @Test
+ public void getHadoopAuthCookieValueWithNullHeaders() {
+ assertNull(AppCookieManager.getHadoopAuthCookieValue(null));
+ }
+
+ @Test
+ public void getHadoopAuthCookieValueWitEmptylHeaders() {
+ assertNull(AppCookieManager.getHadoopAuthCookieValue(new Header[0]));
+ }
+
+ @Test
+ public void getHadoopAuthCookieValueWithValidlHeaders() {
+ Header[] headers = new Header[1];
+ headers[0] = new BasicHeader("Set-Cookie", AppCookieManager.HADOOP_AUTH + "=dummyvalue");
+ assertNotNull(AppCookieManager.getHadoopAuthCookieValue(headers));
+ }
+
+}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
index 9b0cdbe..ce2cf79 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
@@ -90,6 +90,16 @@
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return true;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
+
+ @Override
public boolean emitMetrics(TimelineMetrics metrics) {
super.init();
return super.emitMetrics(metrics);
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
index a393a96..f0174d5 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -192,5 +192,15 @@
protected String getHostname() {
return "h1";
}
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return true;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
}
}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 32fe32e..3be2162 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -45,7 +45,7 @@
public class HandleConnectExceptionTest {
private static final String COLLECTOR_URL = "collector";
private TestTimelineMetricsSink sink;
-
+
@Before
public void init(){
sink = new TestTimelineMetricsSink();
@@ -88,6 +88,17 @@
}
}
+ @Test
+ public void testEmitMetricsWithNullHost() {
+ TestTimelineMetricsSinkWithNullHost sinkWithNullHost = new TestTimelineMetricsSinkWithNullHost();
+
+ boolean success = sinkWithNullHost.emitMetrics(new TimelineMetrics());
+ Assert.assertFalse(success);
+
+ success = sinkWithNullHost.emitMetrics(new TimelineMetrics());
+ Assert.assertTrue(success);
+ }
+
private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
@Override
protected String getCollectorUri(String host) {
@@ -125,6 +136,16 @@
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 61888;
+ }
+
+ @Override
public boolean emitMetrics(TimelineMetrics metrics) {
super.init();
return super.emitMetrics(metrics);
@@ -136,4 +157,77 @@
}
}
+
+ private class TestTimelineMetricsSinkWithNullHost extends AbstractTimelineMetricsSink {
+
+ int ctr = 0;
+
+ @Override
+ protected String getCollectorUri(String host) {
+ return COLLECTOR_URL;
+ }
+
+ @Override
+ protected String getCollectorProtocol() {
+ return "http";
+ }
+
+ @Override
+ protected String getCollectorPort() {
+ return "2181";
+ }
+
+ @Override
+ protected int getTimeoutSeconds() {
+ return 10;
+ }
+
+ @Override
+ protected String getZookeeperQuorum() {
+ return "localhost:2181";
+ }
+
+ @Override
+ protected Collection<String> getConfiguredCollectorHosts() {
+ return Arrays.asList("localhost");
+ }
+
+ @Override
+ protected String getHostname() {
+ return "h1";
+ }
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 0;
+ }
+
+ @Override
+ public boolean emitMetrics(TimelineMetrics metrics) {
+ super.init();
+ return super.emitMetrics(metrics);
+ }
+
+ @Override
+ protected synchronized String findPreferredCollectHost() {
+ if (ctr == 0) {
+ ctr++;
+ return null;
+ } else {
+ return "localhost";
+ }
+ }
+
+ @Override
+ protected boolean emitMetricsJson(String connectUrl, String jsonData) {
+ return true;
+ }
+
+ }
+
}
diff --git a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 3fdf3f4..6277907 100644
--- a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -61,6 +61,11 @@
private final static String COUNTER_METRICS_PROPERTY = "counters";
private final Set<String> counterMetrics = new HashSet<String>();
private int timeoutSeconds = 10;
+ private boolean setInstanceId;
+ private String instanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
+
@Override
public void start() {
@@ -106,6 +111,11 @@
zookeeperQuorum = configuration.getProperty("zookeeper.quorum");
protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
port = configuration.getProperty(COLLECTOR_PORT, "6188");
+ setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+ instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
+
+ hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
// Initialize the collector write strategy
super.init();
@@ -158,6 +168,16 @@
return hostname;
}
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
public void setPollFrequency(long pollFrequency) {
this.pollFrequency = pollFrequency;
}
@@ -227,7 +247,11 @@
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostname);
- timelineMetric.setInstanceId(component);
+ if (setInstanceId) {
+ timelineMetric.setInstanceId(instanceId + component);
+ } else {
+ timelineMetric.setInstanceId(component);
+ }
timelineMetric.setAppId("FLUME_HANDLER");
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
diff --git a/ambari-metrics-grafana/ambari-metrics/datasource.js b/ambari-metrics-grafana/ambari-metrics/datasource.js
index 97de6e7..07e76af 100644
--- a/ambari-metrics-grafana/ambari-metrics/datasource.js
+++ b/ambari-metrics-grafana/ambari-metrics/datasource.js
@@ -84,7 +84,6 @@
* AMS Datasource Query
*/
AmbariMetricsDatasource.prototype.query = function (options) {
-
var emptyData = function (metric) {
var legend = metric.alias ? metric.alias : metric.metric;
return {
@@ -217,16 +216,21 @@
var getHostAppIdData = function(target) {
var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
+ var instanceId = typeof target.cluster == 'undefined' ? '' : '&instanceId=' + target.cluster;
var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.metric + metricTransform +
- metricAggregator + "&hostname=" + target.hosts + '&appId=' + target.app + '&startTime=' + from +
- '&endTime=' + to + precision + seriesAggregator }).then(
- getMetricsData(target)
+ metricAggregator + "&hostname=" + target.hosts + '&appId=' + target.app + instanceId + '&startTime=' + from +
+ '&endTime=' + to + precision + seriesAggregator }).then(
+ getMetricsData(target)
);
};
//Check if it's a templated dashboard.
+ var templatedClusters = templateSrv.variables.filter(function(o) { return o.name === "cluster"});
+ var templatedCluster = (_.isEmpty(templatedClusters)) ? '' : templatedClusters[0].options.filter(function(cluster)
+ { return cluster.selected; }).map(function(clusterName) { return clusterName.value; });
+
var templatedHosts = templateSrv.variables.filter(function(o) { return o.name === "hosts"});
var templatedHost = (_.isEmpty(templatedHosts)) ? '' : templatedHosts[0].options.filter(function(host)
{ return host.selected; }).map(function(hostName) { return hostName.value; });
@@ -236,20 +240,23 @@
var tComponent = _.isEmpty(tComponents) ? '' : tComponents[0].current.value;
var getServiceAppIdData = function(target) {
+ var tCluster = (_.isEmpty(templateSrv.variables))? templatedCluster : '';
+ var instanceId = typeof tCluster == 'undefined' ? '' : '&instanceId=' + tCluster;
var tHost = (_.isEmpty(templateSrv.variables)) ? templatedHost : target.templatedHost;
- var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.metric + metricTransform
- + metricAggregator + '&hostname=' + tHost + '&appId=' + target.app + '&startTime=' + from +
+ + metricAggregator + '&hostname=' + tHost + '&appId=' + target.app + instanceId + '&startTime=' + from +
'&endTime=' + to + precision + seriesAggregator }).then(
getMetricsData(target)
);
};
// To speed up querying on templatized dashboards.
var getAllHostData = function(target) {
+ var instanceId = typeof target.templatedCluster == 'undefined' ? '' : '&instanceId=' + target.templatedCluster;
var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
@@ -265,28 +272,30 @@
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
var templatedComponent = (_.isEmpty(tComponent)) ? target.app : tComponent;
return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.metric + metricTransform
- + metricAggregator + '&hostname=' + target.templatedHost + '&appId=' + templatedComponent + '&startTime=' + from +
- '&endTime=' + to + precision + topN + seriesAggregator }).then(
+ + metricAggregator + '&hostname=' + target.templatedHost + '&appId=' + templatedComponent + instanceId
+ + '&startTime=' + from + '&endTime=' + to + precision + topN + seriesAggregator }).then(
allHostMetricsData(target)
);
};
var getYarnAppIdData = function(target) {
- var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
+ var instanceId = typeof target.templatedCluster == 'undefined' ? '' : '&instanceId=' + target.templatedCluster;
var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.queue + metricTransform
- + metricAggregator + '&appId=resourcemanager&startTime=' + from +
+ + metricAggregator + '&appId=resourcemanager' + instanceId + '&startTime=' + from +
'&endTime=' + to + precision + seriesAggregator }).then(
getMetricsData(target)
);
};
var getHbaseAppIdData = function(target) {
- var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ var instanceId = typeof target.templatedCluster == 'undefined' ? '' : '&instanceId=' + target.templatedCluster;
+ var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
- return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.hbMetric + '&appId=hbase&startTime='
+ return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.hbMetric + instanceId + '&appId=hbase&startTime='
+ from + '&endTime=' + to + precision + seriesAggregator }).then(
allHostMetricsData(target)
);
@@ -295,22 +304,25 @@
var getKafkaAppIdData = function(target) {
var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
+ var instanceId = typeof target.templatedCluster == 'undefined' ? '' : '&instanceId=' + target.templatedCluster;
var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
- return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.kbMetric + metricTransform
+ return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.kbMetric + metricTransform + instanceId
+ metricAggregator + '&appId=kafka_broker&startTime=' + from +
'&endTime=' + to + precision + seriesAggregator }).then(
getMetricsData(target)
);
};
var getNnAppIdData = function(target) {
+
var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
+ var instanceId = typeof target.templatedCluster == 'undefined' ? '' : '&instanceId=' + target.templatedCluster;
var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
- return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.nnMetric + metricTransform
+ return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.nnMetric + metricTransform + instanceId
+ metricAggregator + '&appId=namenode&startTime=' + from + '&endTime=' + to + precision + seriesAggregator }).then(
allHostMetricsData(target)
);
@@ -318,12 +330,13 @@
// Storm Topology calls.
var getStormData = function(target) {
+ var instanceId = typeof target.templatedCluster == 'undefined' ? '' : '&instanceId=' + target.templatedCluster;
var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
- return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.sTopoMetric + metricTransform
+ return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.sTopoMetric + metricTransform + instanceId
+ metricAggregator + '&appId=nimbus&startTime=' + from + '&endTime=' + to + precision + seriesAggregator }).then(
allHostMetricsData(target)
);
@@ -331,12 +344,13 @@
// Druid calls.
var getDruidData = function(target) {
+ var instanceId = typeof target.templatedCluster == 'undefined' ? '' : '&instanceId=' + target.templatedCluster;
var precision = target.precision === 'default' || typeof target.precision == 'undefined' ? '' : '&precision='
+ target.precision;
var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
- return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.sDataSourceMetric + metricTransform
+ return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.sDataSourceMetric + metricTransform + instanceId
+ metricAggregator + '&appId=druid&startTime=' + from + '&endTime=' + to + precision + seriesAggregator }).then(
allHostMetricsData(target)
);
@@ -471,7 +485,6 @@
target.sTopology = selectedTopology;
target.sComponent = selectedComponent;
target.sTopoMetric = target.metric.replace('*', target.sTopology).replace('*', target.sComponent);
- debugger;
return getStormData(target);
}));
}
@@ -507,14 +520,20 @@
}));
});
}
-
// To speed up querying on templatized dashboards.
- if (templateSrv.variables[1] && templateSrv.variables[1].name === "hosts") {
+ var indexOfHosts = -1;
+ for (var i = 0; i < templateSrv.variables.length; i++) {
+ if (templateSrv.variables[i].name == 'hosts') {
+ indexOfHosts = i;
+ }
+ }
+ if (indexOfHosts >= 0) {
var allHosts = templateSrv._values.hosts.lastIndexOf('}') > 0 ? templateSrv._values.hosts.slice(1,-1) :
templateSrv._values.hosts;
allHosts = templateSrv._texts.hosts === "All" ? '%' : allHosts;
metricsPromises.push(_.map(options.targets, function(target) {
- target.templatedHost = allHosts;
+ target.templatedHost = allHosts? allHosts : '';
+ target.templatedCluster = templatedCluster;
return getAllHostData(target);
}));
}
@@ -558,14 +577,19 @@
AmbariMetricsDatasource.prototype.metricFindQuery = function (query) {
var interpolated;
try {
- interpolated = templateSrv.replace(query);
+ interpolated = query.split('.')[0];
} catch (err) {
return $q.reject(err);
}
+ var templatedClusters = templateSrv.variables.filter(function(o) { return o.name === "cluster"});
+ var templatedCluster = (_.isEmpty(templatedClusters)) ? '' : templatedClusters[0].options.filter(function(cluster)
+ { return cluster.selected; }).map(function(clusterName) { return clusterName.value; });
+
var tComponents = _.isEmpty(templateSrv.variables) ? '' : templateSrv.variables.filter(function(variable)
{ return variable.name === "components"});
var tComponent = _.isEmpty(tComponents) ? '' : tComponents[0].current.value;
+
// Templated Variable for HBase Users
// It will search the cluster and populate the HBase Users.
if(interpolated === "hbase-users") {
@@ -837,58 +861,11 @@
});
}
- // Templated Variable that will populate all hosts on the cluster.
- // The variable needs to be set to "hosts".
- if (!tComponent){
- return this.doAmbariRequest({
- method: 'GET',
- url: '/ws/v1/timeline/metrics/' + interpolated
- })
- .then(function (results) {
- //Remove fakehostname from the list of hosts on the cluster.
- var fake = "fakehostname"; delete results.data[fake];
- return _.map(_.keys(results.data), function (hostName) {
- return {
- text: hostName,
- expandable: hostName.expandable ? true : false
- };
- });
- });
- } else {
- // Create a dropdown in templated dashboards for single components.
- // This will check for the component set and show hosts only for the
- // selected component.
- return this.doAmbariRequest({
- method: 'GET',
- url: '/ws/v1/timeline/metrics/hosts'
- })
- .then(function(results) {
- var compToHostMap = {};
- //Remove fakehostname from the list of hosts on the cluster.
- var fake = "fakehostname";
- delete results.data[fake];
- //Query hosts based on component name
- _.forEach(results.data, function(comp, hostName) {
- comp.forEach(function(component) {
- if (!compToHostMap[component]) {
- compToHostMap[component] = [];
- }
- compToHostMap[component].push(hostName);
- });
- });
- var compHosts = compToHostMap[tComponent];
- compHosts = _.map(compHosts, function(host) {
- return {
- text: host,
- expandable: host.expandable ? true : false
- };
- });
- compHosts = _.sortBy(compHosts, function(i) {
- return i.text.toLowerCase();
- });
- return $q.when(compHosts);
- });
- }
+ if (interpolated == 'hosts') {
+ return this.suggestHosts(tComponent, templatedCluster);
+ } else if (interpolated == 'cluster') {
+ return this.suggestClusters(tComponent)
+ }
};
/**
@@ -941,34 +918,47 @@
return $q.when(keys);
};
+ AmbariMetricsDatasource.prototype.suggestClusters = function(app) {
+ if (!app) { app = ''; }
+ return this.doAmbariRequest({
+ method: 'GET',
+ url: '/ws/v1/timeline/metrics/instances?' + 'appId=' + app
+ }).then(function(response) {
+ var clusters = [];
+ var data = response.data;
+ for (var cluster in data) {
+ if (data[cluster].hasOwnProperty(app)) {
+ clusters.push({text: cluster});
+ }
+ }
+ return $q.when(clusters);
+ });
+ };
+
/**
* AMS Datasource - Suggest Hosts.
*
* Query Hosts on the cluster.
*/
- AmbariMetricsDatasource.prototype.suggestHosts = function (query, app) {
- console.log(query);
- return this.doAmbariRequest({method: 'GET', url: '/ws/v1/timeline/metrics/hosts'})
- .then(function (results) {
- var compToHostMap = {};
- //Remove fakehostname from the list of hosts on the cluster.
- var fake = "fakehostname"; delete results.data[fake];
- //Query hosts based on component name
- _.forEach(results.data, function (comp, hostName) {
- comp.forEach(function (component) {
- if (!compToHostMap[component]){
- compToHostMap[component] = [];
- }
- compToHostMap[component].push(hostName);
- });
- });
- var compHosts = compToHostMap[app];
- compHosts = _.map(compHosts, function (h) {
- return {text: h};
- });
- compHosts = _.sortBy(compHosts, function (i) { return i.text.toLowerCase(); });
- return $q.when(compHosts);
- });
+ AmbariMetricsDatasource.prototype.suggestHosts = function (app, cluster) {
+ if (!app) { app = ''; }
+ if (!cluster) { cluster = ''; }
+ return this.doAmbariRequest({
+ method: 'GET',
+ url: '/ws/v1/timeline/metrics/instances?' + 'appId=' + app + '&instanceId=' + cluster
+ }).then(function (response) {
+ var hosts = [];
+ var data = response.data;
+ for (var cluster in data) {
+ var appHosts = data[cluster][app];
+ if (appHosts) {
+ for (var index in appHosts) {
+ hosts.push({text: appHosts[index]});
+ }
+ }
+ }
+ return $q.when(hosts);
+ });
};
/**
diff --git a/ambari-metrics-grafana/ambari-metrics/partials/query.editor.html b/ambari-metrics-grafana/ambari-metrics/partials/query.editor.html
index 3f322c1..7e78cc0 100644
--- a/ambari-metrics-grafana/ambari-metrics/partials/query.editor.html
+++ b/ambari-metrics-grafana/ambari-metrics/partials/query.editor.html
@@ -81,6 +81,18 @@
</a>
</li>
+ <li class="tight-form-item" style="width: 86px" ng-hide="dashboard.templating.list.length > 0">
+ Cluster
+ </li>
+ <li ng-hide="dashboard.templating.list.length > 0">
+ <input type="text" class="input-large tight-form-input" ng-model="target.cluster"
+ spellcheck='false' bs-typeahead="suggestClusters" placeholder="cluster name" data-min-length=0 data-items=100
+ ng-blur="targetBlur()">
+ </input>
+ <a bs-tooltip="target.errors.metric" style="color: rgb(229, 189, 28)" ng-show="target.errors.metric">
+ <i class="fa fa-warning"></i>
+ </a>
+ </li>
<li class="tight-form-item" style="width: 86px" ng-hide="dashboard.templating.list.length > 0">
Hosts
@@ -95,8 +107,6 @@
</a>
</li>
-
-
<li class="tight-form-item">
Aggregator
</li>
diff --git a/ambari-metrics-grafana/ambari-metrics/queryCtrl.js b/ambari-metrics-grafana/ambari-metrics/queryCtrl.js
index a26e7d0..02b5813 100644
--- a/ambari-metrics-grafana/ambari-metrics/queryCtrl.js
+++ b/ambari-metrics-grafana/ambari-metrics/queryCtrl.js
@@ -55,6 +55,7 @@
if (newValue === '') {
$scope.target.metric = '';
$scope.target.hosts = '';
+ $scope.target.cluster = '';
}
});
if (!$scope.target.downsampleAggregator) {
@@ -86,8 +87,14 @@
.then(callback);
};
+ $scope.suggestClusters = function(query, callback) {
+ $scope.datasource.suggestClusters($scope.target.app)
+ .then($scope.getTextValues)
+ .then(callback);
+ };
+
$scope.suggestHosts = function(query, callback) {
- $scope.datasource.suggestHosts(query, $scope.target.app)
+ $scope.datasource.suggestHosts($scope.target.app, $scope.target.cluster)
.then($scope.getTextValues)
.then(callback);
};
diff --git a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 8e0de03..a290ced 100644
--- a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -52,6 +52,7 @@
private TimelineMetricsCache metricsCache;
private String hostName = "UNKNOWN.example.com";
private String instanceId = null;
+ private boolean setInstanceId;
private String serviceName = "";
private Collection<String> collectorHosts;
private String collectorUri;
@@ -74,6 +75,8 @@
return t;
}
});
+ private int hostInMemoryAggregationPort;
+ private boolean hostInMemoryAggregationEnabled;
@Override
public void init(SubsetConfiguration conf) {
@@ -95,8 +98,8 @@
}
serviceName = getServiceName(conf);
- String inst = conf.getString("instanceId", "");
- instanceId = StringUtils.isEmpty(inst) ? null : inst;
+ instanceId = conf.getString(INSTANCE_ID_PROPERTY, null);
+ setInstanceId = conf.getBoolean(SET_INSTANCE_ID_PROPERTY, false);
LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName);
// Initialize the collector write strategy
@@ -106,7 +109,8 @@
protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
port = conf.getString(COLLECTOR_PORT, "6188");
-
+ hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
+ hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
if (collectorHosts.isEmpty()) {
LOG.error("No Metric collector configured.");
} else {
@@ -248,6 +252,16 @@
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void putMetrics(MetricsRecord record) {
try {
String recordName = record.name();
@@ -307,9 +321,10 @@
int sbBaseLen = sb.length();
List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
- Map<String, String> metadata = null;
+ HashMap<String, String> metadata = null;
if (skipAggregation) {
- metadata = Collections.singletonMap("skipAggregation", "true");
+ metadata = new HashMap<>();
+ metadata.put("skipAggregation", "true");
}
long startTime = record.timestamp();
@@ -321,7 +336,9 @@
timelineMetric.setMetricName(name);
timelineMetric.setHostName(hostName);
timelineMetric.setAppId(serviceName);
- timelineMetric.setInstanceId(instanceId);
+ if (setInstanceId) {
+ timelineMetric.setInstanceId(instanceId);
+ }
timelineMetric.setStartTime(startTime);
timelineMetric.setType(metric.type() != null ? metric.type().name() : null);
timelineMetric.getMetricValues().put(startTime, value.doubleValue());
diff --git a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 5777639..30c5c23 100644
--- a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -50,12 +50,15 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.TreeMap;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PORT;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_HOSTS_PROPERTY;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROTOCOL;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.INSTANCE_ID_PROPERTY;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.SET_INSTANCE_ID_PROPERTY;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
@@ -81,13 +84,17 @@
}
@Test
- @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class})
+ @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class, TimelineMetric.class, HadoopTimelineMetricsSink.class})
public void testPutMetrics() throws Exception {
HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink();
HttpURLConnection connection = PowerMock.createNiceMock(HttpURLConnection.class);
URL url = PowerMock.createNiceMock(URL.class);
InputStream is = IOUtils.toInputStream(gson.toJson(Collections.singletonList("localhost")));
+ TimelineMetric timelineMetric = PowerMock.createNiceMock(TimelineMetric.class);
+ expectNew(TimelineMetric.class).andReturn(timelineMetric).times(2);
+ expect(timelineMetric.getMetricValues()).andReturn(new TreeMap<Long, Double>()).anyTimes();
+ expect(timelineMetric.getMetricName()).andReturn("metricName").anyTimes();
expectNew(URL.class, anyString()).andReturn(url).anyTimes();
expect(url.openConnection()).andReturn(connection).anyTimes();
expect(connection.getInputStream()).andReturn(is).anyTimes();
@@ -106,6 +113,8 @@
expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(1000).anyTimes();
+ expect(conf.getBoolean(eq(SET_INSTANCE_ID_PROPERTY), eq(false))).andReturn(true).anyTimes();
+ expect(conf.getString(eq(INSTANCE_ID_PROPERTY), anyString())).andReturn("instanceId").anyTimes();
conf.setListDelimiter(eq(','));
expectLastCall().anyTimes();
@@ -145,6 +154,9 @@
expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
+ timelineMetric.setInstanceId(eq("instanceId"));
+ EasyMock.expectLastCall();
+
replay(conf, record, metric);
replayAll();
diff --git a/ambari-metrics-host-aggregator/conf/unix/log4j.properties b/ambari-metrics-host-aggregator/conf/unix/log4j.properties
new file mode 100644
index 0000000..d7ceedd
--- /dev/null
+++ b/ambari-metrics-host-aggregator/conf/unix/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari-metrics-monitor/ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
+
+
diff --git a/ambari-metrics-host-aggregator/conf/windows/log4j.properties b/ambari-metrics-host-aggregator/conf/windows/log4j.properties
new file mode 100644
index 0000000..d9aabab
--- /dev/null
+++ b/ambari-metrics-host-aggregator/conf/windows/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=\\var\\log\\ambari-metrics-monitor\\ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
diff --git a/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics-host-aggregator/pom.xml
new file mode 100644
index 0000000..24432dd
--- /dev/null
+++ b/ambari-metrics-host-aggregator/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>ambari-metrics</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>ambari-metrics-host-aggregator</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Ambari Metrics Host Aggregator</name>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-metrics-common</artifactId>
+ <version>2.0.0.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <version>2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-json</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.xml.bind</groupId>
+ <artifactId>jaxb-api</artifactId>
+ <version>2.2.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-core</artifactId>
+ <version>1.11</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.1.2.3.4.0-3347</version>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-core</artifactId>
+ <version>1.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.jersey-test-framework</groupId>
+ <artifactId>jersey-test-framework-grizzly2</artifactId>
+ <version>1.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.2</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.6</version>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
new file mode 100644
index 0000000..1e5cc82
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.sun.jersey.api.container.httpserver.HttpServerFactory;
+import com.sun.jersey.api.core.PackagesResourceConfig;
+import com.sun.jersey.api.core.ResourceConfig;
+import com.sun.net.httpserver.HttpServer;
+
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
+
+/**
+ * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
+ */
+public class AggregatorApplication
+{
+ private static final int STOP_SECONDS_DELAY = 0;
+ private static final int JOIN_SECONDS_TIMEOUT = 5;
+ private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+ private Log LOG;
+ private final int webApplicationPort;
+ private final int rawPublishingInterval;
+ private final int aggregationInterval;
+ private Configuration configuration;
+ private Thread aggregatePublisherThread;
+ private Thread rawPublisherThread;
+ private TimelineMetricsHolder timelineMetricsHolder;
+ private HttpServer httpServer;
+
+ public AggregatorApplication(String hostname, String collectorHosts) {
+ LOG = LogFactory.getLog(this.getClass());
+ configuration = new Configuration(true);
+ initConfiguration();
+ configuration.set("timeline.metrics.collector.hosts", collectorHosts);
+ configuration.set("timeline.metrics.hostname", hostname);
+ configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration());
+ this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
+ this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
+ this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+ this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
+ try {
+ this.httpServer = createHttpServer();
+ } catch (IOException e) {
+ LOG.error("Exception while starting HTTP server. Exiting", e);
+ System.exit(1);
+ }
+ }
+
+ private String getZkQuorumFromConfiguration() {
+ String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
+ String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", "");
+ return getZkConnectionUrl(zkClientPort, zkServerHosts);
+ }
+
+ protected void initConfiguration() {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ if (classLoader == null) {
+ classLoader = getClass().getClassLoader();
+ }
+
+ URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+ LOG.info("Found metric service configuration: " + amsResUrl);
+ if (amsResUrl == null) {
+ throw new IllegalStateException("Unable to initialize the metrics " +
+ "subsystem. No ams-site present in the classpath.");
+ }
+
+ try {
+ configuration.addResource(amsResUrl.toURI().toURL());
+ } catch (Exception e) {
+ LOG.error("Couldn't init configuration. ", e);
+ System.exit(1);
+ }
+ }
+
+ protected String getHostName() {
+ String hostName = "localhost";
+ try {
+ hostName = InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ LOG.error(e);
+ }
+ return hostName;
+ }
+
+ protected URI getURI() {
+ URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+ LOG.info(String.format("Web server at %s", uri));
+ return uri;
+ }
+
+ protected HttpServer createHttpServer() throws IOException {
+ ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
+ HashMap<String, Object> params = new HashMap();
+ params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
+ resourceConfig.setPropertiesAndFeatures(params);
+ return HttpServerFactory.create(getURI(), resourceConfig);
+ }
+
+ private void startWebServer() {
+ LOG.info("Starting web server.");
+ this.httpServer.start();
+ }
+
+ private void startAggregatePublisherThread() {
+ LOG.info("Starting aggregated metrics publisher.");
+ AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval);
+ aggregatePublisherThread = new Thread(metricPublisher);
+ aggregatePublisherThread.start();
+ }
+
+ private void startRawPublisherThread() {
+ LOG.info("Starting raw metrics publisher.");
+ AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval);
+ rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher);
+ aggregatePublisherThread.start();
+ }
+
+
+
+ private void stop() {
+ LOG.info("Stopping aggregator application");
+ aggregatePublisherThread.interrupt();
+ rawPublisherThread.interrupt();
+ httpServer.stop(STOP_SECONDS_DELAY);
+ LOG.info("Stopped web server.");
+ try {
+ LOG.info("Waiting for threads to join.");
+ aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
+ rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
+ LOG.info("Gracefully stopped Aggregator Application.");
+ } catch (InterruptedException e) {
+ LOG.error("Received exception during stop : ", e);
+
+ }
+
+ }
+
+ private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+ StringBuilder sb = new StringBuilder();
+ String[] quorumParts = zkQuorum.split(",");
+ String prefix = "";
+ for (String part : quorumParts) {
+ sb.append(prefix);
+ sb.append(part.trim());
+ if (!part.contains(":")) {
+ sb.append(":");
+ sb.append(zkClientPort);
+ }
+ prefix = ",";
+ }
+ return sb.toString();
+ }
+
+ public static void main( String[] args ) throws Exception {
+ if (args.length != 2) {
+ throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " +
+ "2nd - collector hosts separated with coma");
+ }
+
+ final AggregatorApplication app = new AggregatorApplication(args[0], args[1]);
+
+ app.startWebServerAndPublishersThreads();
+
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ app.stop();
+ }
+ });
+ }
+
+ private void startWebServerAndPublishersThreads() {
+ LOG.info("Starting aggregator application");
+ startAggregatePublisherThread();
+ startRawPublisherThread();
+ startWebServer();
+ }
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
new file mode 100644
index 0000000..b151209
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+
+@Singleton
+@Path("/ws/v1/timeline")
+public class AggregatorWebService {
+ TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
+
+ @GET
+ @Produces("text/json")
+ @Path("/metrics")
+ public Response getOkResponse() throws IOException {
+ return Response.ok().build();
+ }
+
+ @POST
+ @Produces(MediaType.TEXT_PLAIN)
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ @Path("/metrics")
+ public Response postMetrics(
+ TimelineMetrics metrics) {
+ metricsHolder.putMetricsForAggregationPublishing(metrics);
+ metricsHolder.putMetricsForRawPublishing(metrics);
+ return Response.ok().build();
+ }
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
new file mode 100644
index 0000000..03b6542
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Singleton class with 2 guava caches for raw and aggregated metrics storing
+ */
+public class TimelineMetricsHolder {
+ private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
+ private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
+ private Cache<String, TimelineMetrics> aggregationMetricsCache;
+ private Cache<String, TimelineMetrics> rawMetricsCache;
+ private static TimelineMetricsHolder instance = null;
+ //to ensure no metric values are expired
+ private static int EXPIRE_DELAY = 30;
+ ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
+ ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
+
+ private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+ this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+ this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+ }
+
+ public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+ if (instance == null) {
+ instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
+ }
+ return instance;
+ }
+
+ /**
+ * Uses default expiration time for caches initialization if they are not initialized yet.
+ * @return
+ */
+ public static TimelineMetricsHolder getInstance() {
+ return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
+ }
+
+ public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
+ aggregationCacheLock.writeLock().lock();
+ aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics);
+ aggregationCacheLock.writeLock().unlock();
+ }
+
+ private String calculateCacheKey(TimelineMetrics timelineMetrics) {
+ List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+ if (metrics.size() > 0) {
+ return metrics.get(0).getAppId() + System.currentTimeMillis();
+ }
+ return String.valueOf(System.currentTimeMillis());
+ }
+
+ public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() {
+ return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
+ }
+
+ public void putMetricsForRawPublishing(TimelineMetrics metrics) {
+ rawCacheLock.writeLock().lock();
+ rawMetricsCache.put(calculateCacheKey(metrics), metrics);
+ rawCacheLock.writeLock().unlock();
+ }
+
+ public Map<String, TimelineMetrics> extractMetricsForRawPublishing() {
+ return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
+ }
+
+ /**
+ * Returns values from cache and clears the cache
+ * @param cache
+ * @param lock
+ * @return
+ */
+ private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) {
+ lock.writeLock().lock();
+ Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+ cache.invalidateAll();
+ lock.writeLock().unlock();
+ return metricsMap;
+ }
+
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
new file mode 100644
index 0000000..5af115f
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable {
+
+ private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
+ private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password";
+ private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type";
+ private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
+ private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
+ private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
+ private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum";
+ private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname";
+ protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+ protected int publishIntervalInSeconds;
+ private Log LOG;
+ protected TimelineMetricsHolder timelineMetricsHolder;
+ protected Configuration configuration;
+ private String collectorProtocol;
+ private String collectorPort;
+ private Collection<String> collectorHosts;
+ private String hostname;
+ private String zkQuorum;
+
+ public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) {
+ LOG = LogFactory.getLog(this.getClass());
+ this.configuration = configuration;
+ this.publishIntervalInSeconds = publishIntervalInSeconds;
+ this.timelineMetricsHolder = timelineMetricsHolder;
+ configure();
+ }
+
+ protected void configure() {
+ collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+ collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1];
+ collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+ zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, "");
+ hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost");
+ collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+ if (collectorHosts.isEmpty()) {
+ LOG.error("No Metric collector configured.");
+ } else {
+ if (collectorProtocol.contains("https")) {
+ String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim();
+ String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim();
+ String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+ loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+ }
+ }
+ }
+
+ /**
+ * Publishes metrics to collector in specified intervals while not interrupted.
+ */
+ @Override
+ public void run() {
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ Thread.sleep(this.publishIntervalInSeconds * 1000);
+ } catch (InterruptedException e) {
+ //Ignore
+ }
+ try {
+ processAndPublishMetrics(getMetricsFromCache());
+ } catch (Exception e) {
+ //ignore
+ }
+ }
+ }
+
+ /**
+ * Processes and sends metrics to collector.
+ * @param metricsFromCache
+ * @throws Exception
+ */
+ protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception {
+ if (metricsFromCache.size()==0) return;
+
+ LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+ emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache));
+ }
+
+ /**
+ * Returns metrics map. Source is based on implementation.
+ * @return
+ */
+ protected abstract Map<String,TimelineMetrics> getMetricsFromCache();
+
+ /**
+ * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+ * @param metricValues
+ * @return
+ */
+ protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues);
+
+ protected abstract String getPostUrl();
+
+ @Override
+ protected String getCollectorUri(String host) {
+ return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort());
+ }
+
+ @Override
+ protected String getCollectorProtocol() {
+ return collectorProtocol;
+ }
+
+ @Override
+ protected String getCollectorPort() {
+ return collectorPort;
+ }
+
+ @Override
+ protected int getTimeoutSeconds() {
+ return DEFAULT_POST_TIMEOUT_SECONDS;
+ }
+
+ @Override
+ protected String getZookeeperQuorum() {
+ return zkQuorum;
+ }
+
+ @Override
+ protected Collection<String> getConfiguredCollectorHosts() {
+ return collectorHosts;
+ }
+
+ @Override
+ protected String getHostname() {
+ return hostname;
+ }
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return false;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return 0;
+ }
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
new file mode 100644
index 0000000..c8dffab
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
+ private static String AGGREGATED_POST_PREFIX = "/aggregated";
+ private Log LOG;
+
+ public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+ super(timelineMetricsHolder, configuration, interval);
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+ /**
+ * get metrics map form @TimelineMetricsHolder
+ * @return
+ */
+ @Override
+ protected Map<String, TimelineMetrics> getMetricsFromCache() {
+ return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ }
+
+ /**
+ * Aggregates given metrics and converts them into json string that will be send to collector
+ * @param metricForAggregationValues
+ * @return
+ */
+ @Override
+ protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) {
+ HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+ for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+ for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+ if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+ nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+ }
+ nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+ }
+ }
+ Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+ for (TimelineMetrics metrics : nameToMetricMap.values()) {
+ double sum = 0;
+ double max = Integer.MIN_VALUE;
+ double min = Integer.MAX_VALUE;
+ int count = 0;
+ for (TimelineMetric metric : metrics.getMetrics()) {
+ for (Double value : metric.getMetricValues().values()) {
+ sum+=value;
+ max = Math.max(max, value);
+ min = Math.min(min, value);
+ count++;
+ }
+ }
+ TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+ tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+ metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+ }
+ String json = null;
+ try {
+ json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+ LOG.debug(json);
+ } catch (Exception e) {
+ LOG.error("Failed to convert result into json", e);
+ }
+
+ return json;
+ }
+
+ @Override
+ protected String getPostUrl() {
+ return BASE_POST_URL + AGGREGATED_POST_PREFIX;
+ }
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
new file mode 100644
index 0000000..89addb7
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisher {
+ private final Log LOG;
+
+ public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+ super(timelineMetricsHolder, configuration, interval);
+ LOG = LogFactory.getLog(this.getClass());
+ }
+
+
+ @Override
+ protected Map<String, TimelineMetrics> getMetricsFromCache() {
+ return timelineMetricsHolder.extractMetricsForRawPublishing();
+ }
+
+ @Override
+ protected String processMetrics(Map<String, TimelineMetrics> metricValues) {
+ //merge everything in one TimelineMetrics object
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ for (TimelineMetrics metrics : metricValues.values()) {
+ for (TimelineMetric timelineMetric : metrics.getMetrics())
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ }
+ //map TimelineMetrics to json string
+ String json = null;
+ try {
+ json = mapper.writeValueAsString(timelineMetrics);
+ LOG.debug(json);
+ } catch (Exception e) {
+ LOG.error("Failed to convert result into json", e);
+ }
+ return json;
+ }
+
+ @Override
+ protected String getPostUrl() {
+ return BASE_POST_URL;
+ }
+}
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
new file mode 100644
index 0000000..ea72d17
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.easymock.EasyMock.createMockBuilder;
+
+
+public class AggregatorApplicationTest {
+ @Test
+ public void testMainNotEnoughArguments() {
+ try {
+ AggregatorApplication.main(new String[0]);
+ throw new Exception("Should not be thrown");
+ } catch (Exception e) {
+ //expected
+ }
+ try {
+ AggregatorApplication.main(new String[1]);
+ throw new Exception("Should not be thrown");
+ } catch (Exception e) {
+ //expected
+ }
+ }
+
+ @Test
+ public void testGetURI() {
+ AggregatorApplication aggregatorApplicationMock = createMockBuilder(AggregatorApplication.class)
+ .withConstructor("", "")
+ .addMockedMethod("createHttpServer")
+ .addMockedMethod("initConfiguration").createMock();
+
+ URI uri = aggregatorApplicationMock.getURI();
+ Assert.assertEquals("http://" + aggregatorApplicationMock.getHostName() + ":61888/", uri.toString());
+ }
+}
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
new file mode 100644
index 0000000..736fd06
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.test.framework.JerseyTest;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import com.sun.jersey.test.framework.spi.container.TestContainerFactory;
+import com.sun.jersey.test.framework.spi.container.grizzly2.GrizzlyTestContainerFactory;
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
+import org.junit.Test;
+
+
+import javax.ws.rs.core.MediaType;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class AggregatorWebServiceTest extends JerseyTest {
+ public AggregatorWebServiceTest() {
+ super(new WebAppDescriptor.Builder(
+ "org.apache.hadoop.metrics2.host.aggregator")
+ .contextPath("jersey-guice-filter")
+ .servletPath("/")
+ .clientConfig(new DefaultClientConfig(JacksonJaxbJsonProvider.class))
+ .build());
+ }
+
+ @Override
+ public TestContainerFactory getTestContainerFactory() {
+ return new GrizzlyTestContainerFactory();
+ }
+
+ @Test
+ public void testOkResponse() {
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+ .accept("text/json")
+ .get(ClientResponse.class);
+ assertEquals(200, response.getStatus());
+ assertEquals("text/json", response.getType().toString());
+ }
+
+ @Test
+ public void testWrongPath() {
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics").path("aggregated")
+ .accept("text/json")
+ .get(ClientResponse.class);
+ assertEquals(404, response.getStatus());
+ }
+
+
+ @Test
+ public void testMetricsPost() {
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ TimelineMetrics timelineMetrics = TimelineMetricsHolderTest.getTimelineMetricsWithAppID("appid");
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+ .accept(MediaType.TEXT_PLAIN)
+ .post(ClientResponse.class, timelineMetrics);
+ assertEquals(200, response.getStatus());
+ assertEquals(MediaType.TEXT_PLAIN, response.getType().toString());
+
+ Map<String, TimelineMetrics> aggregationMap = timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ Map<String, TimelineMetrics> rawMap = timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ Assert.assertEquals(1, aggregationMap.size());
+ Assert.assertEquals(1, rawMap.size());
+
+ Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+ Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+ Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+ Collection<String> rawCollectionKeys = rawMap.keySet();
+
+ for (String key : aggregationCollectionKeys) {
+ Assert.assertTrue(key.contains("appid"));
+ }
+
+ for (String key : rawCollectionKeys) {
+ Assert.assertTrue(key.contains("appid"));
+ }
+
+ Assert.assertEquals(1, aggregationCollection.size());
+ Assert.assertEquals(1, rawCollection.size());
+
+ TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+ TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+ Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+ Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+ Assert.assertEquals("appid", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+ Assert.assertEquals("appid", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+ aggregationMap = timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ rawMap = timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ //Cache should be empty after extraction
+ Assert.assertEquals(0, aggregationMap.size());
+ Assert.assertEquals(0, rawMap.size());
+ }
+
+
+}
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
new file mode 100644
index 0000000..7d8ebf4
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+
+
+public class TimelineMetricsHolderTest {
+ private TimelineMetricsHolder timelineMetricsHolderInstance;
+
+ public void clearHolderSingleton() throws NoSuchFieldException, IllegalAccessException {
+ Class timelineMetricHolderClass = TimelineMetricsHolder.class;
+ Field field = timelineMetricHolderClass.getDeclaredField("instance");
+ field.setAccessible(true);
+ field.set(field, null);
+ }
+
+ @Test
+ public void testGetInstanceDefaultValues() throws Exception {
+ clearHolderSingleton();
+ Assert.assertNotNull(TimelineMetricsHolder.getInstance());
+ }
+
+ @Test
+ public void testGetInstanceWithParameters() throws Exception {
+ clearHolderSingleton();
+ Assert.assertNotNull(TimelineMetricsHolder.getInstance(1,2));
+ }
+
+ @Test
+ public void testCache() throws Exception {
+ clearHolderSingleton();
+ timelineMetricsHolderInstance = TimelineMetricsHolder.getInstance(4,4);
+ timelineMetricsHolderInstance.putMetricsForAggregationPublishing(getTimelineMetricsWithAppID("aggr"));
+ timelineMetricsHolderInstance.putMetricsForRawPublishing(getTimelineMetricsWithAppID("raw"));
+
+ Map<String, TimelineMetrics> aggregationMap = timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+ Map<String, TimelineMetrics> rawMap = timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+ Assert.assertEquals(1, aggregationMap.size());
+ Assert.assertEquals(1, rawMap.size());
+
+ Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+ Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+ Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+ Collection<String> rawCollectionKeys = rawMap.keySet();
+
+ for (String key : aggregationCollectionKeys) {
+ Assert.assertTrue(key.contains("aggr"));
+ }
+
+ for (String key : rawCollectionKeys) {
+ Assert.assertTrue(key.contains("raw"));
+ }
+
+ Assert.assertEquals(1, aggregationCollection.size());
+ Assert.assertEquals(1, rawCollection.size());
+
+ TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+ TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+ Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+ Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+ Assert.assertEquals("aggr", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+ Assert.assertEquals("raw", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+ aggregationMap = timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+ rawMap = timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+ //Cache should be empty after extraction
+ Assert.assertEquals(0, aggregationMap.size());
+ Assert.assertEquals(0, rawMap.size());
+ }
+
+ public static TimelineMetrics getTimelineMetricsWithAppID(String appId) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setAppId(appId);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ return timelineMetrics;
+ }
+}
\ No newline at end of file
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
new file mode 100644
index 0000000..a8ddbee
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class AbstractMetricPublisherTest {
+ @Test
+ public void testProcessAndPublishMetrics() throws Exception {
+ AbstractMetricPublisher publisherMock =
+ createMockBuilder(RawMetricsPublisher.class)
+ .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 60)
+ .addMockedMethod("processMetrics")
+ .addMockedMethod("getCollectorUri")
+ .addMockedMethod("emitMetricsJson")
+ .addMockedMethod("getCurrentCollectorHost").createStrictMock();
+
+ TimelineMetricsHolder.getInstance().putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+ expect(publisherMock.getCurrentCollectorHost()).andReturn("collectorhost").once();
+ expect(publisherMock.getCollectorUri(anyString())).andReturn("https://collectorhost:11/metrics").once();
+ expect(publisherMock.processMetrics(anyObject(Map.class))).andReturn("{metrics}").once();
+ expect(publisherMock.emitMetricsJson("https://collectorhost:11/metrics", "{metrics}")).andReturn(true).once();
+
+ replay(publisherMock);
+
+ publisherMock.processAndPublishMetrics(TimelineMetricsHolder.getInstance().extractMetricsForRawPublishing());
+
+ verify(publisherMock);
+ }
+
+ @Test
+ public void testRunAndStop() throws Exception {
+ AbstractMetricPublisher publisherMock = createMockBuilder(RawMetricsPublisher.class)
+ .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 1)
+ .addMockedMethod("processAndPublishMetrics").createStrictMock();
+ publisherMock.processAndPublishMetrics(anyObject(Map.class));
+ expectLastCall().times(1);
+
+
+ Thread t = createMockBuilder(Thread.class)
+ .withConstructor(publisherMock)
+ .addMockedMethod("isInterrupted").createStrictMock();
+ expect(t.isInterrupted()).andReturn(false).once();
+ expect(t.isInterrupted()).andReturn(true).once();
+
+ replay(publisherMock, t);
+
+ t.start();
+
+ Thread.sleep(2222);
+
+ verify(publisherMock, t);
+ }
+}
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
new file mode 100644
index 0000000..3413052
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class AggregatedMetricsPublisherTest {
+
+ @Test
+ public void testProcessMetrics() throws Exception {
+ Configuration configuration = new Configuration();
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+ metric1App1Metrics.put(1L, 1d);
+ metric1App1Metrics.put(2L, 2d);
+ metric1App1Metrics.put(3L, 3d);
+ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+ TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+ metric2App2Metrics.put(1L, 4d);
+ metric2App2Metrics.put(2L, 5d);
+ metric2App2Metrics.put(3L, 6d);
+ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+ TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+ metric3App3Metrics.put(1L, 7d);
+ metric3App3Metrics.put(2L, 8d);
+ metric3App3Metrics.put(3L, 9d);
+
+ timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+ AggregatedMetricsPublisher aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+ String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing());
+ String expectedMetric1App1 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}";
+ String expectedMetric2App2 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}";
+ String expectedMetric3App3 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}";
+ Assert.assertNotNull(aggregatedJson);
+ Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1));
+ Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3));
+ Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2));
+ }
+
+ @Test
+ public void testGetPostUrl() {
+ Configuration configuration = new Configuration();
+ AggregatedMetricsPublisher aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ String actualURL = aggregatedMetricsPublisher.getPostUrl();
+ String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+ }
+
+ @Test
+ public void testGetCollectorUri() {
+ //default configuration
+ Configuration configuration = new Configuration();
+ AbstractMetricPublisher aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+ String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+
+ //https configuration
+ configuration = new Configuration();
+ configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+ aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+ expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+
+ //custom port configuration
+ configuration = new Configuration();
+ configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+ aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+ expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+ }
+
+ @Test
+ public void testGetMetricsFromCache() throws InterruptedException {
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1"));
+ timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+ timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2"));
+
+ Configuration configuration = new Configuration();
+ AggregatedMetricsPublisher aggregatedMetricsPublisher =
+ new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+ Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache();
+ Assert.assertNotNull(metricsFromCache);
+ Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+ Assert.assertNotNull(actualTimelineMetrics);
+ Assert.assertEquals(2, actualTimelineMetrics.size());
+
+ for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+ List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+ Assert.assertEquals(1, metrics.size());
+ Assert.assertTrue(metrics.get(0).getAppId().contains("aggr"));
+ }
+
+ }
+
+ TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricName);
+ timelineMetric.setAppId(appId);
+ timelineMetric.setMetricValues(metricValues);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ return timelineMetrics;
+ }
+}
\ No newline at end of file
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
new file mode 100644
index 0000000..60510d2
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class RawMetricsPublisherTest {
+ @Test
+ public void testProcessMetrics() throws Exception {
+ Configuration configuration = new Configuration();
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+ metric1App1Metrics.put(1L, 1d);
+ metric1App1Metrics.put(2L, 2d);
+ metric1App1Metrics.put(3L, 3d);
+ timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+ TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+ metric2App2Metrics.put(1L, 4d);
+ metric2App2Metrics.put(2L, 5d);
+ metric2App2Metrics.put(3L, 6d);
+ timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+ TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+ metric3App3Metrics.put(1L, 7d);
+ metric3App3Metrics.put(2L, 8d);
+ metric3App3Metrics.put(3L, 9d);
+
+ timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+ RawMetricsPublisher rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+ String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing());
+ String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}";
+ Assert.assertNotNull(rawJson);
+ Assert.assertEquals(expectedResult, rawJson);
+ }
+
+ @Test
+ public void testGetPostUrl() {
+ Configuration configuration = new Configuration();
+ RawMetricsPublisher rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ String actualURL = rawMetricsPublisher.getPostUrl();
+ String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+ }
+
+ @Test
+ public void testGetCollectorUri() {
+ //default configuration
+ Configuration configuration = new Configuration();
+ AbstractMetricPublisher rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ String actualURL = rawMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+ String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+
+ //https configuration
+ configuration = new Configuration();
+ configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+ rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ actualURL = rawMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+ expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+
+ //custom port configuration
+ configuration = new Configuration();
+ configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+ rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+ actualURL = rawMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+ expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics";
+ Assert.assertNotNull(actualURL);
+ Assert.assertEquals(expectedURL, actualURL);
+ }
+
+ @Test
+ public void testGetMetricsFromCache() throws InterruptedException {
+
+ TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+ timelineMetricsHolder.extractMetricsForAggregationPublishing();
+ timelineMetricsHolder.extractMetricsForRawPublishing();
+
+ timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw1"));
+ timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr"));
+ timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw2"));
+
+ Configuration configuration = new Configuration();
+ RawMetricsPublisher rawMetricsPublisher =
+ new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+ Map<String, TimelineMetrics> metricsFromCache = rawMetricsPublisher.getMetricsFromCache();
+ Assert.assertNotNull(metricsFromCache);
+ Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+ Assert.assertNotNull(actualTimelineMetrics);
+ Assert.assertEquals(2, actualTimelineMetrics.size());
+
+ for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+ List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+ Assert.assertEquals(1, metrics.size());
+ Assert.assertTrue(metrics.get(0).getAppId().contains("raw"));
+ }
+
+ }
+
+ TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+ TimelineMetric timelineMetric = new TimelineMetric();
+ timelineMetric.setMetricName(metricName);
+ timelineMetric.setAppId(appId);
+ timelineMetric.setMetricValues(metricValues);
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
+ timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+ return timelineMetrics;
+ }
+}
diff --git a/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor b/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
index 967e133..4980b8e 100644
--- a/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
+++ b/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
@@ -24,7 +24,7 @@
PIDFILE=/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid
OUTFILE=/var/log/ambari-metrics-monitor/ambari-metrics-monitor.out
-STOP_TIMEOUT=5
+STOP_TIMEOUT=10
OK=0
NOTOK=1
@@ -98,23 +98,26 @@
exit 1
fi
+# Set log directory path
+if [[ -n "${AMS_MONITOR_LOG_DIR}" ]]; then
+ OUTFILE=${AMS_MONITOR_LOG_DIR}/ambari-metrics-monitor.out
+fi
+
#TODO decide if rebuild on each start (pretty quickly) to tolerate major node changes (like kernel update)
#build psutil
if [ ! "$(ls -A ${RESOURCE_MONITORING_DIR}/psutil/build)" ]; then
echo "Building psutil..."
dir=$(pwd)
cd "${RESOURCE_MONITORING_DIR}/psutil"
- ${PYTHON} "setup.py" "build"
+ # build psutil and redirect output to log file
+ echo "--------------------------Building psutil--------------------------" >> ${OUTFILE}
+ ${PYTHON} "setup.py" "build" >> ${OUTFILE}
+ echo "----------------------Finished building psutil---------------------" >> ${OUTFILE}
cd "${dir}"
else
echo "psutil build directory is not empty, continuing..."
fi
-# Set log directory path
-if [[ -n "${AMS_MONITOR_LOG_DIR}" ]]; then
- OUTFILE=${AMS_MONITOR_LOG_DIR}/ambari-metrics-monitor.out
-fi
-
# Set pid directory path
if [[ -n "${AMS_MONITOR_PID_DIR}" ]]; then
PIDFILE=${AMS_MONITOR_PID_DIR}/ambari-metrics-monitor.pid
diff --git a/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini b/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
index 7fe7397..38fff1e 100644
--- a/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
+++ b/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
@@ -27,6 +27,8 @@
[emitter]
send_interval = 60
+kinit_cmd = /usr/bin/kinit -kt /etc/security/keytabs/ams.monitor.keytab amsmon/localhost
+klist_cmd = /usr/bin/klist
[collector]
collector_sleep_interval = 5
diff --git a/ambari-metrics-host-monitoring/pom.xml b/ambari-metrics-host-monitoring/pom.xml
index d6c1fab..e8f8556 100644
--- a/ambari-metrics-host-monitoring/pom.xml
+++ b/ambari-metrics-host-monitoring/pom.xml
@@ -86,6 +86,7 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
<executions>
<execution>
<configuration>
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
new file mode 100644
index 0000000..ba05e9b
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -0,0 +1,112 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import threading
+import subprocess
+import logging
+import urllib2
+
+logger = logging.getLogger()
+class Aggregator(threading.Thread):
+ def __init__(self, config, stop_handler):
+ threading.Thread.__init__(self)
+ self._config = config
+ self._stop_handler = stop_handler
+ self._aggregator_process = None
+ self._sleep_interval = config.get_collector_sleep_interval()
+ self.stopped = False
+
+ def run(self):
+ java_home = self._config.get_java_home()
+ collector_hosts = self._config.get_metrics_collector_hosts_as_string()
+ jvm_agrs = self._config.get_aggregator_jvm_agrs()
+ config_dir = self._config.get_config_dir()
+ class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication"
+ ams_log_file = "ambari-metrics-aggregator.log"
+ additional_classpath = ':{0}'.format(config_dir)
+ ams_log_dir = self._config.ams_monitor_log_dir()
+ hostname = self._config.get_hostname_config()
+ logger.info('Starting Aggregator thread.')
+ cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6} {7}"\
+ .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, hostname, collector_hosts)
+
+ logger.info("Executing : {0}".format(cmd))
+
+ self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True)
+ while not self.stopped:
+ if 0 == self._stop_handler.wait(self._sleep_interval):
+ break
+ pass
+ self.stop()
+
+ def stop(self):
+ self.stopped = True
+ if self._aggregator_process :
+ logger.info('Stopping Aggregator thread.')
+ self._aggregator_process.terminate()
+ self._aggregator_process = None
+
+class AggregatorWatchdog(threading.Thread):
+ SLEEP_TIME = 30
+ CONNECTION_TIMEOUT = 5
+ AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/"
+ def __init__(self, config, stop_handler):
+ threading.Thread.__init__(self)
+ self._config = config
+ self._stop_handler = stop_handler
+ self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+ self._is_ok = threading.Event()
+ self.set_is_ok(True)
+ self.stopped = False
+
+ def run(self):
+ logger.info('Starting Aggregator Watchdog thread.')
+ while not self.stopped:
+ if 0 == self._stop_handler.wait(self.SLEEP_TIME):
+ break
+ try:
+ conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT)
+ self.set_is_ok(True)
+ except (KeyboardInterrupt, SystemExit):
+ raise
+ except Exception, e:
+ self.set_is_ok(False)
+ continue
+ if conn.code != 200:
+ self.set_is_ok(False)
+ continue
+ conn.close()
+
+ def is_ok(self):
+ return self._is_ok.is_set()
+
+ def set_is_ok(self, value):
+ if value == False and self.is_ok() != value:
+ logger.warning("Watcher couldn't connect to aggregator.")
+ self._is_ok.clear()
+ else:
+ self._is_ok.set()
+
+
+ def stop(self):
+ logger.info('Stopping watcher thread.')
+ self.stopped = True
+
+
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
index 0052808..34a6787 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
@@ -66,7 +66,7 @@
del self.app_metric_map[ app_id ]
pass
- def flatten(self, application_id = None, clear_once_flattened = False):
+ def flatten(self, application_id = None, clear_once_flattened = False, set_instanceid = False, instanceid = None):
"""
Return flatten dict to caller in json format.
Json format:
@@ -89,11 +89,14 @@
for appId, metrics in local_metric_map.iteritems():
for metricId, metricData in dict(metrics).iteritems():
# Create a timeline metric object
+ result_instanceid = ""
+ if set_instanceid:
+ result_instanceid = instanceid
timeline_metric = {
"hostname" : self.hostname,
"metricname" : metricId,
"appid" : "HOST",
- "instanceid" : "",
+ "instanceid" : result_instanceid,
"starttime" : self.get_start_time(appId, metricId),
"metrics" : metricData
}
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 5686c50..017ad24 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -30,6 +30,8 @@
# Abstraction for OS-dependent configuration defaults
#
class ConfigDefaults(object):
+ def get_config_dir(self):
+ pass
def get_config_file_path(self):
pass
def get_metric_file_path(self):
@@ -40,11 +42,14 @@
@OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
class ConfigDefaultsWindows(ConfigDefaults):
def __init__(self):
+ self._CONFIG_DIR = "conf"
self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
self._METRIC_FILE_PATH = "conf\\ca.pem"
pass
+ def get_config_dir(self):
+ return self._CONFIG_DIR
def get_config_file_path(self):
return self._CONFIG_FILE_PATH
def get_metric_file_path(self):
@@ -55,11 +60,13 @@
@OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
class ConfigDefaultsLinux(ConfigDefaults):
def __init__(self):
+ self._CONFIG_DIR = "/etc/ambari-metrics-monitor/conf/"
self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
pass
-
+ def get_config_dir(self):
+ return self._CONFIG_DIR
def get_config_file_path(self):
return self._CONFIG_FILE_PATH
def get_metric_file_path(self):
@@ -71,6 +78,7 @@
config = ConfigParser.RawConfigParser()
+CONFIG_DIR = configDefaults.get_config_dir()
CONFIG_FILE_PATH = configDefaults.get_config_file_path()
METRIC_FILE_PATH = configDefaults.get_metric_file_path()
CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
@@ -107,6 +115,8 @@
[emitter]
send_interval = 60
+kinit_cmd = /usr/bin/kinit -kt /etc/security/keytabs/ams.monitor.keytab amsmon/localhost
+klist_cmd = /usr/bin/klist
[collector]
collector_sleep_interval = 5
@@ -191,6 +201,8 @@
# No hostname script identified in the ambari agent conf
pass
pass
+ def get_config_dir(self):
+ return CONFIG_DIR
def getConfig(self):
return self.config
@@ -208,16 +220,26 @@
def get_send_interval(self):
return int(self.get("emitter", "send_interval", 60))
+ def get_kinit_cmd(self):
+ return self.get("emitter", "kinit_cmd")
+
+ def get_klist_cmd(self):
+ return self.get("emitter", "klist_cmd")
+
def get_collector_sleep_interval(self):
return int(self.get("collector", "collector_sleep_interval", 10))
def get_hostname_config(self):
return self.get("default", "hostname", None)
- def get_metrics_collector_hosts(self):
+ def get_metrics_collector_hosts_as_list(self):
hosts = self.get("default", "metrics_servers", "localhost")
return hosts.split(",")
+ def get_metrics_collector_hosts_as_string(self):
+ hosts = self.get("default", "metrics_servers", "localhost")
+ return hosts
+
def get_failover_strategy(self):
return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY)
@@ -239,9 +261,32 @@
def is_server_https_enabled(self):
return "true" == str(self.get("collector", "https_enabled")).lower()
+ def get_java_home(self):
+ return self.get("aggregation", "java_home")
+
+ def is_inmemory_aggregation_enabled(self):
+ return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+
+ def get_inmemory_aggregation_port(self):
+ return self.get("aggregation", "host_in_memory_aggregation_port")
+
+ def get_aggregator_jvm_agrs(self):
+ hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
+ return hosts
+
+ def ams_monitor_log_dir(self):
+ hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
+ return hosts
+
+ def is_set_instanceid(self):
+ return "true" == str(self.get("default", "set.instanceId", 'false')).lower()
+
def get_server_host(self):
return self.get("collector", "host")
+ def get_instanceid(self):
+ return self.get("default", "instanceid")
+
def get_server_port(self):
try:
return int(self.get("collector", "port"))
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index c0feed5..d161269 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -27,6 +27,9 @@
from metric_collector import MetricsCollector
from emitter import Emitter
from host_info import HostInfo
+from aggregator import Aggregator
+from aggregator import AggregatorWatchdog
+
logger = logging.getLogger()
@@ -50,11 +53,15 @@
self.initialize_events_cache()
self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
self._t = None
+ self.aggregator = None
+ self.aggregator_watchdog = None
def run(self):
logger.info('Running Controller thread: %s' % threading.currentThread().getName())
self.start_emitter()
+ if self.config.is_inmemory_aggregation_enabled():
+ self.start_aggregator_with_watchdog()
# Wake every 5 seconds to push events to the queue
while True:
@@ -62,6 +69,10 @@
logger.warn('Event Queue full!! Suspending further collections.')
else:
self.enqueque_events()
+ # restart aggregator if needed
+ if self.config.is_inmemory_aggregation_enabled() and not self.aggregator_watchdog.is_ok():
+ logger.warning("Aggregator is not available. Restarting aggregator.")
+ self.start_aggregator_with_watchdog()
pass
# Wait for the service stop event instead of sleeping blindly
if 0 == self._stop_handler.wait(self.sleep_interval):
@@ -75,6 +86,12 @@
# The emitter thread should have stopped by now, just ensure it has shut
# down properly
self.emitter.join(5)
+
+ if self.config.is_inmemory_aggregation_enabled():
+ self.aggregator.stop()
+ self.aggregator_watchdog.stop()
+ self.aggregator.join(5)
+ self.aggregator_watchdog.join(5)
pass
# TODO: Optimize to not use Timer class and use the Queue instead
@@ -115,3 +132,14 @@
def start_emitter(self):
self.emitter.start()
+
+ # Start aggregator and watcher threads
+ def start_aggregator_with_watchdog(self):
+ if self.aggregator:
+ self.aggregator.stop()
+ if self.aggregator_watchdog:
+ self.aggregator_watchdog.stop()
+ self.aggregator = Aggregator(self.config, self._stop_handler)
+ self.aggregator_watchdog = AggregatorWatchdog(self.config, self._stop_handler)
+ self.aggregator.start()
+ self.aggregator_watchdog.start()
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index ba3f18e..f19434d 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -24,6 +24,7 @@
from security import CachedHTTPSConnection, CachedHTTPConnection
from blacklisted_set import BlacklistedSet
from config_reader import ROUND_ROBIN_FAILOVER_STRATEGY
+from spnego_kerberos_auth import SPNEGOKerberosAuth
logger = logging.getLogger()
@@ -31,6 +32,10 @@
AMS_METRICS_POST_URL = "/ws/v1/timeline/metrics/"
RETRY_SLEEP_INTERVAL = 5
MAX_RETRY_COUNT = 3
+ cookie_cached = {}
+ kinit_cmd = None
+ klist_cmd = None
+ spnego_krb_auth = None
"""
Wake up every send interval seconds and empty the application metric map.
"""
@@ -39,13 +44,25 @@
logger.debug('Initializing Emitter thread.')
self.lock = threading.Lock()
self.send_interval = config.get_send_interval()
+ self.kinit_cmd = config.get_kinit_cmd()
+ if self.kinit_cmd:
+ logger.debug(self.kinit_cmd)
+ self.klist_cmd = config.get_klist_cmd()
self.hostname = config.get_hostname_config()
self.hostname_hash = self.compute_hash(self.hostname)
self._stop_handler = stop_handler
self.application_metric_map = application_metric_map
self.collector_port = config.get_server_port()
- self.all_metrics_collector_hosts = config.get_metrics_collector_hosts()
+ self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list()
self.is_server_https_enabled = config.is_server_https_enabled()
+ self.set_instanceid = config.is_set_instanceid()
+ self.instanceid = config.get_instanceid()
+ self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled()
+
+ if self.is_inmemory_aggregation_enabled:
+ self.collector_port = config.get_inmemory_aggregation_port()
+ self.all_metrics_collector_hosts = ['localhost']
+ self.is_server_https_enabled = False
if self.is_server_https_enabled:
self.ca_certs = config.get_ca_certs()
@@ -63,6 +80,7 @@
self.submit_metrics()
except Exception, e:
logger.warn('Unable to emit events. %s' % str(e))
+ self.cookie_cached = {}
pass
#Wait for the service stop event instead of sleeping blindly
if 0 == self._stop_handler.wait(self.send_interval):
@@ -74,7 +92,7 @@
# This call will acquire lock on the map and clear contents before returning
# After configured number of retries the data will not be sent to the
# collector
- json_data = self.application_metric_map.flatten(None, True)
+ json_data = self.application_metric_map.flatten(None, True, set_instanceid=self.set_instanceid, instanceid=self.instanceid)
if json_data is None:
logger.info("Nothing to emit, resume waiting.")
return
@@ -100,17 +118,50 @@
headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
connection = self.get_connection(collector_host)
logger.debug("message to send: %s" % data)
+
+ try:
+ if self.cookie_cached[connection.host]:
+ headers["Cookie"] = self.cookie_cached[connection.host]
+ logger.debug("Cookie: %s" % self.cookie_cached[connection.host])
+ except Exception, e:
+ self.cookie_cached = {}
+ pass
+
retry_count = 0
while retry_count < self.MAX_RETRY_COUNT:
response = self.get_response_from_submission(connection, data, headers)
- if response and response.status == 200:
- return True
- else:
- logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
- retry_count += 1
- #Wait for the service stop event instead of sleeping blindly
- if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
+ if response:
+ if response.status == 200:
return True
+ if response.status == 401:
+ self.cookie_cached = {}
+ auth_header = response.getheader('www-authenticate', None)
+ if auth_header == None:
+ logger.warn('www-authenticate header not found')
+ else:
+ self.spnego_krb_auth = SPNEGOKerberosAuth()
+ if self.spnego_krb_auth.get_negotiate_value(auth_header) == '':
+ response = self.spnego_krb_auth.authenticate_handshake(connection, "POST", self.AMS_METRICS_POST_URL, data, headers, self.kinit_cmd, self.klist_cmd)
+ if response:
+ logger.debug("response from authenticate_client: retcode = {0}, reason = {1}"
+ .format(response.status, response.reason))
+ logger.debug(str(response.read()))
+ if response.status == 200:
+ logger.debug("response headers: {0}".format(response.getheaders()))
+ logger.debug("cookie_cached: %s" % self.cookie_cached)
+ set_cookie_header = response.getheader('set-cookie', None)
+ if set_cookie_header and self.spnego_krb_auth:
+ set_cookie_val = self.spnego_krb_auth.get_hadoop_auth_cookie(set_cookie_header)
+ logger.debug("set_cookie: %s" % set_cookie_val)
+ if set_cookie_val:
+ self.cookie_cached[connection.host] = set_cookie_val
+ return True
+ #No response or failed
+ logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
+ retry_count += 1
+ #Wait for the service stop event instead of sleeping blindly
+ if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
+ return True
pass
if retry_count >= self.MAX_RETRY_COUNT:
@@ -142,6 +193,7 @@
return response
except Exception, e:
logger.warn('Error sending metrics to server. %s' % str(e))
+ self.cookie_cached = {}
return None
def get_collector_host_shard(self):
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/krberr.py b/ambari-metrics-host-monitoring/src/main/python/core/krberr.py
new file mode 100644
index 0000000..c5e0163
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/krberr.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+"""
+Python Kerberos GSS APIs used by spnego_kerberos_auth.py.
+It is used as a place holder for kerberos.py which is not available.
+"""
+
+class KrbError(Exception):
+ pass
+
+class GSSError(KrbError):
+ pass
+
+def authGSSClientInit(service):
+ pass
+
+def authGSSClientClean(context):
+ pass
+
+def authGSSClientStep(context, challenge):
+ pass
+
+def authGSSClientResponse(context):
+ pass
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/spnego_kerberos_auth.py b/ambari-metrics-host-monitoring/src/main/python/core/spnego_kerberos_auth.py
new file mode 100644
index 0000000..e49712f
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/spnego_kerberos_auth.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import httplib
+import os
+
+logger = logging.getLogger()
+try:
+ import kerberos
+except ImportError:
+ import krberr as kerberos
+ logger.warn('import kerberos exception: %s' % str(ImportError))
+pass
+
+class SPNEGOKerberosAuth:
+ def __init__(self):
+ self.krb_context = None
+
+ def authenticate_handshake (self, connection, method, service_url, body, headers, kinit_cmd, klist_cmd):
+ # kinit to ensure ticket valid
+ self.execute_kinit(kinit_cmd, klist_cmd)
+
+ try:
+ # Authenticate the client request
+ response = self.authenticate_client(connection, method, service_url, body, headers)
+
+ # Authenticate the response from the server
+ if response:
+ self.authenticate_server(response)
+ return response
+ finally:
+ # Clean the client context after the handshake
+ self.clean_client_context()
+ pass
+
+ def execute_kinit(self, kinit_cmd, klist_cmd):
+ exit_status = os.system(kinit_cmd)
+ logger.debug("kinit exit_status: {0}".format(exit_status))
+ logger.debug(os.system(klist_cmd))
+ return exit_status
+
+ def authenticate_client(self, connection, method, service_url, body, headers):
+ service = "HTTP@%s" % connection.host.lower()
+ logger.debug("connection: %s", connection)
+ logger.debug("service: %s", service)
+
+ auth_header = self.get_authorization_header(service)
+ logger.debug("Authorization: %s" % auth_header)
+
+ # Send 2nd HTTP request with authorization header
+ headers['Authorization'] = auth_header
+ try:
+ connection.request(method, service_url, body, headers)
+ response = connection.getresponse()
+ except Exception, e:
+ logger.warn('2nd HTTP request exception from server: %s' % str(e))
+ return None
+ pass
+ if response:
+ logger.debug("2nd HTTP response from server: retcode = {0}, reason = {1}"
+ .format(response.status, response.reason))
+ logger.debug(str(response.read()))
+ logger.debug("response headers: {0}".format(response.getheaders()))
+ return response
+
+ def get_authorization_header(self, service):
+ # Initialize the context object for client-side authentication with a service principal
+ try:
+ result, self.krb_context = kerberos.authGSSClientInit(service)
+ if result == -1:
+ logger.warn('authGSSClientInit result: {0}'.format(result))
+ return None
+ except kerberos.GSSError, e:
+ logger.warn('authGSSClientInit exception: %s' % str(e))
+ return None
+ pass
+
+ # Process the first client-side step with the context
+ try:
+ result = kerberos.authGSSClientStep(self.krb_context, "")
+ if result == -1:
+ logger.warn('authGSSClientStep result for authenticate client: {0}'.format(result))
+ return None
+ except kerberos.GSSError, e:
+ logger.warn('authGSSClientStep exception for authenticate client: %s' % str(e))
+ return None
+ pass
+
+ # Get the client response from the first client-side step
+ try:
+ negotiate_value = kerberos.authGSSClientResponse(self.krb_context)
+ logger.debug("authGSSClientResponse response:{0}".format(negotiate_value))
+ except kerberos.GSSError, e:
+ logger.warn('authGSSClientResponse exception: %s' % str(e))
+ return None
+ pass
+
+ # Build the authorization header
+ return "Negotiate %s" % negotiate_value
+
+ def authenticate_server(self, response):
+ auth_header = response.getheader('www-authenticate', None)
+ negotiate_value = self.get_negotiate_value(auth_header)
+ if negotiate_value == None:
+ logger.warn('www-authenticate header not found')
+
+ # Process the client-side step with the context and the negotiate value from 2nd HTTP response
+ try:
+ result = kerberos.authGSSClientStep(self.krb_context, negotiate_value)
+ if result == -1:
+ logger.warn('authGSSClientStep result for authenticate server: {0}'.format(result))
+ except kerberos.GSSError, e:
+ logger.warn('authGSSClientStep exception for authenticate server: %s' % str(e))
+ result = -1
+ pass
+ return result
+
+ def clean_client_context(self):
+ # Destroy the context for client-side authentication
+ try:
+ result = kerberos.authGSSClientClean(self.krb_context)
+ logger.debug("authGSSClientClean result:{0}".format(result))
+ except kerberos.GSSError, e:
+ logger.warn('authGSSClientClean exception: %s' % str(e))
+ result = -1
+ pass
+ return result
+
+ def get_hadoop_auth_cookie(self, set_cookie_header):
+ if set_cookie_header:
+ for field in set_cookie_header.split(";"):
+ if field.startswith('hadoop.auth='):
+ return field
+ else:
+ return None
+ return None
+
+ def get_negotiate_value(self, auth_header):
+ if auth_header:
+ for field in auth_header.split(","):
+ key, __, value = field.strip().partition(" ")
+ if key.lower() == "negotiate":
+ return value.strip()
+ else:
+ return None
+ return None
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
index bfb6957..7a9fbec 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
@@ -117,7 +117,8 @@
def wait(self, timeout=None):
# Stop process when stop event received
- if self.stop_event.wait(timeout):
+ self.stop_event.wait(timeout)
+ if self.stop_event.isSet():
logger.info("Stop event received")
return 0
# Timeout
diff --git a/ambari-metrics-host-monitoring/src/main/python/main.py b/ambari-metrics-host-monitoring/src/main/python/main.py
index d218015..53d27f8 100644
--- a/ambari-metrics-host-monitoring/src/main/python/main.py
+++ b/ambari-metrics-host-monitoring/src/main/python/main.py
@@ -21,7 +21,7 @@
import logging
import os
import sys
-
+import signal
from ambari_commons.os_utils import remove_file
from core.controller import Controller
@@ -73,6 +73,10 @@
if scmStatus is not None:
scmStatus.reportStarted()
+ # For some reason this is needed to catch system signals like SIGTERM
+ # TODO fix if possible
+ signal.pause()
+
#The controller thread finishes when the stop event is signaled
controller.join()
diff --git a/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py b/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
index 4056ae3..8b3eb66 100644
--- a/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
+++ b/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
@@ -27,6 +27,7 @@
from mock.mock import patch, MagicMock
from security import CachedHTTPConnection
from blacklisted_set import BlacklistedSet
+from spnego_kerberos_auth import SPNEGOKerberosAuth
if get_platform() != PLATFORM_WINDOWS:
os_distro_value = ('Suse','11','Final')
@@ -86,6 +87,29 @@
self.assertEqual(request_mock.call_count, 3)
self.assertUrlData(request_mock)
+ @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+ @patch.object(CachedHTTPConnection, "create_connection", new = MagicMock())
+ @patch.object(SPNEGOKerberosAuth, "authenticate_handshake")
+ @patch.object(CachedHTTPConnection, "getresponse")
+ @patch.object(CachedHTTPConnection, "request")
+ def test_spnego_negotiation(self, request_mock, getresponse_mock, auth_mock):
+ request_mock.return_value = MagicMock()
+ getresponse_mock.return_value.status = 401
+ getresponse_mock.return_value.getheader.return_value = "Negotiate "
+
+ auth_mock.return_value.status = 200
+
+ stop_handler = bind_signal_handlers()
+
+ config = Configuration()
+ application_metric_map = ApplicationMetricMap("host","10.10.10.10")
+ application_metric_map.clear()
+ application_metric_map.put_metric("APP1", {"metric1":1}, 1)
+ emitter = Emitter(config, application_metric_map, stop_handler)
+ emitter.submit_metrics()
+
+
+ self.assertEqual(request_mock.call_count, 1)
def assertUrlData(self, request_mock):
self.assertEqual(len(request_mock.call_args), 2)
data = request_mock.call_args[0][2]
diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 9d492cb..e126016 100644
--- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -70,6 +70,10 @@
private static final String TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PATH_PROPERTY;
private static final String TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_TYPE_PROPERTY;
private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
private static final String TIMELINE_DEFAULT_HOST = "localhost";
private static final String TIMELINE_DEFAULT_PORT = "6188";
private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -87,11 +91,15 @@
private TimelineMetricsCache metricsCache;
private int timeoutSeconds = 10;
private String zookeeperQuorum = null;
+ private boolean setInstanceId;
+ private String instanceId;
private String[] excludedMetricsPrefixes;
private String[] includedMetricsPrefixes;
// Local cache to avoid prefix matching everytime
private Set<String> excludedMetrics = new HashSet<>();
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -128,6 +136,17 @@
return hostname;
}
+
+ @Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
public void setMetricsCache(TimelineMetricsCache metricsCache) {
this.metricsCache = metricsCache;
}
@@ -162,6 +181,11 @@
collectorHosts = parseHostsStringIntoCollection(props.getString(TIMELINE_HOSTS_PROPERTY, TIMELINE_DEFAULT_HOST));
metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
+ instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY, null);
+ setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY, false);
+
+ hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
+ hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
if (metricCollectorProtocol.contains("https")) {
@@ -315,6 +339,9 @@
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostname);
+ if (setInstanceId) {
+ timelineMetric.setInstanceId(instanceId);
+ }
timelineMetric.setAppId(component);
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
@@ -379,8 +406,10 @@
final String sanitizedName = sanitizeName(name);
try {
- cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value())));
- populateMetricsList(context, MetricType.GAUGE, sanitizedName);
+ if (!isExcludedMetric(sanitizedName)) {
+ cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value())));
+ populateMetricsList(context, MetricType.GAUGE, sanitizedName);
+ }
} catch (NumberFormatException ex) {
LOG.debug(ex.getMessage());
}
diff --git a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index e1ac48c..b05190c 100644
--- a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -84,6 +84,8 @@
properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+ properties.setProperty("kafka.timeline.metrics.instanceId", "cluster");
+ properties.setProperty("kafka.timeline.metrics.set.instanceId", "false");
props = new VerifiableProperties(properties);
}
@@ -120,6 +122,8 @@
properties.setProperty("kafka.timeline.metrics.truststore.path", "");
properties.setProperty("kafka.timeline.metrics.truststore.type", "");
properties.setProperty("kafka.timeline.metrics.truststore.password", "");
+ properties.setProperty("kafka.timeline.metrics.instanceId", "cluster");
+ properties.setProperty("kafka.timeline.metrics.set.instanceId", "false");
kafkaTimelineMetricsReporter.init(new VerifiableProperties(properties));
kafkaTimelineMetricsReporter.stopReporter();
verifyAll();
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 9a55f10..d408e1a 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -50,9 +50,13 @@
private Collection<String> collectorHosts;
private String zkQuorum;
private String protocol;
+ private boolean setInstanceId;
+ private String instanceId;
private NimbusClient nimbusClient;
private String applicationId;
private int timeoutSeconds;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
public StormTimelineMetricsReporter() {
@@ -94,6 +98,16 @@
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map conf) {
LOG.info("Preparing Storm Metrics Reporter");
try {
@@ -126,6 +140,12 @@
Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
DEFAULT_POST_TIMEOUT_SECONDS;
applicationId = cf.get(APP_ID).toString();
+ if (cf.containsKey(SET_INSTANCE_ID_PROPERTY)) {
+ setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
+ instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
+ }
+ hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString());
+ hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString());
collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
if (protocol.contains("https")) {
@@ -196,6 +216,9 @@
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostname);
+ if (setInstanceId) {
+ timelineMetric.setInstanceId(instanceId);
+ }
timelineMetric.setAppId(component);
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 60c1427..ff72f24 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -59,6 +59,10 @@
private String port;
private String topologyName;
private String applicationId;
+ private boolean setInstanceId;
+ private String instanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -96,6 +100,16 @@
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
LOG.info("Preparing Storm Metrics Sink");
try {
@@ -122,6 +136,10 @@
protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
port = configuration.getProperty(COLLECTOR_PORT, "6188");
+ instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
+ setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
// Initialize the collector write strategy
super.init();
@@ -243,6 +261,9 @@
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostName);
+ if (setInstanceId) {
+ timelineMetric.setInstanceId(instanceId);
+ }
timelineMetric.setAppId(applicationId);
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.setType(ClassUtils.getShortCanonicalName(
diff --git a/ambari-metrics-storm-sink/pom.xml b/ambari-metrics-storm-sink/pom.xml
index 612ad63..9e11539 100644
--- a/ambari-metrics-storm-sink/pom.xml
+++ b/ambari-metrics-storm-sink/pom.xml
@@ -31,7 +31,7 @@
<packaging>jar</packaging>
<properties>
- <storm.version>1.1.0-SNAPSHOT</storm.version>
+ <storm.version>1.1.0</storm.version>
</properties>
<build>
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 535fae0..5b75065 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -46,8 +46,12 @@
private Collection<String> collectorHosts;
private String zkQuorum;
private String protocol;
+ private boolean setInstanceId;
+ private String instanceId;
private String applicationId;
private int timeoutSeconds;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
public StormTimelineMetricsReporter() {
@@ -89,6 +93,16 @@
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Object registrationArgument) {
LOG.info("Preparing Storm Metrics Reporter");
try {
@@ -115,6 +129,11 @@
Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS)) :
DEFAULT_POST_TIMEOUT_SECONDS;
applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
+ setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
+ instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
+
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
if (protocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
@@ -226,6 +245,9 @@
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostname);
+ if (setInstanceId) {
+ timelineMetric.setInstanceId(instanceId);
+ }
timelineMetric.setAppId(component);
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index f58f549..4d5a229 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -68,6 +68,10 @@
private String port;
private String topologyName;
private String applicationId;
+ private String instanceId;
+ private boolean setInstanceId;
+ private boolean hostInMemoryAggregationEnabled;
+ private int hostInMemoryAggregationPort;
@Override
protected String getCollectorUri(String host) {
@@ -105,6 +109,16 @@
}
@Override
+ protected boolean isHostInMemoryAggregationEnabled() {
+ return hostInMemoryAggregationEnabled;
+ }
+
+ @Override
+ protected int getHostInMemoryAggregationPort() {
+ return hostInMemoryAggregationPort;
+ }
+
+ @Override
public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
LOG.info("Preparing Storm Metrics Sink");
try {
@@ -133,6 +147,11 @@
protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
port = configuration.getProperty(COLLECTOR_PORT, "6188");
+ instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
+ setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
// Initialize the collector write strategy
super.init();
@@ -332,6 +351,9 @@
TimelineMetric timelineMetric = new TimelineMetric();
timelineMetric.setMetricName(attributeName);
timelineMetric.setHostName(hostName);
+ if (setInstanceId) {
+ timelineMetric.setInstanceId(instanceId);
+ }
timelineMetric.setAppId(applicationId);
timelineMetric.setStartTime(currentTimeMillis);
timelineMetric.setType(ClassUtils.getShortCanonicalName(
diff --git a/ambari-metrics-timelineservice/conf/unix/metrics_whitelist b/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
index bd36429..2edac39 100644
--- a/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
+++ b/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
@@ -4,6 +4,15 @@
BytesReceivedLast5Minutes
BytesSentLast5Minutes
ChannelSize
+Counter.%.CacheMisses
+Counter.CacheHits
+Counter.CacheMisses
+Counter.ReadAllQuery
+Counter.ReadAllQuery.%
+Counter.ReadAllQuery.HostRoleCommandEntity
+DataModifyQuery
+DirectReadQuery
+DoesExistQuery
EventPutSuccessCount
EventTakeSuccessCount
FSDatasetState.org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.Capacity
@@ -13,8 +22,21 @@
FlowFilesReceivedLast5Minutes
FlowFilesSentLast5Minutes
Free Slots
+InsertObjectQuery
+ReadAllQuery
+ReadAllQuery.HostRoleCommandEntity
+ReadObjectQuery
Supervisors
TimelineMetricStoreWatcher.FakeMetric
+Timer.ObjectBuilding
+Timer.QueryPreparation
+Timer.ReadAllQuery
+Timer.ReadAllQuery.%
+Timer.ReadAllQuery.HostRoleCommandEntity
+Timer.RowFetch
+Timer.SqlGeneration
+Timer.SqlPrepare
+Timer.StatementExecute
Topologies
Total Executors
Total Slots
@@ -79,6 +101,18 @@
dfs.FSNamesystem.TransactionsSinceLastCheckpoint
dfs.FSNamesystem.TransactionsSinceLastLogRoll
dfs.FSNamesystem.UnderReplicatedBlocks
+dfs.NNTopUserOpCounts.windowMs=1500000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=1500000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=1500000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=1500000.op=__%.user=%
+dfs.NNTopUserOpCounts.windowMs=300000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=300000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%
+dfs.NNTopUserOpCounts.windowMs=60000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=60000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=60000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=60000.op=__%.user=%
dfs.datanode.BlocksRead
dfs.datanode.BlocksWritten
dfs.datanode.DatanodeNetworkErrors
@@ -94,6 +128,37 @@
disk_free
disk_total
disk_used
+druid/broker.*.%.query/time
+druid/broker.heap.jvm/mem/max
+druid/broker.heap.jvm/mem/used
+druid/broker.jvm/gc/time
+druid/coordinator.heap.jvm/mem/max
+druid/coordinator.heap.jvm/mem/used
+druid/coordinator.jvm/gc/time
+druid/historical.*.%.query/segment/time
+druid/historical.*.%.query/time
+druid/historical.*.%.query/wait/time
+druid/historical.heap.jvm/mem/max
+druid/historical.heap.jvm/mem/used
+druid/historical.jvm/gc/time
+druid/historical.segment/scan/pending
+druid/middlemanager.*.%.query/segment/time
+druid/middlemanager.*.%.query/time
+druid/middlemanager.*.%.query/wait/time
+druid/middlemanager.*.ingest/events/processed
+druid/middlemanager.*.ingest/events/thrownAway
+druid/middlemanager.*.ingest/events/unparseable
+druid/middlemanager.*.ingest/persists/count
+druid/middlemanager.*.ingest/persists/time
+druid/middlemanager.*.ingest/rows/output
+druid/middlemanager.heap.jvm/mem/max
+druid/middlemanager.heap.jvm/mem/used
+druid/middlemanager.jvm/gc/time
+druid/middlemanager.segment/scan/pending
+druid/overlord.*.segment/added/bytes
+druid/overlord.heap.jvm/mem/max
+druid/overlord.heap.jvm/mem/used
+druid/overlord.jvm/gc/time
executors.ExecutorMetrics.ExecutorAvailableFreeSlots
executors.ExecutorMetrics.ExecutorAvailableFreeSlotsPercent
executors.ExecutorMetrics.ExecutorCacheMemoryPerInstance
@@ -118,14 +183,14 @@
executors.ExecutorMetrics.ExecutorTotalRejectedRequests
executors.ExecutorMetrics.ExecutorTotalRequestsHandled
executors.ExecutorMetrics.ExecutorTotalSuccess
-gc.ConcurrentMarkSweep.count
-gc.ConcurrentMarkSweep.time
-gc.ParNew.count
-gc.ParNew.time
+filter.error.grok
+filter.error.keyvalue
+input.files.count
+input.files.read_bytes
+input.files.read_lines
io.IOMetrics.MaxDecodingTime
io.IOMetrics.PercentileDecodingTime_30s50thPercentileLatency
io.IOMetrics.PercentileDecodingTime_30s90thPercentileLatency
-io.IOMetrics.PercentileDecodingTime_30s95thPercentileLatency
io.IOMetrics.PercentileDecodingTime_30s99thPercentileLatency
ipc.client.org.apache.hadoop.ipc.DecayRpcScheduler.Caller(*).Priority
ipc.client.org.apache.hadoop.ipc.DecayRpcScheduler.Caller(*).Volume
@@ -151,6 +216,10 @@
jvm.JvmMetrics.ThreadsTerminated
jvm.JvmMetrics.ThreadsTimedWaiting
jvm.JvmMetrics.ThreadsWaiting
+jvm.LlapDaemonJVMMetrics.LlapDaemonDirectBufferMemoryUsed
+jvm.LlapDaemonJVMMetrics.LlapDaemonDirectBufferTotalCapacity
+jvm.LlapDaemonJVMMetrics.LlapDaemonMappedBufferMemoryUsed
+jvm.LlapDaemonJVMMetrics.LlapDaemonMappedBufferTotalCapacity
jvm.Master.JvmMetrics.ThreadsBlocked
jvm.Master.JvmMetrics.ThreadsNew
jvm.Master.JvmMetrics.ThreadsRunnable
@@ -177,8 +246,23 @@
jvm.RegionServer.JvmMetrics.ThreadsWaiting
jvm.daemon_thread_count
jvm.file_descriptor_usage
+jvm.gc.ConcurrentMarkSweep.count
+jvm.gc.ConcurrentMarkSweep.time
+jvm.gc.ParNew.count
+jvm.gc.ParNew.time
jvm.heap_usage
+jvm.memory.heap.committed
+jvm.memory.heap.max
+jvm.memory.heap.used
+jvm.memory.non-heap.committed
+jvm.memory.non-heap.max
+jvm.memory.non-heap.used
jvm.thread_count
+jvm.threads.blocked.count
+jvm.threads.count
+jvm.threads.daemon.count
+jvm.threads.deadlock.count
+jvm.threads.runnable.count
jvm.uptime
kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate
kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.count
@@ -236,12 +320,8 @@
mem_shared
mem_total
mem_used
-memory.heap.committed
-memory.heap.max
-memory.heap.used
-memory.non-heap.committed
-memory.non-heap.max
-memory.non-heap.used
+output.solr.write_bytes
+output.solr.write_logs
pkts_in
pkts_out
proc_run
@@ -250,6 +330,24 @@
read_bytes
read_count
read_time
+regionserver.IO.FsPReadTime_75th_percentile
+regionserver.IO.FsPReadTime_95th_percentile
+regionserver.IO.FsPReadTime_99th_percentile
+regionserver.IO.FsPReadTime_max
+regionserver.IO.FsPReadTime_mean
+regionserver.IO.FsPReadTime_median
+regionserver.IO.FsReadTime_75th_percentile
+regionserver.IO.FsReadTime_95th_percentile
+regionserver.IO.FsReadTime_99th_percentile
+regionserver.IO.FsReadTime_max
+regionserver.IO.FsReadTime_mean
+regionserver.IO.FsWriteTime_75th_percentile
+regionserver.IO.FsWriteTime_95th_percentile
+regionserver.IO.FsWriteTime_99th_percentile
+regionserver.IO.FsWriteTime_max
+regionserver.IO.FsWriteTime_mean
+regionserver.IO.FsWriteTime_median
+regionserver.IO.fsChecksumFailureCount
regionserver.RegionServer.ProcessCallTime_75th_percentile
regionserver.RegionServer.ProcessCallTime_95th_percentile
regionserver.RegionServer.ProcessCallTime_99th_percentile
@@ -441,13 +539,42 @@
rpc.rpc.datanode.RpcSlowCalls
rpcdetailed.rpcdetailed.client.AddBlockAvgTime
rpcdetailed.rpcdetailed.client.AddBlockNumOps
+solr.admin.info.jvm.memory.used
+solr.admin.info.system.processCpuLoad
+solr.admin.mbeans.cache.documentCache.hitratio
+solr.admin.mbeans.cache.documentCache.size
+solr.admin.mbeans.cache.documentCache.warmupTime
+solr.admin.mbeans.cache.filterCache.hitratio
+solr.admin.mbeans.cache.filterCache.size
+solr.admin.mbeans.cache.filterCache.warmupTime
+solr.admin.mbeans.cache.queryResultCache.hitratio
+solr.admin.mbeans.cache.queryResultCache.size
+solr.admin.mbeans.cache.queryResultCache.warmupTime
+solr.admin.mbeans.queryHandler.browse.avgTimePerRequest
+solr.admin.mbeans.queryHandler.browse.requests
+solr.admin.mbeans.queryHandler.export.avgTimePerRequest
+solr.admin.mbeans.queryHandler.export.requests
+solr.admin.mbeans.queryHandler.get.avgTimePerRequest
+solr.admin.mbeans.queryHandler.get.requests
+solr.admin.mbeans.queryHandler.query.avgTimePerRequest
+solr.admin.mbeans.queryHandler.query.requests
+solr.admin.mbeans.queryHandler.select.15minRateReqsPerSecond
+solr.admin.mbeans.queryHandler.select.5minRateReqsPerSecond
+solr.admin.mbeans.queryHandler.select.75thPcRequestTime
+solr.admin.mbeans.queryHandler.select.95thPcRequestTime
+solr.admin.mbeans.queryHandler.select.999thPcRequestTime
+solr.admin.mbeans.queryHandler.select.99thPcRequestTime
+solr.admin.mbeans.queryHandler.select.avgRequestsPerSecond
+solr.admin.mbeans.queryHandler.select.avgTimePerRequest
+solr.admin.mbeans.queryHandler.select.medianRequestTime
+solr.admin.mbeans.queryHandler.select.requests
+solr.admin.mbeans.updateHandler.adds
+solr.admin.mbeans.updateHandler.deletesById
+solr.admin.mbeans.updateHandler.deletesByQuery
+solr.admin.mbeans.updateHandler.docsPending
+solr.admin.mbeans.updateHandler.errors
swap_free
swap_total
-threads.blocked.count
-threads.count
-threads.daemon.count
-threads.deadlock.count
-threads.runnable.count
topology.*.%.--ack-count.%
topology.*.%.--complete-latency.%
topology.*.%.--emit-count.%
diff --git a/ambari-metrics-timelineservice/conf/windows/metrics_whitelist b/ambari-metrics-timelineservice/conf/windows/metrics_whitelist
index bd36429..2edac39 100644
--- a/ambari-metrics-timelineservice/conf/windows/metrics_whitelist
+++ b/ambari-metrics-timelineservice/conf/windows/metrics_whitelist
@@ -4,6 +4,15 @@
BytesReceivedLast5Minutes
BytesSentLast5Minutes
ChannelSize
+Counter.%.CacheMisses
+Counter.CacheHits
+Counter.CacheMisses
+Counter.ReadAllQuery
+Counter.ReadAllQuery.%
+Counter.ReadAllQuery.HostRoleCommandEntity
+DataModifyQuery
+DirectReadQuery
+DoesExistQuery
EventPutSuccessCount
EventTakeSuccessCount
FSDatasetState.org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.Capacity
@@ -13,8 +22,21 @@
FlowFilesReceivedLast5Minutes
FlowFilesSentLast5Minutes
Free Slots
+InsertObjectQuery
+ReadAllQuery
+ReadAllQuery.HostRoleCommandEntity
+ReadObjectQuery
Supervisors
TimelineMetricStoreWatcher.FakeMetric
+Timer.ObjectBuilding
+Timer.QueryPreparation
+Timer.ReadAllQuery
+Timer.ReadAllQuery.%
+Timer.ReadAllQuery.HostRoleCommandEntity
+Timer.RowFetch
+Timer.SqlGeneration
+Timer.SqlPrepare
+Timer.StatementExecute
Topologies
Total Executors
Total Slots
@@ -79,6 +101,18 @@
dfs.FSNamesystem.TransactionsSinceLastCheckpoint
dfs.FSNamesystem.TransactionsSinceLastLogRoll
dfs.FSNamesystem.UnderReplicatedBlocks
+dfs.NNTopUserOpCounts.windowMs=1500000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=1500000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=1500000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=1500000.op=__%.user=%
+dfs.NNTopUserOpCounts.windowMs=300000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=300000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%
+dfs.NNTopUserOpCounts.windowMs=60000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=60000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=60000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=60000.op=__%.user=%
dfs.datanode.BlocksRead
dfs.datanode.BlocksWritten
dfs.datanode.DatanodeNetworkErrors
@@ -94,6 +128,37 @@
disk_free
disk_total
disk_used
+druid/broker.*.%.query/time
+druid/broker.heap.jvm/mem/max
+druid/broker.heap.jvm/mem/used
+druid/broker.jvm/gc/time
+druid/coordinator.heap.jvm/mem/max
+druid/coordinator.heap.jvm/mem/used
+druid/coordinator.jvm/gc/time
+druid/historical.*.%.query/segment/time
+druid/historical.*.%.query/time
+druid/historical.*.%.query/wait/time
+druid/historical.heap.jvm/mem/max
+druid/historical.heap.jvm/mem/used
+druid/historical.jvm/gc/time
+druid/historical.segment/scan/pending
+druid/middlemanager.*.%.query/segment/time
+druid/middlemanager.*.%.query/time
+druid/middlemanager.*.%.query/wait/time
+druid/middlemanager.*.ingest/events/processed
+druid/middlemanager.*.ingest/events/thrownAway
+druid/middlemanager.*.ingest/events/unparseable
+druid/middlemanager.*.ingest/persists/count
+druid/middlemanager.*.ingest/persists/time
+druid/middlemanager.*.ingest/rows/output
+druid/middlemanager.heap.jvm/mem/max
+druid/middlemanager.heap.jvm/mem/used
+druid/middlemanager.jvm/gc/time
+druid/middlemanager.segment/scan/pending
+druid/overlord.*.segment/added/bytes
+druid/overlord.heap.jvm/mem/max
+druid/overlord.heap.jvm/mem/used
+druid/overlord.jvm/gc/time
executors.ExecutorMetrics.ExecutorAvailableFreeSlots
executors.ExecutorMetrics.ExecutorAvailableFreeSlotsPercent
executors.ExecutorMetrics.ExecutorCacheMemoryPerInstance
@@ -118,14 +183,14 @@
executors.ExecutorMetrics.ExecutorTotalRejectedRequests
executors.ExecutorMetrics.ExecutorTotalRequestsHandled
executors.ExecutorMetrics.ExecutorTotalSuccess
-gc.ConcurrentMarkSweep.count
-gc.ConcurrentMarkSweep.time
-gc.ParNew.count
-gc.ParNew.time
+filter.error.grok
+filter.error.keyvalue
+input.files.count
+input.files.read_bytes
+input.files.read_lines
io.IOMetrics.MaxDecodingTime
io.IOMetrics.PercentileDecodingTime_30s50thPercentileLatency
io.IOMetrics.PercentileDecodingTime_30s90thPercentileLatency
-io.IOMetrics.PercentileDecodingTime_30s95thPercentileLatency
io.IOMetrics.PercentileDecodingTime_30s99thPercentileLatency
ipc.client.org.apache.hadoop.ipc.DecayRpcScheduler.Caller(*).Priority
ipc.client.org.apache.hadoop.ipc.DecayRpcScheduler.Caller(*).Volume
@@ -151,6 +216,10 @@
jvm.JvmMetrics.ThreadsTerminated
jvm.JvmMetrics.ThreadsTimedWaiting
jvm.JvmMetrics.ThreadsWaiting
+jvm.LlapDaemonJVMMetrics.LlapDaemonDirectBufferMemoryUsed
+jvm.LlapDaemonJVMMetrics.LlapDaemonDirectBufferTotalCapacity
+jvm.LlapDaemonJVMMetrics.LlapDaemonMappedBufferMemoryUsed
+jvm.LlapDaemonJVMMetrics.LlapDaemonMappedBufferTotalCapacity
jvm.Master.JvmMetrics.ThreadsBlocked
jvm.Master.JvmMetrics.ThreadsNew
jvm.Master.JvmMetrics.ThreadsRunnable
@@ -177,8 +246,23 @@
jvm.RegionServer.JvmMetrics.ThreadsWaiting
jvm.daemon_thread_count
jvm.file_descriptor_usage
+jvm.gc.ConcurrentMarkSweep.count
+jvm.gc.ConcurrentMarkSweep.time
+jvm.gc.ParNew.count
+jvm.gc.ParNew.time
jvm.heap_usage
+jvm.memory.heap.committed
+jvm.memory.heap.max
+jvm.memory.heap.used
+jvm.memory.non-heap.committed
+jvm.memory.non-heap.max
+jvm.memory.non-heap.used
jvm.thread_count
+jvm.threads.blocked.count
+jvm.threads.count
+jvm.threads.daemon.count
+jvm.threads.deadlock.count
+jvm.threads.runnable.count
jvm.uptime
kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate
kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.count
@@ -236,12 +320,8 @@
mem_shared
mem_total
mem_used
-memory.heap.committed
-memory.heap.max
-memory.heap.used
-memory.non-heap.committed
-memory.non-heap.max
-memory.non-heap.used
+output.solr.write_bytes
+output.solr.write_logs
pkts_in
pkts_out
proc_run
@@ -250,6 +330,24 @@
read_bytes
read_count
read_time
+regionserver.IO.FsPReadTime_75th_percentile
+regionserver.IO.FsPReadTime_95th_percentile
+regionserver.IO.FsPReadTime_99th_percentile
+regionserver.IO.FsPReadTime_max
+regionserver.IO.FsPReadTime_mean
+regionserver.IO.FsPReadTime_median
+regionserver.IO.FsReadTime_75th_percentile
+regionserver.IO.FsReadTime_95th_percentile
+regionserver.IO.FsReadTime_99th_percentile
+regionserver.IO.FsReadTime_max
+regionserver.IO.FsReadTime_mean
+regionserver.IO.FsWriteTime_75th_percentile
+regionserver.IO.FsWriteTime_95th_percentile
+regionserver.IO.FsWriteTime_99th_percentile
+regionserver.IO.FsWriteTime_max
+regionserver.IO.FsWriteTime_mean
+regionserver.IO.FsWriteTime_median
+regionserver.IO.fsChecksumFailureCount
regionserver.RegionServer.ProcessCallTime_75th_percentile
regionserver.RegionServer.ProcessCallTime_95th_percentile
regionserver.RegionServer.ProcessCallTime_99th_percentile
@@ -441,13 +539,42 @@
rpc.rpc.datanode.RpcSlowCalls
rpcdetailed.rpcdetailed.client.AddBlockAvgTime
rpcdetailed.rpcdetailed.client.AddBlockNumOps
+solr.admin.info.jvm.memory.used
+solr.admin.info.system.processCpuLoad
+solr.admin.mbeans.cache.documentCache.hitratio
+solr.admin.mbeans.cache.documentCache.size
+solr.admin.mbeans.cache.documentCache.warmupTime
+solr.admin.mbeans.cache.filterCache.hitratio
+solr.admin.mbeans.cache.filterCache.size
+solr.admin.mbeans.cache.filterCache.warmupTime
+solr.admin.mbeans.cache.queryResultCache.hitratio
+solr.admin.mbeans.cache.queryResultCache.size
+solr.admin.mbeans.cache.queryResultCache.warmupTime
+solr.admin.mbeans.queryHandler.browse.avgTimePerRequest
+solr.admin.mbeans.queryHandler.browse.requests
+solr.admin.mbeans.queryHandler.export.avgTimePerRequest
+solr.admin.mbeans.queryHandler.export.requests
+solr.admin.mbeans.queryHandler.get.avgTimePerRequest
+solr.admin.mbeans.queryHandler.get.requests
+solr.admin.mbeans.queryHandler.query.avgTimePerRequest
+solr.admin.mbeans.queryHandler.query.requests
+solr.admin.mbeans.queryHandler.select.15minRateReqsPerSecond
+solr.admin.mbeans.queryHandler.select.5minRateReqsPerSecond
+solr.admin.mbeans.queryHandler.select.75thPcRequestTime
+solr.admin.mbeans.queryHandler.select.95thPcRequestTime
+solr.admin.mbeans.queryHandler.select.999thPcRequestTime
+solr.admin.mbeans.queryHandler.select.99thPcRequestTime
+solr.admin.mbeans.queryHandler.select.avgRequestsPerSecond
+solr.admin.mbeans.queryHandler.select.avgTimePerRequest
+solr.admin.mbeans.queryHandler.select.medianRequestTime
+solr.admin.mbeans.queryHandler.select.requests
+solr.admin.mbeans.updateHandler.adds
+solr.admin.mbeans.updateHandler.deletesById
+solr.admin.mbeans.updateHandler.deletesByQuery
+solr.admin.mbeans.updateHandler.docsPending
+solr.admin.mbeans.updateHandler.errors
swap_free
swap_total
-threads.blocked.count
-threads.count
-threads.daemon.count
-threads.deadlock.count
-threads.runnable.count
topology.*.%.--ack-count.%
topology.*.%.--complete-latency.%
topology.*.%.--emit-count.%
diff --git a/ambari-metrics-timelineservice/pom.xml b/ambari-metrics-timelineservice/pom.xml
index f9d7e19..a5eb572 100644
--- a/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics-timelineservice/pom.xml
@@ -568,13 +568,13 @@
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
- <version>1.7.2</version>
+ <version>1.7.20</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
- <version>1.7.2</version>
+ <version>1.7.20</version>
</dependency>
<dependency>
@@ -658,8 +658,8 @@
<!-- Dependency in order to annotate unit tests with a category. -->
<dependency>
- <groupId>utility</groupId>
- <artifactId>utility</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-utility</artifactId>
<version>1.0.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index fa095a0..2342bd8 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -19,14 +19,18 @@
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
import org.apache.hadoop.service.AbstractService;
@@ -40,6 +44,7 @@
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
@@ -51,6 +56,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -60,6 +66,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
@@ -75,6 +82,7 @@
private TimelineMetricMetadataManager metricMetadataManager;
private Integer defaultTopNHostsLimit;
private MetricCollectorHAController haController;
+ private boolean containerMetricsDisabled = false;
/**
* Construct the service.
@@ -150,10 +158,14 @@
scheduleAggregatorThread(dailyClusterAggregator);
// Start the minute host aggregator
- TimelineMetricAggregator minuteHostAggregator =
- TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
- hBaseAccessor, metricsConf, haController);
- scheduleAggregatorThread(minuteHostAggregator);
+ if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+ LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
+ } else {
+ TimelineMetricAggregator minuteHostAggregator =
+ TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+ hBaseAccessor, metricsConf, haController);
+ scheduleAggregatorThread(minuteHostAggregator);
+ }
// Start the hourly host aggregator
TimelineMetricAggregator hourlyHostAggregator =
@@ -177,7 +189,7 @@
LOG.info("Started watchdog for timeline metrics store with initial " +
"delay = " + initDelay + ", delay = " + delay);
}
-
+ containerMetricsDisabled = configuration.isContainerMetricsDisabled();
isInitialized = true;
}
@@ -300,8 +312,12 @@
if (prevTime != null) {
step = currTime - prevTime;
diff = currVal - prevVal;
- Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
- timeValueEntry.setValue(rate);
+ if (diff < 0) {
+ it.remove(); //Discard calculating rate when the metric counter has been reset.
+ } else {
+ Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
+ timeValueEntry.setValue(rate);
+ }
} else {
it.remove();
}
@@ -352,6 +368,12 @@
@Override
public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
throws SQLException, IOException {
+
+ if (containerMetricsDisabled) {
+ LOG.debug("Ignoring submitted container metrics according to configuration. Values will not be stored.");
+ return new TimelinePutResponse();
+ }
+
hBaseAccessor.insertContainerMetrics(metrics);
return new TimelinePutResponse();
}
@@ -388,8 +410,66 @@
}
@Override
- public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException {
- return metricMetadataManager.getHostedInstanceCache();
+ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+ Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+ for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+ aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+ }
+ hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+ return new TimelinePutResponse();
+ }
+
+ @Override
+ public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
+ throws SQLException, IOException {
+
+ Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache();
+ Map<String, Set<String>> instanceHosts = new HashMap<>();
+ if (configuration.getTimelineMetricsMultipleClusterSupport()) {
+ instanceHosts = metricMetadataManager.getHostedInstanceCache();
+ }
+
+ Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
+
+ if (MapUtils.isEmpty(instanceHosts)) {
+ Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
+ for (String host : hostedApps.keySet()) {
+ for (String app : hostedApps.get(host)) {
+ if (!appHostMap.containsKey(app)) {
+ appHostMap.put(app, new HashSet<String>());
+ }
+ appHostMap.get(app).add(host);
+ }
+ }
+ instanceAppHosts.put("", appHostMap);
+ } else {
+ for (String instance : instanceHosts.keySet()) {
+
+ if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) {
+ continue;
+ }
+ Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
+ instanceAppHosts.put(instance, appHostMap);
+
+ Set<String> hostsWithInstance = instanceHosts.get(instance);
+ for (String host : hostsWithInstance) {
+ for (String app : hostedApps.get(host)) {
+ if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
+ continue;
+ }
+
+ if (!appHostMap.containsKey(app)) {
+ appHostMap.put(app, new HashSet<String>());
+ }
+ appHostMap.get(app).add(host);
+ }
+ }
+ }
+ }
+
+ return instanceAppHosts;
}
@Override
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 65bbc4c..3b2a119 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -31,6 +31,8 @@
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -40,8 +42,6 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
@@ -787,7 +787,9 @@
metadataManager.putIfModifiedHostedAppsMetadata(
tm.getHostName(), tm.getAppId());
- metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName());
+ if (!tm.getAppId().equals("FLUME_HANDLER")) {
+ metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName());
+ }
}
if (!acceptMetric) {
iterator.remove();
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 0d5042f..899928a 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -191,6 +191,9 @@
public static final String TIMELINE_SERVICE_RPC_ADDRESS =
"timeline.metrics.service.rpc.address";
+ public static final String TIMELINE_SERVICE_DISABLE_CONTAINER_METRICS =
+ "timeline.metrics.service.container.metrics.disabled";
+
public static final String CLUSTER_AGGREGATOR_APP_IDS =
"timeline.metrics.service.cluster.aggregator.appIds";
@@ -251,9 +254,15 @@
public static final String TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY =
"timeline.metrics.aggregate.tables.durability";
+ public static final String TIMELINE_METRICS_WHITELIST_ENABLED =
+ "timeline.metrics.whitelisting.enabled";
+
public static final String TIMELINE_METRICS_WHITELIST_FILE =
"timeline.metrics.whitelist.file";
+ public static final String TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT =
+ "/etc/ambari-metrics-collector/conf/metrics_whitelist";
+
public static final String TIMELINE_METRIC_METADATA_FILTERS =
"timeline.metrics.service.metadata.filters";
@@ -290,12 +299,17 @@
public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
"timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
+ public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
+ "timeline.metrics.support.multiple.clusters";
+
public static final String HOST_APP_ID = "HOST";
public static final String DEFAULT_INSTANCE_PORT = "12001";
public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist";
+ public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation";
+
private Configuration hbaseConf;
private Configuration metricsConf;
private Configuration amsEnvConf;
@@ -438,6 +452,13 @@
return 3;
}
+ public boolean getTimelineMetricsMultipleClusterSupport() {
+ if (metricsConf != null) {
+ return Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS, "false"));
+ }
+ return false;
+ }
+
public String getTimelineServiceRpcAddress() {
String defaultRpcAddress = "0.0.0.0:60200";
if (metricsConf != null) {
@@ -495,4 +516,19 @@
return whitelist;
}
+
+ public boolean isContainerMetricsDisabled() {
+ try {
+ return metricsConf != null && Boolean.parseBoolean(metricsConf.get(TIMELINE_SERVICE_DISABLE_CONTAINER_METRICS, "false"));
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ public boolean isWhitelistingEnabled() {
+ if (metricsConf != null) {
+ return Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_WHITELIST_ENABLED, "false"));
+ }
+ return false;
+ }
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index 121a8ae..d052d54 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -80,6 +81,7 @@
*/
Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException;
+ TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException;
/**
* Returns all hosts that have written metrics with the apps on the host
* @return { hostname : [ appIds ] }
@@ -94,7 +96,7 @@
* @throws SQLException
* @throws IOException
*/
- Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException;
+ Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException;
/**
* Return a list of known live collector nodes
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
index 65d54c0..7b03b30 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
@@ -19,10 +19,10 @@
import java.util.Map;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
/**
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
index 1446ec2..63cc510 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
@@ -18,17 +18,14 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_APPS_BLACKLIST;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_APPS_WHITELIST;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.HashSet;
@@ -36,9 +33,12 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_APPS_BLACKLIST;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_APPS_WHITELIST;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
public class TimelineMetricsFilter {
@@ -67,8 +67,8 @@
whitelistedApps = new HashSet<>();
amshbaseWhitelist = new HashSet<>();
- String whitelistFile = metricsConf.get(TIMELINE_METRICS_WHITELIST_FILE, "");
- if (!StringUtils.isEmpty(whitelistFile)) {
+ if (configuration.isWhitelistingEnabled()) {
+ String whitelistFile = metricsConf.get(TIMELINE_METRICS_WHITELIST_FILE, TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
readMetricWhitelistFromFile(whitelistFile);
}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index 44aca03..9eaf456 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -21,6 +21,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 02677b9..ba16b43 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
@@ -91,6 +93,7 @@
MetricHostAggregate hostAggregate = null;
Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+ int perMetricCount = 0;
while (rs.next()) {
TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
@@ -106,14 +109,20 @@
currentMetric.setTimestamp(endTime);
hostAggregate = new MetricHostAggregate();
hostAggregateMap.put(currentMetric, hostAggregate);
+ perMetricCount++;
}
if (existingMetric.equalsExceptTime(currentMetric)) {
// Recalculate totals with current metric
updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-
+ perMetricCount++;
} else {
- // Switched over to a new metric - save existing
+ // Switched over to a new metric - save new metric
+
+ hostAggregate.setSum(hostAggregate.getSum() / perMetricCount);
+ hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)perMetricCount));
+ perMetricCount = 1;
+
hostAggregate = new MetricHostAggregate();
currentMetric.setTimestamp(endTime);
updateAggregatesFromHost(hostAggregate, currentHostAggregate);
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index a5a3499..34b1f9b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -38,6 +38,7 @@
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 0ea9c08..a17433b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index b5f49fb..672f85f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
index 53e6304..a06f4e8 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
@@ -123,20 +124,41 @@
admin = new ZKHelixAdmin(zkConnectUrl);
// create cluster
LOG.info("Creating zookeeper cluster node: " + clusterName);
- admin.addCluster(clusterName, false);
+ boolean clusterAdded = admin.addCluster(clusterName, false);
+ LOG.info("Was cluster added successfully? " + clusterAdded);
// Adding host to the cluster
- List<String> nodes = Collections.EMPTY_LIST;
- try {
- nodes = admin.getInstancesInCluster(clusterName);
- } catch (ZkNoNodeException ex) {
- LOG.warn("Child znode under /" + CLUSTER_NAME + " not found.Recreating the cluster.");
- admin.addCluster(clusterName, true);
+ boolean success = false;
+ int tries = 5;
+ int sleepTimeInSeconds = 5;
+
+ for (int i = 0; i < tries && !success; i++) {
+ try {
+ List<String> nodes = admin.getInstancesInCluster(clusterName);
+ if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
+ LOG.info("Adding participant instance " + instanceConfig);
+ admin.addInstance(clusterName, instanceConfig);
+ }
+ success = true;
+ } catch (HelixException | ZkNoNodeException ex) {
+ LOG.warn("Helix Cluster not yet setup fully.");
+ if (i < tries - 1) {
+ LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying.");
+ TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
+ } else {
+ LOG.error(ex);
+ }
+ }
}
- if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
- LOG.info("Adding participant instance " + instanceConfig);
- admin.addInstance(clusterName, instanceConfig);
+ if (!success) {
+ LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help.");
+ admin.addCluster(clusterName, true);
+ List<String> nodes = admin.getInstancesInCluster(clusterName);
+ if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
+ LOG.info("Adding participant instance " + instanceConfig);
+ admin.addInstance(clusterName, instanceConfig);
+ }
}
// Add a state model
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 6278c59..50cfb08 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
@@ -285,6 +286,36 @@
}
}
+ /**
+ * Store the given metrics into the timeline store, and return errors that
+ * happened during storing.
+ */
+ @Path("/metrics/aggregated")
+ @POST
+ @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+ public TimelinePutResponse postAggregatedMetrics(
+ @Context HttpServletRequest req,
+ @Context HttpServletResponse res,
+ AggregationResult metrics) {
+
+ init(res);
+ if (metrics == null) {
+ return new TimelinePutResponse();
+ }
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Storing aggregated metrics: " +
+ TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
+ }
+
+ return timelineMetricStore.putHostAggregatedMetrics(metrics);
+ } catch (Exception e) {
+ LOG.error("Error saving metrics.", e);
+ throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+ }
+ }
+
@Path("/containermetrics")
@POST
@Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
@@ -415,14 +446,16 @@
@GET
@Path("/metrics/instances")
@Produces({ MediaType.APPLICATION_JSON })
- public Map<String, Set<String>> getClusterHostsMetadata(
+ public Map<String, Map<String, Set<String>>> getClusterHostsMetadata(
@Context HttpServletRequest req,
- @Context HttpServletResponse res
+ @Context HttpServletResponse res,
+ @QueryParam("appId") String appId,
+ @QueryParam("instanceId") String instanceId
) {
init(res);
try {
- return timelineMetricStore.getInstanceHostsMetadata();
+ return timelineMetricStore.getInstanceHostsMetadata(instanceId, appId);
} catch (Exception e) {
throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
index aae1d4b..70dd583 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
@@ -95,22 +95,26 @@
@Test
public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
Map<Long, Double> metricValues = new TreeMap<>();
- metricValues.put(1454016368371L, 1011.25);
- metricValues.put(1454016428371L, 1011.25);
- metricValues.put(1454016488371L, 1011.25);
- metricValues.put(1454016548371L, 1011.25);
- metricValues.put(1454016608371L, 1011.25);
- metricValues.put(1454016668371L, 1011.25);
- metricValues.put(1454016728371L, 1011.25);
+ metricValues.put(1454000000000L, 1.0);
+ metricValues.put(1454000001000L, 6.0);
+ metricValues.put(1454000002000L, 0.0);
+ metricValues.put(1454000003000L, 3.0);
+ metricValues.put(1454000004000L, 4.0);
+ metricValues.put(1454000005000L, 7.0);
// Calculate rate
Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), false);
// Make sure rate is zero
- for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
- Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
- + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
- }
+ Assert.assertTrue(rates.size() == 4);
+
+ Assert.assertFalse(rates.containsKey(1454000000000L));
+ Assert.assertFalse(rates.containsKey(1454000002000L));
+
+ Assert.assertEquals(rates.get(1454000001000L), 5.0);
+ Assert.assertEquals(rates.get(1454000003000L), 3.0);
+ Assert.assertEquals(rates.get(1454000004000L), 1.0);
+ Assert.assertEquals(rates.get(1454000005000L), 3.0);
}
@Test
@@ -119,14 +123,14 @@
metricValues.put(1454016368371L, 1011.25);
metricValues.put(1454016428371L, 1010.25);
metricValues.put(1454016488371L, 1012.25);
- metricValues.put(1454016548371L, 1010.25);
- metricValues.put(1454016608371L, 1010.25);
+ metricValues.put(1454016548371L, 1015.25);
+ metricValues.put(1454016608371L, 1020.25);
Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), true);
- Assert.assertTrue(rates.size()==4);
- Assert.assertTrue(rates.containsValue(-1.0));
+ Assert.assertTrue(rates.size() == 3);
Assert.assertTrue(rates.containsValue(2.0));
- Assert.assertTrue(rates.containsValue(0.0));
+ Assert.assertTrue(rates.containsValue(3.0));
+ Assert.assertTrue(rates.containsValue(5.0));
}
}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 0087fd9..d5baaef 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -26,12 +26,12 @@
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
index 37ec134..7eeb9c4 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
@@ -17,9 +17,9 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import java.util.Arrays;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index a910cc2..d668178 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -22,11 +22,11 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
index 44f48e8..3009163 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
.timeline;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
@@ -34,7 +34,7 @@
assertThat(aggregate.getSum()).isEqualTo(3.0);
assertThat(aggregate.getMin()).isEqualTo(1.0);
assertThat(aggregate.getMax()).isEqualTo(2.0);
- assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+ assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2);
}
@Test
@@ -50,7 +50,7 @@
assertThat(aggregate.getSum()).isEqualTo(12.0);
assertThat(aggregate.getMin()).isEqualTo(0.5);
assertThat(aggregate.getMax()).isEqualTo(7.5);
- assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+ assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
}
static MetricHostAggregate createAggregate (Double sum, Double min,
@@ -63,4 +63,4 @@
aggregate.setNumberOfSamples(samplesCount);
return aggregate;
}
-}
\ No newline at end of file
+}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index b40481d..8abcd83 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -92,12 +93,17 @@
}
@Override
+ public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+ return null;
+ }
+
+ @Override
public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
return Collections.emptyMap();
}
@Override
- public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException {
+ public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException {
return Collections.emptyMap();
}
@@ -105,4 +111,5 @@
public List<String> getLiveInstances() {
return Collections.emptyList();
}
+
}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
index fa0cfe9..53f6f6c 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
@@ -17,10 +17,10 @@
*/
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.Precision;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import java.util.Collections;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java
index 81da5c8..a308248 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java
@@ -61,6 +61,7 @@
Configuration metricsConf = new Configuration();
TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class);
expect(configuration.getMetricsConf()).andReturn(metricsConf).once();
+ expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes();
replay(configuration);
URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat");
@@ -170,6 +171,8 @@
whitelist.add("regionserver.Server.Delete_mean");
expect(configuration.getAmshbaseWhitelist()).andReturn(whitelist).once();
+ expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes();
+
replay(configuration);
TimelineMetricsFilter.initializeMetricFilter(configuration);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index 590f82a..07fd85d 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -20,13 +20,13 @@
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
@@ -659,14 +659,14 @@
while (rs.next()) {
if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
- assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
- assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
} else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
- assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
- assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+ assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM"));
+ assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
index 9873643..75b3f91 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
@@ -124,14 +125,14 @@
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else if ("mem_free".equals(currentMetric.getMetricName())) {
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else {
fail("Unexpected entry");
@@ -198,7 +199,7 @@
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
assertEquals(12 * 15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
}
}
}
@@ -260,7 +261,7 @@
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
assertEquals(12 * 15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
}
}
}
@@ -309,14 +310,14 @@
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else if ("mem_free".equals(currentMetric.getMetricName())) {
assertEquals(2.0, currentHostAggregate.getMax());
assertEquals(0.0, currentHostAggregate.getMin());
assertEquals(20, currentHostAggregate.getNumberOfSamples());
assertEquals(15.0, currentHostAggregate.getSum());
- assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+ assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
count++;
} else {
fail("Unexpected entry");
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 78db11d..6541b2c 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -31,6 +31,7 @@
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
diff --git a/pom.xml b/pom.xml
index 2d88912..a14b8fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
<version>2.0.0.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
- <module>../utility</module>
+ <module>../ambari-utility</module>
<module>ambari-metrics-common</module>
<module>ambari-metrics-hadoop-sink</module>
<module>ambari-metrics-flume-sink</module>
@@ -33,6 +33,7 @@
<module>ambari-metrics-host-monitoring</module>
<module>ambari-metrics-grafana</module>
<module>ambari-metrics-assembly</module>
+ <module>ambari-metrics-host-aggregator</module>
</modules>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -117,11 +118,6 @@
<build>
<plugins>
<plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <version>2.19</version>
- </plugin>
- <plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.8</version>
@@ -146,63 +142,6 @@
<failIfNoMatch>false</failIfNoMatch>
</configuration>
</execution>
- </executions>
- </plugin>
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptors>
- <descriptor>../ambari-project/src/main/assemblies/empty.xml</descriptor>
- </descriptors>
- </configuration>
- <executions>
- <execution>
- <id>build-tarball</id>
- <phase>none</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skip>${skipSurefireTests}</skip>
-
- <!-- Each profile in the top-level pom.xml defines which test group categories to run. -->
- <groups>${testcase.groups}</groups>
- </configuration>
- </plugin>
- <plugin>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.2</version>
- <configuration>
- <source>1.7</source>
- <target>1.7</target>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-clean-plugin</artifactId>
- <configuration>
- <filesets>
- <fileset>
- <directory>${basedir}</directory>
- <followSymlinks>false</followSymlinks>
- <includes>
- <include>**/*.pyc</include>
- </includes>
- </fileset>
- </filesets>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.codehaus.mojo</groupId>
- <artifactId>build-helper-maven-plugin</artifactId>
- <version>1.8</version>
- <executions>
<execution>
<id>parse-package-version</id>
<goals>
@@ -234,6 +173,58 @@
</executions>
</plugin>
<plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>../ambari-project/src/main/assemblies/empty.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>build-tarball</id>
+ <phase>none</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.19</version>
+ <configuration>
+ <skip>${skipSurefireTests}</skip>
+
+ <!-- Each profile in the top-level pom.xml defines which test group categories to run. -->
+ <groups>${testcase.groups}</groups>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.2</version>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-clean-plugin</artifactId>
+ <version>2.5</version>
+ <configuration>
+ <filesets>
+ <fileset>
+ <directory>${basedir}</directory>
+ <followSymlinks>false</followSymlinks>
+ <includes>
+ <include>**/*.pyc</include>
+ </includes>
+ </fileset>
+ </filesets>
+ </configuration>
+ </plugin>
+ <plugin>
<!--Stub execution on direct plugin call - workaround for ambari rpm build process-->
<groupId>org.codehaus.mojo</groupId>
<artifactId>rpm-maven-plugin</artifactId>
@@ -281,6 +272,7 @@
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
+ <version>0.11</version>
<configuration>
<excludes>
<exclude>pass.txt</exclude>
@@ -304,8 +296,8 @@
<dependencies>
<!-- Dependency in order to annotate unit tests with a category. -->
<dependency>
- <groupId>utility</groupId>
- <artifactId>utility</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-utility</artifactId>
<version>1.0.0.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
@@ -319,11 +311,38 @@
</properties>
<dependencies>
<dependency>
- <groupId>utility</groupId>
- <artifactId>utility</artifactId>
+ <groupId>org.apache.ambari</groupId>
+ <artifactId>ambari-utility</artifactId>
<version>1.0.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>sign-artifacts</id>
+ <activation>
+ <property>
+ <name>performRelease</name>
+ <value>true</value>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-gpg-plugin</artifactId>
+ <version>1.6</version>
+ <executions>
+ <execution>
+ <id>sign-artifacts</id>
+ <phase>verify</phase>
+ <goals>
+ <goal>sign</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
</profiles>
</project>