blob: 61e61a6b377a1effd4d39bc84cad7f6f2f89157b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.proxy.server;
import static com.google.common.base.Preconditions.checkNotNull;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.authorization.AuthorizationManager;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService;
import org.apache.pulsar.zookeeper.ZookeeperClientFactoryImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
/**
* Pulsar proxy service
*/
public class ProxyService implements Closeable {
private final ProxyConfiguration proxyConfig;
private final String serviceUrl;
private final String serviceUrlTls;
private ConfigurationCacheService configurationCacheService;
private AuthenticationService authenticationService;
private AuthorizationManager authorizationManager;
private ZooKeeperClientFactory zkClientFactory = null;
private final EventLoopGroup acceptorGroup;
private final EventLoopGroup workerGroup;
private final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-discovery-acceptor");
private final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-discovery-io");
// ConnectionPool is used by the proxy to issue lookup requests
private final PulsarClientImpl client;
private final Authentication clientAuthentication;
private BrokerDiscoveryProvider discoveryProvider;
private LocalZooKeeperConnectionService localZooKeeperConnectionService;
private static final int numThreads = Runtime.getRuntime().availableProcessors();
public ProxyService(ProxyConfiguration proxyConfig) throws IOException {
checkNotNull(proxyConfig);
this.proxyConfig = proxyConfig;
String hostname;
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException e) {
throw new RuntimeException(e);
}
this.serviceUrl = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePort());
this.serviceUrlTls = String.format("pulsar://%s:%d/", hostname, proxyConfig.getServicePortTls());
this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory);
this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory);
ClientConfiguration clientConfiguration = new ClientConfiguration();
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
clientConfiguration.setAuthentication(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
}
this.client = new PulsarClientImpl(serviceUrl, clientConfiguration, workerGroup);
this.clientAuthentication = clientConfiguration.getAuthentication();
}
public void start() throws Exception {
localZooKeeperConnectionService = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(),
proxyConfig.getZookeeperServers(), proxyConfig.getZookeeperSessionTimeoutMs());
localZooKeeperConnectionService.start(new ShutdownService() {
@Override
public void shutdown(int exitCode) {
LOG.error("Lost local ZK session. Shutting down the proxy");
Runtime.getRuntime().halt(-1);
}
});
discoveryProvider = new BrokerDiscoveryProvider(this.proxyConfig, getZooKeeperClientFactory());
this.configurationCacheService = new ConfigurationCacheService(discoveryProvider.globalZkCache);
ServiceConfiguration serviceConfiguration = createServiceConfiguration(proxyConfig);
authenticationService = new AuthenticationService(serviceConfiguration);
authorizationManager = new AuthorizationManager(serviceConfiguration, configurationCacheService);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.group(acceptorGroup, workerGroup);
bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR,
new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024));
bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup));
EventLoopUtil.enableTriggeredMode(bootstrap);
bootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, false));
// Bind and start to accept incoming connections.
bootstrap.bind(proxyConfig.getServicePort()).sync();
LOG.info("Started Pulsar Proxy at {}", serviceUrl);
if (proxyConfig.isTlsEnabledInProxy()) {
ServerBootstrap tlsBootstrap = bootstrap.clone();
tlsBootstrap.childHandler(new ServiceChannelInitializer(this, proxyConfig, true));
tlsBootstrap.bind(proxyConfig.getServicePortTls()).sync();
LOG.info("Started Pulsar TLS Proxy on port {}", proxyConfig.getWebServicePortTls());
}
}
long newRequestId() {
return client.newRequestId();
}
ConnectionPool getConnectionPool() {
return client.getCnxPool();
}
public ZooKeeperClientFactory getZooKeeperClientFactory() {
if (zkClientFactory == null) {
zkClientFactory = new ZookeeperClientFactoryImpl();
}
// Return default factory
return zkClientFactory;
}
public BrokerDiscoveryProvider getDiscoveryProvider() {
return discoveryProvider;
}
public void close() throws IOException {
if (localZooKeeperConnectionService != null) {
localZooKeeperConnectionService.close();
}
if (discoveryProvider != null) {
discoveryProvider.close();
}
acceptorGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
client.close();
}
private ServiceConfiguration createServiceConfiguration(ProxyConfiguration config) {
ServiceConfiguration serviceConfiguration = new ServiceConfiguration();
serviceConfiguration.setAuthenticationEnabled(config.isAuthenticationEnabled());
serviceConfiguration.setAuthorizationEnabled(config.isAuthorizationEnabled());
serviceConfiguration.setAuthenticationProviders(config.getAuthenticationProviders());
serviceConfiguration.setProperties(config.getProperties());
return serviceConfiguration;
}
public String getServiceUrl() {
return serviceUrl;
}
public String getServiceUrlTls() {
return serviceUrlTls;
}
public ProxyConfiguration getConfiguration() {
return proxyConfig;
}
public AuthenticationService getAuthenticationService() {
return authenticationService;
}
public AuthorizationManager getAuthorizationManager() {
return authorizationManager;
}
public Authentication getClientAuthentication() {
return clientAuthentication;
}
public ConfigurationCacheService getConfigurationCacheService() {
return configurationCacheService;
}
public void setConfigurationCacheService(ConfigurationCacheService configurationCacheService) {
this.configurationCacheService = configurationCacheService;
}
private static final Logger LOG = LoggerFactory.getLogger(ProxyService.class);
}