Merge remote-tracking branch 'remotes/origin/trunk' into branch-3.0-perf
diff --git a/ambari-metrics-assembly/pom.xml b/ambari-metrics-assembly/pom.xml
index a4b87de..d9875ce 100644
--- a/ambari-metrics-assembly/pom.xml
+++ b/ambari-metrics-assembly/pom.xml
@@ -35,6 +35,7 @@
   <properties>
     <collector.dir>${project.basedir}/../ambari-metrics-timelineservice</collector.dir>
     <monitor.dir>${project.basedir}/../ambari-metrics-host-monitoring</monitor.dir>
+    <aggregator.dir>${project.basedir}/../ambari-metrics-host-aggregator</aggregator.dir>
     <grafana.dir>${project.basedir}/../ambari-metrics-grafana</grafana.dir>
     <hadoop-sink.dir>${project.basedir}/../ambari-metrics-hadoop-sink</hadoop-sink.dir>
     <storm-sink.dir>${project.basedir}/../ambari-metrics-storm-sink</storm-sink.dir>
@@ -243,6 +244,7 @@
                           <location>${collector.dir}/target/lib</location>
                           <excludes>
                             <exclude>*tests.jar</exclude>
+                            <exclude>findbugs*.jar</exclude>
                           </excludes>
                         </source>
                         <source>
@@ -262,6 +264,7 @@
                             <exclude>bin/**</exclude>
                             <exclude>bin/*</exclude>
                             <exclude>lib/*tests.jar</exclude>
+                            <exclude>lib/findbugs*.jar</exclude>
                           </excludes>
                         </source>
                       </sources>
@@ -599,6 +602,19 @@
                       </sources>
                     </mapping>
                     <mapping>
+                      <directory>/var/lib/ambari-metrics-monitor/lib</directory>
+                      <sources>
+                        <source>
+                          <location>
+                            ${aggregator.dir}/target/
+                          </location>
+                          <includes>
+                            <include>ambari-metrics-host-aggregator-${project.version}.jar</include>
+                          </includes>
+                        </source>
+                      </sources>
+                    </mapping>
+                    <mapping>
                       <directory>/etc/ambari-metrics-monitor/conf</directory>
                       <configuration>true</configuration>
                     </mapping>
@@ -744,6 +760,7 @@
                     <path>/var/run/ambari-metrics-grafana</path>
                     <path>/var/log/ambari-metrics-grafana</path>
                     <path>/var/lib/ambari-metrics-collector</path>
+                    <path>/var/lib/ambari-metrics-monitor/lib</path>
                     <path>/var/lib/ambari-metrics-grafana</path>
                     <path>/usr/lib/ambari-metrics-hadoop-sink</path>
                     <path>/usr/lib/ambari-metrics-kafka-sink</path>
@@ -1331,6 +1348,11 @@
       <type>pom</type>
       <optional>true</optional>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-metrics-host-aggregator</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
 
diff --git a/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml b/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
index ab309a1..d015d31 100644
--- a/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
+++ b/ambari-metrics-assembly/src/main/assembly/monitor-windows.xml
@@ -64,6 +64,13 @@
       </includes>
     </fileSet>
     <fileSet>
+      <directory>${aggregator.dir}/conf/windows</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>log4j.properties</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>${monitor.dir}/conf/windows</directory>
       <outputDirectory>/</outputDirectory>
       <includes>
diff --git a/ambari-metrics-assembly/src/main/assembly/monitor.xml b/ambari-metrics-assembly/src/main/assembly/monitor.xml
index 99a41c3..448fe62 100644
--- a/ambari-metrics-assembly/src/main/assembly/monitor.xml
+++ b/ambari-metrics-assembly/src/main/assembly/monitor.xml
@@ -46,6 +46,13 @@
       </includes>
     </fileSet>
     <fileSet>
+      <directory>${aggregator.dir}/conf/unix</directory>
+      <outputDirectory>conf</outputDirectory>
+      <includes>
+        <include>log4j.properties</include>
+      </includes>
+    </fileSet>
+    <fileSet>
       <directory>${monitor.dir}/conf/unix</directory>
       <outputDirectory>bin</outputDirectory>
       <includes>
@@ -68,4 +75,4 @@
 
 
 
-</assembly>
\ No newline at end of file
+</assembly>
diff --git a/ambari-metrics-common/pom.xml b/ambari-metrics-common/pom.xml
index 62ae75f..cae9734 100644
--- a/ambari-metrics-common/pom.xml
+++ b/ambari-metrics-common/pom.xml
@@ -108,6 +108,10 @@
                   <pattern>org.jboss</pattern>
                   <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.jboss</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>org.apache.http</pattern>
+                  <shadedPattern>org.apache.hadoop.metrics2.sink.relocated.apache.http</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
@@ -189,5 +193,10 @@
       <artifactId>powermock-module-junit4</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.httpcomponents</groupId>
+      <artifactId>httpclient</artifactId>
+      <version>4.2.5</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 46f32f9..337f640 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.metrics2.sink.timeline.availability.MetricCollectorUnavailableException;
 import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardHostnameHashingStrategy;
 import org.apache.hadoop.metrics2.sink.timeline.availability.MetricSinkWriteShardStrategy;
+import org.apache.http.HttpStatus;
 import org.codehaus.jackson.map.AnnotationIntrospector;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
@@ -78,9 +79,16 @@
   public static final String SSL_KEYSTORE_PATH_PROPERTY = "truststore.path";
   public static final String SSL_KEYSTORE_TYPE_PROPERTY = "truststore.type";
   public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
+  public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
+  public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
   public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
+  public static final String INSTANCE_ID_PROPERTY = "instanceId";
+  public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
+  public static final String COOKIE = "Cookie";
+  private static final String WWW_AUTHENTICATE = "WWW-Authenticate";
+  private static final String NEGOTIATE = "Negotiate";
 
-  protected static final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
+  protected final AtomicInteger failedCollectorConnectionsCounter = new AtomicInteger(0);
   public static int NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS = 100;
   protected static final AtomicInteger nullCollectorCounter = new AtomicInteger(0);
   public static int NUMBER_OF_NULL_COLLECTOR_EXCEPTIONS = 20;
@@ -93,6 +101,7 @@
   private long lastFailedZkRequestTime = 0l;
 
   private SSLSocketFactory sslSocketFactory;
+  private AppCookieManager appCookieManager = null;
 
   protected final Log LOG;
 
@@ -111,7 +120,7 @@
   private volatile boolean isInitializedForHA = false;
 
   @SuppressWarnings("all")
-  private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 5;
+  private final int RETRY_COUNT_BEFORE_COLLECTOR_FAILOVER = 3;
 
   private final Gson gson = new Gson();
 
@@ -153,20 +162,40 @@
       connection = connectUrl.startsWith("https") ?
           getSSLConnection(connectUrl) : getConnection(connectUrl);
 
-      connection.setRequestMethod("POST");
-      connection.setRequestProperty("Content-Type", "application/json");
-      connection.setRequestProperty("Connection", "Keep-Alive");
-      connection.setConnectTimeout(timeout);
-      connection.setReadTimeout(timeout);
-      connection.setDoOutput(true);
-
-      if (jsonData != null) {
-        try (OutputStream os = connection.getOutputStream()) {
-          os.write(jsonData.getBytes("UTF-8"));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("emitMetricsJson to " + connectUrl + ", " + jsonData);
+      }
+      AppCookieManager appCookieManager = getAppCookieManager();
+      String appCookie = appCookieManager.getCachedAppCookie(connectUrl);
+      if (appCookie != null) {
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Using cached app cookie for URL:" + connectUrl);
         }
+        connection.setRequestProperty(COOKIE, appCookie);
       }
 
-      int statusCode = connection.getResponseCode();
+      int statusCode = emitMetricsJson(connection, timeout, jsonData);
+
+      if (statusCode == HttpStatus.SC_UNAUTHORIZED ) {
+        String wwwAuthHeader = connection.getHeaderField(WWW_AUTHENTICATE);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("Received WWW-Authentication header:" + wwwAuthHeader + ", for URL:" + connectUrl);
+        }
+        if (wwwAuthHeader != null && wwwAuthHeader.trim().startsWith(NEGOTIATE)) {
+          appCookie = appCookieManager.getAppCookie(connectUrl, true);
+          if (appCookie != null) {
+            cleanupInputStream(connection.getInputStream());
+            connection = connectUrl.startsWith("https") ?
+                getSSLConnection(connectUrl) : getConnection(connectUrl);
+            connection.setRequestProperty(COOKIE, appCookie);
+            statusCode = emitMetricsJson(connection, timeout, jsonData);
+          }
+        } else {
+          // no supported authentication type found
+          // we would let the original response propagate
+          LOG.error("Unsupported WWW-Authentication header:" + wwwAuthHeader+ ", for URL:" + connectUrl);
+        }
+      }
 
       if (statusCode != 200) {
         LOG.info("Unable to POST metrics to collector, " + connectUrl + ", " +
@@ -209,6 +238,27 @@
     }
   }
 
+  private int emitMetricsJson(HttpURLConnection connection, int timeout, String jsonData) throws IOException {
+    connection.setRequestMethod("POST");
+    connection.setRequestProperty("Content-Type", "application/json");
+    connection.setRequestProperty("Connection", "Keep-Alive");
+    connection.setConnectTimeout(timeout);
+    connection.setReadTimeout(timeout);
+    connection.setDoOutput(true);
+
+    if (jsonData != null) {
+      try (OutputStream os = connection.getOutputStream()) {
+        os.write(jsonData.getBytes("UTF-8"));
+      }
+    }
+
+    int statusCode = connection.getResponseCode();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("emitMetricsJson: statusCode = " + statusCode);
+    }
+    return statusCode;
+  }
+
   protected String getCurrentCollectorHost() {
     String collectorHost;
     // Get cached target
@@ -239,22 +289,47 @@
   }
 
   protected boolean emitMetrics(TimelineMetrics metrics) {
-    String collectorHost = getCurrentCollectorHost();
-    String connectUrl = getCollectorUri(collectorHost);
-    String jsonData = null;
-    LOG.debug("EmitMetrics connectUrl = "  + connectUrl);
-    try {
-      jsonData = mapper.writeValueAsString(metrics);
-    } catch (IOException e) {
-      LOG.error("Unable to parse metrics", e);
+    String connectUrl;
+    boolean validCollectorHost = true;
+
+    if (isHostInMemoryAggregationEnabled()) {
+      connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+    } else {
+      String collectorHost  = getCurrentCollectorHost();
+      if (collectorHost == null) {
+        validCollectorHost = false;
+      }
+      connectUrl = getCollectorUri(collectorHost);
     }
-    if (jsonData != null) {
-      return emitMetricsJson(connectUrl, jsonData);
+
+    if (validCollectorHost) {
+      String jsonData = null;
+      LOG.debug("EmitMetrics connectUrl = "  + connectUrl);
+      try {
+        jsonData = mapper.writeValueAsString(metrics);
+      } catch (IOException e) {
+        LOG.error("Unable to parse metrics", e);
+      }
+      if (jsonData != null) {
+        return emitMetricsJson(connectUrl, jsonData);
+      }
     }
     return false;
   }
 
   /**
+   * Get the associated app cookie manager.
+   *
+   * @return the app cookie manager
+   */
+  public synchronized AppCookieManager getAppCookieManager() {
+    if (appCookieManager == null) {
+      appCookieManager = new AppCookieManager();
+    }
+    return appCookieManager;
+  }
+
+  /**
    * Cleans up and closes an input stream
    * see http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
    * @param is the InputStream to clean up
@@ -560,4 +635,16 @@
    * @return String "host1"
    */
   abstract protected String getHostname();
+
+  /**
+   * Check if host in-memory aggregation is enabled
+   * @return
+   */
+  abstract protected boolean isHostInMemoryAggregationEnabled();
+
+  /**
+   * In memory aggregation port
+   * @return
+   */
+  abstract protected int getHostInMemoryAggregationPort();
 }
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
new file mode 100644
index 0000000..c903e3d
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregationResult.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.Set;
+
+@XmlRootElement(name="AggregationResult")
+public class AggregationResult {
+    protected Set<TimelineMetricWithAggregatedValues> result;
+    protected Long timeInMilis;
+
+    @Override
+    public String toString() {
+        return "AggregationResult{" +
+                "result=" + result +
+                ", timeInMilis=" + timeInMilis +
+                '}';
+    }
+
+    public AggregationResult() {
+    }
+
+    public AggregationResult(Set<TimelineMetricWithAggregatedValues> result, Long timeInMilis) {
+        this.result = result;
+        this.timeInMilis = timeInMilis;
+    }
+    @XmlElement
+    public Set<TimelineMetricWithAggregatedValues> getResult() {
+        return result;
+    }
+
+    public void setResult(Set<TimelineMetricWithAggregatedValues> result) {
+        this.result = result;
+    }
+    @XmlElement
+    public Long getTimeInMilis() {
+        return timeInMilis;
+    }
+
+    public void setTimeInMilis(Long timeInMilis) {
+        this.timeInMilis = timeInMilis;
+    }
+}
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java
new file mode 100644
index 0000000..bcba238
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManager.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.Principal;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.Header;
+import org.apache.http.HeaderElement;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpHost;
+import org.apache.http.HttpRequest;
+import org.apache.http.HttpResponse;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.Credentials;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpOptions;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.client.params.AuthPolicy;
+import org.apache.http.impl.auth.SPNegoSchemeFactory;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * Handles SPNego authentication as a client of hadoop service, caches
+ * hadoop.auth cookie returned by hadoop service on successful SPNego
+ * authentication. Refreshes hadoop.auth cookie on demand if the cookie has
+ * expired.
+ *
+ */
+public class AppCookieManager {
+
+  static final String HADOOP_AUTH = "hadoop.auth";
+  private static final String HADOOP_AUTH_EQ = "hadoop.auth=";
+  private static final String SET_COOKIE = "Set-Cookie";
+
+  private static final EmptyJaasCredentials EMPTY_JAAS_CREDENTIALS = new EmptyJaasCredentials();
+
+  private Map<String, String> endpointCookieMap = new ConcurrentHashMap<String, String>();
+  private static Log LOG = LogFactory.getLog(AppCookieManager.class);
+
+  /**
+   * Utility method to exercise AppCookieManager directly
+   * @param args element 0 of args should be a URL to hadoop service protected by SPengo
+   * @throws IOException in case of errors
+   */
+  public static void main(String[] args) throws IOException {
+    new AppCookieManager().getAppCookie(args[0], false);
+  }
+
+  public AppCookieManager() {
+  }
+
+  /**
+   * Returns hadoop.auth cookie, doing needed SPNego authentication
+   *
+   * @param endpoint
+   *          the URL of the Hadoop service
+   * @param refresh
+   *          flag indicating wehther to refresh the cookie, if
+   *          <code>true</code>, we do a new SPNego authentication and refresh
+   *          the cookie even if the cookie already exists in local cache
+   * @return hadoop.auth cookie value
+   * @throws IOException
+   *           in case of problem getting hadoop.auth cookie
+   */
+  public String getAppCookie(String endpoint, boolean refresh)
+      throws IOException {
+
+    HttpUriRequest outboundRequest = new HttpGet(endpoint);
+    URI uri = outboundRequest.getURI();
+    String scheme = uri.getScheme();
+    String host = uri.getHost();
+    int port = uri.getPort();
+    String path = uri.getPath();
+    if (!refresh) {
+      String appCookie = endpointCookieMap.get(endpoint);
+      if (appCookie != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("got cached cookie");
+        }
+        return appCookie;
+      }
+    }
+
+    clearAppCookie(endpoint);
+
+    DefaultHttpClient client = new DefaultHttpClient();
+    SPNegoSchemeFactory spNegoSF = new SPNegoSchemeFactory(/* stripPort */true);
+    client.getAuthSchemes().register(AuthPolicy.SPNEGO, spNegoSF);
+    client.getCredentialsProvider().setCredentials(
+        new AuthScope(/* host */null, /* port */-1, /* realm */null),
+        EMPTY_JAAS_CREDENTIALS);
+
+    String hadoopAuthCookie = null;
+    HttpResponse httpResponse = null;
+    try {
+      HttpHost httpHost = new HttpHost(host, port, scheme);
+      HttpRequest httpRequest = new HttpOptions(path);
+      httpResponse = client.execute(httpHost, httpRequest);
+      Header[] headers = httpResponse.getHeaders(SET_COOKIE);
+      if (LOG.isDebugEnabled()) {
+        for (Header header : headers) {
+          LOG.debug(header.getName() + " : " + header.getValue());
+        }
+      }
+      hadoopAuthCookie = getHadoopAuthCookieValue(headers);
+      if (hadoopAuthCookie == null) {
+        int statusCode = httpResponse.getStatusLine().getStatusCode();
+        HttpEntity entity = httpResponse.getEntity();
+        String responseBody = entity != null ? EntityUtils.toString(entity) : null;
+        LOG.error("SPNego authentication failed with statusCode = " + statusCode + ", responseBody = " + responseBody + ", can not get hadoop.auth cookie for URL: " + endpoint);
+        return null;
+      }
+    } finally {
+      if (httpResponse != null) {
+        HttpEntity entity = httpResponse.getEntity();
+        if (entity != null) {
+          entity.getContent().close();
+        }
+      }
+
+    }
+
+    hadoopAuthCookie = HADOOP_AUTH_EQ + quote(hadoopAuthCookie);
+    setAppCookie(endpoint, hadoopAuthCookie);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("Successful SPNego authentication to URL:" + uri.toString());
+    }
+    return hadoopAuthCookie;
+  }
+
+
+  /**
+   * Returns the cached app cookie
+   *  @param endpoint the hadoop end point we authenticate to
+   * @return the cached app cookie, can be null
+   */
+  public String getCachedAppCookie(String endpoint) {
+    return endpointCookieMap.get(endpoint);
+  }
+
+  /**
+   *  Sets the cached app cookie cache
+   *  @param endpoint the hadoop end point we authenticate to
+   *  @param appCookie the app cookie
+   */
+  private void setAppCookie(String endpoint, String appCookie) {
+    endpointCookieMap.put(endpoint, appCookie);
+  }
+
+  /**
+   *  Clears the cached app cookie
+   *  @param endpoint the hadoop end point we authenticate to
+   */
+  private void clearAppCookie(String endpoint) {
+    endpointCookieMap.remove(endpoint);
+  }
+
+  static String quote(String s) {
+    return s == null ? s : "\"" + s + "\"";
+  }
+
+  static String getHadoopAuthCookieValue(Header[] headers) {
+    if (headers == null) {
+      return null;
+    }
+    for (Header header : headers) {
+      HeaderElement[] elements = header.getElements();
+      for (HeaderElement element : elements) {
+        String cookieName = element.getName();
+        if (cookieName.equals(HADOOP_AUTH)) {
+          if (element.getValue() != null) {
+            String trimmedVal = element.getValue().trim();
+            if (!trimmedVal.isEmpty()) {
+              return trimmedVal;
+            }
+          }
+        }
+      }
+    }
+    return null;
+  }
+
+
+  private static class EmptyJaasCredentials implements Credentials {
+
+    public String getPassword() {
+      return null;
+    }
+
+    public Principal getUserPrincipal() {
+      return null;
+    }
+
+  }
+
+}
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
similarity index 96%
rename from ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
rename to ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
index 825ac25..84cba0e 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricAggregate.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricAggregate.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
similarity index 95%
rename from ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
rename to ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
index 9c837b6..7ef2c1d 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricClusterAggregate.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricClusterAggregate.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 
 import org.codehaus.jackson.annotate.JsonCreator;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
similarity index 94%
rename from ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
rename to ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
index 340ec75..e190913 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/MetricHostAggregate.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/MetricHostAggregate.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
+package org.apache.hadoop.metrics2.sink.timeline;
 
 
 import org.codehaus.jackson.annotate.JsonCreator;
@@ -54,7 +54,7 @@
     this.numberOfSamples = numberOfSamples;
   }
 
-  public double getAvg() {
+  public double calculateAverage() {
     return sum / numberOfSamples;
   }
 
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
index 31044cc..e87f06e 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/Precision.java
@@ -35,7 +35,7 @@
   }
 
   public static Precision getPrecision(String precision) throws PrecisionFormatException {
-    if (precision == null ) {
+    if (precision == null || precision.isEmpty()) {
       return null;
     }
     try {
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
index 44c9d4a..edace52 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetric.java
@@ -45,7 +45,7 @@
   private String type;
   private String units;
   private TreeMap<Long, Double> metricValues = new TreeMap<Long, Double>();
-  private Map<String, String> metadata = new HashMap<>();
+  private HashMap<String, String> metadata = new HashMap<>();
 
   // default
   public TimelineMetric() {
@@ -151,11 +151,11 @@
   }
 
   @XmlElement(name = "metadata")
-  public Map<String,String> getMetadata () {
+  public HashMap<String,String> getMetadata () {
     return metadata;
   }
 
-  public void setMetadata (Map<String,String> metadata) {
+  public void setMetadata (HashMap<String,String> metadata) {
     this.metadata = metadata;
   }
 
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
new file mode 100644
index 0000000..626ac5f
--- /dev/null
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/TimelineMetricWithAggregatedValues.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+
+@XmlRootElement(name = "TimelineMetricWithAggregatedValues")
+@XmlAccessorType(XmlAccessType.NONE)
+public class TimelineMetricWithAggregatedValues {
+    private TimelineMetric timelineMetric;
+    private MetricHostAggregate metricAggregate;
+
+    public TimelineMetricWithAggregatedValues() {
+    }
+
+    public TimelineMetricWithAggregatedValues(TimelineMetric metric, MetricHostAggregate metricAggregate) {
+        timelineMetric = metric;
+        this.metricAggregate = metricAggregate;
+    }
+
+    @XmlElement
+    public MetricHostAggregate getMetricAggregate() {
+        return metricAggregate;
+    }
+    @XmlElement
+    public TimelineMetric getTimelineMetric() {
+        return timelineMetric;
+    }
+
+    public void setTimelineMetric(TimelineMetric timelineMetric) {
+        this.timelineMetric = timelineMetric;
+    }
+
+    public void setMetricAggregate(MetricHostAggregate metricAggregate) {
+        this.metricAggregate = metricAggregate;
+    }
+
+    @Override
+    public String toString() {
+        return "TimelineMetricWithAggregatedValues{" +
+                "timelineMetric=" + timelineMetric +
+                ", metricAggregate=" + metricAggregate +
+                '}';
+    }
+}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java
new file mode 100644
index 0000000..8355288
--- /dev/null
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AppCookieManagerTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import org.apache.http.Header;
+import org.apache.http.message.BasicHeader;
+import org.junit.Test;
+
+public class AppCookieManagerTest {
+
+  @Test
+  public void getCachedAppCookie() {
+    assertNull(new AppCookieManager().getCachedAppCookie("http://dummy"));
+  }
+
+  @Test
+  public void getHadoopAuthCookieValueWithNullHeaders() {
+    assertNull(AppCookieManager.getHadoopAuthCookieValue(null));
+  }
+
+  @Test
+  public void getHadoopAuthCookieValueWitEmptylHeaders() {
+    assertNull(AppCookieManager.getHadoopAuthCookieValue(new Header[0]));
+  }
+
+  @Test
+  public void getHadoopAuthCookieValueWithValidlHeaders() {
+    Header[] headers = new Header[1];
+    headers[0] = new BasicHeader("Set-Cookie", AppCookieManager.HADOOP_AUTH + "=dummyvalue");
+    assertNotNull(AppCookieManager.getHadoopAuthCookieValue(headers));
+  }
+
+}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
index 9b0cdbe..ce2cf79 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
@@ -90,6 +90,16 @@
     }
 
     @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();
       return super.emitMetrics(metrics);
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
index a393a96..f0174d5 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -192,5 +192,15 @@
     protected String getHostname() {
       return "h1";
     }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return true;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
   }
 }
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 32fe32e..3be2162 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -45,7 +45,7 @@
 public class HandleConnectExceptionTest {
   private static final String COLLECTOR_URL = "collector";
   private TestTimelineMetricsSink sink;
-  
+
   @Before
   public void init(){
     sink = new TestTimelineMetricsSink();
@@ -88,6 +88,17 @@
     }
   }
 
+  @Test
+  public void testEmitMetricsWithNullHost() {
+    TestTimelineMetricsSinkWithNullHost sinkWithNullHost = new TestTimelineMetricsSinkWithNullHost();
+
+    boolean success = sinkWithNullHost.emitMetrics(new TimelineMetrics());
+    Assert.assertFalse(success);
+
+    success = sinkWithNullHost.emitMetrics(new TimelineMetrics());
+    Assert.assertTrue(success);
+  }
+
   private class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
     @Override
     protected String getCollectorUri(String host) {
@@ -125,6 +136,16 @@
     }
 
     @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return false;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 61888;
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();
       return super.emitMetrics(metrics);
@@ -136,4 +157,77 @@
     }
 
   }
+
+  private class TestTimelineMetricsSinkWithNullHost extends AbstractTimelineMetricsSink {
+
+    int ctr = 0;
+
+    @Override
+    protected String getCollectorUri(String host) {
+      return COLLECTOR_URL;
+    }
+
+    @Override
+    protected String getCollectorProtocol() {
+      return "http";
+    }
+
+    @Override
+    protected String getCollectorPort() {
+      return "2181";
+    }
+
+    @Override
+    protected int getTimeoutSeconds() {
+      return 10;
+    }
+
+    @Override
+    protected String getZookeeperQuorum() {
+      return "localhost:2181";
+    }
+
+    @Override
+    protected Collection<String> getConfiguredCollectorHosts() {
+      return Arrays.asList("localhost");
+    }
+
+    @Override
+    protected String getHostname() {
+      return "h1";
+    }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+      return false;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+      return 0;
+    }
+
+    @Override
+    public boolean emitMetrics(TimelineMetrics metrics) {
+      super.init();
+      return super.emitMetrics(metrics);
+    }
+
+    @Override
+    protected synchronized String findPreferredCollectHost() {
+      if (ctr == 0) {
+        ctr++;
+        return null;
+      } else {
+        return "localhost";
+      }
+    }
+
+    @Override
+    protected boolean emitMetricsJson(String connectUrl, String jsonData) {
+      return true;
+    }
+
+  }
+
 }
diff --git a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 3fdf3f4..6277907 100644
--- a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -61,6 +61,11 @@
   private final static String COUNTER_METRICS_PROPERTY = "counters";
   private final Set<String> counterMetrics = new HashSet<String>();
   private int timeoutSeconds = 10;
+  private boolean setInstanceId;
+  private String instanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
+
 
   @Override
   public void start() {
@@ -106,6 +111,11 @@
     zookeeperQuorum = configuration.getProperty("zookeeper.quorum");
     protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
+    setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+    instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
+
+    hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
     // Initialize the collector write strategy
     super.init();
 
@@ -158,6 +168,16 @@
     return hostname;
   }
 
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }
@@ -227,7 +247,11 @@
       TimelineMetric timelineMetric = new TimelineMetric();
       timelineMetric.setMetricName(attributeName);
       timelineMetric.setHostName(hostname);
-      timelineMetric.setInstanceId(component);
+      if (setInstanceId) {
+        timelineMetric.setInstanceId(instanceId + component);
+      } else {
+        timelineMetric.setInstanceId(component);
+      }
       timelineMetric.setAppId("FLUME_HANDLER");
       timelineMetric.setStartTime(currentTimeMillis);
       timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
diff --git a/ambari-metrics-grafana/ambari-metrics/datasource.js b/ambari-metrics-grafana/ambari-metrics/datasource.js
index 97de6e7..07e76af 100644
--- a/ambari-metrics-grafana/ambari-metrics/datasource.js
+++ b/ambari-metrics-grafana/ambari-metrics/datasource.js
@@ -84,7 +84,6 @@
          * AMS Datasource  Query
          */
         AmbariMetricsDatasource.prototype.query = function (options) {
-
           var emptyData = function (metric) {
             var legend = metric.alias ? metric.alias : metric.metric;
             return {
@@ -217,16 +216,21 @@
           var getHostAppIdData = function(target) {
             var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision=' 
             + target.precision;
+            var instanceId = typeof target.cluster == 'undefined'  ? '' : '&instanceId=' + target.cluster;
             var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
             var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
             return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.metric + metricTransform +
-                metricAggregator + "&hostname=" + target.hosts + '&appId=' + target.app + '&startTime=' + from +
-                '&endTime=' + to + precision + seriesAggregator }).then(
-                getMetricsData(target)
+            metricAggregator + "&hostname=" + target.hosts + '&appId=' + target.app + instanceId + '&startTime=' + from +
+            '&endTime=' + to + precision + seriesAggregator }).then(
+              getMetricsData(target)
             );
           };
           //Check if it's a templated dashboard.
+          var templatedClusters = templateSrv.variables.filter(function(o) { return o.name === "cluster"});
+          var templatedCluster = (_.isEmpty(templatedClusters)) ? '' : templatedClusters[0].options.filter(function(cluster)
+            { return cluster.selected; }).map(function(clusterName) { return clusterName.value; });
+
           var templatedHosts = templateSrv.variables.filter(function(o) { return o.name === "hosts"});
           var templatedHost = (_.isEmpty(templatedHosts)) ? '' : templatedHosts[0].options.filter(function(host)
             { return host.selected; }).map(function(hostName) { return hostName.value; });
@@ -236,20 +240,23 @@
           var tComponent = _.isEmpty(tComponents) ? '' : tComponents[0].current.value;
 
           var getServiceAppIdData = function(target) {
+            var tCluster = (_.isEmpty(templateSrv.variables))? templatedCluster : '';
+            var instanceId = typeof tCluster == 'undefined'  ? '' : '&instanceId=' + tCluster;
             var tHost = (_.isEmpty(templateSrv.variables)) ? templatedHost : target.templatedHost;
-            var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision=' 
+            var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
             + target.precision;
             var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
             var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
             return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.metric + metricTransform
-              + metricAggregator + '&hostname=' + tHost + '&appId=' + target.app + '&startTime=' + from +
+              + metricAggregator + '&hostname=' + tHost + '&appId=' + target.app + instanceId + '&startTime=' + from +
               '&endTime=' + to + precision + seriesAggregator }).then(
               getMetricsData(target)
             );
           };
           // To speed up querying on templatized dashboards.
           var getAllHostData = function(target) {
+            var instanceId = typeof target.templatedCluster == 'undefined'  ? '' : '&instanceId=' + target.templatedCluster;
             var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
             + target.precision;
             var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
@@ -265,28 +272,30 @@
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
             var templatedComponent = (_.isEmpty(tComponent)) ? target.app : tComponent;
             return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.metric + metricTransform
-              + metricAggregator + '&hostname=' + target.templatedHost + '&appId=' + templatedComponent + '&startTime=' + from +
-              '&endTime=' + to + precision + topN + seriesAggregator }).then(
+              + metricAggregator + '&hostname=' + target.templatedHost + '&appId=' + templatedComponent + instanceId
+              + '&startTime=' + from + '&endTime=' + to + precision + topN + seriesAggregator }).then(
               allHostMetricsData(target)
             );
           };
           var getYarnAppIdData = function(target) {
-            var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision=' 
+            var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
             + target.precision;
+            var instanceId = typeof target.templatedCluster == 'undefined'  ? '' : '&instanceId=' + target.templatedCluster;
             var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
             var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
             return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.queue + metricTransform
-              + metricAggregator + '&appId=resourcemanager&startTime=' + from +
+              + metricAggregator + '&appId=resourcemanager' + instanceId + '&startTime=' + from +
               '&endTime=' + to + precision + seriesAggregator }).then(
               getMetricsData(target)
             );
           };
           var getHbaseAppIdData = function(target) {
-            var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
+              var instanceId = typeof target.templatedCluster == 'undefined'  ? '' : '&instanceId=' + target.templatedCluster;
+              var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
             + target.precision;
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
-            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.hbMetric + '&appId=hbase&startTime='
+            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.hbMetric + instanceId + '&appId=hbase&startTime='
             + from + '&endTime=' + to + precision + seriesAggregator }).then(
               allHostMetricsData(target)
             );
@@ -295,22 +304,25 @@
           var getKafkaAppIdData = function(target) {
             var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
             + target.precision;
+            var instanceId = typeof target.templatedCluster == 'undefined'  ? '' : '&instanceId=' + target.templatedCluster;
             var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
             var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
-            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.kbMetric + metricTransform
+            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.kbMetric + metricTransform + instanceId
               + metricAggregator + '&appId=kafka_broker&startTime=' + from +
               '&endTime=' + to + precision + seriesAggregator }).then(
               getMetricsData(target)
             );
           };
           var getNnAppIdData = function(target) {
+
             var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
             + target.precision;
+            var instanceId = typeof target.templatedCluster == 'undefined'  ? '' : '&instanceId=' + target.templatedCluster;
             var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
             var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
-            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.nnMetric + metricTransform
+            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.nnMetric + metricTransform + instanceId
             + metricAggregator + '&appId=namenode&startTime=' + from + '&endTime=' + to + precision + seriesAggregator }).then(
               allHostMetricsData(target)
             );
@@ -318,12 +330,13 @@
 
           // Storm Topology calls.
           var getStormData = function(target) {
+            var instanceId = typeof target.templatedCluster == 'undefined'  ? '' : '&instanceId=' + target.templatedCluster;
             var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
             + target.precision;
             var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
             var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
-            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.sTopoMetric + metricTransform
+            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.sTopoMetric + metricTransform + instanceId
                 + metricAggregator + '&appId=nimbus&startTime=' + from + '&endTime=' + to + precision + seriesAggregator }).then(
                 allHostMetricsData(target)
             );
@@ -331,12 +344,13 @@
 
           // Druid calls.
           var getDruidData = function(target) {
+            var instanceId = typeof target.templatedCluster == 'undefined'  ? '' : '&instanceId=' + target.templatedCluster;
             var precision = target.precision === 'default' || typeof target.precision == 'undefined'  ? '' : '&precision='
             + target.precision;
             var metricAggregator = target.aggregator === "none" ? '' : '._' + target.aggregator;
             var metricTransform = !target.transform || target.transform === "none" ? '' : '._' + target.transform;
             var seriesAggregator = !target.seriesAggregator || target.seriesAggregator === "none" ? '' : '&seriesAggregateFunction=' + target.seriesAggregator;
-            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.sDataSourceMetric + metricTransform
+            return self.doAmbariRequest({ url: '/ws/v1/timeline/metrics?metricNames=' + target.sDataSourceMetric + metricTransform + instanceId
                           + metricAggregator + '&appId=druid&startTime=' + from + '&endTime=' + to + precision + seriesAggregator }).then(
                           allHostMetricsData(target)
             );
@@ -471,7 +485,6 @@
                 target.sTopology = selectedTopology;
                 target.sComponent = selectedComponent;
                 target.sTopoMetric = target.metric.replace('*', target.sTopology).replace('*', target.sComponent);
-                debugger;
                   return getStormData(target);
               }));
             }
@@ -507,14 +520,20 @@
                 }));
               });
             }
-
             // To speed up querying on templatized dashboards.
-            if (templateSrv.variables[1] && templateSrv.variables[1].name === "hosts") {
+              var indexOfHosts = -1;
+              for (var i = 0; i < templateSrv.variables.length; i++) {
+                  if (templateSrv.variables[i].name == 'hosts') {
+                      indexOfHosts = i;
+                  }
+              }
+              if (indexOfHosts >= 0) {
               var allHosts = templateSrv._values.hosts.lastIndexOf('}') > 0 ? templateSrv._values.hosts.slice(1,-1) :
               templateSrv._values.hosts;
               allHosts = templateSrv._texts.hosts === "All" ? '%' : allHosts;
               metricsPromises.push(_.map(options.targets, function(target) {
-                  target.templatedHost = allHosts;
+                  target.templatedHost = allHosts? allHosts : '';
+                  target.templatedCluster = templatedCluster;
                   return getAllHostData(target);
               }));
             }
@@ -558,14 +577,19 @@
         AmbariMetricsDatasource.prototype.metricFindQuery = function (query) {
           var interpolated;
           try {
-            interpolated = templateSrv.replace(query);
+            interpolated = query.split('.')[0];
           } catch (err) {
             return $q.reject(err);
           }
+          var templatedClusters = templateSrv.variables.filter(function(o) { return o.name === "cluster"});
+          var templatedCluster = (_.isEmpty(templatedClusters)) ? '' : templatedClusters[0].options.filter(function(cluster)
+          { return cluster.selected; }).map(function(clusterName) { return clusterName.value; });
+
           var tComponents = _.isEmpty(templateSrv.variables) ? '' : templateSrv.variables.filter(function(variable) 
             { return variable.name === "components"});
           var tComponent = _.isEmpty(tComponents) ? '' : tComponents[0].current.value;
 
+
           // Templated Variable for HBase Users
           // It will search the cluster and populate the HBase Users.
           if(interpolated === "hbase-users") {
@@ -837,58 +861,11 @@
               });
           }
 
-          // Templated Variable that will populate all hosts on the cluster.
-          // The variable needs to be set to "hosts".
-          if (!tComponent){
-                  return this.doAmbariRequest({
-                        method: 'GET',
-                        url: '/ws/v1/timeline/metrics/' + interpolated
-                      })
-                      .then(function (results) {
-                        //Remove fakehostname from the list of hosts on the cluster.
-                        var fake = "fakehostname"; delete results.data[fake];
-                        return _.map(_.keys(results.data), function (hostName) {
-                          return {
-                                text: hostName,
-                                expandable: hostName.expandable ? true : false
-                              };
-                        });
-                      });
-          } else {
-            // Create a dropdown in templated dashboards for single components.
-            // This will check for the component set and show hosts only for the
-            // selected component.
-            return this.doAmbariRequest({
-                method: 'GET',
-                url: '/ws/v1/timeline/metrics/hosts'
-              })
-              .then(function(results) {
-                var compToHostMap = {};
-                //Remove fakehostname from the list of hosts on the cluster.
-                var fake = "fakehostname";
-                delete results.data[fake];
-                //Query hosts based on component name
-                _.forEach(results.data, function(comp, hostName) {
-                  comp.forEach(function(component) {
-                    if (!compToHostMap[component]) {
-                      compToHostMap[component] = [];
-                    }
-                    compToHostMap[component].push(hostName);
-                  });
-                });
-                var compHosts = compToHostMap[tComponent];
-                compHosts = _.map(compHosts, function(host) {
-                  return {
-                    text: host,
-                    expandable: host.expandable ? true : false
-                  };
-                });
-                compHosts = _.sortBy(compHosts, function(i) {
-                  return i.text.toLowerCase();
-                });
-                return $q.when(compHosts);
-              });
-           }
+          if (interpolated == 'hosts') {
+            return this.suggestHosts(tComponent, templatedCluster);
+          } else if (interpolated == 'cluster') {
+            return this.suggestClusters(tComponent)
+          }
         };
 
         /**
@@ -941,34 +918,47 @@
           return $q.when(keys);
         };
 
+        AmbariMetricsDatasource.prototype.suggestClusters = function(app) {
+          if (!app) { app = ''; }
+          return this.doAmbariRequest({
+            method: 'GET',
+            url: '/ws/v1/timeline/metrics/instances?' + 'appId=' + app
+          }).then(function(response) {
+              var clusters = [];
+              var data = response.data;
+              for (var cluster in data) {
+                if (data[cluster].hasOwnProperty(app)) {
+                  clusters.push({text: cluster});
+                }
+              }
+              return $q.when(clusters);
+          });
+        };
+
         /**
          * AMS Datasource - Suggest Hosts.
          *
          * Query Hosts on the cluster.
          */
-        AmbariMetricsDatasource.prototype.suggestHosts = function (query, app) {
-          console.log(query);
-          return this.doAmbariRequest({method: 'GET', url: '/ws/v1/timeline/metrics/hosts'})
-            .then(function (results) {
-              var compToHostMap = {};
-              //Remove fakehostname from the list of hosts on the cluster.
-              var fake = "fakehostname"; delete results.data[fake];
-              //Query hosts based on component name
-              _.forEach(results.data, function (comp, hostName) {
-                comp.forEach(function (component) {
-                  if (!compToHostMap[component]){
-                    compToHostMap[component] = [];
-                  }
-                  compToHostMap[component].push(hostName);
-                });
-              });
-              var compHosts = compToHostMap[app];
-              compHosts = _.map(compHosts, function (h) {
-                return {text: h};
-              });
-              compHosts = _.sortBy(compHosts, function (i) { return i.text.toLowerCase(); });
-              return $q.when(compHosts);
-            });
+        AmbariMetricsDatasource.prototype.suggestHosts = function (app, cluster) {
+          if (!app) { app = ''; }
+          if (!cluster) { cluster = ''; }
+          return this.doAmbariRequest({
+            method: 'GET',
+            url: '/ws/v1/timeline/metrics/instances?' + 'appId=' + app + '&instanceId=' + cluster
+          }).then(function (response) {
+            var hosts = [];
+            var data = response.data;
+            for (var cluster in data) {
+              var appHosts = data[cluster][app];
+              if (appHosts) {
+                for (var index in appHosts) {
+                  hosts.push({text: appHosts[index]});
+                }
+              }
+            }
+            return $q.when(hosts);
+          });
         };
 
         /**
diff --git a/ambari-metrics-grafana/ambari-metrics/partials/query.editor.html b/ambari-metrics-grafana/ambari-metrics/partials/query.editor.html
index 3f322c1..7e78cc0 100644
--- a/ambari-metrics-grafana/ambari-metrics/partials/query.editor.html
+++ b/ambari-metrics-grafana/ambari-metrics/partials/query.editor.html
@@ -81,6 +81,18 @@
             </a>
         </li>
 
+        <li class="tight-form-item" style="width: 86px" ng-hide="dashboard.templating.list.length > 0">
+            Cluster
+        </li>
+        <li ng-hide="dashboard.templating.list.length > 0">
+            <input type="text" class="input-large tight-form-input" ng-model="target.cluster"
+                   spellcheck='false' bs-typeahead="suggestClusters" placeholder="cluster name" data-min-length=0 data-items=100
+                   ng-blur="targetBlur()">
+            </input>
+            <a bs-tooltip="target.errors.metric" style="color: rgb(229, 189, 28)" ng-show="target.errors.metric">
+                <i class="fa fa-warning"></i>
+            </a>
+        </li>
 
         <li class="tight-form-item" style="width: 86px" ng-hide="dashboard.templating.list.length > 0">
             Hosts
@@ -95,8 +107,6 @@
             </a>
         </li>
 
-
-
         <li class="tight-form-item">
             Aggregator
         </li>
diff --git a/ambari-metrics-grafana/ambari-metrics/queryCtrl.js b/ambari-metrics-grafana/ambari-metrics/queryCtrl.js
index a26e7d0..02b5813 100644
--- a/ambari-metrics-grafana/ambari-metrics/queryCtrl.js
+++ b/ambari-metrics-grafana/ambari-metrics/queryCtrl.js
@@ -55,6 +55,7 @@
             if (newValue === '') {
               $scope.target.metric = '';
               $scope.target.hosts = '';
+              $scope.target.cluster = '';
             }
           });
           if (!$scope.target.downsampleAggregator) {
@@ -86,8 +87,14 @@
             .then(callback);
         };
 
+        $scope.suggestClusters = function(query, callback) {
+          $scope.datasource.suggestClusters($scope.target.app)
+            .then($scope.getTextValues)
+            .then(callback);
+        };
+
         $scope.suggestHosts = function(query, callback) {
-          $scope.datasource.suggestHosts(query, $scope.target.app)
+          $scope.datasource.suggestHosts($scope.target.app, $scope.target.cluster)
             .then($scope.getTextValues)
             .then(callback);
         };
diff --git a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index 8e0de03..a290ced 100644
--- a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -52,6 +52,7 @@
   private TimelineMetricsCache metricsCache;
   private String hostName = "UNKNOWN.example.com";
   private String instanceId = null;
+  private boolean setInstanceId;
   private String serviceName = "";
   private Collection<String> collectorHosts;
   private String collectorUri;
@@ -74,6 +75,8 @@
       return t;
     }
   });
+  private int hostInMemoryAggregationPort;
+  private boolean hostInMemoryAggregationEnabled;
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -95,8 +98,8 @@
     }
 
     serviceName = getServiceName(conf);
-    String inst = conf.getString("instanceId", "");
-    instanceId = StringUtils.isEmpty(inst) ? null : inst;
+    instanceId = conf.getString(INSTANCE_ID_PROPERTY, null);
+    setInstanceId = conf.getBoolean(SET_INSTANCE_ID_PROPERTY, false);
 
     LOG.info("Identified hostname = " + hostName + ", serviceName = " + serviceName);
     // Initialize the collector write strategy
@@ -106,7 +109,8 @@
     protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
     collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
     port = conf.getString(COLLECTOR_PORT, "6188");
-
+    hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
+    hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
     if (collectorHosts.isEmpty()) {
       LOG.error("No Metric collector configured.");
     } else {
@@ -248,6 +252,16 @@
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void putMetrics(MetricsRecord record) {
     try {
       String recordName = record.name();
@@ -307,9 +321,10 @@
 
       int sbBaseLen = sb.length();
       List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
-      Map<String, String> metadata = null;
+      HashMap<String, String> metadata = null;
       if (skipAggregation) {
-        metadata = Collections.singletonMap("skipAggregation", "true");
+        metadata = new HashMap<>();
+        metadata.put("skipAggregation", "true");
       }
       long startTime = record.timestamp();
 
@@ -321,7 +336,9 @@
         timelineMetric.setMetricName(name);
         timelineMetric.setHostName(hostName);
         timelineMetric.setAppId(serviceName);
-        timelineMetric.setInstanceId(instanceId);
+        if (setInstanceId) {
+          timelineMetric.setInstanceId(instanceId);
+        }
         timelineMetric.setStartTime(startTime);
         timelineMetric.setType(metric.type() != null ? metric.type().name() : null);
         timelineMetric.getMetricValues().put(startTime, value.doubleValue());
diff --git a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 5777639..30c5c23 100644
--- a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -50,12 +50,15 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.TreeMap;
 
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PORT;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_HOSTS_PROPERTY;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.COLLECTOR_PROTOCOL;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.INSTANCE_ID_PROPERTY;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.SET_INSTANCE_ID_PROPERTY;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
@@ -81,13 +84,17 @@
   }
 
   @Test
-  @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class})
+  @PrepareForTest({URL.class, OutputStream.class, AbstractTimelineMetricsSink.class, HttpURLConnection.class, TimelineMetric.class, HadoopTimelineMetricsSink.class})
   public void testPutMetrics() throws Exception {
     HadoopTimelineMetricsSink sink = new HadoopTimelineMetricsSink();
 
     HttpURLConnection connection = PowerMock.createNiceMock(HttpURLConnection.class);
     URL url = PowerMock.createNiceMock(URL.class);
     InputStream is = IOUtils.toInputStream(gson.toJson(Collections.singletonList("localhost")));
+    TimelineMetric timelineMetric = PowerMock.createNiceMock(TimelineMetric.class);
+    expectNew(TimelineMetric.class).andReturn(timelineMetric).times(2);
+    expect(timelineMetric.getMetricValues()).andReturn(new TreeMap<Long, Double>()).anyTimes();
+    expect(timelineMetric.getMetricName()).andReturn("metricName").anyTimes();
     expectNew(URL.class, anyString()).andReturn(url).anyTimes();
     expect(url.openConnection()).andReturn(connection).anyTimes();
     expect(connection.getInputStream()).andReturn(is).anyTimes();
@@ -106,6 +113,8 @@
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(1000).anyTimes();
+    expect(conf.getBoolean(eq(SET_INSTANCE_ID_PROPERTY), eq(false))).andReturn(true).anyTimes();
+    expect(conf.getString(eq(INSTANCE_ID_PROPERTY), anyString())).andReturn("instanceId").anyTimes();
 
     conf.setListDelimiter(eq(','));
     expectLastCall().anyTimes();
@@ -145,6 +154,9 @@
 
     expect(record.metrics()).andReturn(Arrays.asList(metric)).anyTimes();
 
+    timelineMetric.setInstanceId(eq("instanceId"));
+    EasyMock.expectLastCall();
+
     replay(conf, record, metric);
     replayAll();
 
diff --git a/ambari-metrics-host-aggregator/conf/unix/log4j.properties b/ambari-metrics-host-aggregator/conf/unix/log4j.properties
new file mode 100644
index 0000000..d7ceedd
--- /dev/null
+++ b/ambari-metrics-host-aggregator/conf/unix/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/var/log/ambari-metrics-monitor/ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
+
+
diff --git a/ambari-metrics-host-aggregator/conf/windows/log4j.properties b/ambari-metrics-host-aggregator/conf/windows/log4j.properties
new file mode 100644
index 0000000..d9aabab
--- /dev/null
+++ b/ambari-metrics-host-aggregator/conf/windows/log4j.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Define some default values that can be overridden by system properties
+# Root logger option
+log4j.rootLogger=INFO,file
+
+# Direct log messages to a log file
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=\\var\\log\\ambari-metrics-monitor\\ambari-metrics-aggregator.log
+log4j.appender.file.MaxFileSize=80MB
+log4j.appender.file.MaxBackupIndex=60
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L - %m%n
diff --git a/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics-host-aggregator/pom.xml
new file mode 100644
index 0000000..24432dd
--- /dev/null
+++ b/ambari-metrics-host-aggregator/pom.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>ambari-metrics</artifactId>
+        <groupId>org.apache.ambari</groupId>
+        <version>2.0.0.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>ambari-metrics-host-aggregator</artifactId>
+    <packaging>jar</packaging>
+
+    <name>Ambari Metrics Host Aggregator</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>14.0.1</version>
+        </dependency>
+        <dependency>
+              <groupId>org.apache.ambari</groupId>
+              <artifactId>ambari-metrics-common</artifactId>
+              <version>2.0.0.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.servlet</groupId>
+            <artifactId>servlet-api</artifactId>
+            <version>2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-json</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-server</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.xml.bind</groupId>
+            <artifactId>jaxb-api</artifactId>
+            <version>2.2.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey</groupId>
+            <artifactId>jersey-core</artifactId>
+            <version>1.11</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>2.7.1.2.3.4.0-3347</version>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey.jersey-test-framework</groupId>
+            <artifactId>jersey-test-framework-core</artifactId>
+            <version>1.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.jersey.jersey-test-framework</groupId>
+            <artifactId>jersey-test-framework-grizzly2</artifactId>
+            <version>1.11</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>3.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.2</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>1.6</version>
+                <configuration>
+                    <createDependencyReducedPom>false</createDependencyReducedPom>
+                    <filters>
+                        <filter>
+                            <artifact>*:*</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                </configuration>
+
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
new file mode 100644
index 0000000..1e5cc82
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.sun.jersey.api.container.httpserver.HttpServerFactory;
+import com.sun.jersey.api.core.PackagesResourceConfig;
+import com.sun.jersey.api.core.ResourceConfig;
+import com.sun.net.httpserver.HttpServer;
+
+import javax.ws.rs.core.UriBuilder;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URL;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
+import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
+
+/**
+ * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
+ */
+public class AggregatorApplication
+{
+    private static final int STOP_SECONDS_DELAY = 0;
+    private static final int JOIN_SECONDS_TIMEOUT = 5;
+    private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+    private Log LOG;
+    private final int webApplicationPort;
+    private final int rawPublishingInterval;
+    private final int aggregationInterval;
+    private Configuration configuration;
+    private Thread aggregatePublisherThread;
+    private Thread rawPublisherThread;
+    private TimelineMetricsHolder timelineMetricsHolder;
+    private HttpServer httpServer;
+
+    public AggregatorApplication(String hostname, String collectorHosts) {
+        LOG = LogFactory.getLog(this.getClass());
+        configuration = new Configuration(true);
+        initConfiguration();
+        configuration.set("timeline.metrics.collector.hosts", collectorHosts);
+        configuration.set("timeline.metrics.hostname", hostname);
+        configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration());
+        this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
+        this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
+        this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+        this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
+        try {
+            this.httpServer = createHttpServer();
+        } catch (IOException e) {
+            LOG.error("Exception while starting HTTP server. Exiting", e);
+            System.exit(1);
+        }
+    }
+
+    private String getZkQuorumFromConfiguration() {
+        String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
+        String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", "");
+        return getZkConnectionUrl(zkClientPort, zkServerHosts);
+    }
+
+    protected void initConfiguration() {
+        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+        if (classLoader == null) {
+            classLoader = getClass().getClassLoader();
+        }
+
+        URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
+        LOG.info("Found metric service configuration: " + amsResUrl);
+        if (amsResUrl == null) {
+            throw new IllegalStateException("Unable to initialize the metrics " +
+                    "subsystem. No ams-site present in the classpath.");
+        }
+
+        try {
+            configuration.addResource(amsResUrl.toURI().toURL());
+        } catch (Exception e) {
+            LOG.error("Couldn't init configuration. ", e);
+            System.exit(1);
+        }
+    }
+
+    protected String getHostName() {
+        String hostName = "localhost";
+        try {
+            hostName = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (UnknownHostException e) {
+            LOG.error(e);
+        }
+        return hostName;
+    }
+
+    protected URI getURI() {
+        URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+        LOG.info(String.format("Web server at %s", uri));
+        return uri;
+    }
+
+    protected HttpServer createHttpServer() throws IOException {
+        ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
+        HashMap<String, Object> params = new HashMap();
+        params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
+        resourceConfig.setPropertiesAndFeatures(params);
+        return HttpServerFactory.create(getURI(), resourceConfig);
+    }
+
+    private void startWebServer() {
+        LOG.info("Starting web server.");
+        this.httpServer.start();
+    }
+
+    private void startAggregatePublisherThread() {
+        LOG.info("Starting aggregated metrics publisher.");
+        AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval);
+        aggregatePublisherThread = new Thread(metricPublisher);
+        aggregatePublisherThread.start();
+    }
+
+    private void startRawPublisherThread() {
+        LOG.info("Starting raw metrics publisher.");
+        AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval);
+        rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher);
+        aggregatePublisherThread.start();
+    }
+
+
+
+    private void stop() {
+        LOG.info("Stopping aggregator application");
+        aggregatePublisherThread.interrupt();
+        rawPublisherThread.interrupt();
+        httpServer.stop(STOP_SECONDS_DELAY);
+        LOG.info("Stopped web server.");
+        try {
+            LOG.info("Waiting for threads to join.");
+            aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
+            rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
+            LOG.info("Gracefully stopped Aggregator Application.");
+        } catch (InterruptedException e) {
+            LOG.error("Received exception during stop : ", e);
+
+        }
+
+    }
+
+    private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
+        StringBuilder sb = new StringBuilder();
+        String[] quorumParts = zkQuorum.split(",");
+        String prefix = "";
+        for (String part : quorumParts) {
+            sb.append(prefix);
+            sb.append(part.trim());
+            if (!part.contains(":")) {
+                sb.append(":");
+                sb.append(zkClientPort);
+            }
+            prefix = ",";
+        }
+        return sb.toString();
+    }
+
+    public static void main( String[] args ) throws Exception {
+        if (args.length != 2) {
+            throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " +
+                    "2nd - collector hosts separated with coma");
+        }
+
+        final AggregatorApplication app = new AggregatorApplication(args[0], args[1]);
+
+        app.startWebServerAndPublishersThreads();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            public void run() {
+                app.stop();
+            }
+        });
+    }
+
+    private void startWebServerAndPublishersThreads() {
+        LOG.info("Starting aggregator application");
+        startAggregatePublisherThread();
+        startRawPublisherThread();
+        startWebServer();
+    }
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
new file mode 100644
index 0000000..b151209
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebService.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+
+import com.sun.jersey.spi.resource.Singleton;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+
+@Singleton
+@Path("/ws/v1/timeline")
+public class AggregatorWebService {
+    TimelineMetricsHolder metricsHolder = TimelineMetricsHolder.getInstance();
+
+    @GET
+    @Produces("text/json")
+    @Path("/metrics")
+    public Response getOkResponse() throws IOException {
+        return Response.ok().build();
+    }
+
+    @POST
+    @Produces(MediaType.TEXT_PLAIN)
+    @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+    @Path("/metrics")
+    public Response postMetrics(
+            TimelineMetrics metrics) {
+        metricsHolder.putMetricsForAggregationPublishing(metrics);
+        metricsHolder.putMetricsForRawPublishing(metrics);
+        return Response.ok().build();
+    }
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
new file mode 100644
index 0000000..03b6542
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolder.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * Singleton class with 2 guava caches for raw and aggregated metrics storing
+ */
+public class TimelineMetricsHolder {
+    private static final int DEFAULT_RAW_CACHE_EXPIRE_TIME = 60;
+    private static final int DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME = 300;
+    private Cache<String, TimelineMetrics> aggregationMetricsCache;
+    private Cache<String, TimelineMetrics> rawMetricsCache;
+    private static TimelineMetricsHolder instance = null;
+    //to ensure no metric values are expired
+    private static int EXPIRE_DELAY = 30;
+    ReadWriteLock aggregationCacheLock = new ReentrantReadWriteLock();
+    ReadWriteLock rawCacheLock = new ReentrantReadWriteLock();
+
+    private TimelineMetricsHolder(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+        this.rawMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(rawCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+        this.aggregationMetricsCache = CacheBuilder.newBuilder().expireAfterWrite(aggregationCacheExpireTime + EXPIRE_DELAY, TimeUnit.SECONDS).build();
+    }
+
+    public static TimelineMetricsHolder getInstance(int rawCacheExpireTime, int aggregationCacheExpireTime) {
+        if (instance == null) {
+            instance = new TimelineMetricsHolder(rawCacheExpireTime, aggregationCacheExpireTime);
+        }
+        return instance;
+    }
+
+    /**
+     * Uses default expiration time for caches initialization if they are not initialized yet.
+     * @return
+     */
+    public static TimelineMetricsHolder getInstance() {
+        return getInstance(DEFAULT_RAW_CACHE_EXPIRE_TIME, DEFAULT_AGGREGATION_CACHE_EXPIRE_TIME);
+    }
+
+    public void putMetricsForAggregationPublishing(TimelineMetrics timelineMetrics) {
+        aggregationCacheLock.writeLock().lock();
+        aggregationMetricsCache.put(calculateCacheKey(timelineMetrics), timelineMetrics);
+        aggregationCacheLock.writeLock().unlock();
+    }
+
+    private String calculateCacheKey(TimelineMetrics timelineMetrics) {
+        List<TimelineMetric>  metrics =  timelineMetrics.getMetrics();
+        if (metrics.size() > 0) {
+            return  metrics.get(0).getAppId() + System.currentTimeMillis();
+        }
+        return String.valueOf(System.currentTimeMillis());
+    }
+
+    public Map<String, TimelineMetrics> extractMetricsForAggregationPublishing() {
+        return extractMetricsFromCacheWithLock(aggregationMetricsCache, aggregationCacheLock);
+    }
+
+    public void putMetricsForRawPublishing(TimelineMetrics metrics) {
+        rawCacheLock.writeLock().lock();
+        rawMetricsCache.put(calculateCacheKey(metrics), metrics);
+        rawCacheLock.writeLock().unlock();
+    }
+
+    public Map<String, TimelineMetrics> extractMetricsForRawPublishing() {
+        return extractMetricsFromCacheWithLock(rawMetricsCache, rawCacheLock);
+    }
+
+    /**
+     * Returns values from cache and clears the cache
+     * @param cache
+     * @param lock
+     * @return
+     */
+    private Map<String, TimelineMetrics> extractMetricsFromCacheWithLock(Cache<String, TimelineMetrics> cache, ReadWriteLock lock) {
+        lock.writeLock().lock();
+        Map<String, TimelineMetrics> metricsMap = new TreeMap<>(cache.asMap());
+        cache.invalidateAll();
+        lock.writeLock().unlock();
+        return metricsMap;
+    }
+
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
new file mode 100644
index 0000000..5af115f
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Abstract class that runs a thread that publishes metrics data to AMS collector in specified intervals.
+ */
+public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable {
+
+    private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
+    private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password";
+    private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type";
+    private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
+    private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
+    private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
+    private static final String PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY = "timeline.metrics.zk.quorum";
+    private static final String PUBLISHER_HOSTNAME_PROPERTY = "timeline.metrics.hostname";
+    protected static String BASE_POST_URL = "%s://%s:%s/ws/v1/timeline/metrics";
+    protected int publishIntervalInSeconds;
+    private Log LOG;
+    protected TimelineMetricsHolder timelineMetricsHolder;
+    protected Configuration configuration;
+    private String collectorProtocol;
+    private String collectorPort;
+    private Collection<String> collectorHosts;
+    private String hostname;
+    private String zkQuorum;
+
+    public AbstractMetricPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int publishIntervalInSeconds) {
+        LOG = LogFactory.getLog(this.getClass());
+        this.configuration = configuration;
+        this.publishIntervalInSeconds = publishIntervalInSeconds;
+        this.timelineMetricsHolder = timelineMetricsHolder;
+        configure();
+    }
+
+    protected void configure() {
+        collectorProtocol = configuration.get(AMS_SITE_HTTP_POLICY_PROPERTY, "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
+        collectorPort = configuration.getTrimmed(AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY, "0.0.0.0:6188").split(":")[1];
+        collectorHosts = parseHostsStringIntoCollection(configuration.getTrimmed(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+        zkQuorum = configuration.get(PUBLISHER_ZOOKEEPER_QUORUM_PROPERTY, "");
+        hostname = configuration.get(PUBLISHER_HOSTNAME_PROPERTY, "localhost");
+        collectorHosts = parseHostsStringIntoCollection(configuration.get(PUBLISHER_COLLECTOR_HOSTS_PROPERTY, ""));
+        if (collectorHosts.isEmpty()) {
+            LOG.error("No Metric collector configured.");
+        } else {
+            if (collectorProtocol.contains("https")) {
+                String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim();
+                String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim();
+                String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+                loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
+            }
+        }
+    }
+
+    /**
+     * Publishes metrics to collector in specified intervals while not interrupted.
+     */
+    @Override
+    public void run() {
+        while (!Thread.currentThread().isInterrupted()) {
+            try {
+                Thread.sleep(this.publishIntervalInSeconds * 1000);
+            } catch (InterruptedException e) {
+                //Ignore
+            }
+            try {
+                processAndPublishMetrics(getMetricsFromCache());
+            } catch (Exception e) {
+                //ignore
+            }
+        }
+    }
+
+    /**
+     * Processes and sends metrics to collector.
+     * @param metricsFromCache
+     * @throws Exception
+     */
+    protected void processAndPublishMetrics(Map<String, TimelineMetrics> metricsFromCache) throws Exception {
+        if (metricsFromCache.size()==0) return;
+
+        LOG.info(String.format("Preparing %s timeline metrics for publishing", metricsFromCache.size()));
+        emitMetricsJson(getCollectorUri(getCurrentCollectorHost()), processMetrics(metricsFromCache));
+    }
+
+    /**
+     * Returns metrics map. Source is based on implementation.
+     * @return
+     */
+    protected abstract Map<String,TimelineMetrics> getMetricsFromCache();
+
+    /**
+     * Processes given metrics (aggregates or merges them) and converts them into json string that will be send to collector
+     * @param metricValues
+     * @return
+     */
+    protected abstract String processMetrics(Map<String, TimelineMetrics> metricValues);
+
+    protected abstract String getPostUrl();
+
+    @Override
+    protected String getCollectorUri(String host) {
+        return String.format(getPostUrl(), getCollectorProtocol(), host, getCollectorPort());
+    }
+
+    @Override
+    protected String getCollectorProtocol() {
+        return collectorProtocol;
+    }
+
+    @Override
+    protected String getCollectorPort() {
+        return collectorPort;
+    }
+
+    @Override
+    protected int getTimeoutSeconds() {
+        return DEFAULT_POST_TIMEOUT_SECONDS;
+    }
+
+    @Override
+    protected String getZookeeperQuorum() {
+        return zkQuorum;
+    }
+
+    @Override
+    protected Collection<String> getConfiguredCollectorHosts() {
+        return collectorHosts;
+    }
+
+    @Override
+    protected String getHostname() {
+        return hostname;
+    }
+
+    @Override
+    protected boolean isHostInMemoryAggregationEnabled() {
+        return false;
+    }
+
+    @Override
+    protected int getHostInMemoryAggregationPort() {
+        return 0;
+    }
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
new file mode 100644
index 0000000..c8dffab
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Thread that aggregates and publishes metrics to collector on specified interval.
+ */
+public class AggregatedMetricsPublisher extends AbstractMetricPublisher {
+    private static String AGGREGATED_POST_PREFIX = "/aggregated";
+    private Log LOG;
+
+    public AggregatedMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+        super(timelineMetricsHolder, configuration, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+    /**
+     * get metrics map form @TimelineMetricsHolder
+     * @return
+     */
+    @Override
+    protected Map<String, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForAggregationPublishing();
+    }
+
+    /**
+     * Aggregates given metrics and converts them into json string that will be send to collector
+     * @param metricForAggregationValues
+     * @return
+     */
+    @Override
+    protected String processMetrics(Map<String, TimelineMetrics> metricForAggregationValues) {
+        HashMap<String, TimelineMetrics> nameToMetricMap = new HashMap<>();
+        for (TimelineMetrics timelineMetrics : metricForAggregationValues.values()) {
+            for (TimelineMetric timelineMetric : timelineMetrics.getMetrics()) {
+                if (!nameToMetricMap.containsKey(timelineMetric.getMetricName())) {
+                    nameToMetricMap.put(timelineMetric.getMetricName(), new TimelineMetrics());
+                }
+                nameToMetricMap.get(timelineMetric.getMetricName()).addOrMergeTimelineMetric(timelineMetric);
+            }
+        }
+        Set<TimelineMetricWithAggregatedValues> metricAggregateMap = new HashSet<>();
+        for (TimelineMetrics metrics : nameToMetricMap.values()) {
+            double sum = 0;
+            double max = Integer.MIN_VALUE;
+            double min = Integer.MAX_VALUE;
+            int count = 0;
+            for (TimelineMetric metric : metrics.getMetrics()) {
+                for (Double value : metric.getMetricValues().values()) {
+                    sum+=value;
+                    max = Math.max(max, value);
+                    min = Math.min(min, value);
+                    count++;
+                }
+            }
+            TimelineMetric tmpMetric = new TimelineMetric(metrics.getMetrics().get(0));
+            tmpMetric.setMetricValues(new TreeMap<Long, Double>());
+            metricAggregateMap.add(new TimelineMetricWithAggregatedValues(tmpMetric, new MetricHostAggregate(sum, count, 0d, max, min)));
+        }
+        String json = null;
+        try {
+            json = mapper.writeValueAsString(new AggregationResult(metricAggregateMap, System.currentTimeMillis()));
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+
+        return json;
+    }
+
+    @Override
+    protected String getPostUrl() {
+        return BASE_POST_URL + AGGREGATED_POST_PREFIX;
+    }
+}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
new file mode 100644
index 0000000..89addb7
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+
+import java.util.Map;
+
+public class RawMetricsPublisher extends AbstractMetricPublisher {
+    private final Log LOG;
+
+    public RawMetricsPublisher(TimelineMetricsHolder timelineMetricsHolder, Configuration configuration, int interval) {
+        super(timelineMetricsHolder, configuration, interval);
+        LOG = LogFactory.getLog(this.getClass());
+    }
+
+
+    @Override
+    protected Map<String, TimelineMetrics> getMetricsFromCache() {
+        return timelineMetricsHolder.extractMetricsForRawPublishing();
+    }
+
+    @Override
+    protected String processMetrics(Map<String, TimelineMetrics> metricValues) {
+        //merge everything in one TimelineMetrics object
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        for (TimelineMetrics metrics : metricValues.values()) {
+            for (TimelineMetric timelineMetric : metrics.getMetrics())
+                timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        }
+        //map TimelineMetrics to json string
+        String json = null;
+        try {
+            json = mapper.writeValueAsString(timelineMetrics);
+            LOG.debug(json);
+        } catch (Exception e) {
+            LOG.error("Failed to convert result into json", e);
+        }
+        return json;
+    }
+
+    @Override
+    protected String getPostUrl() {
+        return BASE_POST_URL;
+    }
+}
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
new file mode 100644
index 0000000..ea72d17
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplicationTest.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.easymock.EasyMock.createMockBuilder;
+
+
+public class AggregatorApplicationTest {
+    @Test
+    public void testMainNotEnoughArguments() {
+        try {
+            AggregatorApplication.main(new String[0]);
+            throw new Exception("Should not be thrown");
+        } catch (Exception e) {
+            //expected
+        }
+        try {
+            AggregatorApplication.main(new String[1]);
+            throw new Exception("Should not be thrown");
+        } catch (Exception e) {
+            //expected
+        }
+    }
+
+    @Test
+    public void testGetURI() {
+        AggregatorApplication aggregatorApplicationMock = createMockBuilder(AggregatorApplication.class)
+                .withConstructor("", "")
+                .addMockedMethod("createHttpServer")
+                .addMockedMethod("initConfiguration").createMock();
+
+        URI uri = aggregatorApplicationMock.getURI();
+        Assert.assertEquals("http://" + aggregatorApplicationMock.getHostName() + ":61888/", uri.toString());
+    }
+}
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
new file mode 100644
index 0000000..736fd06
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorWebServiceTest.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.host.aggregator;
+
+
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.test.framework.JerseyTest;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import com.sun.jersey.test.framework.spi.container.TestContainerFactory;
+import com.sun.jersey.test.framework.spi.container.grizzly2.GrizzlyTestContainerFactory;
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.codehaus.jackson.jaxrs.JacksonJaxbJsonProvider;
+import org.junit.Test;
+
+
+import javax.ws.rs.core.MediaType;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+
+public class AggregatorWebServiceTest extends JerseyTest {
+    public AggregatorWebServiceTest() {
+        super(new WebAppDescriptor.Builder(
+                "org.apache.hadoop.metrics2.host.aggregator")
+                .contextPath("jersey-guice-filter")
+                .servletPath("/")
+                .clientConfig(new DefaultClientConfig(JacksonJaxbJsonProvider.class))
+                .build());
+    }
+
+    @Override
+    public TestContainerFactory getTestContainerFactory() {
+        return new GrizzlyTestContainerFactory();
+    }
+
+    @Test
+    public void testOkResponse() {
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+                .accept("text/json")
+                .get(ClientResponse.class);
+        assertEquals(200, response.getStatus());
+        assertEquals("text/json", response.getType().toString());
+    }
+
+    @Test
+    public void testWrongPath() {
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics").path("aggregated")
+                .accept("text/json")
+                .get(ClientResponse.class);
+        assertEquals(404, response.getStatus());
+    }
+
+
+    @Test
+    public void testMetricsPost() {
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TimelineMetrics timelineMetrics = TimelineMetricsHolderTest.getTimelineMetricsWithAppID("appid");
+        WebResource r = resource();
+        ClientResponse response = r.path("ws").path("v1").path("timeline").path("metrics")
+                .accept(MediaType.TEXT_PLAIN)
+                .post(ClientResponse.class, timelineMetrics);
+        assertEquals(200, response.getStatus());
+        assertEquals(MediaType.TEXT_PLAIN, response.getType().toString());
+
+        Map<String, TimelineMetrics> aggregationMap =  timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        Map<String, TimelineMetrics> rawMap =  timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        Assert.assertEquals(1, aggregationMap.size());
+        Assert.assertEquals(1, rawMap.size());
+
+        Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+        Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+        Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+        Collection<String> rawCollectionKeys = rawMap.keySet();
+
+        for (String key : aggregationCollectionKeys) {
+            Assert.assertTrue(key.contains("appid"));
+        }
+
+        for (String key : rawCollectionKeys) {
+            Assert.assertTrue(key.contains("appid"));
+        }
+
+        Assert.assertEquals(1, aggregationCollection.size());
+        Assert.assertEquals(1, rawCollection.size());
+
+        TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+        TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+        Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+        Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+        Assert.assertEquals("appid", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+        Assert.assertEquals("appid", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+        aggregationMap =  timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        rawMap =  timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        //Cache should be empty after extraction
+        Assert.assertEquals(0, aggregationMap.size());
+        Assert.assertEquals(0, rawMap.size());
+    }
+
+
+}
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
new file mode 100644
index 0000000..7d8ebf4
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/host/aggregator/TimelineMetricsHolderTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.host.aggregator;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collection;
+import java.util.Map;
+
+
+public class TimelineMetricsHolderTest {
+    private TimelineMetricsHolder timelineMetricsHolderInstance;
+
+    public void clearHolderSingleton() throws NoSuchFieldException, IllegalAccessException {
+        Class timelineMetricHolderClass = TimelineMetricsHolder.class;
+        Field field = timelineMetricHolderClass.getDeclaredField("instance");
+        field.setAccessible(true);
+        field.set(field, null);
+    }
+
+    @Test
+    public void testGetInstanceDefaultValues() throws Exception {
+        clearHolderSingleton();
+        Assert.assertNotNull(TimelineMetricsHolder.getInstance());
+    }
+
+    @Test
+    public void testGetInstanceWithParameters() throws Exception {
+        clearHolderSingleton();
+        Assert.assertNotNull(TimelineMetricsHolder.getInstance(1,2));
+    }
+
+    @Test
+    public void testCache() throws Exception {
+        clearHolderSingleton();
+        timelineMetricsHolderInstance = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolderInstance.putMetricsForAggregationPublishing(getTimelineMetricsWithAppID("aggr"));
+        timelineMetricsHolderInstance.putMetricsForRawPublishing(getTimelineMetricsWithAppID("raw"));
+
+        Map<String, TimelineMetrics> aggregationMap =  timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+        Map<String, TimelineMetrics> rawMap =  timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+        Assert.assertEquals(1, aggregationMap.size());
+        Assert.assertEquals(1, rawMap.size());
+
+        Collection<TimelineMetrics> aggregationCollection = aggregationMap.values();
+        Collection<TimelineMetrics> rawCollection = rawMap.values();
+
+        Collection<String> aggregationCollectionKeys = aggregationMap.keySet();
+        Collection<String> rawCollectionKeys = rawMap.keySet();
+
+        for (String key : aggregationCollectionKeys) {
+            Assert.assertTrue(key.contains("aggr"));
+        }
+
+        for (String key : rawCollectionKeys) {
+            Assert.assertTrue(key.contains("raw"));
+        }
+
+        Assert.assertEquals(1, aggregationCollection.size());
+        Assert.assertEquals(1, rawCollection.size());
+
+        TimelineMetrics aggregationTimelineMetrics = (TimelineMetrics) aggregationCollection.toArray()[0];
+        TimelineMetrics rawTimelineMetrics = (TimelineMetrics) rawCollection.toArray()[0];
+
+
+        Assert.assertEquals(1, aggregationTimelineMetrics.getMetrics().size());
+        Assert.assertEquals(1, rawTimelineMetrics.getMetrics().size());
+
+        Assert.assertEquals("aggr", aggregationTimelineMetrics.getMetrics().get(0).getAppId());
+        Assert.assertEquals("raw", rawTimelineMetrics.getMetrics().get(0).getAppId());
+
+        aggregationMap =  timelineMetricsHolderInstance.extractMetricsForAggregationPublishing();
+        rawMap =  timelineMetricsHolderInstance.extractMetricsForRawPublishing();
+
+        //Cache should be empty after extraction
+        Assert.assertEquals(0, aggregationMap.size());
+        Assert.assertEquals(0, rawMap.size());
+    }
+
+    public static TimelineMetrics getTimelineMetricsWithAppID(String appId) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setAppId(appId);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}
\ No newline at end of file
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
new file mode 100644
index 0000000..a8ddbee
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisherTest.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.createMockBuilder;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+import static org.easymock.EasyMock.verify;
+
+public class AbstractMetricPublisherTest {
+    @Test
+    public void testProcessAndPublishMetrics() throws Exception {
+        AbstractMetricPublisher publisherMock =
+                createMockBuilder(RawMetricsPublisher.class)
+                        .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 60)
+                        .addMockedMethod("processMetrics")
+                        .addMockedMethod("getCollectorUri")
+                        .addMockedMethod("emitMetricsJson")
+                        .addMockedMethod("getCurrentCollectorHost").createStrictMock();
+
+        TimelineMetricsHolder.getInstance().putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+        expect(publisherMock.getCurrentCollectorHost()).andReturn("collectorhost").once();
+        expect(publisherMock.getCollectorUri(anyString())).andReturn("https://collectorhost:11/metrics").once();
+        expect(publisherMock.processMetrics(anyObject(Map.class))).andReturn("{metrics}").once();
+        expect(publisherMock.emitMetricsJson("https://collectorhost:11/metrics", "{metrics}")).andReturn(true).once();
+
+        replay(publisherMock);
+
+        publisherMock.processAndPublishMetrics(TimelineMetricsHolder.getInstance().extractMetricsForRawPublishing());
+
+        verify(publisherMock);
+    }
+
+    @Test
+    public void testRunAndStop() throws Exception {
+        AbstractMetricPublisher publisherMock = createMockBuilder(RawMetricsPublisher.class)
+                .withConstructor(TimelineMetricsHolder.getInstance(), new Configuration(), 1)
+                .addMockedMethod("processAndPublishMetrics").createStrictMock();
+        publisherMock.processAndPublishMetrics(anyObject(Map.class));
+        expectLastCall().times(1);
+
+
+        Thread t = createMockBuilder(Thread.class)
+                .withConstructor(publisherMock)
+                .addMockedMethod("isInterrupted").createStrictMock();
+        expect(t.isInterrupted()).andReturn(false).once();
+        expect(t.isInterrupted()).andReturn(true).once();
+
+        replay(publisherMock, t);
+
+        t.start();
+
+        Thread.sleep(2222);
+
+        verify(publisherMock, t);
+    }
+}
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
new file mode 100644
index 0000000..3413052
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisherTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class AggregatedMetricsPublisherTest {
+
+    @Test
+    public void testProcessMetrics() throws Exception {
+        Configuration configuration = new Configuration();
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+        metric1App1Metrics.put(1L, 1d);
+        metric1App1Metrics.put(2L, 2d);
+        metric1App1Metrics.put(3L, 3d);
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+        TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+        metric2App2Metrics.put(1L, 4d);
+        metric2App2Metrics.put(2L, 5d);
+        metric2App2Metrics.put(3L, 6d);
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+        TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+        metric3App3Metrics.put(1L, 7d);
+        metric3App3Metrics.put(2L, 8d);
+        metric3App3Metrics.put(3L, 9d);
+
+        timelineMetricsHolder.putMetricsForAggregationPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+        String aggregatedJson = aggregatedMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForAggregationPublishing());
+        String expectedMetric1App1 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":6.0,\"deviation\":0.0,\"max\":3.0,\"min\":1.0,\"numberOfSamples\":3}}";
+        String expectedMetric2App2 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":15.0,\"deviation\":0.0,\"max\":6.0,\"min\":4.0,\"numberOfSamples\":3}}";
+        String expectedMetric3App3 = "{\"timelineMetric\":{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{}},\"metricAggregate\":{\"sum\":24.0,\"deviation\":0.0,\"max\":9.0,\"min\":7.0,\"numberOfSamples\":3}}";
+        Assert.assertNotNull(aggregatedJson);
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric1App1));
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric3App3));
+        Assert.assertTrue(aggregatedJson.contains(expectedMetric2App2));
+    }
+
+    @Test
+    public void testGetPostUrl() {
+        Configuration configuration = new Configuration();
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = aggregatedMetricsPublisher.getPostUrl();
+        String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetCollectorUri() {
+        //default configuration
+        Configuration configuration = new Configuration();
+        AbstractMetricPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = aggregatedMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+        String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //https configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+        aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = aggregatedMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+        expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //custom port configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+        aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = aggregatedMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+        expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics/aggregated";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetMetricsFromCache() throws InterruptedException {
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr1"));
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw"));
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr2"));
+
+        Configuration configuration = new Configuration();
+        AggregatedMetricsPublisher aggregatedMetricsPublisher =
+                new AggregatedMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+        Map<String, TimelineMetrics> metricsFromCache = aggregatedMetricsPublisher.getMetricsFromCache();
+        Assert.assertNotNull(metricsFromCache);
+        Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+        Assert.assertNotNull(actualTimelineMetrics);
+        Assert.assertEquals(2, actualTimelineMetrics.size());
+
+        for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+            List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+            Assert.assertEquals(1, metrics.size());
+            Assert.assertTrue(metrics.get(0).getAppId().contains("aggr"));
+        }
+
+    }
+
+    TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setMetricValues(metricValues);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}
\ No newline at end of file
diff --git a/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
new file mode 100644
index 0000000..60510d2
--- /dev/null
+++ b/ambari-metrics-host-aggregator/src/test/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisherTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.metrics2.sink.timeline;
+
+import junit.framework.Assert;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolder;
+import org.apache.hadoop.metrics2.host.aggregator.TimelineMetricsHolderTest;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+
+public class RawMetricsPublisherTest {
+    @Test
+    public void testProcessMetrics() throws Exception {
+        Configuration configuration = new Configuration();
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance();
+
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        TreeMap<Long, Double> metric1App1Metrics = new TreeMap<>();
+        metric1App1Metrics.put(1L, 1d);
+        metric1App1Metrics.put(2L, 2d);
+        metric1App1Metrics.put(3L, 3d);
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName1", "app1", metric1App1Metrics));
+
+        TreeMap<Long, Double> metric2App2Metrics = new TreeMap<>();
+        metric2App2Metrics.put(1L, 4d);
+        metric2App2Metrics.put(2L, 5d);
+        metric2App2Metrics.put(3L, 6d);
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName2", "app2", metric2App2Metrics));
+
+        TreeMap<Long, Double> metric3App3Metrics = new TreeMap<>();
+        metric3App3Metrics.put(1L, 7d);
+        metric3App3Metrics.put(2L, 8d);
+        metric3App3Metrics.put(3L, 9d);
+
+        timelineMetricsHolder.putMetricsForRawPublishing(getTimelineMetricsForAppId("metricName3", "app3", metric3App3Metrics));
+
+
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 60);
+
+        String rawJson = rawMetricsPublisher.processMetrics(timelineMetricsHolder.extractMetricsForRawPublishing());
+        String expectedResult = "{\"metrics\":[{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName1\",\"appid\":\"app1\",\"starttime\":0,\"metrics\":{\"1\":1.0,\"2\":2.0,\"3\":3.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName2\",\"appid\":\"app2\",\"starttime\":0,\"metrics\":{\"1\":4.0,\"2\":5.0,\"3\":6.0}},{\"timestamp\":0,\"metadata\":{},\"metricname\":\"metricName3\",\"appid\":\"app3\",\"starttime\":0,\"metrics\":{\"1\":7.0,\"2\":8.0,\"3\":9.0}}]}";
+        Assert.assertNotNull(rawJson);
+        Assert.assertEquals(expectedResult, rawJson);
+    }
+
+    @Test
+    public void testGetPostUrl() {
+        Configuration configuration = new Configuration();
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = rawMetricsPublisher.getPostUrl();
+        String expectedURL = "%s://%s:%s/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetCollectorUri() {
+        //default configuration
+        Configuration configuration = new Configuration();
+        AbstractMetricPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        String actualURL = rawMetricsPublisher.getCollectorUri("c6401.ambari.apache.org");
+        String expectedURL = "http://c6401.ambari.apache.org:6188/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //https configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.http.policy", "HTTPS_ONLY");
+        rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = rawMetricsPublisher.getCollectorUri("c6402.ambari.apache.org");
+        expectedURL = "https://c6402.ambari.apache.org:6188/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+
+        //custom port configuration
+        configuration = new Configuration();
+        configuration.set("timeline.metrics.service.webapp.address", "0.0.0.0:8888");
+        rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+        actualURL = rawMetricsPublisher.getCollectorUri("c6403.ambari.apache.org");
+        expectedURL = "http://c6403.ambari.apache.org:8888/ws/v1/timeline/metrics";
+        Assert.assertNotNull(actualURL);
+        Assert.assertEquals(expectedURL, actualURL);
+    }
+
+    @Test
+    public void testGetMetricsFromCache() throws InterruptedException {
+
+        TimelineMetricsHolder timelineMetricsHolder = TimelineMetricsHolder.getInstance(4,4);
+        timelineMetricsHolder.extractMetricsForAggregationPublishing();
+        timelineMetricsHolder.extractMetricsForRawPublishing();
+
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw1"));
+        timelineMetricsHolder.putMetricsForAggregationPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("aggr"));
+        timelineMetricsHolder.putMetricsForRawPublishing(TimelineMetricsHolderTest.getTimelineMetricsWithAppID("raw2"));
+
+        Configuration configuration = new Configuration();
+        RawMetricsPublisher rawMetricsPublisher =
+                new RawMetricsPublisher(TimelineMetricsHolder.getInstance(), configuration, 1);
+
+        Map<String, TimelineMetrics> metricsFromCache = rawMetricsPublisher.getMetricsFromCache();
+        Assert.assertNotNull(metricsFromCache);
+        Collection<TimelineMetrics> actualTimelineMetrics = metricsFromCache.values();
+        Assert.assertNotNull(actualTimelineMetrics);
+        Assert.assertEquals(2, actualTimelineMetrics.size());
+
+        for (TimelineMetrics timelineMetrics : actualTimelineMetrics) {
+            List<TimelineMetric> metrics = timelineMetrics.getMetrics();
+            Assert.assertEquals(1, metrics.size());
+            Assert.assertTrue(metrics.get(0).getAppId().contains("raw"));
+        }
+
+    }
+
+    TimelineMetrics getTimelineMetricsForAppId(String metricName, String appId, TreeMap<Long, Double> metricValues) {
+        TimelineMetric timelineMetric = new TimelineMetric();
+        timelineMetric.setMetricName(metricName);
+        timelineMetric.setAppId(appId);
+        timelineMetric.setMetricValues(metricValues);
+        TimelineMetrics timelineMetrics = new TimelineMetrics();
+        timelineMetrics.addOrMergeTimelineMetric(timelineMetric);
+        return timelineMetrics;
+    }
+}
diff --git a/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor b/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
index 967e133..4980b8e 100644
--- a/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
+++ b/ambari-metrics-host-monitoring/conf/unix/ambari-metrics-monitor
@@ -24,7 +24,7 @@
 PIDFILE=/var/run/ambari-metrics-monitor/ambari-metrics-monitor.pid
 OUTFILE=/var/log/ambari-metrics-monitor/ambari-metrics-monitor.out
 
-STOP_TIMEOUT=5
+STOP_TIMEOUT=10
 
 OK=0
 NOTOK=1
@@ -98,23 +98,26 @@
   exit 1
 fi
 
+# Set log directory path
+if [[ -n "${AMS_MONITOR_LOG_DIR}" ]]; then
+  OUTFILE=${AMS_MONITOR_LOG_DIR}/ambari-metrics-monitor.out
+fi
+
 #TODO decide if rebuild on each start (pretty quickly) to tolerate major node changes (like kernel update)
 #build psutil
 if [ ! "$(ls -A ${RESOURCE_MONITORING_DIR}/psutil/build)" ]; then
   echo "Building psutil..."
   dir=$(pwd)
   cd "${RESOURCE_MONITORING_DIR}/psutil"
-  ${PYTHON} "setup.py" "build"
+  # build psutil and redirect output to log file
+  echo "--------------------------Building psutil--------------------------" >> ${OUTFILE}
+  ${PYTHON} "setup.py" "build" >> ${OUTFILE}
+  echo "----------------------Finished building psutil---------------------" >> ${OUTFILE}
   cd "${dir}"
 else
   echo "psutil build directory is not empty, continuing..."
 fi
 
-# Set log directory path
-if [[ -n "${AMS_MONITOR_LOG_DIR}" ]]; then
-  OUTFILE=${AMS_MONITOR_LOG_DIR}/ambari-metrics-monitor.out
-fi
-
 # Set pid directory path
 if [[ -n "${AMS_MONITOR_PID_DIR}" ]]; then
   PIDFILE=${AMS_MONITOR_PID_DIR}/ambari-metrics-monitor.pid
diff --git a/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini b/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
index 7fe7397..38fff1e 100644
--- a/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
+++ b/ambari-metrics-host-monitoring/conf/unix/metric_monitor.ini
@@ -27,6 +27,8 @@
 
 [emitter]
 send_interval = 60
+kinit_cmd = /usr/bin/kinit -kt /etc/security/keytabs/ams.monitor.keytab amsmon/localhost
+klist_cmd = /usr/bin/klist
 
 [collector]
 collector_sleep_interval = 5
diff --git a/ambari-metrics-host-monitoring/pom.xml b/ambari-metrics-host-monitoring/pom.xml
index d6c1fab..e8f8556 100644
--- a/ambari-metrics-host-monitoring/pom.xml
+++ b/ambari-metrics-host-monitoring/pom.xml
@@ -86,6 +86,7 @@
       <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>exec-maven-plugin</artifactId>
+        <version>1.2.1</version>
         <executions>
           <execution>
             <configuration>
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
new file mode 100644
index 0000000..ba05e9b
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -0,0 +1,112 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import threading
+import subprocess
+import logging
+import urllib2
+
+logger = logging.getLogger()
+class Aggregator(threading.Thread):
+  def __init__(self, config, stop_handler):
+    threading.Thread.__init__(self)
+    self._config = config
+    self._stop_handler = stop_handler
+    self._aggregator_process = None
+    self._sleep_interval = config.get_collector_sleep_interval()
+    self.stopped = False
+
+  def run(self):
+    java_home = self._config.get_java_home()
+    collector_hosts = self._config.get_metrics_collector_hosts_as_string()
+    jvm_agrs = self._config.get_aggregator_jvm_agrs()
+    config_dir = self._config.get_config_dir()
+    class_name = "org.apache.hadoop.metrics2.host.aggregator.AggregatorApplication"
+    ams_log_file = "ambari-metrics-aggregator.log"
+    additional_classpath = ':{0}'.format(config_dir)
+    ams_log_dir = self._config.ams_monitor_log_dir()
+    hostname = self._config.get_hostname_config()
+    logger.info('Starting Aggregator thread.')
+    cmd = "{0}/bin/java {1} -Dams.log.dir={2} -Dams.log.file={3} -cp /var/lib/ambari-metrics-monitor/lib/*{4} {5} {6} {7}"\
+      .format(java_home, jvm_agrs, ams_log_dir, ams_log_file, additional_classpath, class_name, hostname, collector_hosts)
+
+    logger.info("Executing : {0}".format(cmd))
+
+    self._aggregator_process = subprocess.Popen([cmd], stdout = None, stderr = None, shell = True)
+    while not self.stopped:
+      if 0 == self._stop_handler.wait(self._sleep_interval):
+        break
+    pass
+    self.stop()
+
+  def stop(self):
+    self.stopped = True
+    if self._aggregator_process :
+      logger.info('Stopping Aggregator thread.')
+      self._aggregator_process.terminate()
+      self._aggregator_process = None
+
+class AggregatorWatchdog(threading.Thread):
+  SLEEP_TIME = 30
+  CONNECTION_TIMEOUT = 5
+  AMS_AGGREGATOR_METRICS_CHECK_URL = "/ws/v1/timeline/metrics/"
+  def __init__(self, config, stop_handler):
+    threading.Thread.__init__(self)
+    self._config = config
+    self._stop_handler = stop_handler
+    self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+    self._is_ok = threading.Event()
+    self.set_is_ok(True)
+    self.stopped = False
+
+  def run(self):
+    logger.info('Starting Aggregator Watchdog thread.')
+    while not self.stopped:
+      if 0 == self._stop_handler.wait(self.SLEEP_TIME):
+        break
+      try:
+        conn = urllib2.urlopen(self.URL, timeout=self.CONNECTION_TIMEOUT)
+        self.set_is_ok(True)
+      except (KeyboardInterrupt, SystemExit):
+        raise
+      except Exception, e:
+        self.set_is_ok(False)
+        continue
+      if conn.code != 200:
+        self.set_is_ok(False)
+        continue
+      conn.close()
+
+  def is_ok(self):
+    return self._is_ok.is_set()
+
+  def set_is_ok(self, value):
+    if value == False and self.is_ok() != value:
+      logger.warning("Watcher couldn't connect to aggregator.")
+      self._is_ok.clear()
+    else:
+      self._is_ok.set()
+
+
+  def stop(self):
+    logger.info('Stopping watcher thread.')
+    self.stopped = True
+
+
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
index 0052808..34a6787 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/application_metric_map.py
@@ -66,7 +66,7 @@
     del self.app_metric_map[ app_id ]
   pass
 
-  def flatten(self, application_id = None, clear_once_flattened = False):
+  def flatten(self, application_id = None, clear_once_flattened = False, set_instanceid = False, instanceid = None):
     """
     Return flatten dict to caller in json format.
     Json format:
@@ -89,11 +89,14 @@
       for appId, metrics in local_metric_map.iteritems():
         for metricId, metricData in dict(metrics).iteritems():
           # Create a timeline metric object
+          result_instanceid = ""
+          if set_instanceid:
+            result_instanceid = instanceid
           timeline_metric = {
             "hostname" : self.hostname,
             "metricname" : metricId,
             "appid" : "HOST",
-            "instanceid" : "",
+            "instanceid" : result_instanceid,
             "starttime" : self.get_start_time(appId, metricId),
             "metrics" : metricData
           }
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 5686c50..017ad24 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -30,6 +30,8 @@
 # Abstraction for OS-dependent configuration defaults
 #
 class ConfigDefaults(object):
+  def get_config_dir(self):
+    pass
   def get_config_file_path(self):
     pass
   def get_metric_file_path(self):
@@ -40,11 +42,14 @@
 @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY)
 class ConfigDefaultsWindows(ConfigDefaults):
   def __init__(self):
+    self._CONFIG_DIR = "conf"
     self._CONFIG_FILE_PATH = "conf\\metric_monitor.ini"
     self._METRIC_FILE_PATH = "conf\\metric_groups.conf"
     self._METRIC_FILE_PATH = "conf\\ca.pem"
     pass
 
+  def get_config_dir(self):
+    return self._CONFIG_DIR
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
@@ -55,11 +60,13 @@
 @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT)
 class ConfigDefaultsLinux(ConfigDefaults):
   def __init__(self):
+    self._CONFIG_DIR = "/etc/ambari-metrics-monitor/conf/"
     self._CONFIG_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_monitor.ini"
     self._METRIC_FILE_PATH = "/etc/ambari-metrics-monitor/conf/metric_groups.conf"
     self._CA_CERTS_FILE_PATH = "/etc/ambari-metrics-monitor/conf/ca.pem"
     pass
-
+  def get_config_dir(self):
+    return self._CONFIG_DIR
   def get_config_file_path(self):
     return self._CONFIG_FILE_PATH
   def get_metric_file_path(self):
@@ -71,6 +78,7 @@
 
 config = ConfigParser.RawConfigParser()
 
+CONFIG_DIR = configDefaults.get_config_dir()
 CONFIG_FILE_PATH = configDefaults.get_config_file_path()
 METRIC_FILE_PATH = configDefaults.get_metric_file_path()
 CA_CERTS_FILE_PATH = configDefaults.get_ca_certs_file_path()
@@ -107,6 +115,8 @@
 
 [emitter]
 send_interval = 60
+kinit_cmd = /usr/bin/kinit -kt /etc/security/keytabs/ams.monitor.keytab amsmon/localhost
+klist_cmd = /usr/bin/klist
 
 [collector]
 collector_sleep_interval = 5
@@ -191,6 +201,8 @@
         # No hostname script identified in the ambari agent conf
         pass
     pass
+  def get_config_dir(self):
+    return CONFIG_DIR
 
   def getConfig(self):
     return self.config
@@ -208,16 +220,26 @@
   def get_send_interval(self):
     return int(self.get("emitter", "send_interval", 60))
 
+  def get_kinit_cmd(self):
+    return self.get("emitter", "kinit_cmd")
+
+  def get_klist_cmd(self):
+    return self.get("emitter", "klist_cmd")
+
   def get_collector_sleep_interval(self):
     return int(self.get("collector", "collector_sleep_interval", 10))
 
   def get_hostname_config(self):
     return self.get("default", "hostname", None)
 
-  def get_metrics_collector_hosts(self):
+  def get_metrics_collector_hosts_as_list(self):
     hosts = self.get("default", "metrics_servers", "localhost")
     return hosts.split(",")
 
+  def get_metrics_collector_hosts_as_string(self):
+    hosts = self.get("default", "metrics_servers", "localhost")
+    return hosts
+
   def get_failover_strategy(self):
     return self.get("collector", "failover_strategy", ROUND_ROBIN_FAILOVER_STRATEGY)
 
@@ -239,9 +261,32 @@
   def is_server_https_enabled(self):
     return "true" == str(self.get("collector", "https_enabled")).lower()
 
+  def get_java_home(self):
+    return self.get("aggregation", "java_home")
+
+  def is_inmemory_aggregation_enabled(self):
+    return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+
+  def get_inmemory_aggregation_port(self):
+    return self.get("aggregation", "host_in_memory_aggregation_port")
+
+  def get_aggregator_jvm_agrs(self):
+    hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
+    return hosts
+
+  def ams_monitor_log_dir(self):
+    hosts = self.get("aggregation", "ams_monitor_log_dir", "/var/log/ambari-metrics-monitor")
+    return hosts
+
+  def is_set_instanceid(self):
+    return "true" == str(self.get("default", "set.instanceId", 'false')).lower()
+
   def get_server_host(self):
     return self.get("collector", "host")
 
+  def get_instanceid(self):
+    return self.get("default", "instanceid")
+
   def get_server_port(self):
     try:
       return int(self.get("collector", "port"))
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/controller.py b/ambari-metrics-host-monitoring/src/main/python/core/controller.py
index c0feed5..d161269 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/controller.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/controller.py
@@ -27,6 +27,9 @@
 from metric_collector import MetricsCollector
 from emitter import Emitter
 from host_info import HostInfo
+from aggregator import Aggregator
+from aggregator import AggregatorWatchdog
+
 
 logger = logging.getLogger()
 
@@ -50,11 +53,15 @@
     self.initialize_events_cache()
     self.emitter = Emitter(self.config, self.application_metric_map, stop_handler)
     self._t = None
+    self.aggregator = None
+    self.aggregator_watchdog = None
 
   def run(self):
     logger.info('Running Controller thread: %s' % threading.currentThread().getName())
 
     self.start_emitter()
+    if self.config.is_inmemory_aggregation_enabled():
+      self.start_aggregator_with_watchdog()
 
     # Wake every 5 seconds to push events to the queue
     while True:
@@ -62,6 +69,10 @@
         logger.warn('Event Queue full!! Suspending further collections.')
       else:
         self.enqueque_events()
+      # restart aggregator if needed
+      if self.config.is_inmemory_aggregation_enabled() and not self.aggregator_watchdog.is_ok():
+        logger.warning("Aggregator is not available. Restarting aggregator.")
+        self.start_aggregator_with_watchdog()
       pass
       # Wait for the service stop event instead of sleeping blindly
       if 0 == self._stop_handler.wait(self.sleep_interval):
@@ -75,6 +86,12 @@
     # The emitter thread should have stopped by now, just ensure it has shut
     # down properly
     self.emitter.join(5)
+
+    if self.config.is_inmemory_aggregation_enabled():
+      self.aggregator.stop()
+      self.aggregator_watchdog.stop()
+      self.aggregator.join(5)
+      self.aggregator_watchdog.join(5)
     pass
 
   # TODO: Optimize to not use Timer class and use the Queue instead
@@ -115,3 +132,14 @@
 
   def start_emitter(self):
     self.emitter.start()
+
+  # Start aggregator and watcher threads
+  def start_aggregator_with_watchdog(self):
+    if self.aggregator:
+      self.aggregator.stop()
+    if self.aggregator_watchdog:
+      self.aggregator_watchdog.stop()
+    self.aggregator = Aggregator(self.config, self._stop_handler)
+    self.aggregator_watchdog = AggregatorWatchdog(self.config, self._stop_handler)
+    self.aggregator.start()
+    self.aggregator_watchdog.start()
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index ba3f18e..f19434d 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -24,6 +24,7 @@
 from security import CachedHTTPSConnection, CachedHTTPConnection
 from blacklisted_set import BlacklistedSet
 from config_reader import ROUND_ROBIN_FAILOVER_STRATEGY
+from spnego_kerberos_auth import SPNEGOKerberosAuth
 
 logger = logging.getLogger()
 
@@ -31,6 +32,10 @@
   AMS_METRICS_POST_URL = "/ws/v1/timeline/metrics/"
   RETRY_SLEEP_INTERVAL = 5
   MAX_RETRY_COUNT = 3
+  cookie_cached = {}
+  kinit_cmd = None
+  klist_cmd = None
+  spnego_krb_auth = None
   """
   Wake up every send interval seconds and empty the application metric map.
   """
@@ -39,13 +44,25 @@
     logger.debug('Initializing Emitter thread.')
     self.lock = threading.Lock()
     self.send_interval = config.get_send_interval()
+    self.kinit_cmd = config.get_kinit_cmd()
+    if self.kinit_cmd:
+      logger.debug(self.kinit_cmd)
+    self.klist_cmd = config.get_klist_cmd()
     self.hostname = config.get_hostname_config()
     self.hostname_hash = self.compute_hash(self.hostname)
     self._stop_handler = stop_handler
     self.application_metric_map = application_metric_map
     self.collector_port = config.get_server_port()
-    self.all_metrics_collector_hosts = config.get_metrics_collector_hosts()
+    self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list()
     self.is_server_https_enabled = config.is_server_https_enabled()
+    self.set_instanceid = config.is_set_instanceid()
+    self.instanceid = config.get_instanceid()
+    self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled()
+
+    if self.is_inmemory_aggregation_enabled:
+      self.collector_port = config.get_inmemory_aggregation_port()
+      self.all_metrics_collector_hosts = ['localhost']
+      self.is_server_https_enabled = False
 
     if self.is_server_https_enabled:
       self.ca_certs = config.get_ca_certs()
@@ -63,6 +80,7 @@
         self.submit_metrics()
       except Exception, e:
         logger.warn('Unable to emit events. %s' % str(e))
+        self.cookie_cached = {}
       pass
       #Wait for the service stop event instead of sleeping blindly
       if 0 == self._stop_handler.wait(self.send_interval):
@@ -74,7 +92,7 @@
     # This call will acquire lock on the map and clear contents before returning
     # After configured number of retries the data will not be sent to the
     # collector
-    json_data = self.application_metric_map.flatten(None, True)
+    json_data = self.application_metric_map.flatten(None, True, set_instanceid=self.set_instanceid, instanceid=self.instanceid)
     if json_data is None:
       logger.info("Nothing to emit, resume waiting.")
       return
@@ -100,17 +118,50 @@
     headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
     connection = self.get_connection(collector_host)
     logger.debug("message to send: %s" % data)
+
+    try:
+      if self.cookie_cached[connection.host]:
+        headers["Cookie"] = self.cookie_cached[connection.host]
+        logger.debug("Cookie: %s" % self.cookie_cached[connection.host])
+    except Exception, e:
+      self.cookie_cached = {}
+    pass
+
     retry_count = 0
     while retry_count < self.MAX_RETRY_COUNT:
       response = self.get_response_from_submission(connection, data, headers)
-      if response and response.status == 200:
-        return True
-      else:
-        logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
-        retry_count += 1
-        #Wait for the service stop event instead of sleeping blindly
-        if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
+      if response:
+        if response.status == 200:
           return True
+        if response.status == 401:
+          self.cookie_cached = {}
+          auth_header = response.getheader('www-authenticate', None)
+          if auth_header == None:
+              logger.warn('www-authenticate header not found')
+          else:
+            self.spnego_krb_auth = SPNEGOKerberosAuth()
+            if self.spnego_krb_auth.get_negotiate_value(auth_header) == '':
+              response = self.spnego_krb_auth.authenticate_handshake(connection, "POST", self.AMS_METRICS_POST_URL, data, headers, self.kinit_cmd, self.klist_cmd)
+              if response:
+                logger.debug("response from authenticate_client: retcode = {0}, reason = {1}"
+                              .format(response.status, response.reason))
+                logger.debug(str(response.read()))
+                if response.status == 200:
+                  logger.debug("response headers: {0}".format(response.getheaders()))
+                  logger.debug("cookie_cached: %s" % self.cookie_cached)
+                  set_cookie_header = response.getheader('set-cookie', None)
+                  if set_cookie_header and self.spnego_krb_auth:
+                    set_cookie_val = self.spnego_krb_auth.get_hadoop_auth_cookie(set_cookie_header)
+                    logger.debug("set_cookie: %s" % set_cookie_val)
+                    if set_cookie_val:
+                      self.cookie_cached[connection.host] = set_cookie_val
+                  return True
+      #No response or failed
+      logger.warn("Retrying after {0} ...".format(self.RETRY_SLEEP_INTERVAL))
+      retry_count += 1
+      #Wait for the service stop event instead of sleeping blindly
+      if 0 == self._stop_handler.wait(self.RETRY_SLEEP_INTERVAL):
+        return True
     pass
 
     if retry_count >= self.MAX_RETRY_COUNT:
@@ -142,6 +193,7 @@
       return response
     except Exception, e:
       logger.warn('Error sending metrics to server. %s' % str(e))
+      self.cookie_cached = {}
       return None
 
   def get_collector_host_shard(self):
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/krberr.py b/ambari-metrics-host-monitoring/src/main/python/core/krberr.py
new file mode 100644
index 0000000..c5e0163
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/krberr.py
@@ -0,0 +1,42 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+"""
+Python Kerberos GSS APIs used by spnego_kerberos_auth.py.
+It is used as a place holder for kerberos.py which is not available.
+"""
+
+class KrbError(Exception):
+  pass
+
+class GSSError(KrbError):
+  pass
+
+def authGSSClientInit(service):
+  pass
+
+def authGSSClientClean(context):
+  pass
+
+def authGSSClientStep(context, challenge):
+  pass
+
+def authGSSClientResponse(context):
+  pass
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/spnego_kerberos_auth.py b/ambari-metrics-host-monitoring/src/main/python/core/spnego_kerberos_auth.py
new file mode 100644
index 0000000..e49712f
--- /dev/null
+++ b/ambari-metrics-host-monitoring/src/main/python/core/spnego_kerberos_auth.py
@@ -0,0 +1,164 @@
+#!/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import logging
+import httplib
+import os
+
+logger = logging.getLogger()
+try:
+  import kerberos
+except ImportError:
+  import krberr as kerberos
+  logger.warn('import kerberos exception: %s' % str(ImportError))
+pass
+
+class SPNEGOKerberosAuth:
+  def __init__(self):
+    self.krb_context = None
+
+  def authenticate_handshake (self, connection, method, service_url, body, headers, kinit_cmd, klist_cmd):
+    # kinit to ensure ticket valid
+    self.execute_kinit(kinit_cmd, klist_cmd)
+
+    try:
+      # Authenticate the client request
+      response = self.authenticate_client(connection, method, service_url, body, headers)
+
+      # Authenticate the response from the server
+      if response:
+        self.authenticate_server(response)
+      return response
+    finally:
+      # Clean the client context after the handshake
+      self.clean_client_context()
+    pass
+
+  def execute_kinit(self, kinit_cmd, klist_cmd):
+    exit_status = os.system(kinit_cmd)
+    logger.debug("kinit exit_status: {0}".format(exit_status))
+    logger.debug(os.system(klist_cmd))
+    return exit_status
+
+  def authenticate_client(self, connection, method, service_url, body, headers):
+    service = "HTTP@%s" % connection.host.lower()
+    logger.debug("connection: %s", connection)
+    logger.debug("service: %s", service)
+
+    auth_header = self.get_authorization_header(service)
+    logger.debug("Authorization: %s" % auth_header)
+
+    # Send 2nd HTTP request with authorization header
+    headers['Authorization'] = auth_header
+    try:
+      connection.request(method, service_url, body, headers)
+      response = connection.getresponse()
+    except Exception, e:
+      logger.warn('2nd HTTP request exception from server: %s' % str(e))
+      return None
+    pass
+    if response:
+      logger.debug("2nd HTTP response from server: retcode = {0}, reason = {1}"
+                    .format(response.status, response.reason))
+      logger.debug(str(response.read()))
+      logger.debug("response headers: {0}".format(response.getheaders()))
+    return response
+
+  def get_authorization_header(self, service):
+    # Initialize the context object for client-side authentication with a service principal
+    try:
+      result, self.krb_context = kerberos.authGSSClientInit(service)
+      if result == -1:
+        logger.warn('authGSSClientInit result: {0}'.format(result))
+        return None
+    except kerberos.GSSError, e:
+      logger.warn('authGSSClientInit exception: %s' % str(e))
+      return None
+    pass
+
+    # Process the first client-side step with the context
+    try:
+      result = kerberos.authGSSClientStep(self.krb_context, "")
+      if result == -1:
+        logger.warn('authGSSClientStep result for authenticate client: {0}'.format(result))
+        return None
+    except kerberos.GSSError, e:
+      logger.warn('authGSSClientStep exception for authenticate client: %s' % str(e))
+      return None
+    pass
+
+    # Get the client response from the first client-side step
+    try:
+      negotiate_value = kerberos.authGSSClientResponse(self.krb_context)
+      logger.debug("authGSSClientResponse response:{0}".format(negotiate_value))
+    except kerberos.GSSError, e:
+      logger.warn('authGSSClientResponse exception: %s' % str(e))
+      return None
+    pass
+
+    # Build the authorization header
+    return "Negotiate %s" % negotiate_value
+
+  def authenticate_server(self, response):
+    auth_header = response.getheader('www-authenticate', None)
+    negotiate_value = self.get_negotiate_value(auth_header)
+    if negotiate_value == None:
+      logger.warn('www-authenticate header not found')
+
+    # Process the client-side step with the context and the negotiate value from 2nd HTTP response
+    try:
+      result = kerberos.authGSSClientStep(self.krb_context, negotiate_value)
+      if result == -1:
+        logger.warn('authGSSClientStep result for authenticate server: {0}'.format(result))
+    except kerberos.GSSError, e:
+      logger.warn('authGSSClientStep exception for authenticate server: %s' % str(e))
+      result = -1
+    pass
+    return result
+
+  def clean_client_context(self):
+    # Destroy the context for client-side authentication
+    try:
+      result = kerberos.authGSSClientClean(self.krb_context)
+      logger.debug("authGSSClientClean result:{0}".format(result))
+    except kerberos.GSSError, e:
+      logger.warn('authGSSClientClean exception: %s' % str(e))
+      result = -1
+    pass
+    return result
+
+  def get_hadoop_auth_cookie(self, set_cookie_header):
+    if set_cookie_header:
+      for field in set_cookie_header.split(";"):
+        if field.startswith('hadoop.auth='):
+          return field
+      else:
+        return None
+    return None
+
+  def get_negotiate_value(self, auth_header):
+    if auth_header:
+      for field in auth_header.split(","):
+        key, __, value = field.strip().partition(" ")
+        if key.lower() == "negotiate":
+          return value.strip()
+      else:
+        return None
+    return None
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
index bfb6957..7a9fbec 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
@@ -117,7 +117,8 @@
 
   def wait(self, timeout=None):
     # Stop process when stop event received
-    if self.stop_event.wait(timeout):
+    self.stop_event.wait(timeout)
+    if self.stop_event.isSet():
       logger.info("Stop event received")
       return 0
     # Timeout
diff --git a/ambari-metrics-host-monitoring/src/main/python/main.py b/ambari-metrics-host-monitoring/src/main/python/main.py
index d218015..53d27f8 100644
--- a/ambari-metrics-host-monitoring/src/main/python/main.py
+++ b/ambari-metrics-host-monitoring/src/main/python/main.py
@@ -21,7 +21,7 @@
 import logging
 import os
 import sys
-
+import signal
 from ambari_commons.os_utils import remove_file
 
 from core.controller import Controller
@@ -73,6 +73,10 @@
   if scmStatus is not None:
     scmStatus.reportStarted()
 
+  # For some reason this is needed to catch system signals like SIGTERM
+  # TODO fix if possible
+  signal.pause()
+
   #The controller thread finishes when the stop event is signaled
   controller.join()
 
diff --git a/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py b/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
index 4056ae3..8b3eb66 100644
--- a/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
+++ b/ambari-metrics-host-monitoring/src/test/python/core/TestEmitter.py
@@ -27,6 +27,7 @@
 from mock.mock import patch, MagicMock
 from security import CachedHTTPConnection
 from blacklisted_set import BlacklistedSet
+from spnego_kerberos_auth import SPNEGOKerberosAuth
 
 if get_platform() != PLATFORM_WINDOWS:
   os_distro_value = ('Suse','11','Final')
@@ -86,6 +87,29 @@
     self.assertEqual(request_mock.call_count, 3)
     self.assertUrlData(request_mock)
 
+  @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value))
+  @patch.object(CachedHTTPConnection, "create_connection", new = MagicMock())
+  @patch.object(SPNEGOKerberosAuth, "authenticate_handshake")
+  @patch.object(CachedHTTPConnection, "getresponse")
+  @patch.object(CachedHTTPConnection, "request")
+  def test_spnego_negotiation(self, request_mock, getresponse_mock, auth_mock):
+    request_mock.return_value = MagicMock()
+    getresponse_mock.return_value.status = 401
+    getresponse_mock.return_value.getheader.return_value = "Negotiate   "
+
+    auth_mock.return_value.status = 200
+
+    stop_handler = bind_signal_handlers()
+
+    config = Configuration()
+    application_metric_map = ApplicationMetricMap("host","10.10.10.10")
+    application_metric_map.clear()
+    application_metric_map.put_metric("APP1", {"metric1":1}, 1)
+    emitter = Emitter(config, application_metric_map, stop_handler)
+    emitter.submit_metrics()
+
+
+    self.assertEqual(request_mock.call_count, 1)
   def assertUrlData(self, request_mock):
     self.assertEqual(len(request_mock.call_args), 2)
     data = request_mock.call_args[0][2]
diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index 9d492cb..e126016 100644
--- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -70,6 +70,10 @@
   private static final String TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PATH_PROPERTY;
   private static final String TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_TYPE_PROPERTY;
   private static final String TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SSL_KEYSTORE_PASSWORD_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + INSTANCE_ID_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
   private static final String TIMELINE_DEFAULT_HOST = "localhost";
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -87,11 +91,15 @@
   private TimelineMetricsCache metricsCache;
   private int timeoutSeconds = 10;
   private String zookeeperQuorum = null;
+  private boolean setInstanceId;
+  private String instanceId;
 
   private String[] excludedMetricsPrefixes;
   private String[] includedMetricsPrefixes;
   // Local cache to avoid prefix matching everytime
   private Set<String> excludedMetrics = new HashSet<>();
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -128,6 +136,17 @@
     return hostname;
   }
 
+
+  @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
@@ -162,6 +181,11 @@
         collectorHosts = parseHostsStringIntoCollection(props.getString(TIMELINE_HOSTS_PROPERTY, TIMELINE_DEFAULT_HOST));
         metricCollectorProtocol = props.getString(TIMELINE_PROTOCOL_PROPERTY, TIMELINE_DEFAULT_PROTOCOL);
 
+        instanceId = props.getString(TIMELINE_METRICS_KAFKA_INSTANCE_ID_PROPERTY, null);
+        setInstanceId = props.getBoolean(TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY, false);
+
+        hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
+        hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
 
         if (metricCollectorProtocol.contains("https")) {
@@ -315,6 +339,9 @@
       TimelineMetric timelineMetric = new TimelineMetric();
       timelineMetric.setMetricName(attributeName);
       timelineMetric.setHostName(hostname);
+      if (setInstanceId) {
+        timelineMetric.setInstanceId(instanceId);
+      }
       timelineMetric.setAppId(component);
       timelineMetric.setStartTime(currentTimeMillis);
       timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
@@ -379,8 +406,10 @@
       final String sanitizedName = sanitizeName(name);
 
       try {
-        cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value())));
-        populateMetricsList(context, MetricType.GAUGE, sanitizedName);
+        if (!isExcludedMetric(sanitizedName)) {
+          cacheSanitizedTimelineMetric(currentTimeMillis, sanitizedName, "", Double.parseDouble(String.valueOf(gauge.value())));
+          populateMetricsList(context, MetricType.GAUGE, sanitizedName);
+        }
       } catch (NumberFormatException ex) {
         LOG.debug(ex.getMessage());
       }
diff --git a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
index e1ac48c..b05190c 100644
--- a/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
+++ b/ambari-metrics-kafka-sink/src/test/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporterTest.java
@@ -84,6 +84,8 @@
     properties.setProperty("kafka.timeline.metrics.reporter.enabled", "true");
     properties.setProperty("external.kafka.metrics.exclude.prefix", "a.b.c");
     properties.setProperty("external.kafka.metrics.include.prefix", "a.b.c.d");
+    properties.setProperty("kafka.timeline.metrics.instanceId", "cluster");
+    properties.setProperty("kafka.timeline.metrics.set.instanceId", "false");
     props = new VerifiableProperties(properties);
   }
 
@@ -120,6 +122,8 @@
     properties.setProperty("kafka.timeline.metrics.truststore.path", "");
     properties.setProperty("kafka.timeline.metrics.truststore.type", "");
     properties.setProperty("kafka.timeline.metrics.truststore.password", "");
+    properties.setProperty("kafka.timeline.metrics.instanceId", "cluster");
+    properties.setProperty("kafka.timeline.metrics.set.instanceId", "false");
     kafkaTimelineMetricsReporter.init(new VerifiableProperties(properties));
     kafkaTimelineMetricsReporter.stopReporter();
     verifyAll();
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 9a55f10..d408e1a 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -50,9 +50,13 @@
   private Collection<String> collectorHosts;
   private String zkQuorum;
   private String protocol;
+  private boolean setInstanceId;
+  private String instanceId;
   private NimbusClient nimbusClient;
   private String applicationId;
   private int timeoutSeconds;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   public StormTimelineMetricsReporter() {
 
@@ -94,6 +98,16 @@
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void prepare(Map conf) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -126,6 +140,12 @@
           Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
           DEFAULT_POST_TIMEOUT_SECONDS;
       applicationId = cf.get(APP_ID).toString();
+      if (cf.containsKey(SET_INSTANCE_ID_PROPERTY)) {
+        setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
+        instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
+      }
+      hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString());
+      hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString());
 
       collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
       if (protocol.contains("https")) {
@@ -196,6 +216,9 @@
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostname);
+    if (setInstanceId) {
+      timelineMetric.setInstanceId(instanceId);
+    }
     timelineMetric.setAppId(component);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.getMetricValues().put(currentTimeMillis, Double.parseDouble(attributeValue));
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 60c1427..ff72f24 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -59,6 +59,10 @@
   private String port;
   private String topologyName;
   private String applicationId;
+  private boolean setInstanceId;
+  private String instanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -96,6 +100,16 @@
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -122,6 +136,10 @@
     protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
 
+    instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
+    setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
     // Initialize the collector write strategy
     super.init();
 
@@ -243,6 +261,9 @@
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);
+    if (setInstanceId) {
+      timelineMetric.setInstanceId(instanceId);
+    }
     timelineMetric.setAppId(applicationId);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(
diff --git a/ambari-metrics-storm-sink/pom.xml b/ambari-metrics-storm-sink/pom.xml
index 612ad63..9e11539 100644
--- a/ambari-metrics-storm-sink/pom.xml
+++ b/ambari-metrics-storm-sink/pom.xml
@@ -31,7 +31,7 @@
   <packaging>jar</packaging>
 
   <properties>
-    <storm.version>1.1.0-SNAPSHOT</storm.version>
+    <storm.version>1.1.0</storm.version>
   </properties>
 
   <build>
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 535fae0..5b75065 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -46,8 +46,12 @@
   private Collection<String> collectorHosts;
   private String zkQuorum;
   private String protocol;
+  private boolean setInstanceId;
+  private String instanceId;
   private String applicationId;
   private int timeoutSeconds;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   public StormTimelineMetricsReporter() {
 
@@ -89,6 +93,16 @@
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void prepare(Object registrationArgument) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -115,6 +129,11 @@
           Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS)) :
           DEFAULT_POST_TIMEOUT_SECONDS;
       applicationId = configuration.getProperty(CLUSTER_REPORTER_APP_ID, DEFAULT_CLUSTER_REPORTER_APP_ID);
+      setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
+      instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
+
+      hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+      hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
 
       if (protocol.contains("https")) {
         String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
@@ -226,6 +245,9 @@
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostname);
+    if (setInstanceId) {
+      timelineMetric.setInstanceId(instanceId);
+    }
     timelineMetric.setAppId(component);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index f58f549..4d5a229 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -68,6 +68,10 @@
   private String port;
   private String topologyName;
   private String applicationId;
+  private String instanceId;
+  private boolean setInstanceId;
+  private boolean hostInMemoryAggregationEnabled;
+  private int hostInMemoryAggregationPort;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -105,6 +109,16 @@
   }
 
   @Override
+  protected boolean isHostInMemoryAggregationEnabled() {
+    return hostInMemoryAggregationEnabled;
+  }
+
+  @Override
+  protected int getHostInMemoryAggregationPort() {
+    return hostInMemoryAggregationPort;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -133,6 +147,11 @@
 
     protocol = configuration.getProperty(COLLECTOR_PROTOCOL, "http");
     port = configuration.getProperty(COLLECTOR_PORT, "6188");
+    instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
+    setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
+
+    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
 
     // Initialize the collector write strategy
     super.init();
@@ -332,6 +351,9 @@
     TimelineMetric timelineMetric = new TimelineMetric();
     timelineMetric.setMetricName(attributeName);
     timelineMetric.setHostName(hostName);
+    if (setInstanceId) {
+      timelineMetric.setInstanceId(instanceId);
+    }
     timelineMetric.setAppId(applicationId);
     timelineMetric.setStartTime(currentTimeMillis);
     timelineMetric.setType(ClassUtils.getShortCanonicalName(
diff --git a/ambari-metrics-timelineservice/conf/unix/metrics_whitelist b/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
index bd36429..2edac39 100644
--- a/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
+++ b/ambari-metrics-timelineservice/conf/unix/metrics_whitelist
@@ -4,6 +4,15 @@
 BytesReceivedLast5Minutes
 BytesSentLast5Minutes
 ChannelSize
+Counter.%.CacheMisses
+Counter.CacheHits
+Counter.CacheMisses
+Counter.ReadAllQuery
+Counter.ReadAllQuery.%
+Counter.ReadAllQuery.HostRoleCommandEntity
+DataModifyQuery
+DirectReadQuery
+DoesExistQuery
 EventPutSuccessCount
 EventTakeSuccessCount
 FSDatasetState.org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.Capacity
@@ -13,8 +22,21 @@
 FlowFilesReceivedLast5Minutes
 FlowFilesSentLast5Minutes
 Free Slots
+InsertObjectQuery
+ReadAllQuery
+ReadAllQuery.HostRoleCommandEntity
+ReadObjectQuery
 Supervisors
 TimelineMetricStoreWatcher.FakeMetric
+Timer.ObjectBuilding
+Timer.QueryPreparation
+Timer.ReadAllQuery
+Timer.ReadAllQuery.%
+Timer.ReadAllQuery.HostRoleCommandEntity
+Timer.RowFetch
+Timer.SqlGeneration
+Timer.SqlPrepare
+Timer.StatementExecute
 Topologies
 Total Executors
 Total Slots
@@ -79,6 +101,18 @@
 dfs.FSNamesystem.TransactionsSinceLastCheckpoint
 dfs.FSNamesystem.TransactionsSinceLastLogRoll
 dfs.FSNamesystem.UnderReplicatedBlocks
+dfs.NNTopUserOpCounts.windowMs=1500000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=1500000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=1500000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=1500000.op=__%.user=%
+dfs.NNTopUserOpCounts.windowMs=300000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=300000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%
+dfs.NNTopUserOpCounts.windowMs=60000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=60000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=60000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=60000.op=__%.user=%
 dfs.datanode.BlocksRead
 dfs.datanode.BlocksWritten
 dfs.datanode.DatanodeNetworkErrors
@@ -94,6 +128,37 @@
 disk_free
 disk_total
 disk_used
+druid/broker.*.%.query/time
+druid/broker.heap.jvm/mem/max
+druid/broker.heap.jvm/mem/used
+druid/broker.jvm/gc/time
+druid/coordinator.heap.jvm/mem/max
+druid/coordinator.heap.jvm/mem/used
+druid/coordinator.jvm/gc/time
+druid/historical.*.%.query/segment/time
+druid/historical.*.%.query/time
+druid/historical.*.%.query/wait/time
+druid/historical.heap.jvm/mem/max
+druid/historical.heap.jvm/mem/used
+druid/historical.jvm/gc/time
+druid/historical.segment/scan/pending
+druid/middlemanager.*.%.query/segment/time
+druid/middlemanager.*.%.query/time
+druid/middlemanager.*.%.query/wait/time
+druid/middlemanager.*.ingest/events/processed
+druid/middlemanager.*.ingest/events/thrownAway
+druid/middlemanager.*.ingest/events/unparseable
+druid/middlemanager.*.ingest/persists/count
+druid/middlemanager.*.ingest/persists/time
+druid/middlemanager.*.ingest/rows/output
+druid/middlemanager.heap.jvm/mem/max
+druid/middlemanager.heap.jvm/mem/used
+druid/middlemanager.jvm/gc/time
+druid/middlemanager.segment/scan/pending
+druid/overlord.*.segment/added/bytes
+druid/overlord.heap.jvm/mem/max
+druid/overlord.heap.jvm/mem/used
+druid/overlord.jvm/gc/time
 executors.ExecutorMetrics.ExecutorAvailableFreeSlots
 executors.ExecutorMetrics.ExecutorAvailableFreeSlotsPercent
 executors.ExecutorMetrics.ExecutorCacheMemoryPerInstance
@@ -118,14 +183,14 @@
 executors.ExecutorMetrics.ExecutorTotalRejectedRequests
 executors.ExecutorMetrics.ExecutorTotalRequestsHandled
 executors.ExecutorMetrics.ExecutorTotalSuccess
-gc.ConcurrentMarkSweep.count
-gc.ConcurrentMarkSweep.time
-gc.ParNew.count
-gc.ParNew.time
+filter.error.grok
+filter.error.keyvalue
+input.files.count
+input.files.read_bytes
+input.files.read_lines
 io.IOMetrics.MaxDecodingTime
 io.IOMetrics.PercentileDecodingTime_30s50thPercentileLatency
 io.IOMetrics.PercentileDecodingTime_30s90thPercentileLatency
-io.IOMetrics.PercentileDecodingTime_30s95thPercentileLatency
 io.IOMetrics.PercentileDecodingTime_30s99thPercentileLatency
 ipc.client.org.apache.hadoop.ipc.DecayRpcScheduler.Caller(*).Priority
 ipc.client.org.apache.hadoop.ipc.DecayRpcScheduler.Caller(*).Volume
@@ -151,6 +216,10 @@
 jvm.JvmMetrics.ThreadsTerminated
 jvm.JvmMetrics.ThreadsTimedWaiting
 jvm.JvmMetrics.ThreadsWaiting
+jvm.LlapDaemonJVMMetrics.LlapDaemonDirectBufferMemoryUsed
+jvm.LlapDaemonJVMMetrics.LlapDaemonDirectBufferTotalCapacity
+jvm.LlapDaemonJVMMetrics.LlapDaemonMappedBufferMemoryUsed
+jvm.LlapDaemonJVMMetrics.LlapDaemonMappedBufferTotalCapacity
 jvm.Master.JvmMetrics.ThreadsBlocked
 jvm.Master.JvmMetrics.ThreadsNew
 jvm.Master.JvmMetrics.ThreadsRunnable
@@ -177,8 +246,23 @@
 jvm.RegionServer.JvmMetrics.ThreadsWaiting
 jvm.daemon_thread_count
 jvm.file_descriptor_usage
+jvm.gc.ConcurrentMarkSweep.count
+jvm.gc.ConcurrentMarkSweep.time
+jvm.gc.ParNew.count
+jvm.gc.ParNew.time
 jvm.heap_usage
+jvm.memory.heap.committed
+jvm.memory.heap.max
+jvm.memory.heap.used
+jvm.memory.non-heap.committed
+jvm.memory.non-heap.max
+jvm.memory.non-heap.used
 jvm.thread_count
+jvm.threads.blocked.count
+jvm.threads.count
+jvm.threads.daemon.count
+jvm.threads.deadlock.count
+jvm.threads.runnable.count
 jvm.uptime
 kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate
 kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.count
@@ -236,12 +320,8 @@
 mem_shared
 mem_total
 mem_used
-memory.heap.committed
-memory.heap.max
-memory.heap.used
-memory.non-heap.committed
-memory.non-heap.max
-memory.non-heap.used
+output.solr.write_bytes
+output.solr.write_logs
 pkts_in
 pkts_out
 proc_run
@@ -250,6 +330,24 @@
 read_bytes
 read_count
 read_time
+regionserver.IO.FsPReadTime_75th_percentile
+regionserver.IO.FsPReadTime_95th_percentile
+regionserver.IO.FsPReadTime_99th_percentile
+regionserver.IO.FsPReadTime_max
+regionserver.IO.FsPReadTime_mean
+regionserver.IO.FsPReadTime_median
+regionserver.IO.FsReadTime_75th_percentile
+regionserver.IO.FsReadTime_95th_percentile
+regionserver.IO.FsReadTime_99th_percentile
+regionserver.IO.FsReadTime_max
+regionserver.IO.FsReadTime_mean
+regionserver.IO.FsWriteTime_75th_percentile
+regionserver.IO.FsWriteTime_95th_percentile
+regionserver.IO.FsWriteTime_99th_percentile
+regionserver.IO.FsWriteTime_max
+regionserver.IO.FsWriteTime_mean
+regionserver.IO.FsWriteTime_median
+regionserver.IO.fsChecksumFailureCount
 regionserver.RegionServer.ProcessCallTime_75th_percentile
 regionserver.RegionServer.ProcessCallTime_95th_percentile
 regionserver.RegionServer.ProcessCallTime_99th_percentile
@@ -441,13 +539,42 @@
 rpc.rpc.datanode.RpcSlowCalls
 rpcdetailed.rpcdetailed.client.AddBlockAvgTime
 rpcdetailed.rpcdetailed.client.AddBlockNumOps
+solr.admin.info.jvm.memory.used
+solr.admin.info.system.processCpuLoad
+solr.admin.mbeans.cache.documentCache.hitratio
+solr.admin.mbeans.cache.documentCache.size
+solr.admin.mbeans.cache.documentCache.warmupTime
+solr.admin.mbeans.cache.filterCache.hitratio
+solr.admin.mbeans.cache.filterCache.size
+solr.admin.mbeans.cache.filterCache.warmupTime
+solr.admin.mbeans.cache.queryResultCache.hitratio
+solr.admin.mbeans.cache.queryResultCache.size
+solr.admin.mbeans.cache.queryResultCache.warmupTime
+solr.admin.mbeans.queryHandler.browse.avgTimePerRequest
+solr.admin.mbeans.queryHandler.browse.requests
+solr.admin.mbeans.queryHandler.export.avgTimePerRequest
+solr.admin.mbeans.queryHandler.export.requests
+solr.admin.mbeans.queryHandler.get.avgTimePerRequest
+solr.admin.mbeans.queryHandler.get.requests
+solr.admin.mbeans.queryHandler.query.avgTimePerRequest
+solr.admin.mbeans.queryHandler.query.requests
+solr.admin.mbeans.queryHandler.select.15minRateReqsPerSecond
+solr.admin.mbeans.queryHandler.select.5minRateReqsPerSecond
+solr.admin.mbeans.queryHandler.select.75thPcRequestTime
+solr.admin.mbeans.queryHandler.select.95thPcRequestTime
+solr.admin.mbeans.queryHandler.select.999thPcRequestTime
+solr.admin.mbeans.queryHandler.select.99thPcRequestTime
+solr.admin.mbeans.queryHandler.select.avgRequestsPerSecond
+solr.admin.mbeans.queryHandler.select.avgTimePerRequest
+solr.admin.mbeans.queryHandler.select.medianRequestTime
+solr.admin.mbeans.queryHandler.select.requests
+solr.admin.mbeans.updateHandler.adds
+solr.admin.mbeans.updateHandler.deletesById
+solr.admin.mbeans.updateHandler.deletesByQuery
+solr.admin.mbeans.updateHandler.docsPending
+solr.admin.mbeans.updateHandler.errors
 swap_free
 swap_total
-threads.blocked.count
-threads.count
-threads.daemon.count
-threads.deadlock.count
-threads.runnable.count
 topology.*.%.--ack-count.%
 topology.*.%.--complete-latency.%
 topology.*.%.--emit-count.%
diff --git a/ambari-metrics-timelineservice/conf/windows/metrics_whitelist b/ambari-metrics-timelineservice/conf/windows/metrics_whitelist
index bd36429..2edac39 100644
--- a/ambari-metrics-timelineservice/conf/windows/metrics_whitelist
+++ b/ambari-metrics-timelineservice/conf/windows/metrics_whitelist
@@ -4,6 +4,15 @@
 BytesReceivedLast5Minutes
 BytesSentLast5Minutes
 ChannelSize
+Counter.%.CacheMisses
+Counter.CacheHits
+Counter.CacheMisses
+Counter.ReadAllQuery
+Counter.ReadAllQuery.%
+Counter.ReadAllQuery.HostRoleCommandEntity
+DataModifyQuery
+DirectReadQuery
+DoesExistQuery
 EventPutSuccessCount
 EventTakeSuccessCount
 FSDatasetState.org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl.Capacity
@@ -13,8 +22,21 @@
 FlowFilesReceivedLast5Minutes
 FlowFilesSentLast5Minutes
 Free Slots
+InsertObjectQuery
+ReadAllQuery
+ReadAllQuery.HostRoleCommandEntity
+ReadObjectQuery
 Supervisors
 TimelineMetricStoreWatcher.FakeMetric
+Timer.ObjectBuilding
+Timer.QueryPreparation
+Timer.ReadAllQuery
+Timer.ReadAllQuery.%
+Timer.ReadAllQuery.HostRoleCommandEntity
+Timer.RowFetch
+Timer.SqlGeneration
+Timer.SqlPrepare
+Timer.StatementExecute
 Topologies
 Total Executors
 Total Slots
@@ -79,6 +101,18 @@
 dfs.FSNamesystem.TransactionsSinceLastCheckpoint
 dfs.FSNamesystem.TransactionsSinceLastLogRoll
 dfs.FSNamesystem.UnderReplicatedBlocks
+dfs.NNTopUserOpCounts.windowMs=1500000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=1500000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=1500000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=1500000.op=__%.user=%
+dfs.NNTopUserOpCounts.windowMs=300000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=300000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=300000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=300000.op=__%.user=%
+dfs.NNTopUserOpCounts.windowMs=60000.op=%.TotalCount
+dfs.NNTopUserOpCounts.windowMs=60000.op=*.TotalCount
+dfs.NNTopUserOpCounts.windowMs=60000.op=*.user=%.count
+dfs.NNTopUserOpCounts.windowMs=60000.op=__%.user=%
 dfs.datanode.BlocksRead
 dfs.datanode.BlocksWritten
 dfs.datanode.DatanodeNetworkErrors
@@ -94,6 +128,37 @@
 disk_free
 disk_total
 disk_used
+druid/broker.*.%.query/time
+druid/broker.heap.jvm/mem/max
+druid/broker.heap.jvm/mem/used
+druid/broker.jvm/gc/time
+druid/coordinator.heap.jvm/mem/max
+druid/coordinator.heap.jvm/mem/used
+druid/coordinator.jvm/gc/time
+druid/historical.*.%.query/segment/time
+druid/historical.*.%.query/time
+druid/historical.*.%.query/wait/time
+druid/historical.heap.jvm/mem/max
+druid/historical.heap.jvm/mem/used
+druid/historical.jvm/gc/time
+druid/historical.segment/scan/pending
+druid/middlemanager.*.%.query/segment/time
+druid/middlemanager.*.%.query/time
+druid/middlemanager.*.%.query/wait/time
+druid/middlemanager.*.ingest/events/processed
+druid/middlemanager.*.ingest/events/thrownAway
+druid/middlemanager.*.ingest/events/unparseable
+druid/middlemanager.*.ingest/persists/count
+druid/middlemanager.*.ingest/persists/time
+druid/middlemanager.*.ingest/rows/output
+druid/middlemanager.heap.jvm/mem/max
+druid/middlemanager.heap.jvm/mem/used
+druid/middlemanager.jvm/gc/time
+druid/middlemanager.segment/scan/pending
+druid/overlord.*.segment/added/bytes
+druid/overlord.heap.jvm/mem/max
+druid/overlord.heap.jvm/mem/used
+druid/overlord.jvm/gc/time
 executors.ExecutorMetrics.ExecutorAvailableFreeSlots
 executors.ExecutorMetrics.ExecutorAvailableFreeSlotsPercent
 executors.ExecutorMetrics.ExecutorCacheMemoryPerInstance
@@ -118,14 +183,14 @@
 executors.ExecutorMetrics.ExecutorTotalRejectedRequests
 executors.ExecutorMetrics.ExecutorTotalRequestsHandled
 executors.ExecutorMetrics.ExecutorTotalSuccess
-gc.ConcurrentMarkSweep.count
-gc.ConcurrentMarkSweep.time
-gc.ParNew.count
-gc.ParNew.time
+filter.error.grok
+filter.error.keyvalue
+input.files.count
+input.files.read_bytes
+input.files.read_lines
 io.IOMetrics.MaxDecodingTime
 io.IOMetrics.PercentileDecodingTime_30s50thPercentileLatency
 io.IOMetrics.PercentileDecodingTime_30s90thPercentileLatency
-io.IOMetrics.PercentileDecodingTime_30s95thPercentileLatency
 io.IOMetrics.PercentileDecodingTime_30s99thPercentileLatency
 ipc.client.org.apache.hadoop.ipc.DecayRpcScheduler.Caller(*).Priority
 ipc.client.org.apache.hadoop.ipc.DecayRpcScheduler.Caller(*).Volume
@@ -151,6 +216,10 @@
 jvm.JvmMetrics.ThreadsTerminated
 jvm.JvmMetrics.ThreadsTimedWaiting
 jvm.JvmMetrics.ThreadsWaiting
+jvm.LlapDaemonJVMMetrics.LlapDaemonDirectBufferMemoryUsed
+jvm.LlapDaemonJVMMetrics.LlapDaemonDirectBufferTotalCapacity
+jvm.LlapDaemonJVMMetrics.LlapDaemonMappedBufferMemoryUsed
+jvm.LlapDaemonJVMMetrics.LlapDaemonMappedBufferTotalCapacity
 jvm.Master.JvmMetrics.ThreadsBlocked
 jvm.Master.JvmMetrics.ThreadsNew
 jvm.Master.JvmMetrics.ThreadsRunnable
@@ -177,8 +246,23 @@
 jvm.RegionServer.JvmMetrics.ThreadsWaiting
 jvm.daemon_thread_count
 jvm.file_descriptor_usage
+jvm.gc.ConcurrentMarkSweep.count
+jvm.gc.ConcurrentMarkSweep.time
+jvm.gc.ParNew.count
+jvm.gc.ParNew.time
 jvm.heap_usage
+jvm.memory.heap.committed
+jvm.memory.heap.max
+jvm.memory.heap.used
+jvm.memory.non-heap.committed
+jvm.memory.non-heap.max
+jvm.memory.non-heap.used
 jvm.thread_count
+jvm.threads.blocked.count
+jvm.threads.count
+jvm.threads.daemon.count
+jvm.threads.deadlock.count
+jvm.threads.runnable.count
 jvm.uptime
 kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.1MinuteRate
 kafka.controller.ControllerStats.LeaderElectionRateAndTimeMs.count
@@ -236,12 +320,8 @@
 mem_shared
 mem_total
 mem_used
-memory.heap.committed
-memory.heap.max
-memory.heap.used
-memory.non-heap.committed
-memory.non-heap.max
-memory.non-heap.used
+output.solr.write_bytes
+output.solr.write_logs
 pkts_in
 pkts_out
 proc_run
@@ -250,6 +330,24 @@
 read_bytes
 read_count
 read_time
+regionserver.IO.FsPReadTime_75th_percentile
+regionserver.IO.FsPReadTime_95th_percentile
+regionserver.IO.FsPReadTime_99th_percentile
+regionserver.IO.FsPReadTime_max
+regionserver.IO.FsPReadTime_mean
+regionserver.IO.FsPReadTime_median
+regionserver.IO.FsReadTime_75th_percentile
+regionserver.IO.FsReadTime_95th_percentile
+regionserver.IO.FsReadTime_99th_percentile
+regionserver.IO.FsReadTime_max
+regionserver.IO.FsReadTime_mean
+regionserver.IO.FsWriteTime_75th_percentile
+regionserver.IO.FsWriteTime_95th_percentile
+regionserver.IO.FsWriteTime_99th_percentile
+regionserver.IO.FsWriteTime_max
+regionserver.IO.FsWriteTime_mean
+regionserver.IO.FsWriteTime_median
+regionserver.IO.fsChecksumFailureCount
 regionserver.RegionServer.ProcessCallTime_75th_percentile
 regionserver.RegionServer.ProcessCallTime_95th_percentile
 regionserver.RegionServer.ProcessCallTime_99th_percentile
@@ -441,13 +539,42 @@
 rpc.rpc.datanode.RpcSlowCalls
 rpcdetailed.rpcdetailed.client.AddBlockAvgTime
 rpcdetailed.rpcdetailed.client.AddBlockNumOps
+solr.admin.info.jvm.memory.used
+solr.admin.info.system.processCpuLoad
+solr.admin.mbeans.cache.documentCache.hitratio
+solr.admin.mbeans.cache.documentCache.size
+solr.admin.mbeans.cache.documentCache.warmupTime
+solr.admin.mbeans.cache.filterCache.hitratio
+solr.admin.mbeans.cache.filterCache.size
+solr.admin.mbeans.cache.filterCache.warmupTime
+solr.admin.mbeans.cache.queryResultCache.hitratio
+solr.admin.mbeans.cache.queryResultCache.size
+solr.admin.mbeans.cache.queryResultCache.warmupTime
+solr.admin.mbeans.queryHandler.browse.avgTimePerRequest
+solr.admin.mbeans.queryHandler.browse.requests
+solr.admin.mbeans.queryHandler.export.avgTimePerRequest
+solr.admin.mbeans.queryHandler.export.requests
+solr.admin.mbeans.queryHandler.get.avgTimePerRequest
+solr.admin.mbeans.queryHandler.get.requests
+solr.admin.mbeans.queryHandler.query.avgTimePerRequest
+solr.admin.mbeans.queryHandler.query.requests
+solr.admin.mbeans.queryHandler.select.15minRateReqsPerSecond
+solr.admin.mbeans.queryHandler.select.5minRateReqsPerSecond
+solr.admin.mbeans.queryHandler.select.75thPcRequestTime
+solr.admin.mbeans.queryHandler.select.95thPcRequestTime
+solr.admin.mbeans.queryHandler.select.999thPcRequestTime
+solr.admin.mbeans.queryHandler.select.99thPcRequestTime
+solr.admin.mbeans.queryHandler.select.avgRequestsPerSecond
+solr.admin.mbeans.queryHandler.select.avgTimePerRequest
+solr.admin.mbeans.queryHandler.select.medianRequestTime
+solr.admin.mbeans.queryHandler.select.requests
+solr.admin.mbeans.updateHandler.adds
+solr.admin.mbeans.updateHandler.deletesById
+solr.admin.mbeans.updateHandler.deletesByQuery
+solr.admin.mbeans.updateHandler.docsPending
+solr.admin.mbeans.updateHandler.errors
 swap_free
 swap_total
-threads.blocked.count
-threads.count
-threads.daemon.count
-threads.deadlock.count
-threads.runnable.count
 topology.*.%.--ack-count.%
 topology.*.%.--complete-latency.%
 topology.*.%.--emit-count.%
diff --git a/ambari-metrics-timelineservice/pom.xml b/ambari-metrics-timelineservice/pom.xml
index f9d7e19..a5eb572 100644
--- a/ambari-metrics-timelineservice/pom.xml
+++ b/ambari-metrics-timelineservice/pom.xml
@@ -568,13 +568,13 @@
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
-      <version>1.7.2</version>
+      <version>1.7.20</version>
     </dependency>
 
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
-      <version>1.7.2</version>
+      <version>1.7.20</version>
     </dependency>
 
     <dependency>
@@ -658,8 +658,8 @@
 
     <!-- Dependency in order to annotate unit tests with a category. -->
     <dependency>
-      <groupId>utility</groupId>
-      <artifactId>utility</artifactId>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-utility</artifactId>
       <version>1.0.0.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
index fa095a0..2342bd8 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStore.java
@@ -19,14 +19,18 @@
 
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricWithAggregatedValues;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.TopNConfig;
 import org.apache.hadoop.service.AbstractService;
@@ -40,6 +44,7 @@
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.ConditionBuilder;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.PhoenixTransactSQL;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.TopNCondition;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.SeriesAggregateFunction;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.function.TimelineMetricsSeriesAggregateFunction;
@@ -51,6 +56,7 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -60,6 +66,7 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.USE_GROUPBY_AGGREGATOR_QUERIES;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.DEFAULT_TOPN_HOSTS_LIMIT;
 import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.ACTUAL_AGGREGATOR_NAMES;
@@ -75,6 +82,7 @@
   private TimelineMetricMetadataManager metricMetadataManager;
   private Integer defaultTopNHostsLimit;
   private MetricCollectorHAController haController;
+  private boolean containerMetricsDisabled = false;
 
   /**
    * Construct the service.
@@ -150,10 +158,14 @@
       scheduleAggregatorThread(dailyClusterAggregator);
 
       // Start the minute host aggregator
-      TimelineMetricAggregator minuteHostAggregator =
-        TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
-          hBaseAccessor, metricsConf, haController);
-      scheduleAggregatorThread(minuteHostAggregator);
+      if (Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION, "true"))) {
+        LOG.info("timeline.metrics.host.inmemory.aggregation is set to True, disabling host minute aggregation on collector");
+      } else {
+        TimelineMetricAggregator minuteHostAggregator =
+          TimelineMetricAggregatorFactory.createTimelineMetricAggregatorMinute(
+            hBaseAccessor, metricsConf, haController);
+        scheduleAggregatorThread(minuteHostAggregator);
+      }
 
       // Start the hourly host aggregator
       TimelineMetricAggregator hourlyHostAggregator =
@@ -177,7 +189,7 @@
         LOG.info("Started watchdog for timeline metrics store with initial " +
           "delay = " + initDelay + ", delay = " + delay);
       }
-
+      containerMetricsDisabled = configuration.isContainerMetricsDisabled();
       isInitialized = true;
     }
 
@@ -300,8 +312,12 @@
       if (prevTime != null) {
         step = currTime - prevTime;
         diff = currVal - prevVal;
-        Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
-        timeValueEntry.setValue(rate);
+        if (diff < 0) {
+          it.remove(); //Discard calculating rate when the metric counter has been reset.
+        } else {
+          Double rate = isDiff ? diff : (diff / TimeUnit.MILLISECONDS.toSeconds(step));
+          timeValueEntry.setValue(rate);
+        }
       } else {
         it.remove();
       }
@@ -352,6 +368,12 @@
   @Override
   public TimelinePutResponse putContainerMetrics(List<ContainerMetric> metrics)
       throws SQLException, IOException {
+
+    if (containerMetricsDisabled) {
+      LOG.debug("Ignoring submitted container metrics according to configuration. Values will not be stored.");
+      return new TimelinePutResponse();
+    }
+
     hBaseAccessor.insertContainerMetrics(metrics);
     return new TimelinePutResponse();
   }
@@ -388,8 +410,66 @@
   }
 
   @Override
-  public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException {
-    return metricMetadataManager.getHostedInstanceCache();
+  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+    Map<TimelineMetric, MetricHostAggregate> aggregateMap = new HashMap<>();
+    for (TimelineMetricWithAggregatedValues entry : aggregationResult.getResult()) {
+      aggregateMap.put(entry.getTimelineMetric(), entry.getMetricAggregate());
+    }
+    hBaseAccessor.saveHostAggregateRecords(aggregateMap, PhoenixTransactSQL.METRICS_AGGREGATE_MINUTE_TABLE_NAME);
+
+
+    return new TimelinePutResponse();
+  }
+
+  @Override
+  public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId)
+          throws SQLException, IOException {
+
+    Map<String, Set<String>> hostedApps = metricMetadataManager.getHostedAppsCache();
+    Map<String, Set<String>> instanceHosts = new HashMap<>();
+    if (configuration.getTimelineMetricsMultipleClusterSupport()) {
+      instanceHosts = metricMetadataManager.getHostedInstanceCache();
+    }
+
+    Map<String, Map<String, Set<String>>> instanceAppHosts = new HashMap<>();
+
+    if (MapUtils.isEmpty(instanceHosts)) {
+      Map<String, Set<String>> appHostMap = new HashMap<String, Set<String>>();
+      for (String host : hostedApps.keySet()) {
+        for (String app : hostedApps.get(host)) {
+          if (!appHostMap.containsKey(app)) {
+            appHostMap.put(app, new HashSet<String>());
+          }
+          appHostMap.get(app).add(host);
+        }
+      }
+      instanceAppHosts.put("", appHostMap);
+    } else {
+      for (String instance : instanceHosts.keySet()) {
+
+        if (StringUtils.isNotEmpty(instanceId) && !instance.equals(instanceId)) {
+          continue;
+        }
+        Map<String, Set<String>> appHostMap = new  HashMap<String, Set<String>>();
+        instanceAppHosts.put(instance, appHostMap);
+
+        Set<String> hostsWithInstance = instanceHosts.get(instance);
+        for (String host : hostsWithInstance) {
+          for (String app : hostedApps.get(host)) {
+            if (StringUtils.isNotEmpty(appId) && !app.equals(appId)) {
+              continue;
+            }
+
+            if (!appHostMap.containsKey(app)) {
+              appHostMap.put(app, new HashSet<String>());
+            }
+            appHostMap.get(app).add(host);
+          }
+        }
+      }
+    }
+
+    return instanceAppHosts;
   }
 
   @Override
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
index 65bbc4c..3b2a119 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessor.java
@@ -31,6 +31,8 @@
 import org.apache.hadoop.hbase.util.RetryCounter;
 import org.apache.hadoop.hbase.util.RetryCounterFactory;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -40,8 +42,6 @@
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.AggregatorUtils;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricReadHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
@@ -787,7 +787,9 @@
         metadataManager.putIfModifiedHostedAppsMetadata(
                 tm.getHostName(), tm.getAppId());
 
-        metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName());
+        if (!tm.getAppId().equals("FLUME_HANDLER")) {
+          metadataManager.putIfModifiedHostedInstanceMetadata(tm.getInstanceId(), tm.getHostName());
+        }
       }
       if (!acceptMetric) {
         iterator.remove();
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
index 0d5042f..899928a 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricConfiguration.java
@@ -191,6 +191,9 @@
   public static final String TIMELINE_SERVICE_RPC_ADDRESS =
     "timeline.metrics.service.rpc.address";
 
+  public static final String TIMELINE_SERVICE_DISABLE_CONTAINER_METRICS =
+    "timeline.metrics.service.container.metrics.disabled";
+
   public static final String CLUSTER_AGGREGATOR_APP_IDS =
     "timeline.metrics.service.cluster.aggregator.appIds";
 
@@ -251,9 +254,15 @@
   public static final String TIMELINE_METRICS_AGGREGATE_TABLES_DURABILITY =
       "timeline.metrics.aggregate.tables.durability";
 
+  public static final String TIMELINE_METRICS_WHITELIST_ENABLED =
+    "timeline.metrics.whitelisting.enabled";
+
   public static final String TIMELINE_METRICS_WHITELIST_FILE =
     "timeline.metrics.whitelist.file";
 
+  public static final String TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT =
+    "/etc/ambari-metrics-collector/conf/metrics_whitelist";
+
   public static final String TIMELINE_METRIC_METADATA_FILTERS =
     "timeline.metrics.service.metadata.filters";
 
@@ -290,12 +299,17 @@
   public static final String TIMELINE_METRICS_PRECISION_TABLE_HBASE_BLOCKING_STORE_FILES =
     "timeline.metrics.precision.table.hbase.hstore.blockingStoreFiles";
 
+  public static final String TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS =
+    "timeline.metrics.support.multiple.clusters";
+
   public static final String HOST_APP_ID = "HOST";
 
   public static final String DEFAULT_INSTANCE_PORT = "12001";
 
   public static final String AMSHBASE_METRICS_WHITESLIST_FILE = "amshbase_metrics_whitelist";
 
+  public static final String TIMELINE_METRICS_HOST_INMEMORY_AGGREGATION = "timeline.metrics.host.inmemory.aggregation";
+
   private Configuration hbaseConf;
   private Configuration metricsConf;
   private Configuration amsEnvConf;
@@ -438,6 +452,13 @@
     return 3;
   }
 
+  public boolean getTimelineMetricsMultipleClusterSupport() {
+    if (metricsConf != null) {
+      return Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_SUPPORT_MULTIPLE_CLUSTERS, "false"));
+    }
+    return false;
+  }
+
   public String getTimelineServiceRpcAddress() {
     String defaultRpcAddress = "0.0.0.0:60200";
     if (metricsConf != null) {
@@ -495,4 +516,19 @@
 
     return whitelist;
   }
+
+  public boolean isContainerMetricsDisabled() {
+    try {
+      return metricsConf != null && Boolean.parseBoolean(metricsConf.get(TIMELINE_SERVICE_DISABLE_CONTAINER_METRICS, "false"));
+    } catch (Exception e) {
+      return false;
+    }
+  }
+
+  public boolean isWhitelistingEnabled() {
+    if (metricsConf != null) {
+      return Boolean.parseBoolean(metricsConf.get(TIMELINE_METRICS_WHITELIST_ENABLED, "false"));
+    }
+    return false;
+  }
 }
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
index 121a8ae..d052d54 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -80,6 +81,7 @@
    */
   Map<String, List<TimelineMetricMetadata>> getTimelineMetricMetadata(String query) throws SQLException, IOException;
 
+  TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException;
   /**
    * Returns all hosts that have written metrics with the apps on the host
    * @return { hostname : [ appIds ] }
@@ -94,7 +96,7 @@
    * @throws SQLException
    * @throws IOException
    */
-  Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException;
+  Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException;
 
   /**
    * Return a list of known live collector nodes
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
index 65d54c0..7b03b30 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorSink.java
@@ -19,10 +19,10 @@
 
 import java.util.Map;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 /**
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
index 1446ec2..63cc510 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilter.java
@@ -18,17 +18,14 @@
 
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_APPS_BLACKLIST;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_APPS_WHITELIST;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT;
 
 import java.io.BufferedReader;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.Arrays;
 import java.util.HashSet;
@@ -36,9 +33,12 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_WHITELIST_FILE;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_APPS_BLACKLIST;
-import static org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration.TIMELINE_METRICS_APPS_WHITELIST;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 
 public class TimelineMetricsFilter {
 
@@ -67,8 +67,8 @@
     whitelistedApps = new HashSet<>();
     amshbaseWhitelist = new HashSet<>();
 
-    String whitelistFile = metricsConf.get(TIMELINE_METRICS_WHITELIST_FILE, "");
-    if (!StringUtils.isEmpty(whitelistFile)) {
+    if (configuration.isWhitelistingEnabled()) {
+      String whitelistFile = metricsConf.get(TIMELINE_METRICS_WHITELIST_FILE, TIMELINE_METRICS_WHITELIST_FILE_LOCATION_DEFAULT);
       readMetricWhitelistFromFile(whitelistFile);
     }
 
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
index 44aca03..9eaf456 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricAppAggregator.java
@@ -21,6 +21,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricsFilter;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
index 02677b9..ba16b43 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregator.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.MetricCollectorHAController;
@@ -91,6 +93,7 @@
     MetricHostAggregate hostAggregate = null;
     Map<TimelineClusterMetric, MetricHostAggregate> hostAggregateMap =
       new HashMap<TimelineClusterMetric, MetricHostAggregate>();
+    int perMetricCount = 0;
 
     while (rs.next()) {
       TimelineClusterMetric currentMetric = readHelper.fromResultSet(rs);
@@ -106,14 +109,20 @@
         currentMetric.setTimestamp(endTime);
         hostAggregate = new MetricHostAggregate();
         hostAggregateMap.put(currentMetric, hostAggregate);
+        perMetricCount++;
       }
 
       if (existingMetric.equalsExceptTime(currentMetric)) {
         // Recalculate totals with current metric
         updateAggregatesFromHost(hostAggregate, currentHostAggregate);
-
+        perMetricCount++;
       } else {
-        // Switched over to a new metric - save existing
+        // Switched over to a new metric - save new metric
+
+        hostAggregate.setSum(hostAggregate.getSum() / perMetricCount);
+        hostAggregate.setNumberOfSamples(Math.round((float)hostAggregate.getNumberOfSamples() / (float)perMetricCount));
+        perMetricCount = 1;
+
         hostAggregate = new MetricHostAggregate();
         currentMetric.setTimestamp(endTime);
         updateAggregatesFromHost(hostAggregate, currentHostAggregate);
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
index a5a3499..34b1f9b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecond.java
@@ -38,6 +38,7 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.PostProcessingUtil;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
index 0ea9c08..a17433b 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricHostAggregator.java
@@ -20,6 +20,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.availability.AggregationTaskRunner.AGGREGATOR_NAME;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
index b5f49fb..672f85f 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricReadHelper.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.SingleValuedTimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
index 53e6304..a06f4e8 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/availability/MetricCollectorHAController.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricsSystemInitializationException;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.TimelineMetricConfiguration;
 import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
@@ -123,20 +124,41 @@
     admin = new ZKHelixAdmin(zkConnectUrl);
     // create cluster
     LOG.info("Creating zookeeper cluster node: " + clusterName);
-    admin.addCluster(clusterName, false);
+    boolean clusterAdded = admin.addCluster(clusterName, false);
+    LOG.info("Was cluster added successfully? " + clusterAdded);
 
     // Adding host to the cluster
-    List<String> nodes = Collections.EMPTY_LIST;
-    try {
-      nodes =  admin.getInstancesInCluster(clusterName);
-    } catch (ZkNoNodeException ex) {
-      LOG.warn("Child znode under /" + CLUSTER_NAME + " not found.Recreating the cluster.");
-        admin.addCluster(clusterName, true);
+    boolean success = false;
+    int tries = 5;
+    int sleepTimeInSeconds = 5;
+
+    for (int i = 0; i < tries && !success; i++) {
+      try {
+        List<String> nodes = admin.getInstancesInCluster(clusterName);
+        if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
+          LOG.info("Adding participant instance " + instanceConfig);
+          admin.addInstance(clusterName, instanceConfig);
+        }
+        success = true;
+      } catch (HelixException | ZkNoNodeException ex) {
+        LOG.warn("Helix Cluster not yet setup fully.");
+        if (i < tries - 1) {
+          LOG.info("Waiting for " + sleepTimeInSeconds + " seconds and retrying.");
+          TimeUnit.SECONDS.sleep(sleepTimeInSeconds);
+        } else {
+          LOG.error(ex);
+        }
+      }
     }
 
-    if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
-      LOG.info("Adding participant instance " + instanceConfig);
-      admin.addInstance(clusterName, instanceConfig);
+    if (!success) {
+      LOG.info("Trying to create " + clusterName + " again since waiting for the creation did not help.");
+      admin.addCluster(clusterName, true);
+      List<String> nodes = admin.getInstancesInCluster(clusterName);
+      if (CollectionUtils.isEmpty(nodes) || !nodes.contains(instanceConfig.getInstanceName())) {
+        LOG.info("Adding participant instance " + instanceConfig);
+        admin.addInstance(clusterName, instanceConfig);
+      }
     }
 
     // Add a state model
diff --git a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
index 6278c59..50cfb08 100644
--- a/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
+++ b/ambari-metrics-timelineservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
@@ -25,6 +25,7 @@
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.PrecisionLimitExceededException;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetricMetadata;
@@ -285,6 +286,36 @@
     }
   }
 
+  /**
+   * Store the given metrics into the timeline store, and return errors that
+   * happened during storing.
+   */
+  @Path("/metrics/aggregated")
+  @POST
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelinePutResponse postAggregatedMetrics(
+    @Context HttpServletRequest req,
+    @Context HttpServletResponse res,
+    AggregationResult metrics) {
+
+    init(res);
+    if (metrics == null) {
+      return new TimelinePutResponse();
+    }
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing aggregated metrics: " +
+                TimelineUtils.dumpTimelineRecordtoJSON(metrics, true));
+      }
+
+      return timelineMetricStore.putHostAggregatedMetrics(metrics);
+    } catch (Exception e) {
+      LOG.error("Error saving metrics.", e);
+      throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
   @Path("/containermetrics")
   @POST
   @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
@@ -415,14 +446,16 @@
   @GET
   @Path("/metrics/instances")
   @Produces({ MediaType.APPLICATION_JSON })
-  public Map<String, Set<String>> getClusterHostsMetadata(
+  public Map<String, Map<String, Set<String>>> getClusterHostsMetadata(
     @Context HttpServletRequest req,
-    @Context HttpServletResponse res
+    @Context HttpServletResponse res,
+    @QueryParam("appId") String appId,
+    @QueryParam("instanceId") String instanceId
   ) {
     init(res);
 
     try {
-      return timelineMetricStore.getInstanceHostsMetadata();
+      return timelineMetricStore.getInstanceHostsMetadata(instanceId, appId);
     } catch (Exception e) {
       throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
     }
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
index aae1d4b..70dd583 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/HBaseTimelineMetricStoreTest.java
@@ -95,22 +95,26 @@
   @Test
   public void testRateCalculationOnMetricsWithEqualValues() throws Exception {
     Map<Long, Double> metricValues = new TreeMap<>();
-    metricValues.put(1454016368371L, 1011.25);
-    metricValues.put(1454016428371L, 1011.25);
-    metricValues.put(1454016488371L, 1011.25);
-    metricValues.put(1454016548371L, 1011.25);
-    metricValues.put(1454016608371L, 1011.25);
-    metricValues.put(1454016668371L, 1011.25);
-    metricValues.put(1454016728371L, 1011.25);
+    metricValues.put(1454000000000L, 1.0);
+    metricValues.put(1454000001000L, 6.0);
+    metricValues.put(1454000002000L, 0.0);
+    metricValues.put(1454000003000L, 3.0);
+    metricValues.put(1454000004000L, 4.0);
+    metricValues.put(1454000005000L, 7.0);
 
     // Calculate rate
     Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), false);
 
     // Make sure rate is zero
-    for (Map.Entry<Long, Double> rateEntry : rates.entrySet()) {
-      Assert.assertEquals("Rate should be zero, key = " + rateEntry.getKey()
-          + ", value = " + rateEntry.getValue(), 0.0, rateEntry.getValue());
-    }
+    Assert.assertTrue(rates.size() == 4);
+
+    Assert.assertFalse(rates.containsKey(1454000000000L));
+    Assert.assertFalse(rates.containsKey(1454000002000L));
+
+    Assert.assertEquals(rates.get(1454000001000L), 5.0);
+    Assert.assertEquals(rates.get(1454000003000L), 3.0);
+    Assert.assertEquals(rates.get(1454000004000L), 1.0);
+    Assert.assertEquals(rates.get(1454000005000L), 3.0);
   }
 
   @Test
@@ -119,14 +123,14 @@
     metricValues.put(1454016368371L, 1011.25);
     metricValues.put(1454016428371L, 1010.25);
     metricValues.put(1454016488371L, 1012.25);
-    metricValues.put(1454016548371L, 1010.25);
-    metricValues.put(1454016608371L, 1010.25);
+    metricValues.put(1454016548371L, 1015.25);
+    metricValues.put(1454016608371L, 1020.25);
 
     Map<Long, Double> rates = HBaseTimelineMetricStore.updateValuesAsRate(new TreeMap<>(metricValues), true);
 
-    Assert.assertTrue(rates.size()==4);
-    Assert.assertTrue(rates.containsValue(-1.0));
+    Assert.assertTrue(rates.size() == 3);
     Assert.assertTrue(rates.containsValue(2.0));
-    Assert.assertTrue(rates.containsValue(0.0));
+    Assert.assertTrue(rates.containsValue(3.0));
+    Assert.assertTrue(rates.containsValue(5.0));
   }
 }
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
index 0087fd9..d5baaef 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/ITPhoenixHBaseAccessor.java
@@ -26,12 +26,12 @@
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
index 37ec134..7eeb9c4 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/MetricTestHelper.java
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 import java.util.Arrays;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
index a910cc2..d668178 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/PhoenixHBaseAccessorTest.java
@@ -22,11 +22,11 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.Function;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.query.Condition;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
index 44f48e8..3009163 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestMetricHostAggregate.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics
   .timeline;
 
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.junit.Test;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -34,7 +34,7 @@
     assertThat(aggregate.getSum()).isEqualTo(3.0);
     assertThat(aggregate.getMin()).isEqualTo(1.0);
     assertThat(aggregate.getMax()).isEqualTo(2.0);
-    assertThat(aggregate.getAvg()).isEqualTo(3.0 / 2);
+    assertThat(aggregate.calculateAverage()).isEqualTo(3.0 / 2);
   }
 
   @Test
@@ -50,7 +50,7 @@
     assertThat(aggregate.getSum()).isEqualTo(12.0);
     assertThat(aggregate.getMin()).isEqualTo(0.5);
     assertThat(aggregate.getMax()).isEqualTo(7.5);
-    assertThat(aggregate.getAvg()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
+    assertThat(aggregate.calculateAverage()).isEqualTo((3.0 + 8.0 + 1.0) / 5);
   }
 
   static MetricHostAggregate createAggregate (Double sum, Double min,
@@ -63,4 +63,4 @@
     aggregate.setNumberOfSamples(samplesCount);
     return aggregate;
   }
-}
\ No newline at end of file
+}
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
index b40481d..8abcd83 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TestTimelineMetricStore.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.AggregationResult;
 import org.apache.hadoop.metrics2.sink.timeline.ContainerMetric;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
@@ -92,12 +93,17 @@
   }
 
   @Override
+  public TimelinePutResponse putHostAggregatedMetrics(AggregationResult aggregationResult) throws SQLException, IOException {
+    return null;
+  }
+
+  @Override
   public Map<String, Set<String>> getHostAppsMetadata() throws SQLException, IOException {
     return Collections.emptyMap();
   }
 
   @Override
-  public Map<String, Set<String>> getInstanceHostsMetadata() throws SQLException, IOException {
+  public Map<String, Map<String,Set<String>>> getInstanceHostsMetadata(String instanceId, String appId) throws SQLException, IOException {
     return Collections.emptyMap();
   }
 
@@ -105,4 +111,5 @@
   public List<String> getLiveInstances() {
     return Collections.emptyList();
   }
+  
 }
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
index fa0cfe9..53f6f6c 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsAggregatorMemorySink.java
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline;
 
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.Precision;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 
 import java.util.Collections;
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java
index 81da5c8..a308248 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/TimelineMetricsFilterTest.java
@@ -61,6 +61,7 @@
     Configuration metricsConf = new Configuration();
     TimelineMetricConfiguration configuration = EasyMock.createNiceMock(TimelineMetricConfiguration.class);
     expect(configuration.getMetricsConf()).andReturn(metricsConf).once();
+    expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes();
     replay(configuration);
 
     URL fileUrl = ClassLoader.getSystemResource("test_data/metric_whitelist.dat");
@@ -170,6 +171,8 @@
     whitelist.add("regionserver.Server.Delete_mean");
     expect(configuration.getAmshbaseWhitelist()).andReturn(whitelist).once();
 
+    expect(configuration.isWhitelistingEnabled()).andReturn(true).anyTimes();
+
     replay(configuration);
 
     TimelineMetricsFilter.initializeMetricFilter(configuration);
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
index 590f82a..07fd85d 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITClusterAggregator.java
@@ -20,13 +20,13 @@
 
 import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.MetricTestHelper;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.PhoenixHBaseAccessor;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricClusterAggregate;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.MetricHostAggregate;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineClusterMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregator;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators.TimelineMetricAggregatorFactory;
@@ -659,14 +659,14 @@
     while (rs.next()) {
       if ("disk_used".equals(rs.getString("METRIC_NAME"))) {
         assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-        assertEquals("METRIC_SUM", 16.0, rs.getDouble("METRIC_SUM"));
-        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
         assertEquals("METRIC_MAX", 4.0, rs.getDouble("METRIC_MAX"));
         assertEquals("METRIC_MIN", 0.0, rs.getDouble("METRIC_MIN"));
       } else if ("disk_free".equals(rs.getString("METRIC_NAME"))) {
         assertEquals("APP_ID", "test_app", rs.getString("APP_ID"));
-        assertEquals("METRIC_SUM", 4.0, rs.getDouble("METRIC_SUM"));
-        assertEquals("METRIC_COUNT", 8, rs.getLong("METRIC_COUNT"));
+        assertEquals("METRIC_SUM", 1.0, rs.getDouble("METRIC_SUM"));
+        assertEquals("METRIC_COUNT", 2, rs.getLong("METRIC_COUNT"));
         assertEquals("METRIC_MAX", 1.0, rs.getDouble("METRIC_MAX"));
         assertEquals("METRIC_MIN", 1.0, rs.getDouble("METRIC_MIN"));
       }
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
index 9873643..75b3f91 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/ITMetricAggregator.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.aggregators;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricHostAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.AbstractMiniHBaseClusterTest;
@@ -124,14 +125,14 @@
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else if ("mem_free".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else {
         fail("Unexpected entry");
@@ -198,7 +199,7 @@
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
         assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
       }
     }
   }
@@ -260,7 +261,7 @@
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(12 * 20, currentHostAggregate.getNumberOfSamples());
         assertEquals(12 * 15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
       }
     }
   }
@@ -309,14 +310,14 @@
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else if ("mem_free".equals(currentMetric.getMetricName())) {
         assertEquals(2.0, currentHostAggregate.getMax());
         assertEquals(0.0, currentHostAggregate.getMin());
         assertEquals(20, currentHostAggregate.getNumberOfSamples());
         assertEquals(15.0, currentHostAggregate.getSum());
-        assertEquals(15.0 / 20, currentHostAggregate.getAvg());
+        assertEquals(15.0 / 20, currentHostAggregate.calculateAverage());
         count++;
       } else {
         fail("Unexpected entry");
diff --git a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
index 78db11d..6541b2c 100644
--- a/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
+++ b/ambari-metrics-timelineservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/metrics/timeline/aggregators/TimelineMetricClusterAggregatorSecondTest.java
@@ -31,6 +31,7 @@
 import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.sink.timeline.MetricClusterAggregate;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataKey;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.metrics.timeline.discovery.TimelineMetricMetadataManager;
diff --git a/pom.xml b/pom.xml
index 2d88912..a14b8fd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,7 +22,7 @@
   <version>2.0.0.0-SNAPSHOT</version>
   <packaging>pom</packaging>
   <modules>
-    <module>../utility</module>
+    <module>../ambari-utility</module>
     <module>ambari-metrics-common</module>
     <module>ambari-metrics-hadoop-sink</module>
     <module>ambari-metrics-flume-sink</module>
@@ -33,6 +33,7 @@
     <module>ambari-metrics-host-monitoring</module>
     <module>ambari-metrics-grafana</module>
     <module>ambari-metrics-assembly</module>
+    <module>ambari-metrics-host-aggregator</module>
   </modules>
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -117,11 +118,6 @@
   <build>
     <plugins>
       <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <version>2.19</version>
-      </plugin>
-      <plugin>
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>build-helper-maven-plugin</artifactId>
         <version>1.8</version>
@@ -146,63 +142,6 @@
               <failIfNoMatch>false</failIfNoMatch>
             </configuration>
           </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <configuration>
-          <descriptors>
-            <descriptor>../ambari-project/src/main/assemblies/empty.xml</descriptor>
-          </descriptors>
-        </configuration>
-        <executions>
-          <execution>
-            <id>build-tarball</id>
-            <phase>none</phase>
-            <goals>
-              <goal>single</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <skip>${skipSurefireTests}</skip>
-
-          <!-- Each profile in the top-level pom.xml defines which test group categories to run. -->
-          <groups>${testcase.groups}</groups>
-        </configuration>
-      </plugin>
-      <plugin>
-        <artifactId>maven-compiler-plugin</artifactId>
-        <version>3.2</version>
-        <configuration>
-          <source>1.7</source>
-          <target>1.7</target>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-clean-plugin</artifactId>
-        <configuration>
-          <filesets>
-            <fileset>
-              <directory>${basedir}</directory>
-              <followSymlinks>false</followSymlinks>
-              <includes>
-                <include>**/*.pyc</include>
-              </includes>
-            </fileset>
-          </filesets>
-        </configuration>
-      </plugin>
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <version>1.8</version>
-        <executions>
           <execution>
             <id>parse-package-version</id>
             <goals>
@@ -234,6 +173,58 @@
         </executions>
       </plugin>
       <plugin>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <configuration>
+          <descriptors>
+            <descriptor>../ambari-project/src/main/assemblies/empty.xml</descriptor>
+          </descriptors>
+        </configuration>
+        <executions>
+          <execution>
+            <id>build-tarball</id>
+            <phase>none</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.19</version>
+        <configuration>
+          <skip>${skipSurefireTests}</skip>
+
+          <!-- Each profile in the top-level pom.xml defines which test group categories to run. -->
+          <groups>${testcase.groups}</groups>
+        </configuration>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>3.2</version>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-clean-plugin</artifactId>
+        <version>2.5</version>
+        <configuration>
+          <filesets>
+            <fileset>
+              <directory>${basedir}</directory>
+              <followSymlinks>false</followSymlinks>
+              <includes>
+                <include>**/*.pyc</include>
+              </includes>
+            </fileset>
+          </filesets>
+        </configuration>
+      </plugin>
+      <plugin>
         <!--Stub execution on direct plugin call - workaround for ambari rpm build process-->
         <groupId>org.codehaus.mojo</groupId>
         <artifactId>rpm-maven-plugin</artifactId>
@@ -281,6 +272,7 @@
       <plugin>
         <groupId>org.apache.rat</groupId>
         <artifactId>apache-rat-plugin</artifactId>
+        <version>0.11</version>
         <configuration>
           <excludes>
             <exclude>pass.txt</exclude>
@@ -304,8 +296,8 @@
   <dependencies>
     <!-- Dependency in order to annotate unit tests with a category. -->
     <dependency>
-      <groupId>utility</groupId>
-      <artifactId>utility</artifactId>
+      <groupId>org.apache.ambari</groupId>
+      <artifactId>ambari-utility</artifactId>
       <version>1.0.0.0-SNAPSHOT</version>
       <scope>test</scope>
     </dependency>
@@ -319,11 +311,38 @@
       </properties>
       <dependencies>
         <dependency>
-          <groupId>utility</groupId>
-          <artifactId>utility</artifactId>
+          <groupId>org.apache.ambari</groupId>
+          <artifactId>ambari-utility</artifactId>
           <version>1.0.0.0-SNAPSHOT</version>
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>sign-artifacts</id>
+      <activation>
+        <property>
+          <name>performRelease</name>
+          <value>true</value>
+        </property>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-gpg-plugin</artifactId>
+            <version>1.6</version>
+            <executions>
+              <execution>
+                <id>sign-artifacts</id>
+                <phase>verify</phase>
+                <goals>
+                  <goal>sign</goal>
+                </goals>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 </project>