blob: 838a2f1bf717d8225c0ebfadf615853ab9427018 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.websocket;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.Closeable;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException;
import javax.websocket.DeploymentException;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationManager;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.websocket.service.WebSocketProxyConfiguration;
import org.apache.pulsar.websocket.stats.ProxyStats;
import org.apache.pulsar.zookeeper.GlobalZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperCache;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.util.concurrent.DefaultThreadFactory;
/**
* Socket proxy server which initializes other dependent services and starts server by opening web-socket end-point url.
*
*/
public class WebSocketService implements Closeable {
public static final int MaxTextFrameSize = 1024 * 1024;
AuthenticationService authenticationService;
AuthorizationManager authorizationManager;
PulsarClient pulsarClient;
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(
WebSocketProxyConfiguration.WEBSOCKET_SERVICE_THREADS, new DefaultThreadFactory("pulsar-websocket"));
private final OrderedSafeExecutor orderedExecutor = new OrderedSafeExecutor(
WebSocketProxyConfiguration.GLOBAL_ZK_THREADS, "pulsar-websocket-ordered");
private GlobalZooKeeperCache globalZkCache;
private ZooKeeperClientFactory zkClientFactory;
private ServiceConfiguration config;
private ConfigurationCacheService configurationCacheService;
private ClusterData localCluster;
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> topicProducerMap;
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ConsumerHandler>> topicConsumerMap;
private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>> topicReaderMap;
private final ProxyStats proxyStats;
public WebSocketService(WebSocketProxyConfiguration config) {
this(createClusterData(config), createServiceConfiguration(config));
}
public WebSocketService(ClusterData localCluster, ServiceConfiguration config) {
this.config = config;
this.localCluster = localCluster;
this.topicProducerMap = new ConcurrentOpenHashMap<>();
this.topicConsumerMap = new ConcurrentOpenHashMap<>();
this.topicReaderMap = new ConcurrentOpenHashMap<>();
this.proxyStats = new ProxyStats(this);
}
public void start() throws PulsarServerException, PulsarClientException, MalformedURLException, ServletException,
DeploymentException {
if (isNotBlank(config.getGlobalZookeeperServers())) {
this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(),
(int) config.getZooKeeperSessionTimeoutMillis(), config.getGlobalZookeeperServers(),
this.orderedExecutor, this.executor);
try {
this.globalZkCache.start();
} catch (IOException e) {
throw new PulsarServerException(e);
}
this.configurationCacheService = new ConfigurationCacheService(getGlobalZkCache());
log.info("Global Zookeeper cache started");
}
// start authorizationManager
if (config.isAuthorizationEnabled()) {
if (configurationCacheService == null) {
throw new PulsarServerException(
"Failed to initialize authorization manager due to empty GlobalZookeeperServers");
}
authorizationManager = new AuthorizationManager(this.config, configurationCacheService);
}
// start authentication service
authenticationService = new AuthenticationService(this.config);
log.info("Pulsar WebSocket Service started");
}
@Override
public void close() throws IOException {
if (pulsarClient != null) {
pulsarClient.close();
}
if (authenticationService != null) {
authenticationService.close();
}
if (globalZkCache != null) {
globalZkCache.close();
}
executor.shutdown();
orderedExecutor.shutdown();
}
public AuthenticationService getAuthenticationService() {
return authenticationService;
}
public AuthorizationManager getAuthorizationManager() {
return authorizationManager;
}
public ZooKeeperCache getGlobalZkCache() {
return globalZkCache;
}
public ZooKeeperClientFactory getZooKeeperClientFactory() {
if (zkClientFactory == null) {
zkClientFactory = new ZookeeperClientFactoryImpl();
}
// Return default factory
return zkClientFactory;
}
public synchronized PulsarClient getPulsarClient() throws IOException {
// Do lazy initialization of client
if (pulsarClient == null) {
if (localCluster == null) {
// If not explicitly set, read clusters data from ZK
localCluster = retrieveClusterData();
}
pulsarClient = createClientInstance(localCluster);
}
return pulsarClient;
}
private PulsarClient createClientInstance(ClusterData clusterData) throws IOException {
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setUseTls(config.isTlsEnabled());
clientConf.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
clientConf.setTlsTrustCertsFilePath(config.getTlsTrustCertsFilePath());
clientConf.setIoThreads(config.getWebSocketNumIoThreads());
clientConf.setConnectionsPerBroker(config.getWebSocketConnectionsPerBroker());
if (config.isAuthenticationEnabled()) {
clientConf.setAuthentication(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
}
if (config.isTlsEnabled()) {
if (isNotBlank(clusterData.getBrokerServiceUrlTls())) {
return PulsarClient.create(clusterData.getBrokerServiceUrlTls(), clientConf);
} else if (isNotBlank(clusterData.getServiceUrlTls())) {
return PulsarClient.create(clusterData.getServiceUrlTls(), clientConf);
}
} else if (isNotBlank(clusterData.getBrokerServiceUrl())) {
return PulsarClient.create(clusterData.getBrokerServiceUrl(), clientConf);
}
return PulsarClient.create(clusterData.getServiceUrl(), clientConf);
}
private static ClusterData createClusterData(WebSocketProxyConfiguration config) {
if (isNotBlank(config.getBrokerServiceUrl()) || isNotBlank(config.getBrokerServiceUrlTls())) {
return new ClusterData(config.getServiceUrl(), config.getServiceUrlTls(), config.getBrokerServiceUrl(),
config.getBrokerServiceUrlTls());
} else if (isNotBlank(config.getServiceUrl()) || isNotBlank(config.getServiceUrlTls())) {
return new ClusterData(config.getServiceUrl(), config.getServiceUrlTls());
} else {
return null;
}
}
private static ServiceConfiguration createServiceConfiguration(WebSocketProxyConfiguration config) {
ServiceConfiguration serviceConfig = new ServiceConfiguration();
serviceConfig.setProperties(config.getProperties());
serviceConfig.setClusterName(config.getClusterName());
serviceConfig.setWebServicePort(config.getWebServicePort());
serviceConfig.setWebServicePortTls(config.getWebServicePortTls());
serviceConfig.setAuthenticationEnabled(config.isAuthenticationEnabled());
serviceConfig.setAuthenticationProviders(config.getAuthenticationProviders());
serviceConfig.setBrokerClientAuthenticationPlugin(config.getBrokerClientAuthenticationPlugin());
serviceConfig.setBrokerClientAuthenticationParameters(config.getBrokerClientAuthenticationParameters());
serviceConfig.setAuthorizationEnabled(config.isAuthorizationEnabled());
serviceConfig.setAuthorizationAllowWildcardsMatching(config.getAuthorizationAllowWildcardsMatching());
serviceConfig.setSuperUserRoles(config.getSuperUserRoles());
serviceConfig.setGlobalZookeeperServers(config.getGlobalZookeeperServers());
serviceConfig.setZooKeeperSessionTimeoutMillis(config.getZooKeeperSessionTimeoutMillis());
serviceConfig.setTlsEnabled(config.isTlsEnabled());
serviceConfig.setTlsTrustCertsFilePath(config.getTlsTrustCertsFilePath());
serviceConfig.setTlsCertificateFilePath(config.getTlsCertificateFilePath());
serviceConfig.setTlsKeyFilePath(config.getTlsKeyFilePath());
serviceConfig.setTlsAllowInsecureConnection(config.isTlsAllowInsecureConnection());
serviceConfig.setWebSocketNumIoThreads(config.getNumIoThreads());
serviceConfig.setWebSocketConnectionsPerBroker(config.getConnectionsPerBroker());
return serviceConfig;
}
private ClusterData retrieveClusterData() throws PulsarServerException {
if (configurationCacheService == null) {
throw new PulsarServerException("Failed to retrieve Cluster data due to empty GlobalZookeeperServers");
}
try {
String path = "/admin/clusters/" + config.getClusterName();
return localCluster = configurationCacheService.clustersCache().get(path)
.orElseThrow(() -> new KeeperException.NoNodeException(path));
} catch (Exception e) {
throw new PulsarServerException(e);
}
}
public ProxyStats getProxyStats() {
return proxyStats;
}
public ConfigurationCacheService getConfigurationCache() {
return configurationCacheService;
}
public ScheduledExecutorService getExecutor() {
return executor;
}
public boolean isAuthenticationEnabled() {
if (this.config == null)
return false;
return this.config.isAuthenticationEnabled();
}
public boolean isAuthorizationEnabled() {
if (this.config == null)
return false;
return this.config.isAuthorizationEnabled();
}
public boolean addProducer(ProducerHandler producer) {
return topicProducerMap
.computeIfAbsent(producer.getProducer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
.add(producer);
}
public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ProducerHandler>> getProducers() {
return topicProducerMap;
}
public boolean removeProducer(ProducerHandler producer) {
final String topicName = producer.getProducer().getTopic();
if (topicProducerMap.containsKey(topicName)) {
return topicProducerMap.get(topicName).remove(producer);
}
return false;
}
public boolean addConsumer(ConsumerHandler consumer) {
return topicConsumerMap
.computeIfAbsent(consumer.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
.add(consumer);
}
public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ConsumerHandler>> getConsumers() {
return topicConsumerMap;
}
public boolean removeConsumer(ConsumerHandler consumer) {
final String topicName = consumer.getConsumer().getTopic();
if (topicConsumerMap.containsKey(topicName)) {
return topicConsumerMap.get(topicName).remove(consumer);
}
return false;
}
public boolean addReader(ReaderHandler reader) {
return topicReaderMap.computeIfAbsent(reader.getConsumer().getTopic(), topic -> new ConcurrentOpenHashSet<>())
.add(reader);
}
public ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<ReaderHandler>> getReaders() {
return topicReaderMap;
}
public boolean removeReader(ReaderHandler reader) {
final String topicName = reader.getConsumer().getTopic();
if (topicReaderMap.containsKey(topicName)) {
return topicReaderMap.get(topicName).remove(reader);
}
return false;
}
public ServiceConfiguration getConfig() {
return config;
}
private static final Logger log = LoggerFactory.getLogger(WebSocketService.class);
}