blob: 7f9c5808ddbc770f87a856a1d3a2ed705b74372a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.web;
import com.google.common.collect.Lists;
import io.prometheus.client.jetty.JettyStatisticsCollector;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TimeZone;
import javax.servlet.DispatcherType;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.common.util.SecurityUtility;
import org.apache.pulsar.common.util.keystoretls.KeyStoreSSLContext;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.Slf4jRequestLog;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.ContextHandlerCollection;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.server.handler.RequestLogHandler;
import org.eclipse.jetty.server.handler.ResourceHandler;
import org.eclipse.jetty.server.handler.StatisticsHandler;
import org.eclipse.jetty.servlet.FilterHolder;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.resource.Resource;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
import org.glassfish.jersey.server.ResourceConfig;
import org.glassfish.jersey.servlet.ServletContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Web Service embedded into Pulsar.
*/
public class WebService implements AutoCloseable {
private static final String MATCH_ALL = "/*";
public static final String ATTRIBUTE_PULSAR_NAME = "pulsar";
public static final String HANDLER_CACHE_CONTROL = "max-age=3600";
private final PulsarService pulsar;
private final Server server;
private final List<Handler> handlers;
private final WebExecutorThreadPool webServiceExecutor;
public final int maxConcurrentRequests;
private final ServerConnector httpConnector;
private final ServerConnector httpsConnector;
public WebService(PulsarService pulsar) throws PulsarServerException {
this.handlers = Lists.newArrayList();
this.pulsar = pulsar;
this.webServiceExecutor = new WebExecutorThreadPool(
pulsar.getConfiguration().getNumHttpServerThreads(),
"pulsar-web");
this.server = new Server(webServiceExecutor);
this.maxConcurrentRequests = pulsar.getConfiguration().getMaxConcurrentHttpRequests();
List<ServerConnector> connectors = new ArrayList<>();
Optional<Integer> port = pulsar.getConfiguration().getWebServicePort();
if (port.isPresent()) {
httpConnector = new PulsarServerConnector(server, 1, 1);
httpConnector.setPort(port.get());
httpConnector.setHost(pulsar.getBindAddress());
connectors.add(httpConnector);
} else {
httpConnector = null;
}
Optional<Integer> tlsPort = pulsar.getConfiguration().getWebServicePortTls();
if (tlsPort.isPresent()) {
try {
SslContextFactory sslCtxFactory;
ServiceConfiguration config = pulsar.getConfiguration();
if (config.isTlsEnabledWithKeyStore()) {
sslCtxFactory = KeyStoreSSLContext.createSslContextFactory(
config.getTlsProvider(),
config.getTlsKeyStoreType(),
config.getTlsKeyStore(),
config.getTlsKeyStorePassword(),
config.isTlsAllowInsecureConnection(),
config.getTlsTrustStoreType(),
config.getTlsTrustStore(),
config.getTlsTrustStorePassword(),
config.isTlsRequireTrustedClientCertOnConnect(),
config.getTlsCertRefreshCheckDurationSec()
);
} else {
sslCtxFactory = SecurityUtility.createSslContextFactory(
config.isTlsAllowInsecureConnection(),
config.getTlsTrustCertsFilePath(),
config.getTlsCertificateFilePath(),
config.getTlsKeyFilePath(),
config.isTlsRequireTrustedClientCertOnConnect(), true,
config.getTlsCertRefreshCheckDurationSec());
}
httpsConnector = new PulsarServerConnector(server, 1, 1, sslCtxFactory);
httpsConnector.setPort(tlsPort.get());
httpsConnector.setHost(pulsar.getBindAddress());
connectors.add(httpsConnector);
} catch (Exception e) {
throw new PulsarServerException(e);
}
} else {
httpsConnector = null;
}
// Limit number of concurrent HTTP connections to avoid getting out of file descriptors
connectors.forEach(c -> c.setAcceptQueueSize(maxConcurrentRequests / connectors.size()));
server.setConnectors(connectors.toArray(new ServerConnector[connectors.size()]));
}
public void addRestResources(String basePath, String javaPackages, boolean requiresAuthentication,
Map<String, Object> attributeMap) {
ResourceConfig config = new ResourceConfig();
config.packages("jersey.config.server.provider.packages", javaPackages);
config.register(JsonMapperProvider.class);
config.register(MultiPartFeature.class);
ServletHolder servletHolder = new ServletHolder(new ServletContainer(config));
servletHolder.setAsyncSupported(true);
addServlet(basePath, servletHolder, requiresAuthentication, attributeMap);
}
public void addServlet(String path, ServletHolder servletHolder, boolean requiresAuthentication,
Map<String, Object> attributeMap) {
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
context.setContextPath(path);
context.addServlet(servletHolder, MATCH_ALL);
if (attributeMap != null) {
attributeMap.forEach((key, value) -> {
context.setAttribute(key, value);
});
}
if (!pulsar.getConfig().getBrokerInterceptors().isEmpty()
|| !pulsar.getConfig().isDisableBrokerInterceptors()) {
// Enable PreInterceptFilter only when interceptors are enabled
context.addFilter(new FilterHolder(new PreInterceptFilter(pulsar.getBrokerInterceptor())),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}
if (requiresAuthentication && pulsar.getConfiguration().isAuthenticationEnabled()) {
FilterHolder filter = new FilterHolder(new AuthenticationFilter(
pulsar.getBrokerService().getAuthenticationService()));
context.addFilter(filter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}
if (pulsar.getConfiguration().isHttpRequestsLimitEnabled()) {
context.addFilter(
new FilterHolder(new RateLimitingFilter(pulsar.getConfiguration().getHttpRequestsMaxPerSecond())),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}
if (pulsar.getConfig().getHttpMaxRequestSize() > 0) {
context.addFilter(new FilterHolder(
new MaxRequestSizeFilter(
pulsar.getConfig().getHttpMaxRequestSize())),
MATCH_ALL, EnumSet.allOf(DispatcherType.class));
}
FilterHolder responseFilter = new FilterHolder(new ResponseHandlerFilter(pulsar));
context.addFilter(responseFilter, MATCH_ALL, EnumSet.allOf(DispatcherType.class));
handlers.add(context);
}
public void addStaticResources(String basePath, String resourcePath) {
ContextHandler capHandler = new ContextHandler();
capHandler.setContextPath(basePath);
ResourceHandler resHandler = new ResourceHandler();
resHandler.setBaseResource(Resource.newClassPathResource(resourcePath));
resHandler.setEtags(true);
resHandler.setCacheControl(WebService.HANDLER_CACHE_CONTROL);
capHandler.setHandler(resHandler);
handlers.add(capHandler);
}
public void start() throws PulsarServerException {
try {
RequestLogHandler requestLogHandler = new RequestLogHandler();
Slf4jRequestLog requestLog = new Slf4jRequestLog();
requestLog.setExtended(true);
requestLog.setLogTimeZone(TimeZone.getDefault().getID());
requestLog.setLogLatency(true);
requestLogHandler.setRequestLog(requestLog);
handlers.add(0, new ContextHandlerCollection());
handlers.add(requestLogHandler);
ContextHandlerCollection contexts = new ContextHandlerCollection();
contexts.setHandlers(handlers.toArray(new Handler[handlers.size()]));
HandlerCollection handlerCollection = new HandlerCollection();
handlerCollection.setHandlers(new Handler[] { contexts, new DefaultHandler(), requestLogHandler });
// Metrics handler
StatisticsHandler stats = new StatisticsHandler();
stats.setHandler(handlerCollection);
try {
new JettyStatisticsCollector(stats).register();
} catch (IllegalArgumentException e) {
// Already registered. Eg: in unit tests
}
handlers.add(stats);
server.setHandler(stats);
server.start();
if (httpConnector != null) {
log.info("HTTP Service started at http://{}:{}", httpConnector.getHost(), httpConnector.getLocalPort());
pulsar.getConfiguration().setWebServicePort(Optional.of(httpConnector.getLocalPort()));
} else {
log.info("HTTP Service disabled");
}
if (httpsConnector != null) {
log.info("HTTPS Service started at https://{}:{}", httpsConnector.getHost(),
httpsConnector.getLocalPort());
pulsar.getConfiguration().setWebServicePortTls(Optional.of(httpsConnector.getLocalPort()));
} else {
log.info("HTTPS Service disabled");
}
} catch (Exception e) {
throw new PulsarServerException(e);
}
}
@Override
public void close() throws PulsarServerException {
try {
server.stop();
webServiceExecutor.join();
log.info("Web service closed");
} catch (Exception e) {
throw new PulsarServerException(e);
}
}
public Optional<Integer> getListenPortHTTP() {
if (httpConnector != null) {
return Optional.of(httpConnector.getLocalPort());
} else {
return Optional.empty();
}
}
public Optional<Integer> getListenPortHTTPS() {
if (httpsConnector != null) {
return Optional.of(httpsConnector.getLocalPort());
} else {
return Optional.empty();
}
}
private static final Logger log = LoggerFactory.getLogger(WebService.class);
}