[Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 66792f4..06b5d24 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -18,11 +18,9 @@
*/
package org.apache.pulsar.client.impl;
-import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
-
+import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
import com.google.common.annotations.VisibleForTesting;
-
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelException;
@@ -31,9 +29,6 @@
import io.netty.resolver.dns.DnsNameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
import io.netty.util.concurrent.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -45,9 +40,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -58,7 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ConnectionPool implements Closeable {
+public class ConnectionPool implements AutoCloseable {
protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
private final Bootstrap bootstrap;
@@ -227,7 +220,7 @@
}
/**
- * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
+ * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
*/
private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
int port;
@@ -252,27 +245,32 @@
}
/**
- * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
- * working
+ * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
+ * address is working.
*/
- private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
+ private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
+ int port,
+ InetSocketAddress sniHost) {
CompletableFuture<Channel> future = new CompletableFuture<>();
// Successfully connected to server
- connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
- if (unresolvedAddresses.hasNext()) {
- // Try next IP address
- connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
- // This is already unwinding the recursive call
- future.completeExceptionally(ex);
+ connectToAddress(unresolvedAddresses.next(), port, sniHost)
+ .thenAccept(future::complete)
+ .exceptionally(exception -> {
+ if (unresolvedAddresses.hasNext()) {
+ // Try next IP address
+ connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
+ .exceptionally(ex -> {
+ // This is already unwinding the recursive call
+ future.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ // Failed to connect to any IP address
+ future.completeExceptionally(exception);
+ }
return null;
});
- } else {
- // Failed to connect to any IP address
- future.completeExceptionally(exception);
- }
- return null;
- });
return future;
}
@@ -290,7 +288,7 @@
}
/**
- * Attempt to establish a TCP connection to an already resolved single IP address
+ * Attempt to establish a TCP connection to an already resolved single IP address.
*/
private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
@@ -298,12 +296,11 @@
return toCompletableFuture(bootstrap.register())
.thenCompose(channel -> channelInitializerHandler
.initTls(channel, sniHost != null ? sniHost : remoteAddress))
- .thenCompose(channel -> channelInitializerHandler
- .initSocks5IfConfig(channel))
+ .thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
} else {
return toCompletableFuture(bootstrap.register())
- .thenCompose(channel -> channelInitializerHandler.initSocks5IfConfig(channel))
+ .thenCompose(channelInitializerHandler::initSocks5IfConfig)
.thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
}
}
@@ -312,7 +309,7 @@
if (maxConnectionsPerHosts == 0) {
//Disable pooling
if (cnx.channel().isActive()) {
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("close connection due to pooling disabled.");
}
cnx.close();
@@ -321,14 +318,8 @@
}
@Override
- public void close() throws IOException {
- try {
- if (!eventLoopGroup.isShutdown()) {
- eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
- }
- } catch (InterruptedException e) {
- log.warn("EventLoopGroup shutdown was interrupted", e);
- }
+ public void close() throws Exception {
+ closeAllConnections();
dnsResolver.close();
}
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index a6dba1c..491a227 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,7 +27,9 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import java.net.SocketAddress;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
@@ -37,11 +39,9 @@
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarChannelInitializer;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.CommandAuthResponse;
@@ -63,8 +63,9 @@
*/
public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
// ConnectionPool is used by the proxy to issue lookup requests
- private PulsarClientImpl client;
private ConnectionPool connectionPool;
+ private final AtomicLong requestIdGenerator =
+ new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
private ProxyService service;
AuthenticationDataSource authenticationData;
private State state;
@@ -108,7 +109,7 @@
}
ConnectionPool getConnectionPool() {
- return client.getCnxPool();
+ return connectionPool;
}
public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
@@ -125,7 +126,6 @@
if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
ctx.close();
ProxyService.REJECTED_CONNECTIONS.inc();
- return;
}
}
@@ -144,26 +144,27 @@
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
directProxyHandler.outboundChannel.close();
+ directProxyHandler = null;
}
- if (client != null) {
- client.close();
- }
service.getClientCnxs().remove(this);
LOG.info("[{}] Connection closed", remoteAddress);
if (connectionPool != null) {
try {
connectionPool.close();
+ connectionPool = null;
} catch (Exception e) {
LOG.error("Failed to close connection pool {}", e.getMessage(), e);
}
}
+
+ state = State.Closed;
}
@Override
@@ -217,7 +218,30 @@
}
}
- private void completeConnect() {
+ private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+ if (service.getConfiguration().isAuthenticationEnabled()) {
+ if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+ this.clientAuthData = clientData;
+ this.clientAuthMethod = authMethod;
+ }
+ if (this.connectionPool == null) {
+ this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+ clientAuthMethod, protocolVersionToAdvertise));
+ } else {
+ LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+ remoteAddress, state, clientAuthRole);
+ }
+ } else {
+ if (this.connectionPool == null) {
+ this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+ () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
+ } else {
+ LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
+ remoteAddress, state);
+ }
+ }
+
LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
if (hasProxyToBrokerUrl) {
@@ -237,17 +261,6 @@
}
}
- private void createClientAndCompleteConnect(AuthData clientData)
- throws PulsarClientException {
- if (service.getConfiguration().isForwardAuthorizationCredentials()) {
- this.clientAuthData = clientData;
- this.clientAuthMethod = authMethod;
- }
- this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);
-
- completeConnect();
- }
-
// According to auth result, send newConnected or newAuthChallenge command.
private void doAuthentication(AuthData clientData) throws Exception {
AuthData brokerData = authState.authenticate(clientData);
@@ -258,7 +271,7 @@
LOG.debug("[{}] Client successfully authenticated with {} role {}",
remoteAddress, authMethod, clientAuthRole);
}
- createClientAndCompleteConnect(clientData);
+ completeConnect(clientData);
return;
}
@@ -269,7 +282,6 @@
remoteAddress, authMethod);
}
state = State.Connecting;
- return;
}
@Override
@@ -297,16 +309,10 @@
try {
// init authn
this.clientConf = createClientConfiguration();
- int protocolVersion = getProtocolVersionToAdvertise(connect);
// authn not enabled, complete
if (!service.getConfiguration().isAuthenticationEnabled()) {
- this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
- this.client =
- new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
-
- completeConnect();
+ completeConnect(null);
return;
}
@@ -331,7 +337,7 @@
.orElseThrow(() ->
new AuthenticationException("No anonymous role, and no authentication provider configured"));
- createClientAndCompleteConnect(clientData);
+ completeConnect(clientData);
return;
}
@@ -349,7 +355,6 @@
LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
close();
- return;
}
}
@@ -404,19 +409,26 @@
lookupProxyHandler.handleLookup(lookup);
}
- private void close() {
- state = State.Closed;
- ctx.close();
- try {
- if (client != null) {
- client.close();
+ private synchronized void close() {
+ if (state != State.Closed) {
+ state = State.Closed;
+ if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
+ directProxyHandler.outboundChannel.close();
+ directProxyHandler = null;
}
- } catch (PulsarClientException e) {
- LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
+ if (connectionPool != null) {
+ try {
+ connectionPool.close();
+ connectionPool = null;
+ } catch (Exception e) {
+ LOG.error("Error closing connection pool", e);
+ }
+ }
+ ctx.close();
}
}
- ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
+ ClientConfigurationData createClientConfiguration() {
ClientConfigurationData clientConf = new ClientConfigurationData();
clientConf.setServiceUrl(service.getServiceUrl());
ProxyConfiguration proxyConfig = service.getConfiguration();
@@ -436,20 +448,12 @@
return clientConf;
}
- private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
- final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
- this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
- () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
- clientAuthMethod, protocolVersion));
- return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
- }
-
private static int getProtocolVersionToAdvertise(CommandConnect connect) {
return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
}
long newRequestId() {
- return client.newRequestId();
+ return requestIdGenerator.getAndIncrement();
}
public Authentication getClientAuthentication() {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 2af7ebf..271c85f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -28,8 +28,6 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timer;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
@@ -75,7 +73,6 @@
private final ProxyConfiguration proxyConfig;
private final Authentication proxyClientAuthentication;
- private final Timer timer;
private String serviceUrl;
private String serviceUrlTls;
private final AuthenticationService authenticationService;
@@ -137,8 +134,6 @@
AuthenticationService authenticationService) throws Exception {
requireNonNull(proxyConfig);
this.proxyConfig = proxyConfig;
- this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer",
- Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
this.clientCnxs = Sets.newConcurrentHashSet();
this.topicStats = new ConcurrentHashMap<>();
@@ -342,9 +337,6 @@
for (EventLoopGroup group : extensionsWorkerGroups) {
group.shutdownGracefully();
}
- if (timer != null) {
- timer.stop();
- }
}
public String getServiceUrl() {
@@ -359,10 +351,6 @@
return proxyConfig;
}
- public Timer getTimer() {
- return timer;
- }
-
public AuthenticationService getAuthenticationService() {
return authenticationService;
}