AMBARI-21244 Add https support to local metrics aggregator application (dsen)
diff --git a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
index 337f640..3c06032 100644
--- a/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
+++ b/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java
@@ -81,6 +81,7 @@
public static final String SSL_KEYSTORE_PASSWORD_PROPERTY = "truststore.password";
public static final String HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = "host_in_memory_aggregation";
public static final String HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = "host_in_memory_aggregation_port";
+ public static final String HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY = "host_in_memory_aggregation_protocol";
public static final String COLLECTOR_LIVE_NODES_PATH = "/ws/v1/timeline/metrics/livenodes";
public static final String INSTANCE_ID_PROPERTY = "instanceId";
public static final String SET_INSTANCE_ID_PROPERTY = "set.instanceId";
@@ -293,7 +294,11 @@
boolean validCollectorHost = true;
if (isHostInMemoryAggregationEnabled()) {
- connectUrl = constructTimelineMetricUri("http", "localhost", String.valueOf(getHostInMemoryAggregationPort()));
+ String hostname = "localhost";
+ if (getHostInMemoryAggregationProtocol().equalsIgnoreCase("https")) {
+ hostname = getHostname();
+ }
+ connectUrl = constructTimelineMetricUri(getHostInMemoryAggregationProtocol(), hostname, String.valueOf(getHostInMemoryAggregationPort()));
} else {
String collectorHost = getCurrentCollectorHost();
if (collectorHost == null) {
@@ -647,4 +652,10 @@
* @return
*/
abstract protected int getHostInMemoryAggregationPort();
+
+ /**
+ * In memory aggregation protocol
+ * @return
+ */
+ abstract protected String getHostInMemoryAggregationProtocol();
}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
index ce2cf79..396d08d 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/AbstractTimelineMetricSinkTest.java
@@ -100,6 +100,11 @@
}
@Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return "http";
+ }
+
+ @Override
public boolean emitMetrics(TimelineMetrics metrics) {
super.init();
return super.emitMetrics(metrics);
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
index f0174d5..0abc5fc 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/availability/MetricCollectorHATest.java
@@ -202,5 +202,10 @@
protected int getHostInMemoryAggregationPort() {
return 61888;
}
+
+ @Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return "http";
+ }
}
}
diff --git a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
index 3be2162..77aba6b 100644
--- a/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
+++ b/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java
@@ -146,6 +146,11 @@
}
@Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return "http";
+ }
+
+ @Override
public boolean emitMetrics(TimelineMetrics metrics) {
super.init();
return super.emitMetrics(metrics);
diff --git a/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2 b/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
index f9b303e..58c5f09 100644
--- a/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
+++ b/ambari-metrics-flume-sink/src/main/conf/flume-metrics2.properties.j2
@@ -19,7 +19,13 @@
collector=http://localhost:6188
collectionFrequency=60000
maxRowCacheSize=10000
-sendInterval=59000
+sendInterval={{metrics_report_interval}}000
+clusterReporterAppId=nimbus
+host_in_memory_aggregation = {{host_in_memory_aggregation}}
+host_in_memory_aggregation_port = {{host_in_memory_aggregation_port}}
+{% if is_aggregation_https_enabled %}
+host_in_memory_aggregation_protocol = {{host_in_memory_aggregation_protocol}}
+{% endif %}
# Metric names having type COUNTER
counters=EventTakeSuccessCount,EventPutSuccessCount,EventTakeAttemptCount,EventPutAttemptCount
diff --git a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
index 6277907..720c371 100644
--- a/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
+++ b/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java
@@ -65,6 +65,7 @@
private String instanceId;
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
+ private String hostInMemoryAggregationProtocol;
@Override
@@ -114,12 +115,13 @@
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, "");
- hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
- hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+ hostInMemoryAggregationEnabled = Boolean.getBoolean(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
+ hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
// Initialize the collector write strategy
super.init();
- if (protocol.contains("https")) {
+ if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
@@ -178,6 +180,11 @@
return hostInMemoryAggregationPort;
}
+ @Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return hostInMemoryAggregationProtocol;
+ }
+
public void setPollFrequency(long pollFrequency) {
this.pollFrequency = pollFrequency;
}
diff --git a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
index bbc9617..f0eefc2 100644
--- a/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
+++ b/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java
@@ -77,6 +77,7 @@
});
private int hostInMemoryAggregationPort;
private boolean hostInMemoryAggregationEnabled;
+ private String hostInMemoryAggregationProtocol;
@Override
public void init(SubsetConfiguration conf) {
@@ -109,12 +110,13 @@
protocol = conf.getString(COLLECTOR_PROTOCOL, "http");
collectorHosts = parseHostsStringArrayIntoCollection(conf.getStringArray(COLLECTOR_HOSTS_PROPERTY));
port = conf.getString(COLLECTOR_PORT, "6188");
- hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY);
- hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY);
+ hostInMemoryAggregationEnabled = conf.getBoolean(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
+ hostInMemoryAggregationPort = conf.getInt(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
+ hostInMemoryAggregationProtocol = conf.getString(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
if (collectorHosts.isEmpty()) {
LOG.error("No Metric collector configured.");
} else {
- if (protocol.contains("https")) {
+ if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = conf.getString(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = conf.getString(SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = conf.getString(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
@@ -262,6 +264,11 @@
}
@Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return hostInMemoryAggregationProtocol;
+ }
+
+ @Override
public void putMetrics(MetricsRecord record) {
try {
String recordName = record.name();
diff --git a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
index 6bb6454..a92b436 100644
--- a/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
+++ b/ambari-metrics-hadoop-sink/src/test/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSinkTest.java
@@ -60,6 +60,7 @@
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.MAX_METRIC_ROW_CACHE_SIZE;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.METRICS_SEND_INTERVAL;
import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.SET_INSTANCE_ID_PROPERTY;
+import static org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink.HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY;
import static org.easymock.EasyMock.anyInt;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.anyString;
@@ -116,6 +117,7 @@
expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(1000).anyTimes();
expect(conf.getBoolean(eq(SET_INSTANCE_ID_PROPERTY), eq(false))).andReturn(true).anyTimes();
expect(conf.getString(eq(INSTANCE_ID_PROPERTY), anyString())).andReturn("instanceId").anyTimes();
+ expect(conf.getString(eq(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY), anyString())).andReturn("http").anyTimes();
conf.setListDelimiterHandler(new DefaultListDelimiterHandler(eq(',')));
expectLastCall().anyTimes();
@@ -188,6 +190,7 @@
expect(conf.getString(eq("serviceName-prefix"), eq(""))).andReturn("").anyTimes();
expect(conf.getString(eq(COLLECTOR_PROTOCOL), eq("http"))).andReturn("http").anyTimes();
expect(conf.getString(eq(COLLECTOR_PORT), eq("6188"))).andReturn("6188").anyTimes();
+ expect(conf.getString(eq(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY), anyString())).andReturn("http").anyTimes();
expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
// Return eviction time smaller than time diff for first 3 entries
@@ -326,6 +329,7 @@
expect(conf.getInt(eq(MAX_METRIC_ROW_CACHE_SIZE), anyInt())).andReturn(10).anyTimes();
expect(conf.getInt(eq(METRICS_SEND_INTERVAL), anyInt())).andReturn(10).anyTimes();
+ expect(conf.getString(eq(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY), anyString())).andReturn("http").anyTimes();
conf.setListDelimiterHandler(new DefaultListDelimiterHandler(eq(',')));
expectLastCall().anyTimes();
diff --git a/ambari-metrics-host-aggregator/pom.xml b/ambari-metrics-host-aggregator/pom.xml
index 24432dd..d126be5 100644
--- a/ambari-metrics-host-aggregator/pom.xml
+++ b/ambari-metrics-host-aggregator/pom.xml
@@ -101,6 +101,16 @@
<version>4.2</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-server</artifactId>
+ <version>9.2.11.v20150529</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-webapp</artifactId>
+ <version>9.2.11.v20150529</version>
+ </dependency>
</dependencies>
<build>
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
index 1e5cc82..f8ed95f 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/host/aggregator/AggregatorApplication.java
@@ -22,20 +22,23 @@
import com.sun.jersey.api.core.ResourceConfig;
import com.sun.net.httpserver.HttpServer;
+import javax.net.ssl.SSLContext;
import javax.ws.rs.core.UriBuilder;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.HashMap;
+import com.sun.net.httpserver.HttpsConfigurator;
+import com.sun.net.httpserver.HttpsServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
/**
* WEB application with 2 publisher threads that processes received metrics and submits results to the collector
@@ -45,10 +48,12 @@
private static final int STOP_SECONDS_DELAY = 0;
private static final int JOIN_SECONDS_TIMEOUT = 5;
private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
+ private static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml";
private Log LOG;
private final int webApplicationPort;
private final int rawPublishingInterval;
private final int aggregationInterval;
+ private final String webServerProtocol;
private Configuration configuration;
private Thread aggregatePublisherThread;
private Thread rawPublisherThread;
@@ -65,10 +70,11 @@
this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
+ this.webServerProtocol = configuration.get("timeline.metrics.host.inmemory.aggregation.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
try {
this.httpServer = createHttpServer();
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.error("Exception while starting HTTP server. Exiting", e);
System.exit(1);
}
@@ -88,13 +94,20 @@
URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
LOG.info("Found metric service configuration: " + amsResUrl);
+ URL sslConfUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE);
+ LOG.info("Found metric service configuration: " + sslConfUrl);
if (amsResUrl == null) {
- throw new IllegalStateException("Unable to initialize the metrics " +
- "subsystem. No ams-site present in the classpath.");
+ throw new IllegalStateException(String.format("Unable to initialize the metrics " +
+ "subsystem. No %s present in the classpath.", METRICS_SITE_CONFIGURATION_FILE));
+ }
+ if (sslConfUrl == null) {
+ throw new IllegalStateException(String.format("Unable to initialize the metrics " +
+ "subsystem. No %s present in the classpath.", METRICS_SSL_SERVER_CONFIGURATION_FILE));
}
try {
configuration.addResource(amsResUrl.toURI().toURL());
+ configuration.addResource(sslConfUrl.toURI().toURL());
} catch (Exception e) {
LOG.error("Couldn't init configuration. ", e);
System.exit(1);
@@ -112,17 +125,41 @@
}
protected URI getURI() {
- URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
+ URI uri = UriBuilder.fromUri("/").scheme(this.webServerProtocol).host(getHostName()).port(this.webApplicationPort).build();
LOG.info(String.format("Web server at %s", uri));
return uri;
}
- protected HttpServer createHttpServer() throws IOException {
+ protected HttpServer createHttpServer() throws Exception {
ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
HashMap<String, Object> params = new HashMap();
params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
resourceConfig.setPropertiesAndFeatures(params);
- return HttpServerFactory.create(getURI(), resourceConfig);
+ HttpServer server = HttpServerFactory.create(getURI(), resourceConfig);
+
+ if (webServerProtocol.equalsIgnoreCase("https")) {
+ HttpsServer httpsServer = (HttpsServer) server;
+ SslContextFactory sslContextFactory = new SslContextFactory();
+ String keyStorePath = configuration.get("ssl.server.keystore.location");
+ String keyStorePassword = configuration.get("ssl.server.keystore.password");
+ String keyManagerPassword = configuration.get("ssl.server.keystore.keypassword");
+ String trustStorePath = configuration.get("ssl.server.truststore.location");
+ String trustStorePassword = configuration.get("ssl.server.truststore.password");
+
+ sslContextFactory.setKeyStorePath(keyStorePath);
+ sslContextFactory.setKeyStorePassword(keyStorePassword);
+ sslContextFactory.setKeyManagerPassword(keyManagerPassword);
+ sslContextFactory.setTrustStorePath(trustStorePath);
+ sslContextFactory.setTrustStorePassword(trustStorePassword);
+
+ sslContextFactory.start();
+ SSLContext sslContext = sslContextFactory.getSslContext();
+ sslContextFactory.stop();
+ HttpsConfigurator httpsConfigurator = new HttpsConfigurator(sslContext);
+ httpsServer.setHttpsConfigurator(httpsConfigurator);
+ server = httpsServer;
+ }
+ return server;
}
private void startWebServer() {
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
index 5af115f..7ce0815 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractMetricPublisher.java
@@ -30,9 +30,9 @@
*/
public abstract class AbstractMetricPublisher extends AbstractTimelineMetricsSink implements Runnable {
- private static final String AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
- private static final String AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY = "ssl.server.truststore.password";
- private static final String AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.type";
+ private static final String AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY = "ssl.server.truststore.location";
+ private static final String AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY = "ssl.server.truststore.type";
+ private static final String AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY = "ssl.server.truststore.password";
private static final String AMS_SITE_HTTP_POLICY_PROPERTY = "timeline.metrics.service.http.policy";
private static final String AMS_SITE_COLLECTOR_WEBAPP_ADDRESS_PROPERTY = "timeline.metrics.service.webapp.address";
private static final String PUBLISHER_COLLECTOR_HOSTS_PROPERTY = "timeline.metrics.collector.hosts";
@@ -68,9 +68,9 @@
LOG.error("No Metric collector configured.");
} else {
if (collectorProtocol.contains("https")) {
- String trustStorePath = configuration.get(AMS_SITE_SSL_KEYSTORE_PATH_PROPERTY).trim();
- String trustStoreType = configuration.get(AMS_SITE_SSL_KEYSTORE_TYPE_PROPERTY).trim();
- String trustStorePwd = configuration.get(AMS_SITE_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
+ String trustStorePath = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PATH_PROPERTY).trim();
+ String trustStoreType = configuration.get(AMS_SITE_SSL_TRUSTSTORE_TYPE_PROPERTY).trim();
+ String trustStorePwd = configuration.get(AMS_SITE_SSL_TRUSTSTORE_PASSWORD_PROPERTY).trim();
loadTruststore(trustStorePath, trustStoreType, trustStorePwd);
}
}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
index c8dffab..fa0c8fb 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AggregatedMetricsPublisher.java
@@ -100,4 +100,9 @@
protected String getPostUrl() {
return BASE_POST_URL + AGGREGATED_POST_PREFIX;
}
+
+ @Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return "http";
+ }
}
diff --git a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
index 89addb7..2469449 100644
--- a/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
+++ b/ambari-metrics-host-aggregator/src/main/java/org/apache/hadoop/metrics2/sink/timeline/RawMetricsPublisher.java
@@ -62,4 +62,9 @@
protected String getPostUrl() {
return BASE_POST_URL;
}
+
+ @Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return "http";
+ }
}
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py b/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
index ba05e9b..59cdd27 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/aggregator.py
@@ -59,7 +59,7 @@
def stop(self):
self.stopped = True
if self._aggregator_process :
- logger.info('Stopping Aggregator thread.')
+ logger.info('Shutting down Aggregator thread.')
self._aggregator_process.terminate()
self._aggregator_process = None
@@ -71,7 +71,7 @@
threading.Thread.__init__(self)
self._config = config
self._stop_handler = stop_handler
- self.URL = 'http://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
+ self.URL = self._config.get_inmemory_aggregation_protocol() + '://localhost:' + self._config.get_inmemory_aggregation_port() + self.AMS_AGGREGATOR_METRICS_CHECK_URL
self._is_ok = threading.Event()
self.set_is_ok(True)
self.stopped = False
@@ -106,7 +106,7 @@
def stop(self):
- logger.info('Stopping watcher thread.')
+ logger.info('Shutting down watcher thread.')
self.stopped = True
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
index 017ad24..7cc9fb8 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/config_reader.py
@@ -258,17 +258,20 @@
def get_max_queue_size(self):
return int(self.get("collector", "max_queue_size", 5000))
- def is_server_https_enabled(self):
+ def is_collector_https_enabled(self):
return "true" == str(self.get("collector", "https_enabled")).lower()
def get_java_home(self):
return self.get("aggregation", "java_home")
def is_inmemory_aggregation_enabled(self):
- return "true" == str(self.get("aggregation", "host_in_memory_aggregation")).lower()
+ return "true" == str(self.get("aggregation", "host_in_memory_aggregation", "false")).lower()
def get_inmemory_aggregation_port(self):
- return self.get("aggregation", "host_in_memory_aggregation_port")
+ return self.get("aggregation", "host_in_memory_aggregation_port", "61888")
+
+ def get_inmemory_aggregation_protocol(self):
+ return self.get("aggregation", "host_in_memory_aggregation_protocol", "http")
def get_aggregator_jvm_agrs(self):
hosts = self.get("aggregation", "jvm_arguments", "-Xmx256m -Xms128m -XX:PermSize=68m")
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
index f19434d..df79d69 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/emitter.py
@@ -54,17 +54,19 @@
self.application_metric_map = application_metric_map
self.collector_port = config.get_server_port()
self.all_metrics_collector_hosts = config.get_metrics_collector_hosts_as_list()
- self.is_server_https_enabled = config.is_server_https_enabled()
+ self.is_collector_https_enabled = config.is_collector_https_enabled()
+ self.collector_protocol = "https" if self.is_collector_https_enabled else "http"
self.set_instanceid = config.is_set_instanceid()
self.instanceid = config.get_instanceid()
self.is_inmemory_aggregation_enabled = config.is_inmemory_aggregation_enabled()
if self.is_inmemory_aggregation_enabled:
- self.collector_port = config.get_inmemory_aggregation_port()
- self.all_metrics_collector_hosts = ['localhost']
- self.is_server_https_enabled = False
+ self.inmemory_aggregation_port = config.get_inmemory_aggregation_port()
+ self.inmemory_aggregation_protocol = config.get_inmemory_aggregation_protocol()
+ if self.inmemory_aggregation_protocol == "https":
+ self.ca_certs = config.get_ca_certs()
- if self.is_server_https_enabled:
+ if self.is_collector_https_enabled:
self.ca_certs = config.get_ca_certs()
# TimedRoundRobinSet
@@ -101,22 +103,26 @@
def push_metrics(self, data):
success = False
- while self.active_collector_hosts.get_actual_size() > 0:
+ if self.is_inmemory_aggregation_enabled:
+ success = self.try_with_collector(self.inmemory_aggregation_protocol, "localhost", self.inmemory_aggregation_port, data)
+ if not success:
+ logger.warning("Failed to submit metrics to local aggregator. Trying to post them to collector...")
+ while not success and self.active_collector_hosts.get_actual_size() > 0:
collector_host = self.get_collector_host_shard()
- success = self.try_with_collector_host(collector_host, data)
- if success:
- break
+ success = self.try_with_collector(self.collector_protocol, collector_host, self.collector_port, data)
pass
if not success:
logger.info('No valid collectors found...')
for collector_host in self.active_collector_hosts:
- success = self.try_with_collector_host(collector_host, data)
+ success = self.try_with_collector(self.collector_protocol, collector_host, self.ollector_port, data)
+ if success:
+ break
pass
- def try_with_collector_host(self, collector_host, data):
+ def try_with_collector(self, collector_protocol, collector_host, collector_port, data):
headers = {"Content-Type" : "application/json", "Accept" : "*/*"}
- connection = self.get_connection(collector_host)
+ connection = self.get_connection(collector_protocol, collector_host, collector_port)
logger.debug("message to send: %s" % data)
try:
@@ -169,16 +175,16 @@
logger.warn("Metric collector host {0} was blacklisted.".format(collector_host))
return False
- def get_connection(self, collector_host):
+ def get_connection(self, protocol, host, port):
timeout = int(self.send_interval - 10)
- if self.is_server_https_enabled:
- connection = CachedHTTPSConnection(collector_host,
- self.collector_port,
+ if protocol == "https":
+ connection = CachedHTTPSConnection(host,
+ port,
timeout=timeout,
ca_certs=self.ca_certs)
else:
- connection = CachedHTTPConnection(collector_host,
- self.collector_port,
+ connection = CachedHTTPConnection(host,
+ port,
timeout=timeout)
return connection
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/host_info.py b/ambari-metrics-host-monitoring/src/main/python/core/host_info.py
index 035c833..6198c53 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/host_info.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/host_info.py
@@ -265,7 +265,6 @@
skip_disk_patterns = self.__config.get_disk_metrics_skip_pattern()
logger.debug('skip_disk_patterns: %s' % skip_disk_patterns)
- print skip_disk_patterns
if not skip_disk_patterns or skip_disk_patterns == 'None':
io_counters = psutil.disk_io_counters()
print io_counters
diff --git a/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py b/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
index 7a9fbec..330e018 100644
--- a/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
+++ b/ambari-metrics-host-monitoring/src/main/python/core/stop_handler.py
@@ -78,7 +78,7 @@
raise FatalException(-1, "Error waiting for stop event: " + str(result))
if (win32event.WAIT_TIMEOUT == result):
return -1
- logger.info("Stop event received")
+ logger.debug("Stop event received")
return result # 0 -> stop
@@ -119,7 +119,7 @@
# Stop process when stop event received
self.stop_event.wait(timeout)
if self.stop_event.isSet():
- logger.info("Stop event received")
+ logger.debug("Stop event received")
return 0
# Timeout
return -1
diff --git a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
index e126016..f07d508 100644
--- a/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
+++ b/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java
@@ -74,6 +74,7 @@
private static final String TIMELINE_METRICS_KAFKA_SET_INSTANCE_ID_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + SET_INSTANCE_ID_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY;
private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY;
+ private static final String TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY = TIMELINE_METRICS_KAFKA_PREFIX + HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY;
private static final String TIMELINE_DEFAULT_HOST = "localhost";
private static final String TIMELINE_DEFAULT_PORT = "6188";
private static final String TIMELINE_DEFAULT_PROTOCOL = "http";
@@ -100,6 +101,7 @@
private Set<String> excludedMetrics = new HashSet<>();
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
+ private String hostInMemoryAggregationProtocol;
@Override
protected String getCollectorUri(String host) {
@@ -147,6 +149,11 @@
return hostInMemoryAggregationPort;
}
+ @Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return hostInMemoryAggregationProtocol;
+ }
+
public void setMetricsCache(TimelineMetricsCache metricsCache) {
this.metricsCache = metricsCache;
}
@@ -186,9 +193,10 @@
hostInMemoryAggregationEnabled = props.getBoolean(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, false);
hostInMemoryAggregationPort = props.getInt(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, 61888);
+ hostInMemoryAggregationProtocol = props.getString(TIMELINE_METRICS_KAFKA_HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
- if (metricCollectorProtocol.contains("https")) {
+ if (metricCollectorProtocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = props.getString(TIMELINE_METRICS_SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index d408e1a..842fad8 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -57,6 +57,7 @@
private int timeoutSeconds;
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
+ private String hostInMemoryAggregationProtocol;
public StormTimelineMetricsReporter() {
@@ -108,6 +109,11 @@
}
@Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return hostInMemoryAggregationProtocol;
+ }
+
+ @Override
public void prepare(Map conf) {
LOG.info("Preparing Storm Metrics Reporter");
try {
@@ -144,11 +150,15 @@
setInstanceId = Boolean.getBoolean(cf.get(SET_INSTANCE_ID_PROPERTY).toString());
instanceId = cf.get(INSTANCE_ID_PROPERTY).toString();
}
- hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString());
- hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString());
+ hostInMemoryAggregationEnabled = Boolean.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY) != null ?
+ cf.get(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY).toString() : "false");
+ hostInMemoryAggregationPort = Integer.valueOf(cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY) != null ?
+ cf.get(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY).toString() : "61888");
+ hostInMemoryAggregationProtocol = cf.get(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY) != null ?
+ cf.get(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY).toString() : "http";
collectorUri = constructTimelineMetricUri(protocol, findPreferredCollectHost(), port);
- if (protocol.contains("https")) {
+ if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = cf.get(SSL_KEYSTORE_PATH_PROPERTY).toString().trim();
String trustStoreType = cf.get(SSL_KEYSTORE_TYPE_PROPERTY).toString().trim();
String trustStorePwd = cf.get(SSL_KEYSTORE_PASSWORD_PROPERTY).toString().trim();
diff --git a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index ff72f24..e3494fd 100644
--- a/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink-legacy/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -63,6 +63,7 @@
private String instanceId;
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
+ private String hostInMemoryAggregationProtocol;
@Override
protected String getCollectorUri(String host) {
@@ -110,6 +111,11 @@
}
@Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return hostInMemoryAggregationProtocol;
+ }
+
+ @Override
public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
LOG.info("Preparing Storm Metrics Sink");
try {
@@ -138,12 +144,13 @@
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
- hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
- hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
+ hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
// Initialize the collector write strategy
super.init();
- if (protocol.contains("https")) {
+ if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
index 5b75065..4fcf2fb 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java
@@ -52,6 +52,7 @@
private int timeoutSeconds;
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
+ private String hostInMemoryAggregationProtocol;
public StormTimelineMetricsReporter() {
@@ -103,6 +104,11 @@
}
@Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return hostInMemoryAggregationProtocol;
+ }
+
+ @Override
public void prepare(Object registrationArgument) {
LOG.info("Preparing Storm Metrics Reporter");
try {
@@ -132,10 +138,11 @@
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY));
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY);
- hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
- hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
+ hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
- if (protocol.contains("https")) {
+ if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();
diff --git a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
index 4d5a229..dc92f80 100644
--- a/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
+++ b/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java
@@ -72,6 +72,7 @@
private boolean setInstanceId;
private boolean hostInMemoryAggregationEnabled;
private int hostInMemoryAggregationPort;
+ private String hostInMemoryAggregationProtocol;
@Override
protected String getCollectorUri(String host) {
@@ -119,6 +120,11 @@
}
@Override
+ protected String getHostInMemoryAggregationProtocol() {
+ return hostInMemoryAggregationProtocol;
+ }
+
+ @Override
public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
LOG.info("Preparing Storm Metrics Sink");
try {
@@ -150,13 +156,14 @@
instanceId = configuration.getProperty(INSTANCE_ID_PROPERTY, null);
setInstanceId = Boolean.valueOf(configuration.getProperty(SET_INSTANCE_ID_PROPERTY, "false"));
- hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY));
- hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY));
+ hostInMemoryAggregationEnabled = Boolean.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_ENABLED_PROPERTY, "false"));
+ hostInMemoryAggregationPort = Integer.valueOf(configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PORT_PROPERTY, "61888"));
+ hostInMemoryAggregationProtocol = configuration.getProperty(HOST_IN_MEMORY_AGGREGATION_PROTOCOL_PROPERTY, "http");
// Initialize the collector write strategy
super.init();
- if (protocol.contains("https")) {
+ if (protocol.contains("https") || hostInMemoryAggregationProtocol.contains("https")) {
String trustStorePath = configuration.getProperty(SSL_KEYSTORE_PATH_PROPERTY).trim();
String trustStoreType = configuration.getProperty(SSL_KEYSTORE_TYPE_PROPERTY).trim();
String trustStorePwd = configuration.getProperty(SSL_KEYSTORE_PASSWORD_PROPERTY).trim();