AMBARI-21244 Add https support to local metrics aggregator application (dsen)
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 337f640..3c06032 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -81,6 +81,7 @@
   public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
   public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
   public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
+  public static final String HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY = "host_in_memory_aggregation_protocol";
   public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
   public static final String INSTANCE_ID_PROPERTY = "instanceId";
   public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
@@ -293,7 +294,11 @@
     boolean validCollectorHost = true;
 
     if (isHostInMemoryAggregationEnabled()) {
-      connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+      String hostname = "localhost";
+      if (getHostInMemoryAggregationProtocol().equalsIgnoreCase("https")) {
+        hostname = getHostname();
+      }
+      connectUrl = constructTimelineMetricUri(getHostInMemoryAggregationProtocol(), hostname, String.valueOf(getHostInMemoryAggregationPort()));
     } else {
       String collectorHost  = getCurrentCollectorHost();
       if (collectorHost == null) {
@@ -647,4 +652,10 @@
    * @return
    */
   abstract protected int getHostInMemoryAggregationPort();
+
+  /**
+   * In memory aggregation protocol
+   * @return
+   */
+  abstract protected String getHostInMemoryAggregationProtocol();
 }
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
index ce2cf79..396d08d 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
@@ -100,6 +100,11 @@
     }
 
     @Override
+    protected String getHostInMemoryAggregationProtocol() {
+      return "http";
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();
       return super.emitMetrics(metrics);
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
index f0174d5..0abc5fc 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -202,5 +202,10 @@
     protected int getHostInMemoryAggregationPort() {
       return 61888;
     }
+
+    @Override
+    protected String getHostInMemoryAggregationProtocol() {
+      return "http";
+    }
   }
 }
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 3be2162..77aba6b 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -146,6 +146,11 @@
     }
 
     @Override
+    protected String getHostInMemoryAggregationProtocol() {
+      return "http";
+    }
+
+    @Override
     public boolean emitMetrics(TimelineMetrics metrics) {
       super.init();
       return super.emitMetrics(metrics);
diff --git a/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 b/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
index f9b303e..58c5f09 100644
--- a/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
+++ b/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
@@ -19,7 +19,13 @@
 collector=http://localhost:6188
 collectionFrequency=60000
 maxRowCacheSize=10000
-sendInterval=59000
+sendInterval={{metrics_report_interval}}000
+clusterReporterAppId=nimbus
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+{% if is_aggregation_https_enabled %}
+host_in_memory_aggregation_protocol = {{host_in_memory_aggregation_protocol}}
+{% endif %}
 
 # Metric names having type COUNTER
 counters=EventTakeSuccessCount,EventPutSuccessCount,EventTakeAttemptCount,EventPutAttemptCount
diff --git a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 6277907..720c371 100644
--- a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -65,6 +65,7 @@
   private String instanceId;
   private boolean hostInMemoryAggregationEnabled;
   private int hostInMemoryAggregationPort;
+  private String hostInMemoryAggregationProtocol;
 
 
   @Override
@@ -114,12 +115,13 @@
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
 
-    hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
-    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+    hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
+    hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
     // Initialize the collector write strategy
     super.init();
 
-    if (protocol.contains("https")) {
+    if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
       String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
@@ -178,6 +180,11 @@
     return hostInMemoryAggregationPort;
   }
 
+  @Override
+  protected String getHostInMemoryAggregationProtocol() {
+    return hostInMemoryAggregationProtocol;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }
diff --git a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index bbc9617..f0eefc2 100644
--- a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -77,6 +77,7 @@
   });
   private int hostInMemoryAggregationPort;
   private boolean hostInMemoryAggregationEnabled;
+  private String hostInMemoryAggregationProtocol;
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -109,12 +110,13 @@
     protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
     collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
     port = conf.getString(COLLECTOR_PORT, "6188");
-    hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
-    hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
+    hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
+    hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
+    hostInMemoryAggregationProtocol = conf.getString(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
     if (collectorHosts.isEmpty()) {
       LOG.error("No Metric collector configured.");
     } else {
-      if (protocol.contains("https")) {
+      if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
         String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
         String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
         String trustStorePwd = conf.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
@@ -262,6 +264,11 @@
   }
 
   @Override
+  protected String getHostInMemoryAggregationProtocol() {
+    return hostInMemoryAggregationProtocol;
+  }
+
+  @Override
   public void putMetrics(MetricsRecord record) {
     try {
       String recordName = record.name();
diff --git a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 6bb6454..a92b436 100644
--- a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -60,6 +60,7 @@
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
 import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.SET_INSTANCE_ID_PROPERTY;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY;
 import static org.easymock.EasyMock.anyInt;
 import static org.easymock.EasyMock.anyObject;
 import static org.easymock.EasyMock.anyString;
@@ -116,6 +117,7 @@
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(1000).anyTimes();
     expect(conf.getBoolean(eq(SET_INSTANCE_ID_PROPERTY), eq(false))).andReturn(true).anyTimes();
     expect(conf.getString(eq(INSTANCE_ID_PROPERTY), anyString())).andReturn("instanceId").anyTimes();
+    expect(conf.getString(eq(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY), anyString())).andReturn("http").anyTimes();
 
     conf.setListDelimiterHandler(new DefaultListDelimiterHandler(eq(',')));
     expectLastCall().anyTimes();
@@ -188,6 +190,7 @@
     expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
     expect(conf.getString(eq(COLLECTOR_PROTOCOL), eq("http"))).andReturn("http").anyTimes();
     expect(conf.getString(eq(COLLECTOR_PORT), eq("6188"))).andReturn("6188").anyTimes();
+    expect(conf.getString(eq(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY), anyString())).andReturn("http").anyTimes();
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     // Return eviction time smaller than time diff for first 3 entries
@@ -326,6 +329,7 @@
 
     expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
     expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
+    expect(conf.getString(eq(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY), anyString())).andReturn("http").anyTimes();
 
     conf.setListDelimiterHandler(new DefaultListDelimiterHandler(eq(',')));
     expectLastCall().anyTimes();
diff --git a/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics-host-aggregator/pom.xml
index 24432dd..d126be5 100644
--- a/ambari-metrics-host-aggregator/pom.xml
+++ b/ambari-metrics-host-aggregator/pom.xml
@@ -101,6 +101,16 @@
             <version>4.2</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-server</artifactId>
+            <version>9.2.11.v20150529</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.jetty</groupId>
+            <artifactId>jetty-webapp</artifactId>
+            <version>9.2.11.v20150529</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
index 1e5cc82..f8ed95f 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -22,20 +22,23 @@
 import com.sun.jersey.api.core.ResourceConfig;
 import com.sun.net.httpserver.HttpServer;
 
+import javax.net.ssl.SSLContext;
 import javax.ws.rs.core.UriBuilder;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.URI;
 import java.net.URL;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
 import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
 import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
 
 /**
  * WEB application with 2 publisher threads that processes received metrics and submits results to the collector
@@ -45,10 +48,12 @@
     private static final int STOP_SECONDS_DELAY = 0;
     private static final int JOIN_SECONDS_TIMEOUT = 5;
     private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+    private static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml";
     private Log LOG;
     private final int webApplicationPort;
     private final int rawPublishingInterval;
     private final int aggregationInterval;
+    private final String webServerProtocol;
     private Configuration configuration;
     private Thread aggregatePublisherThread;
     private Thread rawPublisherThread;
@@ -65,10 +70,11 @@
         this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
         this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
         this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+        this.webServerProtocol = configuration.get("timeline.metrics.host.inmemory.aggregation.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
         this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
         try {
             this.httpServer = createHttpServer();
-        } catch (IOException e) {
+        } catch (Exception e) {
             LOG.error("Exception while starting HTTP server. Exiting", e);
             System.exit(1);
         }
@@ -88,13 +94,20 @@
 
         URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
         LOG.info("Found metric service configuration: " + amsResUrl);
+        URL sslConfUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE);
+        LOG.info("Found metric service configuration: " + sslConfUrl);
         if (amsResUrl == null) {
-            throw new IllegalStateException("Unable to initialize the metrics " +
-                    "subsystem. No ams-site present in the classpath.");
+            throw new IllegalStateException(String.format("Unable to initialize the metrics " +
+                    "subsystem. No %s present in the classpath.", METRICS_SITE_CONFIGURATION_FILE));
+        }
+        if (sslConfUrl == null) {
+            throw new IllegalStateException(String.format("Unable to initialize the metrics " +
+                    "subsystem. No %s present in the classpath.", METRICS_SSL_SERVER_CONFIGURATION_FILE));
         }
 
         try {
             configuration.addResource(amsResUrl.toURI().toURL());
+            configuration.addResource(sslConfUrl.toURI().toURL());
         } catch (Exception e) {
             LOG.error("Couldn't init configuration. ", e);
             System.exit(1);
@@ -112,17 +125,41 @@
     }
 
     protected URI getURI() {
-        URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+        URI uri = UriBuilder.fromUri("/").scheme(this.webServerProtocol).host(getHostName()).port(this.webApplicationPort).build();
         LOG.info(String.format("Web server at %s", uri));
         return uri;
     }
 
-    protected HttpServer createHttpServer() throws IOException {
+    protected HttpServer createHttpServer() throws Exception {
         ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
         HashMap<String, Object> params = new HashMap();
         params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
         resourceConfig.setPropertiesAndFeatures(params);
-        return HttpServerFactory.create(getURI(), resourceConfig);
+        HttpServer server = HttpServerFactory.create(getURI(), resourceConfig);
+
+        if (webServerProtocol.equalsIgnoreCase("https")) {
+            HttpsServer httpsServer = (HttpsServer) server;
+            SslContextFactory sslContextFactory = new SslContextFactory();
+            String keyStorePath = configuration.get("ssl.server.keystore.location");
+            String keyStorePassword = configuration.get("ssl.server.keystore.password");
+            String keyManagerPassword = configuration.get("ssl.server.keystore.keypassword");
+            String trustStorePath = configuration.get("ssl.server.truststore.location");
+            String trustStorePassword = configuration.get("ssl.server.truststore.password");
+
+            sslContextFactory.setKeyStorePath(keyStorePath);
+            sslContextFactory.setKeyStorePassword(keyStorePassword);
+            sslContextFactory.setKeyManagerPassword(keyManagerPassword);
+            sslContextFactory.setTrustStorePath(trustStorePath);
+            sslContextFactory.setTrustStorePassword(trustStorePassword);
+
+            sslContextFactory.start();
+            SSLContext sslContext = sslContextFactory.getSslContext();
+            sslContextFactory.stop();
+            HttpsConfigurator httpsConfigurator = new HttpsConfigurator(sslContext);
+            httpsServer.setHttpsConfigurator(httpsConfigurator);
+            server = httpsServer;
+        }
+        return server;
     }
 
     private void startWebServer() {
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
index 5af115f..7ce0815 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
@@ -30,9 +30,9 @@
  */
 public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable {
 
-    private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
-    private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password";
-    private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type";
+    private static final String AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
+    private static final String AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY = "ssl.server.truststore.type";
+    private static final String AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.password";
     private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
     private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
     private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
@@ -68,9 +68,9 @@
             LOG.error("No Metric collector configured.");
         } else {
             if (collectorProtocol.contains("https")) {
-                String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim();
-                String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim();
-                String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+                String trustStorePath = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY).trim();
+                String trustStoreType = configuration.get(AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY).trim();
+                String trustStorePwd = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY).trim();
                 loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
             }
         }
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
index c8dffab..fa0c8fb 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@ -100,4 +100,9 @@
     protected String getPostUrl() {
         return BASE_POST_URL + AGGREGATED_POST_PREFIX;
     }
+
+    @Override
+    protected String getHostInMemoryAggregationProtocol() {
+        return "http";
+    }
 }
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
index 89addb7..2469449 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
@@ -62,4 +62,9 @@
     protected String getPostUrl() {
         return BASE_POST_URL;
     }
+
+    @Override
+    protected String getHostInMemoryAggregationProtocol() {
+        return "http";
+    }
 }
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
index ba05e9b..59cdd27 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -59,7 +59,7 @@
   def stop(self):
     self.stopped = True
     if self._aggregator_process :
-      logger.info('Stopping Aggregator thread.')
+      logger.info('Shutting down Aggregator thread.')
       self._aggregator_process.terminate()
       self._aggregator_process = None
 
@@ -71,7 +71,7 @@
     threading.Thread.__init__(self)
     self._config = config
     self._stop_handler = stop_handler
-    self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+    self.URL = self._config.get_inmemory_aggregation_protocol() + '://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
     self._is_ok = threading.Event()
     self.set_is_ok(True)
     self.stopped = False
@@ -106,7 +106,7 @@
 
 
   def stop(self):
-    logger.info('Stopping watcher thread.')
+    logger.info('Shutting down watcher thread.')
     self.stopped = True
 
 
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 017ad24..7cc9fb8 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -258,17 +258,20 @@
   def get_max_queue_size(self):
     return int(self.get("collector", "max_queue_size", 5000))
 
-  def is_server_https_enabled(self):
+  def is_collector_https_enabled(self):
     return "true" == str(self.get("collector", "https_enabled")).lower()
 
   def get_java_home(self):
     return self.get("aggregation", "java_home")
 
   def is_inmemory_aggregation_enabled(self):
-    return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+    return "true" == str(self.get("aggregation", "host_in_memory_aggregation", "false")).lower()
 
   def get_inmemory_aggregation_port(self):
-    return self.get("aggregation", "host_in_memory_aggregation_port")
+    return self.get("aggregation", "host_in_memory_aggregation_port", "61888")
+
+  def get_inmemory_aggregation_protocol(self):
+    return self.get("aggregation", "host_in_memory_aggregation_protocol", "http")
 
   def get_aggregator_jvm_agrs(self):
     hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index f19434d..df79d69 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -54,17 +54,19 @@
     self.application_metric_map = application_metric_map
     self.collector_port = config.get_server_port()
     self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list()
-    self.is_server_https_enabled = config.is_server_https_enabled()
+    self.is_collector_https_enabled = config.is_collector_https_enabled()
+    self.collector_protocol = "https" if self.is_collector_https_enabled else "http"
     self.set_instanceid = config.is_set_instanceid()
     self.instanceid = config.get_instanceid()
     self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled()
 
     if self.is_inmemory_aggregation_enabled:
-      self.collector_port = config.get_inmemory_aggregation_port()
-      self.all_metrics_collector_hosts = ['localhost']
-      self.is_server_https_enabled = False
+      self.inmemory_aggregation_port = config.get_inmemory_aggregation_port()
+      self.inmemory_aggregation_protocol = config.get_inmemory_aggregation_protocol()
+      if self.inmemory_aggregation_protocol == "https":
+        self.ca_certs = config.get_ca_certs()
 
-    if self.is_server_https_enabled:
+    if self.is_collector_https_enabled:
       self.ca_certs = config.get_ca_certs()
 
     # TimedRoundRobinSet
@@ -101,22 +103,26 @@
 
   def push_metrics(self, data):
     success = False
-    while self.active_collector_hosts.get_actual_size() > 0:
+    if self.is_inmemory_aggregation_enabled:
+      success = self.try_with_collector(self.inmemory_aggregation_protocol, "localhost", self.inmemory_aggregation_port, data)
+      if not success:
+        logger.warning("Failed to submit metrics to local aggregator. Trying to post them to collector...")
+    while not success and self.active_collector_hosts.get_actual_size() > 0:
       collector_host = self.get_collector_host_shard()
-      success = self.try_with_collector_host(collector_host, data)
-      if success:
-        break
+      success = self.try_with_collector(self.collector_protocol, collector_host, self.collector_port, data)
     pass
 
     if not success:
       logger.info('No valid collectors found...')
       for collector_host in self.active_collector_hosts:
-        success = self.try_with_collector_host(collector_host, data)
+        success = self.try_with_collector(self.collector_protocol, collector_host, self.ollector_port, data)
+        if success:
+          break
       pass
 
-  def try_with_collector_host(self, collector_host, data):
+  def try_with_collector(self, collector_protocol, collector_host, collector_port, data):
     headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
-    connection = self.get_connection(collector_host)
+    connection = self.get_connection(collector_protocol, collector_host, collector_port)
     logger.debug("message to send: %s" % data)
 
     try:
@@ -169,16 +175,16 @@
       logger.warn("Metric collector host {0} was blacklisted.".format(collector_host))
       return False
 
-  def get_connection(self, collector_host):
+  def get_connection(self, protocol, host, port):
     timeout = int(self.send_interval - 10)
-    if self.is_server_https_enabled:
-      connection = CachedHTTPSConnection(collector_host,
-                                         self.collector_port,
+    if protocol == "https":
+      connection = CachedHTTPSConnection(host,
+                                         port,
                                          timeout=timeout,
                                          ca_certs=self.ca_certs)
     else:
-      connection = CachedHTTPConnection(collector_host,
-                                        self.collector_port,
+      connection = CachedHTTPConnection(host,
+                                        port,
                                         timeout=timeout)
     return connection
 
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/host_info.py b/ambari-metrics-host-monitoring/src/main/python/core/host_info.py
index 035c833..6198c53 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/host_info.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/host_info.py
@@ -265,7 +265,6 @@
 
     skip_disk_patterns = self.__config.get_disk_metrics_skip_pattern()
     logger.debug('skip_disk_patterns: %s' % skip_disk_patterns)
-    print skip_disk_patterns
     if not skip_disk_patterns or skip_disk_patterns == 'None':
       io_counters = psutil.disk_io_counters()
       print io_counters
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
index 7a9fbec..330e018 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
@@ -78,7 +78,7 @@
       raise FatalException(-1, "Error waiting for stop event: " + str(result))
     if (win32event.WAIT_TIMEOUT == result):
       return -1
-      logger.info("Stop event received")
+      logger.debug("Stop event received")
     return result # 0 -> stop
 
 
@@ -119,7 +119,7 @@
     # Stop process when stop event received
     self.stop_event.wait(timeout)
     if self.stop_event.isSet():
-      logger.info("Stop event received")
+      logger.debug("Stop event received")
       return 0
     # Timeout
     return -1
diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index e126016..f07d508 100644
--- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -74,6 +74,7 @@
   private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
   private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
   private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
+  private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY;
   private static final String TIMELINE_DEFAULT_HOST = "localhost";
   private static final String TIMELINE_DEFAULT_PORT = "6188";
   private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -100,6 +101,7 @@
   private Set<String> excludedMetrics = new HashSet<>();
   private boolean hostInMemoryAggregationEnabled;
   private int hostInMemoryAggregationPort;
+  private String hostInMemoryAggregationProtocol;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -147,6 +149,11 @@
     return hostInMemoryAggregationPort;
   }
 
+  @Override
+  protected String getHostInMemoryAggregationProtocol() {
+    return hostInMemoryAggregationProtocol;
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
@@ -186,9 +193,10 @@
 
         hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
         hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
+        hostInMemoryAggregationProtocol = props.getString(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
 
-        if (metricCollectorProtocol.contains("https")) {
+        if (metricCollectorProtocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
           String trustStorePath = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY).trim();
           String trustStoreType = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY).trim();
           String trustStorePwd = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index d408e1a..842fad8 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -57,6 +57,7 @@
   private int timeoutSeconds;
   private boolean hostInMemoryAggregationEnabled;
   private int hostInMemoryAggregationPort;
+  private String hostInMemoryAggregationProtocol;
 
   public StormTimelineMetricsReporter() {
 
@@ -108,6 +109,11 @@
   }
 
   @Override
+  protected String getHostInMemoryAggregationProtocol() {
+    return hostInMemoryAggregationProtocol;
+  }
+
+  @Override
   public void prepare(Map conf) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -144,11 +150,15 @@
         setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
         instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
       }
-      hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString());
-      hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString());
+      hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY) != null ?
+        cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString() : "false");
+      hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY) != null ?
+        cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString() : "61888");
+      hostInMemoryAggregationProtocol = cf.get(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY) != null ?
+        cf.get(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY).toString() : "http";
 
       collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
-      if (protocol.contains("https")) {
+      if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
         String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
         String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
         String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index ff72f24..e3494fd 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -63,6 +63,7 @@
   private String instanceId;
   private boolean hostInMemoryAggregationEnabled;
   private int hostInMemoryAggregationPort;
+  private String hostInMemoryAggregationProtocol;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -110,6 +111,11 @@
   }
 
   @Override
+  protected String getHostInMemoryAggregationProtocol() {
+    return hostInMemoryAggregationProtocol;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -138,12 +144,13 @@
 
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
-    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
-    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
+    hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
     // Initialize the collector write strategy
     super.init();
 
-    if (protocol.contains("https")) {
+    if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
       String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 5b75065..4fcf2fb 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -52,6 +52,7 @@
   private int timeoutSeconds;
   private boolean hostInMemoryAggregationEnabled;
   private int hostInMemoryAggregationPort;
+  private String hostInMemoryAggregationProtocol;
 
   public StormTimelineMetricsReporter() {
 
@@ -103,6 +104,11 @@
   }
 
   @Override
+  protected String getHostInMemoryAggregationProtocol() {
+    return hostInMemoryAggregationProtocol;
+  }
+
+  @Override
   public void prepare(Object registrationArgument) {
     LOG.info("Preparing Storm Metrics Reporter");
     try {
@@ -132,10 +138,11 @@
       setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
       instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
 
-      hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
-      hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+      hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
+      hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
+      hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
 
-      if (protocol.contains("https")) {
+      if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
         String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
         String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
         String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 4d5a229..dc92f80 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -72,6 +72,7 @@
   private boolean setInstanceId;
   private boolean hostInMemoryAggregationEnabled;
   private int hostInMemoryAggregationPort;
+  private String hostInMemoryAggregationProtocol;
 
   @Override
   protected String getCollectorUri(String host) {
@@ -119,6 +120,11 @@
   }
 
   @Override
+  protected String getHostInMemoryAggregationProtocol() {
+    return hostInMemoryAggregationProtocol;
+  }
+
+  @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
     try {
@@ -150,13 +156,14 @@
     instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
     setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
 
-    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
-    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+    hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
+    hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
+    hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
 
     // Initialize the collector write strategy
     super.init();
 
-    if (protocol.contains("https")) {
+    if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
       String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
       String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
       String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();