blob: 32b6fa133144166565055183a81df95c841fbf64 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.common.stats.JvmMetrics.getJvmDirectMemoryUsed;
import static org.slf4j.bridge.SLF4JBridgeHandler.install;
import static org.slf4j.bridge.SLF4JBridgeHandler.removeHandlersForRootLogger;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.internal.PlatformDependent;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
import io.prometheus.client.exporter.MetricsServlet;
import io.prometheus.client.hotspot.DefaultExports;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import lombok.Getter;
import org.apache.logging.log4j.core.util.datetime.FixedDateFormat;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.CmdGenerateDocs;
import org.apache.pulsar.proxy.stats.ProxyStats;
import org.apache.pulsar.websocket.WebSocketConsumerServlet;
import org.apache.pulsar.websocket.WebSocketPingPongServlet;
import org.apache.pulsar.websocket.WebSocketProducerServlet;
import org.apache.pulsar.websocket.WebSocketReaderServlet;
import org.apache.pulsar.websocket.WebSocketService;
import org.eclipse.jetty.proxy.ProxyServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Starts an instance of the Pulsar ProxyService.
*
*/
public class ProxyServiceStarter {
@Parameter(names = { "-c", "--config" }, description = "Configuration file path", required = true)
private String configFile;
@Deprecated
@Parameter(names = { "-zk", "--zookeeper-servers" },
description = "Local zookeeper connection string, please use --metadata-store instead")
private String zookeeperServers = "";
@Parameter(names = { "-md", "--metadata-store" }, description = "Metadata Store service url. eg: zk:my-zk:2181")
private String metadataStoreUrl = "";
@Deprecated
@Parameter(names = { "-gzk", "--global-zookeeper-servers" },
description = "Global zookeeper connection string, please use --configuration-metadata-store instead")
private String globalZookeeperServers = "";
@Deprecated
@Parameter(names = { "-cs", "--configuration-store-servers" },
description = "Configuration store connection string, "
+ "please use --configuration-metadata-store instead")
private String configurationStoreServers = "";
@Parameter(names = { "-cms", "--configuration-metadata-store" },
description = "The metadata store URL for the configuration data")
private String configurationMetadataStoreUrl = "";
@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;
@Parameter(names = {"-g", "--generate-docs"}, description = "Generate docs")
private boolean generateDocs = false;
private ProxyConfiguration config;
@Getter
private ProxyService proxyService;
private WebServer server;
public ProxyServiceStarter(String[] args) throws Exception {
try {
// setup handlers
removeHandlersForRootLogger();
install();
DateFormat dateFormat = new SimpleDateFormat(
FixedDateFormat.FixedFormat.ISO8601_OFFSET_DATE_TIME_HHMM.getPattern());
Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> {
System.out.printf("%s [%s] error Uncaught exception in thread %s: %s%n", dateFormat.format(new Date()),
thread.getContextClassLoader(), thread.getName(), exception.getMessage());
exception.printStackTrace(System.out);
});
JCommander jcommander = new JCommander();
try {
jcommander.addObject(this);
jcommander.parse(args);
if (help || isBlank(configFile)) {
jcommander.usage();
return;
}
if (this.generateDocs) {
CmdGenerateDocs cmd = new CmdGenerateDocs("pulsar");
cmd.addCommand("proxy", this);
cmd.run(null);
System.exit(0);
}
} catch (Exception e) {
jcommander.usage();
System.exit(-1);
}
// load config file
config = PulsarConfigurationLoader.create(configFile, ProxyConfiguration.class);
if (isBlank(metadataStoreUrl)) {
// Use zookeeperServers from command line if metadataStoreUrl is empty;
config.setMetadataStoreUrl(zookeeperServers);
} else {
// Use metadataStoreUrl from command line
config.setMetadataStoreUrl(metadataStoreUrl);
}
if (!isBlank(globalZookeeperServers)) {
config.setConfigurationMetadataStoreUrl(globalZookeeperServers);
}
if (!isBlank(configurationStoreServers)) {
// Use configurationMetadataStoreUrl from command line
config.setConfigurationMetadataStoreUrl(configurationStoreServers);
}
if (!isBlank(configurationMetadataStoreUrl)) {
config.setConfigurationMetadataStoreUrl(configurationMetadataStoreUrl);
}
if ((isBlank(config.getBrokerServiceURL()) && isBlank(config.getBrokerServiceURLTLS()))
|| config.isAuthorizationEnabled()) {
checkArgument(!isEmpty(config.getMetadataStoreUrl()), "metadataStoreUrl must be provided");
checkArgument(!isEmpty(config.getConfigurationMetadataStoreUrl()),
"configurationMetadataStoreUrl must be provided");
}
if ((!config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURL()))
|| (config.isTlsEnabledWithBroker() && isBlank(config.getBrokerWebServiceURLTLS()))) {
checkArgument(!isEmpty(config.getMetadataStoreUrl()), "metadataStoreUrl must be provided");
}
} catch (Exception e) {
log.error("Failed to start pulsar proxy service. error msg " + e.getMessage(), e);
throw new PulsarServerException(e);
}
}
public static void main(String[] args) throws Exception {
ProxyServiceStarter serviceStarter = new ProxyServiceStarter(args);
serviceStarter.start();
}
public void start() throws Exception {
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));
// create proxy service
proxyService = new ProxyService(config, authenticationService);
// create a web-service
server = new WebServer(config, authenticationService);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
close();
}));
proxyService.start();
// Setup metrics
DefaultExports.initialize();
// Report direct memory from Netty counters
Gauge.build("jvm_memory_direct_bytes_used", "-").create().setChild(new Child() {
@Override
public double get() {
return getJvmDirectMemoryUsed();
}
}).register(CollectorRegistry.defaultRegistry);
Gauge.build("jvm_memory_direct_bytes_max", "-").create().setChild(new Child() {
@Override
public double get() {
return PlatformDependent.maxDirectMemory();
}
}).register(CollectorRegistry.defaultRegistry);
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider());
// start web-service
server.start();
}
public void close() {
try {
if (proxyService != null) {
proxyService.close();
}
if (server != null) {
server.stop();
}
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
}
}
public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
server.addServlet("/metrics", new ServletHolder(MetricsServlet.class),
Collections.emptyList(), config.isAuthenticateMetricsEndpoint());
server.addRestResources("/", VipStatus.class.getPackage().getName(),
VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath());
server.addRestResources("/proxy-stats", ProxyStats.class.getPackage().getName(),
ProxyStats.ATTRIBUTE_PULSAR_PROXY_NAME, service);
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
servletHolder.setInitParameter("preserveHost", "true");
server.addServlet("/admin", servletHolder);
server.addServlet("/lookup", servletHolder);
for (ProxyConfiguration.HttpReverseProxyConfig revProxy : config.getHttpReverseProxyConfigs()) {
log.debug("Adding reverse proxy with config {}", revProxy);
ServletHolder proxyHolder = new ServletHolder(ProxyServlet.Transparent.class);
proxyHolder.setInitParameter("proxyTo", revProxy.getProxyTo());
proxyHolder.setInitParameter("prefix", "/");
server.addServlet(revProxy.getPath(), proxyHolder);
}
// add proxy additional servlets
if (service != null && service.getProxyAdditionalServlets() != null) {
Collection<AdditionalServletWithClassLoader> additionalServletCollection =
service.getProxyAdditionalServlets().getServlets().values();
for (AdditionalServletWithClassLoader servletWithClassLoader : additionalServletCollection) {
servletWithClassLoader.loadConfig(config);
server.addServlet(servletWithClassLoader.getBasePath(), servletWithClassLoader.getServletHolder(),
Collections.emptyList(), config.isAuthenticationEnabled());
log.info("proxy add additional servlet basePath {} ", servletWithClassLoader.getBasePath());
}
}
if (config.isWebSocketServiceEnabled()) {
// add WebSocket servlet
// Use local broker address to avoid different IP address when using a VIP for service discovery
ServiceConfiguration serviceConfiguration = PulsarConfigurationLoader.convertFrom(config);
serviceConfiguration.setBrokerClientTlsEnabled(config.isTlsEnabledWithBroker());
WebSocketService webSocketService = new WebSocketService(createClusterData(config), serviceConfiguration);
webSocketService.start();
final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService);
server.addServlet(WebSocketProducerServlet.SERVLET_PATH,
new ServletHolder(producerWebSocketServlet));
server.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2,
new ServletHolder(producerWebSocketServlet));
final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService);
server.addServlet(WebSocketConsumerServlet.SERVLET_PATH,
new ServletHolder(consumerWebSocketServlet));
server.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2,
new ServletHolder(consumerWebSocketServlet));
final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService);
server.addServlet(WebSocketReaderServlet.SERVLET_PATH,
new ServletHolder(readerWebSocketServlet));
server.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2,
new ServletHolder(readerWebSocketServlet));
final WebSocketServlet pingPongWebSocketServlet = new WebSocketPingPongServlet(webSocketService);
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH,
new ServletHolder(pingPongWebSocketServlet));
server.addServlet(WebSocketPingPongServlet.SERVLET_PATH_V2,
new ServletHolder(pingPongWebSocketServlet));
}
}
private static ClusterData createClusterData(ProxyConfiguration config) {
if (isNotBlank(config.getBrokerServiceURL()) || isNotBlank(config.getBrokerServiceURLTLS())) {
return ClusterData.builder()
.serviceUrl(config.getBrokerWebServiceURL())
.serviceUrlTls(config.getBrokerWebServiceURLTLS())
.brokerServiceUrl(config.getBrokerServiceURL())
.brokerServiceUrlTls(config.getBrokerServiceURLTLS())
.build();
} else if (isNotBlank(config.getBrokerWebServiceURL()) || isNotBlank(config.getBrokerWebServiceURLTLS())) {
return ClusterData.builder()
.serviceUrl(config.getBrokerWebServiceURL())
.serviceUrlTls(config.getBrokerWebServiceURLTLS())
.build();
} else {
return null;
}
}
@VisibleForTesting
public ProxyConfiguration getConfig() {
return config;
}
@VisibleForTesting
public WebServer getServer() {
return server;
}
private static final Logger log = LoggerFactory.getLogger(ProxyServiceStarter.class);
}