| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.management.plugin; |
| |
| import java.io.IOException; |
| import java.io.StringWriter; |
| import java.io.Writer; |
| import java.net.BindException; |
| import java.net.InetSocketAddress; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.EnumSet; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ScheduledThreadPoolExecutor; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.ThreadPoolExecutor; |
| |
| import javax.net.ssl.SSLContext; |
| import javax.net.ssl.SSLEngine; |
| import javax.net.ssl.SSLParameters; |
| import javax.net.ssl.SSLSessionContext; |
| import javax.servlet.DispatcherType; |
| import javax.servlet.MultipartConfigElement; |
| import javax.servlet.http.HttpServletRequest; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.base.Joiner; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.SettableFuture; |
| import org.eclipse.jetty.io.Connection; |
| import org.eclipse.jetty.io.ssl.SslHandshakeListener; |
| import org.eclipse.jetty.server.ConnectionFactory; |
| import org.eclipse.jetty.server.HttpConfiguration; |
| import org.eclipse.jetty.server.HttpConnectionFactory; |
| import org.eclipse.jetty.server.NetworkConnector; |
| import org.eclipse.jetty.server.SecureRequestCustomizer; |
| import org.eclipse.jetty.server.Server; |
| import org.eclipse.jetty.server.ServerConnector; |
| import org.eclipse.jetty.server.SslConnectionFactory; |
| import org.eclipse.jetty.server.handler.ErrorHandler; |
| import org.eclipse.jetty.servlet.FilterHolder; |
| import org.eclipse.jetty.servlet.ServletContextHandler; |
| import org.eclipse.jetty.servlet.ServletHolder; |
| import org.eclipse.jetty.servlets.CrossOriginFilter; |
| import org.eclipse.jetty.util.annotation.Name; |
| import org.eclipse.jetty.util.ssl.SslContextFactory; |
| import org.eclipse.jetty.util.thread.ExecutorThreadPool; |
| import org.eclipse.jetty.util.thread.QueuedThreadPool; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.server.bytebuffer.QpidByteBuffer; |
| import org.apache.qpid.server.configuration.IllegalConfigurationException; |
| import org.apache.qpid.server.configuration.updater.TaskExecutor; |
| import org.apache.qpid.server.logging.messages.ManagementConsoleMessages; |
| import org.apache.qpid.server.logging.messages.PortMessages; |
| import org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter; |
| import org.apache.qpid.server.management.plugin.filter.ExceptionHandlingFilter; |
| import org.apache.qpid.server.management.plugin.filter.ForbiddingTraceFilter; |
| import org.apache.qpid.server.management.plugin.filter.InteractiveAuthenticationFilter; |
| import org.apache.qpid.server.management.plugin.filter.LoggingFilter; |
| import org.apache.qpid.server.management.plugin.filter.RedirectFilter; |
| import org.apache.qpid.server.management.plugin.filter.RewriteRequestForUncompressedJavascript; |
| import org.apache.qpid.server.management.plugin.portunification.TlsOrPlainConnectionFactory; |
| import org.apache.qpid.server.management.plugin.servlet.FileServlet; |
| import org.apache.qpid.server.management.plugin.servlet.RootServlet; |
| import org.apache.qpid.server.management.plugin.servlet.ContentServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.ApiDocsServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.BrokerQueryServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.JsonValueServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.LogoutServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.MetaDataServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.QueueReportServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.RestServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.SaslServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.StructureServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.TimeZoneServlet; |
| import org.apache.qpid.server.management.plugin.servlet.rest.VirtualHostQueryServlet; |
| import org.apache.qpid.server.model.AbstractConfigurationChangeListener; |
| import org.apache.qpid.server.model.AuthenticationProvider; |
| import org.apache.qpid.server.model.Broker; |
| import org.apache.qpid.server.model.BrokerModel; |
| import org.apache.qpid.server.model.ConfiguredObject; |
| import org.apache.qpid.server.model.KeyStore; |
| import org.apache.qpid.server.model.ManagedAttributeField; |
| import org.apache.qpid.server.model.ManagedObject; |
| import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; |
| import org.apache.qpid.server.model.Port; |
| import org.apache.qpid.server.model.Protocol; |
| import org.apache.qpid.server.model.State; |
| import org.apache.qpid.server.model.StateTransition; |
| import org.apache.qpid.server.model.Transport; |
| import org.apache.qpid.server.model.TrustStore; |
| import org.apache.qpid.server.model.adapter.AbstractPluginAdapter; |
| import org.apache.qpid.server.model.port.HttpPort; |
| import org.apache.qpid.server.model.port.PortManager; |
| import org.apache.qpid.server.plugin.ContentFactory; |
| import org.apache.qpid.server.plugin.QpidServiceLoader; |
| import org.apache.qpid.server.transport.PortBindFailureException; |
| import org.apache.qpid.server.transport.network.security.ssl.SSLUtil; |
| import org.apache.qpid.server.util.DaemonThreadFactory; |
| import org.apache.qpid.server.util.ServerScopedRuntimeException; |
| |
| @ManagedObject( category = false, type = HttpManagement.PLUGIN_TYPE ) |
| public class HttpManagement extends AbstractPluginAdapter<HttpManagement> implements HttpManagementConfiguration<HttpManagement>, PortManager |
| { |
| private static final Logger LOGGER = LoggerFactory.getLogger(HttpManagement.class); |
| |
| // 10 minutes by default |
| public static final int DEFAULT_TIMEOUT_IN_SECONDS = 60 * 10; |
| public static final String TIME_OUT = "sessionTimeout"; |
| public static final String HTTP_BASIC_AUTHENTICATION_ENABLED = "httpBasicAuthenticationEnabled"; |
| public static final String HTTPS_BASIC_AUTHENTICATION_ENABLED = "httpsBasicAuthenticationEnabled"; |
| public static final String HTTP_SASL_AUTHENTICATION_ENABLED = "httpSaslAuthenticationEnabled"; |
| public static final String HTTPS_SASL_AUTHENTICATION_ENABLED = "httpsSaslAuthenticationEnabled"; |
| |
| public static final String PLUGIN_TYPE = "MANAGEMENT-HTTP"; |
| |
| public static final String DEFAULT_LOGOUT_URL = "/logout.html"; |
| public static final String DEFAULT_LOGIN_URL = "/index.html"; |
| |
| private static final String OPERATIONAL_LOGGING_NAME = "Web"; |
| |
| private static final String JSESSIONID_COOKIE_PREFIX = "JSESSIONID_"; |
| |
| private static final String[] STATIC_FILE_TYPES = { "*.js", "*.css", "*.html", "*.png", "*.gif", "*.jpg", |
| "*.jpeg", "*.json", "*.txt", "*.xsl", "*.svg" }; |
| |
| private Server _server; |
| |
| @ManagedAttributeField |
| private boolean _httpsSaslAuthenticationEnabled; |
| |
| @ManagedAttributeField |
| private boolean _httpSaslAuthenticationEnabled; |
| |
| @ManagedAttributeField |
| private boolean _httpsBasicAuthenticationEnabled; |
| |
| @ManagedAttributeField |
| private boolean _httpBasicAuthenticationEnabled; |
| |
| @ManagedAttributeField |
| private int _sessionTimeout; |
| |
| @ManagedAttributeField |
| public String _corsAllowOrigins; |
| |
| @ManagedAttributeField |
| public Set<String> _corsAllowMethods; |
| |
| @ManagedAttributeField |
| public String _corsAllowHeaders; |
| |
| @ManagedAttributeField |
| public boolean _corsAllowCredentials; |
| |
| @ManagedAttributeField |
| private boolean _compressResponses; |
| |
| private final Map<HttpPort<?>, ServerConnector> _portConnectorMap = new ConcurrentHashMap<>(); |
| private final Map<HttpPort<?>, SslContextFactory> _sslContextFactoryMap = new ConcurrentHashMap<>(); |
| private final BrokerChangeListener _brokerChangeListener = new BrokerChangeListener(); |
| |
| private volatile boolean _serveUncompressedDojo; |
| private volatile Long _saslExchangeExpiry; |
| private volatile ThreadPoolExecutor _jettyServerExecutor; |
| |
| @ManagedObjectFactoryConstructor |
| public HttpManagement(Map<String, Object> attributes, Broker broker) |
| { |
| super(attributes, broker); |
| } |
| |
| @Override |
| protected void onOpen() |
| { |
| super.onOpen(); |
| _serveUncompressedDojo = Boolean.TRUE.equals(getContextValue(Boolean.class, "qpid.httpManagement.serveUncompressedDojo")); |
| _saslExchangeExpiry = getContextValue(Long.class, SASL_EXCHANGE_EXPIRY_CONTEXT_NAME); |
| getBroker().addChangeListener(_brokerChangeListener); |
| } |
| |
| @StateTransition(currentState = {State.UNINITIALIZED,State.ERRORED}, desiredState = State.ACTIVE) |
| @SuppressWarnings("unused") |
| private ListenableFuture<Void> doStart() |
| { |
| |
| Collection<HttpPort<?>> httpPorts = getEligibleHttpPorts(getBroker().getPorts()); |
| if (httpPorts.isEmpty()) |
| { |
| LOGGER.warn("HttpManagement plugin is configured but no suitable HTTP ports are available."); |
| } |
| else |
| { |
| getBroker().getEventLogger().message(ManagementConsoleMessages.STARTUP(OPERATIONAL_LOGGING_NAME)); |
| |
| _server = createServer(httpPorts); |
| try |
| { |
| _server.start(); |
| logOperationalListenMessages(); |
| } |
| catch (PortBindFailureException e) |
| { |
| getBroker().getEventLogger().message(PortMessages.BIND_FAILED("HTTP", e.getAddress().getPort())); |
| throw e; |
| } |
| catch (Exception e) |
| { |
| throw new ServerScopedRuntimeException("Failed to start HTTP management on ports : " + httpPorts, e); |
| } |
| |
| getBroker().getEventLogger().message(ManagementConsoleMessages.READY(OPERATIONAL_LOGGING_NAME)); |
| } |
| setState(State.ACTIVE); |
| return Futures.immediateFuture(null); |
| } |
| |
| @Override |
| protected ListenableFuture<Void> onClose() |
| { |
| getBroker().removeChangeListener(_brokerChangeListener); |
| if (_server != null) |
| { |
| try |
| { |
| logOperationalShutdownMessage(); |
| _server.stop(); |
| } |
| catch (Exception e) |
| { |
| throw new ServerScopedRuntimeException("Failed to stop HTTP management", e); |
| } |
| } |
| |
| if (_jettyServerExecutor != null) |
| { |
| _jettyServerExecutor.shutdown(); |
| } |
| getBroker().getEventLogger().message(ManagementConsoleMessages.STOPPED(OPERATIONAL_LOGGING_NAME)); |
| return Futures.immediateFuture(null); |
| } |
| |
| @Override |
| public int getSessionTimeout() |
| { |
| return _sessionTimeout; |
| } |
| |
| @Override |
| public String getCorsAllowOrigins() |
| { |
| return _corsAllowOrigins; |
| } |
| |
| @Override |
| public Set<String> getCorsAllowMethods() |
| { |
| return _corsAllowMethods; |
| } |
| |
| @Override |
| public String getCorsAllowHeaders() |
| { |
| return _corsAllowHeaders; |
| } |
| |
| @Override |
| public boolean getCorsAllowCredentials() |
| { |
| return _corsAllowCredentials; |
| } |
| |
| private Server createServer(Collection<HttpPort<?>> ports) |
| { |
| LOGGER.debug("Starting up web server on {}", ports); |
| |
| _jettyServerExecutor = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory("Jetty-Server-Thread")); |
| Server server = new Server(new ExecutorThreadPool(_jettyServerExecutor)); |
| int lastPort = -1; |
| for (HttpPort<?> port : ports) |
| { |
| ServerConnector connector = createConnector(port, server); |
| connector.addBean(new ConnectionTrackingListener()); |
| server.addConnector(connector); |
| _portConnectorMap.put(port, connector); |
| lastPort = port.getPort(); |
| } |
| |
| ServletContextHandler root = new ServletContextHandler(ServletContextHandler.SESSIONS); |
| root.setContextPath("/"); |
| root.setCompactPath(true); |
| server.setHandler(root); |
| |
| final ErrorHandler errorHandler = new ErrorHandler() |
| { |
| @Override |
| protected void writeErrorPageBody(HttpServletRequest request, Writer writer, int code, String message, boolean showStacks) |
| throws IOException |
| { |
| String uri= request.getRequestURI(); |
| |
| writeErrorPageMessage(request,writer,code,message,uri); |
| |
| for (int i= 0; i < 20; i++) |
| writer.write("<br/> \n"); |
| } |
| }; |
| root.setErrorHandler(errorHandler); |
| |
| // set servlet context attributes for broker and configuration |
| root.getServletContext().setAttribute(HttpManagementUtil.ATTR_BROKER, getBroker()); |
| root.getServletContext().setAttribute(HttpManagementUtil.ATTR_MANAGEMENT_CONFIGURATION, this); |
| |
| root.addFilter(new FilterHolder(new ExceptionHandlingFilter()), "/*", EnumSet.allOf(DispatcherType.class)); |
| |
| FilterHolder corsFilter = new FilterHolder(new CrossOriginFilter()); |
| corsFilter.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, getCorsAllowOrigins()); |
| corsFilter.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, Joiner.on(",").join(getCorsAllowMethods())); |
| corsFilter.setInitParameter(CrossOriginFilter.ALLOWED_HEADERS_PARAM, getCorsAllowHeaders()); |
| corsFilter.setInitParameter(CrossOriginFilter.ALLOW_CREDENTIALS_PARAM, String.valueOf(getCorsAllowCredentials())); |
| root.addFilter(corsFilter, "/*", EnumSet.of(DispatcherType.REQUEST)); |
| |
| root.addFilter(new FilterHolder(new ForbiddingTraceFilter()), "/*", EnumSet.of(DispatcherType.REQUEST)); |
| |
| FilterHolder loggingFilter = new FilterHolder(new LoggingFilter()); |
| root.addFilter(loggingFilter, "/api/*", EnumSet.of(DispatcherType.REQUEST)); |
| root.addFilter(loggingFilter, "/service/*", EnumSet.of(DispatcherType.REQUEST)); |
| |
| FilterHolder restAuthorizationFilter = new FilterHolder(new AuthenticationCheckFilter()); |
| restAuthorizationFilter.setInitParameter(AuthenticationCheckFilter.INIT_PARAM_ALLOWED, "/service/sasl"); |
| root.addFilter(restAuthorizationFilter, "/api/*", EnumSet.of(DispatcherType.REQUEST)); |
| root.addFilter(restAuthorizationFilter, "/apidocs/*", EnumSet.of(DispatcherType.REQUEST)); |
| root.addFilter(restAuthorizationFilter, "/service/*", EnumSet.of(DispatcherType.REQUEST)); |
| |
| root.addFilter(new FilterHolder(new InteractiveAuthenticationFilter()), "/index.html", EnumSet.of(DispatcherType.REQUEST)); |
| root.addFilter(new FilterHolder(new InteractiveAuthenticationFilter()), "/", EnumSet.of(DispatcherType.REQUEST)); |
| |
| FilterHolder redirectFilter = new FilterHolder(new RedirectFilter()); |
| redirectFilter.setInitParameter(RedirectFilter.INIT_PARAM_REDIRECT_URI, "/index.html"); |
| root.addFilter(redirectFilter, "/login.html", EnumSet.of(DispatcherType.REQUEST)); |
| |
| if (_serveUncompressedDojo) |
| { |
| root.addFilter(RewriteRequestForUncompressedJavascript.class, "/dojo/dojo/*", EnumSet.of(DispatcherType.REQUEST)); |
| root.addFilter(RewriteRequestForUncompressedJavascript.class, "/dojo/dojox/*", EnumSet.of(DispatcherType.REQUEST)); |
| } |
| |
| addRestServlet(root); |
| |
| ServletHolder queryServlet = new ServletHolder(new BrokerQueryServlet()); |
| root.addServlet(queryServlet, "/api/latest/querybroker/*"); |
| root.addServlet(queryServlet, "/api/v" + BrokerModel.MODEL_VERSION + "/querybroker/*"); |
| |
| ServletHolder vhQueryServlet = new ServletHolder(new VirtualHostQueryServlet()); |
| root.addServlet(vhQueryServlet, "/api/latest/queryvhost/*"); |
| root.addServlet(vhQueryServlet, "/api/v" + BrokerModel.MODEL_VERSION + "/queryvhost/*"); |
| |
| |
| ServletHolder apiDocsServlet = new ServletHolder(new ApiDocsServlet()); |
| final String version = "v" + BrokerModel.MODEL_VERSION; |
| for (final String path : new String[]{"/apidocs", "/apidocs/latest", "/apidocs/" + version}) |
| { |
| root.addServlet(apiDocsServlet, path); |
| root.addServlet(apiDocsServlet, path + "/"); |
| } |
| |
| root.addServlet(new ServletHolder(new StructureServlet()), "/service/structure"); |
| root.addServlet(new ServletHolder(new QueueReportServlet()), "/service/queuereport/*"); |
| |
| root.addServlet(new ServletHolder(new MetaDataServlet()), "/service/metadata"); |
| |
| root.addServlet(new ServletHolder(new SaslServlet()), "/service/sasl"); |
| |
| root.addServlet(new ServletHolder(new RootServlet("/","/apidocs/","index.html")), "/"); |
| root.addServlet(new ServletHolder(new LogoutServlet()), "/logout"); |
| |
| root.addServlet(new ServletHolder(new FileServlet(DojoHelper.getDojoPath(), true)), "/dojo/dojo/*"); |
| root.addServlet(new ServletHolder(new FileServlet(DojoHelper.getDijitPath(), true)), "/dojo/dijit/*"); |
| root.addServlet(new ServletHolder(new FileServlet(DojoHelper.getDojoxPath(), true)), "/dojo/dojox/*"); |
| root.addServlet(new ServletHolder(new FileServlet(DojoHelper.getDgridPath(), true)), "/dojo/dgrid/*"); |
| root.addServlet(new ServletHolder(new FileServlet(DojoHelper.getDstorePath(), true)), "/dojo/dstore/*"); |
| |
| for(String pattern : STATIC_FILE_TYPES) |
| { |
| root.addServlet(new ServletHolder(new FileServlet()), pattern); |
| } |
| |
| root.addServlet(new ServletHolder(new TimeZoneServlet()), "/service/timezones"); |
| |
| final Iterable<ContentFactory> contentFactories = new QpidServiceLoader().instancesOf(ContentFactory.class); |
| contentFactories.forEach(f->{ |
| ServletHolder metricsServlet = new ServletHolder(new ContentServlet(f)); |
| String path = f.getType().toLowerCase(); |
| root.addServlet(metricsServlet, "/" + path); |
| root.addServlet(metricsServlet, "/" + path + "/*"); |
| |
| if (getContextValue(Boolean.class, HTTP_MANAGEMENT_ENABLE_CONTENT_AUTHENTICATION)) |
| { |
| root.addFilter(restAuthorizationFilter, "/" + path, EnumSet.of(DispatcherType.REQUEST)); |
| root.addFilter(restAuthorizationFilter, "/" + path + "/*", EnumSet.of(DispatcherType.REQUEST)); |
| } |
| |
| }); |
| |
| root.getSessionHandler().getSessionCookieConfig().setName(JSESSIONID_COOKIE_PREFIX + lastPort); |
| root.getSessionHandler().getSessionCookieConfig().setHttpOnly(true); |
| root.getSessionHandler().setMaxInactiveInterval(getSessionTimeout()); |
| |
| return server; |
| } |
| |
| @Override |
| public int getBoundPort(final HttpPort httpPort) |
| { |
| NetworkConnector c = _portConnectorMap.get(httpPort); |
| if (c != null) |
| { |
| return c.getLocalPort(); |
| } |
| else |
| { |
| return -1; |
| } |
| } |
| |
| @Override |
| public int getNumberOfAcceptors(HttpPort httpPort) |
| { |
| ServerConnector c = _portConnectorMap.get(httpPort); |
| if (c != null) |
| { |
| return c.getAcceptors(); |
| } |
| else |
| { |
| return -1; |
| } |
| } |
| |
| @Override |
| public int getNumberOfSelectors(HttpPort httpPort) |
| { |
| ServerConnector c = _portConnectorMap.get(httpPort); |
| if (c != null) |
| { |
| return c.getSelectorManager().getSelectorCount(); |
| } |
| else |
| { |
| return -1; |
| } |
| } |
| |
| @Override |
| public SSLContext getSSLContext(final HttpPort httpPort) |
| { |
| final SslContextFactory sslContextFactory = getSslContextFactory(httpPort); |
| if ( sslContextFactory != null) |
| { |
| return sslContextFactory.getSslContext(); |
| } |
| return null; |
| } |
| |
| @Override |
| public boolean updateSSLContext(final HttpPort httpPort) |
| { |
| final SslContextFactory sslContextFactory = getSslContextFactory(httpPort); |
| if ( sslContextFactory != null) |
| { |
| try |
| { |
| final SSLContext sslContext = createSslContext(httpPort); |
| sslContextFactory.reload(f -> { |
| f.setSslContext(sslContext); |
| f.setNeedClientAuth(httpPort.getNeedClientAuth()); |
| f.setWantClientAuth(httpPort.getWantClientAuth()); |
| }); |
| return true; |
| } |
| catch (Exception e) |
| { |
| throw new IllegalConfigurationException("Unexpected exception on reload of ssl context factory", e); |
| } |
| } |
| return false; |
| } |
| |
| private SslContextFactory getSslContextFactory(final HttpPort httpPort) |
| { |
| return _sslContextFactoryMap.get(httpPort); |
| } |
| |
| private ServerConnector createConnector(final HttpPort<?> port, final Server server) |
| { |
| port.setPortManager(this); |
| |
| if (port.getState() != State.ACTIVE) |
| { |
| // TODO - RG - probably does nothing |
| port.startAsync(); |
| } |
| |
| HttpConnectionFactory httpConnectionFactory = new HttpConnectionFactory(); |
| httpConnectionFactory.getHttpConfiguration().setSendServerVersion(false); |
| httpConnectionFactory.getHttpConfiguration().setSendXPoweredBy(false); |
| HttpConfiguration.Customizer requestAttributeCustomizer = |
| (connector, httpConfiguration, request) -> HttpManagementUtil.getPortAttributeAction(port) |
| .performAction(request); |
| |
| httpConnectionFactory.getHttpConfiguration().addCustomizer(requestAttributeCustomizer); |
| httpConnectionFactory.getHttpConfiguration().addCustomizer(new SecureRequestCustomizer()); |
| |
| ConnectionFactory[] connectionFactories; |
| Collection<Transport> transports = port.getTransports(); |
| SslContextFactory sslContextFactory = null; |
| if (!transports.contains(Transport.SSL)) |
| { |
| connectionFactories = new ConnectionFactory[]{httpConnectionFactory}; |
| } |
| else if (transports.contains(Transport.SSL)) |
| { |
| sslContextFactory = createSslContextFactory(port); |
| ConnectionFactory sslConnectionFactory; |
| if (port.getTransports().contains(Transport.TCP)) |
| { |
| sslConnectionFactory = |
| new TlsOrPlainConnectionFactory(sslContextFactory, httpConnectionFactory.getProtocol()); |
| } |
| else |
| { |
| sslConnectionFactory = new SslConnectionFactory(sslContextFactory, httpConnectionFactory.getProtocol()); |
| } |
| connectionFactories = new ConnectionFactory[]{sslConnectionFactory, httpConnectionFactory}; |
| } |
| else |
| { |
| throw new IllegalArgumentException("Unexpected transport on port " + port.getName() + ":" + transports); |
| } |
| |
| ServerConnector connector = new ServerConnector(server, |
| new QBBTrackingThreadPool(port.getThreadPoolMaximum(), |
| port.getThreadPoolMinimum()), |
| null, |
| null, |
| port.getDesiredNumberOfAcceptors(), |
| port.getDesiredNumberOfSelectors(), |
| connectionFactories) |
| { |
| @Override |
| public void open() throws IOException |
| { |
| try |
| { |
| super.open(); |
| } |
| catch (BindException e) |
| { |
| _sslContextFactoryMap.remove(port); |
| InetSocketAddress addr = getHost() == null ? new InetSocketAddress(getPort()) |
| : new InetSocketAddress(getHost(), getPort()); |
| throw new PortBindFailureException(addr); |
| } |
| } |
| }; |
| |
| connector.setAcceptQueueSize(port.getAcceptBacklogSize()); |
| String bindingAddress = port.getBindingAddress(); |
| if (bindingAddress != null && !bindingAddress.trim().equals("") && !bindingAddress.trim().equals("*")) |
| { |
| connector.setHost(bindingAddress.trim()); |
| } |
| connector.setPort(port.getPort()); |
| |
| if (transports.contains(Transport.SSL)) |
| { |
| connector.addBean(new SslHandshakeListener() |
| { |
| @Override |
| public void handshakeFailed(final Event event, final Throwable failure) |
| { |
| SSLEngine sslEngine = event.getSSLEngine(); |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.info("TLS handshake failed: host='{}', port={}", |
| sslEngine.getPeerHost(), |
| sslEngine.getPeerPort(), |
| failure); |
| } |
| else |
| { |
| LOGGER.info("TLS handshake failed: host='{}', port={}: {}", |
| sslEngine.getPeerHost(), |
| sslEngine.getPeerPort(), |
| String.valueOf(failure)); |
| } |
| } |
| }); |
| } |
| |
| |
| int acceptors = connector.getAcceptors(); |
| int selectors = connector.getSelectorManager().getSelectorCount(); |
| if (LOGGER.isDebugEnabled()) |
| { |
| LOGGER.debug( |
| "Created connector for http port {} with maxThreads={}, minThreads={}, acceptors={}, selectors={}, acceptBacklog={}", |
| port.getName(), |
| port.getThreadPoolMaximum(), |
| port.getThreadPoolMinimum(), |
| acceptors, |
| selectors, |
| port.getAcceptBacklogSize()); |
| } |
| |
| int requiredNumberOfConnections = acceptors + 2 * selectors + 1; |
| if (port.getThreadPoolMaximum() < requiredNumberOfConnections) |
| { |
| throw new IllegalConfigurationException(String.format( |
| "Insufficient number of threads is configured on http port '%s': max=%d < needed(acceptors=%d + selectors=2*%d + request=1)", |
| port.getName(), |
| port.getThreadPoolMaximum(), |
| acceptors, |
| selectors)); |
| } |
| if (sslContextFactory != null) |
| { |
| _sslContextFactoryMap.put(port, sslContextFactory); |
| } |
| |
| return connector; |
| } |
| |
| private SslContextFactory createSslContextFactory(final HttpPort<?> port) |
| { |
| SslContextFactory.Server factory = new SslContextFactory.Server() |
| { |
| @Override |
| public void customize(final SSLEngine sslEngine) |
| { |
| super.customize(sslEngine); |
| if (port.getTlsCipherSuiteWhiteList() != null |
| && !port.getTlsCipherSuiteWhiteList().isEmpty()) |
| { |
| SSLParameters sslParameters = sslEngine.getSSLParameters(); |
| sslParameters.setUseCipherSuitesOrder(true); |
| sslEngine.setSSLParameters(sslParameters); |
| } |
| SSLUtil.updateEnabledCipherSuites(sslEngine, |
| port.getTlsCipherSuiteWhiteList(), |
| port.getTlsCipherSuiteBlackList()); |
| SSLUtil.updateEnabledTlsProtocols(sslEngine, |
| port.getTlsProtocolWhiteList(), |
| port.getTlsProtocolBlackList()); |
| } |
| }; |
| factory.setSslContext(createSslContext(port)); |
| if (port.getNeedClientAuth()) |
| { |
| factory.setNeedClientAuth(true); |
| } |
| else if (port.getWantClientAuth()) |
| { |
| factory.setWantClientAuth(true); |
| } |
| return factory; |
| } |
| |
| private SSLContext createSslContext(final HttpPort<?> port) |
| { |
| KeyStore keyStore = port.getKeyStore(); |
| if (keyStore == null) |
| { |
| throw new IllegalConfigurationException( |
| "Key store is not configured. Cannot start management on HTTPS port without keystore"); |
| } |
| |
| final boolean needClientCert = port.getNeedClientAuth() || port.getWantClientAuth(); |
| final Collection<TrustStore> trustStores = port.getTrustStores(); |
| |
| if (needClientCert && trustStores.isEmpty()) |
| { |
| throw new IllegalConfigurationException(String.format( |
| "Client certificate authentication is enabled on HTTPS port '%s' but no trust store defined", |
| this.getName())); |
| } |
| |
| final SSLContext sslContext = SSLUtil.createSslContext(port.getKeyStore(), trustStores, port.getName()); |
| final SSLSessionContext serverSessionContext = sslContext.getServerSessionContext(); |
| if (port.getTLSSessionCacheSize() > 0) |
| { |
| serverSessionContext.setSessionCacheSize(port.getTLSSessionCacheSize()); |
| } |
| if (port.getTLSSessionTimeout() > 0) |
| { |
| serverSessionContext.setSessionTimeout(port.getTLSSessionTimeout()); |
| } |
| return sslContext; |
| } |
| |
| private void addRestServlet(final ServletContextHandler root) |
| { |
| final Map<String, ManagementControllerFactory> factories = ManagementControllerFactory.loadFactories(); |
| final ApiDocsServlet apiDocsServlet = new ApiDocsServlet(); |
| final List<String> supportedVersions = new ArrayList<>(); |
| String currentVersion = BrokerModel.MODEL_VERSION; |
| ManagementController managementController = null; |
| ManagementControllerFactory factory; |
| do |
| { |
| factory = factories.get(currentVersion); |
| if (factory != null) |
| { |
| managementController = factory.createManagementController(this, managementController); |
| final RestServlet managementServlet = new RestServlet(); |
| final Collection<String> categories = managementController.getCategories(); |
| for (String category : categories) |
| { |
| final String name = category.toLowerCase(); |
| final String path = managementController.getCategoryMapping(name); |
| final ServletHolder servletHolder = new ServletHolder(path, managementServlet); |
| servletHolder.setInitParameter("qpid.controller.version", managementController.getVersion()); |
| |
| servletHolder.getRegistration().setMultipartConfig( |
| new MultipartConfigElement("", |
| getContextValue(Long.class, |
| MAX_HTTP_FILE_UPLOAD_SIZE_CONTEXT_NAME), |
| -1L, |
| getContextValue(Integer.class, |
| MAX_HTTP_FILE_UPLOAD_SIZE_CONTEXT_NAME))); |
| |
| root.addServlet(servletHolder, path + (path.endsWith("/") ? "*" : "/*")); |
| |
| if (BrokerModel.MODEL_VERSION.equals(managementController.getVersion())) |
| { |
| root.addServlet(servletHolder, "/api/latest/" + name + "/*"); |
| ServletHolder docServletHolder = new ServletHolder(name + "docs", apiDocsServlet); |
| root.addServlet(docServletHolder, "/apidocs/latest/" + name + "/"); |
| root.addServlet(docServletHolder, "/apidocs/v" + BrokerModel.MODEL_VERSION + "/" + name + "/"); |
| root.addServlet(docServletHolder, "/apidocs/latest/" + name); |
| root.addServlet(docServletHolder, "/apidocs/v" + BrokerModel.MODEL_VERSION + "/" + name); |
| } |
| } |
| supportedVersions.add("v" + currentVersion); |
| currentVersion = factory.getPreviousVersion(); |
| } |
| } |
| while (factory != null); |
| root.getServletContext().setAttribute("qpid.controller.chain", managementController); |
| final Map<String, List<String>> supported = Collections.singletonMap("supportedVersions", supportedVersions); |
| final ServletHolder versionsServletHolder = new ServletHolder(new JsonValueServlet(supported)); |
| root.addServlet(versionsServletHolder, "/api"); |
| root.addServlet(versionsServletHolder, "/api/"); |
| } |
| |
| private void logOperationalListenMessages() |
| { |
| for (Map.Entry<HttpPort<?>, ServerConnector> portConnector : _portConnectorMap.entrySet()) |
| { |
| HttpPort<?> port = portConnector.getKey(); |
| NetworkConnector connector = portConnector.getValue(); |
| logOperationalListenMessages(port, connector.getLocalPort()); |
| } |
| } |
| |
| private void logOperationalListenMessages(final HttpPort<?> port, final int localPort) |
| { |
| Set<Transport> transports = port.getTransports(); |
| for (Transport transport: transports) |
| { |
| getBroker().getEventLogger().message(ManagementConsoleMessages.LISTENING(Protocol.HTTP.name(), |
| transport.name(), |
| localPort)); |
| } |
| } |
| |
| private void logOperationalShutdownMessage() |
| { |
| for (NetworkConnector connector : _portConnectorMap.values()) |
| { |
| logOperationalShutdownMessage(connector.getLocalPort()); |
| } |
| } |
| |
| private void logOperationalShutdownMessage(final int localPort) |
| { |
| getBroker().getEventLogger().message(ManagementConsoleMessages.SHUTTING_DOWN(Protocol.HTTP.name(), |
| localPort)); |
| } |
| |
| private Collection<HttpPort<?>> getEligibleHttpPorts(Collection<Port<?>> ports) |
| { |
| Collection<HttpPort<?>> httpPorts = new HashSet<>(); |
| for (Port<?> port : ports) |
| { |
| if (State.ACTIVE == port.getDesiredState() && |
| State.ERRORED != port.getState() && |
| port.getProtocols().contains(Protocol.HTTP)) |
| { |
| httpPorts.add((HttpPort<?>) port); |
| } |
| } |
| return Collections.unmodifiableCollection(httpPorts); |
| } |
| |
| @Override |
| public boolean isHttpsSaslAuthenticationEnabled() |
| { |
| return _httpsSaslAuthenticationEnabled; |
| } |
| |
| @Override |
| public boolean isHttpSaslAuthenticationEnabled() |
| { |
| return _httpSaslAuthenticationEnabled; |
| } |
| |
| @Override |
| public boolean isHttpsBasicAuthenticationEnabled() |
| { |
| return _httpsBasicAuthenticationEnabled; |
| } |
| |
| @Override |
| public boolean isHttpBasicAuthenticationEnabled() |
| { |
| return _httpBasicAuthenticationEnabled; |
| } |
| |
| @Override |
| public boolean isCompressResponses() |
| { |
| return _compressResponses; |
| } |
| |
| @Override |
| public AuthenticationProvider getAuthenticationProvider(HttpServletRequest request) |
| { |
| HttpPort<?> port = getPort(request); |
| return port == null ? null : port.getAuthenticationProvider(); |
| } |
| |
| @SuppressWarnings("unused") |
| public static Set<String> getAllAvailableCorsMethodCombinations() |
| { |
| List<String> methods = Arrays.asList("OPTION", "HEAD", "GET", "POST", "PUT", "DELETE"); |
| Set<Set<String>> combinations = new HashSet<>(); |
| int n = methods.size(); |
| assert n < 31 : "Too many combination to calculate"; |
| // enumerate all 2**n combinations |
| for (int i = 0; i < (1 << n); ++i) { |
| Set<String> currentCombination = new HashSet<>(); |
| // each bit in the variable i represents an item of the sequence |
| // if the bit is set the item should appear in this particular combination |
| for (int index = 0; index < n; ++index) { |
| if ((i & (1 << index)) != 0) { |
| currentCombination.add(methods.get(index)); |
| } |
| } |
| combinations.add(currentCombination); |
| } |
| |
| Set<String> combinationsAsString = new HashSet<>(combinations.size()); |
| ObjectMapper mapper = new ObjectMapper(); |
| for(Set<String> combination : combinations) |
| { |
| try(StringWriter writer = new StringWriter()) |
| { |
| mapper.writeValue(writer, combination); |
| combinationsAsString.add(writer.toString()); |
| } |
| catch (IOException e) |
| { |
| throw new IllegalArgumentException("Unexpected IO Exception generating JSON string", e); |
| } |
| } |
| return Collections.unmodifiableSet(combinationsAsString); |
| } |
| |
| @Override |
| public HttpPort<?> getPort(final HttpServletRequest request) |
| { |
| return HttpManagementUtil.getPort(request); |
| } |
| |
| @Override |
| protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes) |
| { |
| super.validateChange(proxyForValidation, changedAttributes); |
| |
| HttpManagementConfiguration<?> updated = (HttpManagementConfiguration<?>)proxyForValidation; |
| if(changedAttributes.contains(HttpManagement.NAME)) |
| { |
| if(!getName().equals(updated.getName())) |
| { |
| throw new IllegalConfigurationException("Changing the name of http management plugin is not allowed"); |
| } |
| } |
| if (changedAttributes.contains(TIME_OUT)) |
| { |
| int value = updated.getSessionTimeout(); |
| if (value < 0) |
| { |
| throw new IllegalConfigurationException("Only positive integer value can be specified for the session time out attribute"); |
| } |
| } |
| } |
| |
| @Override |
| public long getSaslExchangeExpiry() |
| { |
| return _saslExchangeExpiry; |
| } |
| |
| private static class QBBTrackingThreadPool extends QueuedThreadPool |
| { |
| private final ThreadFactory _threadFactory; |
| |
| public QBBTrackingThreadPool(@Name("maxThreads") final int maxThreads, @Name("minThreads") final int minThreads) |
| { |
| super(maxThreads, minThreads); |
| _threadFactory = QpidByteBuffer.createQpidByteBufferTrackingThreadFactory(r -> QBBTrackingThreadPool.super.newThread(r)); |
| } |
| |
| @Override |
| public Thread newThread(final Runnable runnable) |
| { |
| return _threadFactory.newThread(runnable); |
| } |
| } |
| |
| private class BrokerChangeListener extends AbstractConfigurationChangeListener |
| { |
| @Override |
| public void childAdded(final ConfiguredObject<?> object, final ConfiguredObject<?> child) |
| { |
| if (child instanceof HttpPort) |
| { |
| final HttpPort<?> port = (HttpPort<?>) child; |
| Server server = _server; |
| if (server != null) |
| { |
| ServerConnector connector = null; |
| try |
| { |
| connector = createConnector(port, server); |
| connector.addBean(new ConnectionTrackingListener()); |
| server.addConnector(connector); |
| connector.start(); |
| _portConnectorMap.put(port, connector); |
| logOperationalListenMessages(port, connector.getLocalPort()); |
| } |
| catch (Exception e) |
| { |
| if (connector != null) |
| { |
| server.removeConnector(connector); |
| } |
| LOGGER.warn("HTTP management connector creation failed for http port {}", port, e); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void childRemoved(final ConfiguredObject<?> object, final ConfiguredObject<?> child) |
| { |
| if (child instanceof HttpPort) |
| { |
| final HttpPort<?> port = (HttpPort<?>) child; |
| Server server = _server; |
| if (server != null) |
| { |
| _sslContextFactoryMap.remove(port); |
| final ServerConnector connector = _portConnectorMap.remove(port); |
| if (connector != null) |
| { |
| final int localPort = connector.getLocalPort(); |
| |
| final ConnectionTrackingListener tracker = connector.getBean(ConnectionTrackingListener.class); |
| // closes the server socket - we will see no new connections arriving |
| connector.close(); |
| // minimise the timeout of endpoints so they close in a timely fashion |
| connector.setIdleTimeout(1); |
| connector.getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(1)); |
| LOGGER.debug("Connector has {} connection(s)", tracker.getConnectionCount()); |
| |
| final TaskExecutor taskExecutor = getBroker().getTaskExecutor(); |
| tracker.getAllClosedFuture().addListener(new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| final int connectionCount = tracker.getConnectionCount(); |
| if (connectionCount == 0) |
| { |
| LOGGER.debug("Stopping connector for http port {}", localPort); |
| try |
| { |
| connector.stop(); |
| } |
| catch (Exception e) |
| { |
| LOGGER.warn("Failed to stop connector for http port {}", localPort, e); |
| } |
| finally |
| { |
| logOperationalShutdownMessage(localPort); |
| _server.removeConnector(connector); |
| } |
| } |
| else |
| { |
| LOGGER.debug("Connector still has {} connection(s)", tracker.getConnectionCount()); |
| connector.getConnectedEndPoints().forEach(endPoint -> endPoint.setIdleTimeout(1)); |
| tracker.getAllClosedFuture() |
| .addListener(this, taskExecutor); |
| } |
| } |
| }, taskExecutor); |
| } |
| } |
| } |
| } |
| |
| } |
| |
| private static class ConnectionTrackingListener implements Connection.Listener |
| { |
| private final Map<Connection, SettableFuture<Void>> _closeFutures = new HashMap<>(); |
| |
| @Override |
| public void onOpened(final Connection connection) |
| { |
| _closeFutures.put(connection, SettableFuture.create()); |
| } |
| |
| @Override |
| public void onClosed(final Connection connection) |
| { |
| SettableFuture<Void> closeFuture = _closeFutures.remove(connection); |
| if (closeFuture != null) |
| { |
| closeFuture.set(null); |
| } |
| } |
| |
| public ListenableFuture<List<Void>> getAllClosedFuture() |
| { |
| return Futures.allAsList(_closeFutures.values()); |
| } |
| |
| public int getConnectionCount() |
| { |
| return _closeFutures.size(); |
| } |
| } |
| } |