blob: b5a4026029fd647e93f1bc798ed962d552f17fac [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.metrics2.host.aggregator;
import com.sun.jersey.api.container.httpserver.HttpServerFactory;
import com.sun.jersey.api.core.PackagesResourceConfig;
import com.sun.jersey.api.core.ResourceConfig;
import com.sun.net.httpserver.HttpServer;
import javax.net.ssl.SSLContext;
import javax.ws.rs.core.UriBuilder;
import java.net.InetAddress;
import java.net.URI;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import com.sun.net.httpserver.HttpsConfigurator;
import com.sun.net.httpserver.HttpsServer;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import static org.apache.hadoop.metrics2.sink.timeline.TimelineMetricUtils.getJavaMetricPatterns;
/**
* WEB application with 2 publisher threads that processes received metrics and submits results to the collector
*/
public class AggregatorApplication {
private static final int STOP_SECONDS_DELAY = 0;
private static final int JOIN_SECONDS_TIMEOUT = 5;
private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
private static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml";
private Log LOG;
private final int webApplicationPort;
private final int rawPublishingInterval;
private final int aggregationInterval;
private final String webServerProtocol;
private Configuration configuration;
private Thread aggregatePublisherThread;
private Thread rawPublisherThread;
private TimelineMetricsHolder timelineMetricsHolder;
private HttpServer httpServer;
public AggregatorApplication(String hostname, String collectorHosts) {
LOG = LogFactory.getLog(this.getClass());
configuration = new Configuration(true);
initConfiguration();
configuration.set("timeline.metrics.collector.hosts", collectorHosts);
configuration.set("timeline.metrics.hostname", hostname);
configuration.set("timeline.metrics.zk.quorum", getZkQuorumFromConfiguration());
this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
this.webServerProtocol = configuration.get("timeline.metrics.host.inmemory.aggregation.http.policy",
"HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
// Skip aggregating transient metrics.
String skipAggregationMetricPatternsString = configuration.get("timeline.metrics.transient.metric.patterns", StringUtils.EMPTY);
List<String> skipAggregationPatterns = new ArrayList<>();
if (StringUtils.isNotEmpty(skipAggregationMetricPatternsString)) {
skipAggregationPatterns.addAll(getJavaMetricPatterns(skipAggregationMetricPatternsString));
}
this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval, skipAggregationPatterns);
try {
this.httpServer = createHttpServer();
} catch (Exception e) {
LOG.error("Exception while starting HTTP server. Exiting", e);
System.exit(1);
}
}
private String getZkQuorumFromConfiguration() {
String zkClientPort = configuration.getTrimmed("cluster.zookeeper.property.clientPort", "2181");
String zkServerHosts = configuration.getTrimmed("cluster.zookeeper.quorum", "");
return getZkConnectionUrl(zkClientPort, zkServerHosts);
}
protected void initConfiguration() {
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
if (classLoader == null) {
classLoader = getClass().getClassLoader();
}
URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
LOG.info("Found metric service configuration: " + amsResUrl);
URL sslConfUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE);
LOG.info("Found metric service configuration: " + sslConfUrl);
if (amsResUrl == null) {
throw new IllegalStateException(String.format("Unable to initialize the metrics " +
"subsystem. No %s present in the classpath.", METRICS_SITE_CONFIGURATION_FILE));
}
if (sslConfUrl == null) {
throw new IllegalStateException(String.format("Unable to initialize the metrics " +
"subsystem. No %s present in the classpath.", METRICS_SSL_SERVER_CONFIGURATION_FILE));
}
try {
configuration.addResource(amsResUrl.toURI().toURL());
configuration.addResource(sslConfUrl.toURI().toURL());
} catch (Exception e) {
LOG.error("Couldn't init configuration. ", e);
System.exit(1);
}
}
protected String getHostName() {
String hostName = "localhost";
try {
hostName = InetAddress.getLocalHost().getCanonicalHostName();
} catch (UnknownHostException e) {
LOG.error(e);
}
return hostName;
}
protected URI getURI() {
URI uri = UriBuilder.fromUri("/").scheme(this.webServerProtocol).host(getHostName()).port(this.webApplicationPort).build();
LOG.info(String.format("Web server at %s", uri));
return uri;
}
protected HttpServer createHttpServer() throws Exception {
ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
HashMap<String, Object> params = new HashMap();
params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
resourceConfig.setPropertiesAndFeatures(params);
HttpServer server = HttpServerFactory.create(getURI(), resourceConfig);
if (webServerProtocol.equalsIgnoreCase("https")) {
HttpsServer httpsServer = (HttpsServer) server;
SslContextFactory sslContextFactory = new SslContextFactory();
String keyStorePath = configuration.get("ssl.server.keystore.location");
String keyStorePassword = configuration.get("ssl.server.keystore.password");
String keyManagerPassword = configuration.get("ssl.server.keystore.keypassword");
String trustStorePath = configuration.get("ssl.server.truststore.location");
String trustStorePassword = configuration.get("ssl.server.truststore.password");
sslContextFactory.setKeyStorePath(keyStorePath);
sslContextFactory.setKeyStorePassword(keyStorePassword);
sslContextFactory.setKeyManagerPassword(keyManagerPassword);
sslContextFactory.setTrustStorePath(trustStorePath);
sslContextFactory.setTrustStorePassword(trustStorePassword);
sslContextFactory.start();
SSLContext sslContext = sslContextFactory.getSslContext();
sslContextFactory.stop();
HttpsConfigurator httpsConfigurator = new HttpsConfigurator(sslContext);
httpsServer.setHttpsConfigurator(httpsConfigurator);
server = httpsServer;
}
return server;
}
private void startWebServer() {
LOG.info("Starting web server.");
this.httpServer.start();
}
private void startAggregatePublisherThread() {
LOG.info("Starting aggregated metrics publisher.");
AbstractMetricPublisher metricPublisher = new AggregatedMetricsPublisher(timelineMetricsHolder, configuration, aggregationInterval);
aggregatePublisherThread = new Thread(metricPublisher);
aggregatePublisherThread.start();
}
private void startRawPublisherThread() {
LOG.info("Starting raw metrics publisher.");
AbstractMetricPublisher metricPublisher = new RawMetricsPublisher(timelineMetricsHolder, configuration, rawPublishingInterval);
rawPublisherThread = aggregatePublisherThread = new Thread(metricPublisher);
aggregatePublisherThread.start();
}
private void stop() {
LOG.info("Stopping aggregator application");
aggregatePublisherThread.interrupt();
rawPublisherThread.interrupt();
httpServer.stop(STOP_SECONDS_DELAY);
LOG.info("Stopped web server.");
try {
LOG.info("Waiting for threads to join.");
aggregatePublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
rawPublisherThread.join(JOIN_SECONDS_TIMEOUT * 1000);
LOG.info("Gracefully stopped Aggregator Application.");
} catch (InterruptedException e) {
LOG.error("Received exception during stop : ", e);
}
}
private String getZkConnectionUrl(String zkClientPort, String zkQuorum) {
StringBuilder sb = new StringBuilder();
String[] quorumParts = zkQuorum.split(",");
String prefix = "";
for (String part : quorumParts) {
sb.append(prefix);
sb.append(part.trim());
if (!part.contains(":")) {
sb.append(":");
sb.append(zkClientPort);
}
prefix = ",";
}
return sb.toString();
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
throw new Exception("This jar should be executed with 2 arguments : 1st - current host name, " +
"2nd - collector hosts separated with coma");
}
final AggregatorApplication app = new AggregatorApplication(args[0], args[1]);
app.startWebServerAndPublishersThreads();
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
app.stop();
}
});
}
private void startWebServerAndPublishersThreads() {
LOG.info("Starting aggregator application");
startAggregatePublisherThread();
startRawPublisherThread();
startWebServer();
}
}