[Proxy] Remove unnecessary Pulsar Client usage from Pulsar Proxy (#13836)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 66792f4..06b5d24 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -18,11 +18,9 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
 import static org.apache.pulsar.client.util.MathUtils.signSafeMod;
-
+import static org.apache.pulsar.common.util.netty.ChannelFutures.toCompletableFuture;
 import com.google.common.annotations.VisibleForTesting;
-
 import io.netty.bootstrap.Bootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelException;
@@ -31,9 +29,6 @@
 import io.netty.resolver.dns.DnsNameResolver;
 import io.netty.resolver.dns.DnsNameResolverBuilder;
 import io.netty.util.concurrent.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URI;
@@ -45,9 +40,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.PulsarClientException.InvalidServiceURL;
@@ -58,7 +51,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ConnectionPool implements Closeable {
+public class ConnectionPool implements AutoCloseable {
     protected final ConcurrentHashMap<InetSocketAddress, ConcurrentMap<Integer, CompletableFuture<ClientCnx>>> pool;
 
     private final Bootstrap bootstrap;
@@ -227,7 +220,7 @@
     }
 
     /**
-     * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server
+     * Resolve DNS asynchronously and attempt to connect to any IP address returned by DNS server.
      */
     private CompletableFuture<Channel> createConnection(InetSocketAddress unresolvedAddress) {
         int port;
@@ -252,27 +245,32 @@
     }
 
     /**
-     * Try to connect to a sequence of IP addresses until a successfull connection can be made, or fail if no address is
-     * working
+     * Try to connect to a sequence of IP addresses until a successful connection can be made, or fail if no
+     * address is working.
      */
-    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses, int port, InetSocketAddress sniHost) {
+    private CompletableFuture<Channel> connectToResolvedAddresses(Iterator<InetAddress> unresolvedAddresses,
+                                                                  int port,
+                                                                  InetSocketAddress sniHost) {
         CompletableFuture<Channel> future = new CompletableFuture<>();
 
         // Successfully connected to server
-        connectToAddress(unresolvedAddresses.next(), port, sniHost).thenAccept(future::complete).exceptionally(exception -> {
-            if (unresolvedAddresses.hasNext()) {
-                // Try next IP address
-                connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete).exceptionally(ex -> {
-                    // This is already unwinding the recursive call
-                    future.completeExceptionally(ex);
+        connectToAddress(unresolvedAddresses.next(), port, sniHost)
+                .thenAccept(future::complete)
+                .exceptionally(exception -> {
+                    if (unresolvedAddresses.hasNext()) {
+                        // Try next IP address
+                        connectToResolvedAddresses(unresolvedAddresses, port, sniHost).thenAccept(future::complete)
+                                .exceptionally(ex -> {
+                                    // This is already unwinding the recursive call
+                                    future.completeExceptionally(ex);
+                                    return null;
+                                });
+                    } else {
+                        // Failed to connect to any IP address
+                        future.completeExceptionally(exception);
+                    }
                     return null;
                 });
-            } else {
-                // Failed to connect to any IP address
-                future.completeExceptionally(exception);
-            }
-            return null;
-        });
 
         return future;
     }
@@ -290,7 +288,7 @@
     }
 
     /**
-     * Attempt to establish a TCP connection to an already resolved single IP address
+     * Attempt to establish a TCP connection to an already resolved single IP address.
      */
     private CompletableFuture<Channel> connectToAddress(InetAddress ipAddress, int port, InetSocketAddress sniHost) {
         InetSocketAddress remoteAddress = new InetSocketAddress(ipAddress, port);
@@ -298,12 +296,11 @@
             return toCompletableFuture(bootstrap.register())
                     .thenCompose(channel -> channelInitializerHandler
                             .initTls(channel, sniHost != null ? sniHost : remoteAddress))
-                    .thenCompose(channel -> channelInitializerHandler
-                            .initSocks5IfConfig(channel))
+                    .thenCompose(channelInitializerHandler::initSocks5IfConfig)
                     .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
         } else {
             return toCompletableFuture(bootstrap.register())
-                    .thenCompose(channel -> channelInitializerHandler.initSocks5IfConfig(channel))
+                    .thenCompose(channelInitializerHandler::initSocks5IfConfig)
                     .thenCompose(channel -> toCompletableFuture(channel.connect(remoteAddress)));
         }
     }
@@ -312,7 +309,7 @@
         if (maxConnectionsPerHosts == 0) {
             //Disable pooling
             if (cnx.channel().isActive()) {
-                if(log.isDebugEnabled()) {
+                if (log.isDebugEnabled()) {
                     log.debug("close connection due to pooling disabled.");
                 }
                 cnx.close();
@@ -321,14 +318,8 @@
     }
 
     @Override
-    public void close() throws IOException {
-        try {
-            if (!eventLoopGroup.isShutdown()) {
-                eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).await();
-            }
-        } catch (InterruptedException e) {
-            log.warn("EventLoopGroup shutdown was interrupted", e);
-        }
+    public void close() throws Exception {
+        closeAllConnections();
         dnsResolver.close();
     }
 
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index a6dba1c..491a227 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -27,7 +27,9 @@
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import java.net.SocketAddress;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
@@ -37,11 +39,9 @@
 import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.client.impl.ClientCnx;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarChannelInitializer;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.api.AuthData;
 import org.apache.pulsar.common.api.proto.CommandAuthResponse;
@@ -63,8 +63,9 @@
  */
 public class ProxyConnection extends PulsarHandler implements FutureListener<Void> {
     // ConnectionPool is used by the proxy to issue lookup requests
-    private PulsarClientImpl client;
     private ConnectionPool connectionPool;
+    private final AtomicLong requestIdGenerator =
+            new AtomicLong(ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2));
     private ProxyService service;
     AuthenticationDataSource authenticationData;
     private State state;
@@ -108,7 +109,7 @@
     }
 
     ConnectionPool getConnectionPool() {
-        return client.getCnxPool();
+        return connectionPool;
     }
 
     public ProxyConnection(ProxyService proxyService, Supplier<SslHandler> sslHandlerSupplier) {
@@ -125,7 +126,6 @@
         if (ProxyService.ACTIVE_CONNECTIONS.get() > service.getConfiguration().getMaxConcurrentInboundConnections()) {
             ctx.close();
             ProxyService.REJECTED_CONNECTIONS.inc();
-            return;
         }
     }
 
@@ -144,26 +144,27 @@
     }
 
     @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
         super.channelInactive(ctx);
 
         if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
             directProxyHandler.outboundChannel.close();
+            directProxyHandler = null;
         }
 
-        if (client != null) {
-            client.close();
-        }
         service.getClientCnxs().remove(this);
         LOG.info("[{}] Connection closed", remoteAddress);
 
         if (connectionPool != null) {
             try {
                 connectionPool.close();
+                connectionPool = null;
             } catch (Exception e) {
                 LOG.error("Failed to close connection pool {}", e.getMessage(), e);
             }
         }
+
+        state = State.Closed;
     }
 
     @Override
@@ -217,7 +218,30 @@
         }
     }
 
-    private void completeConnect() {
+    private synchronized void completeConnect(AuthData clientData) throws PulsarClientException {
+        if (service.getConfiguration().isAuthenticationEnabled()) {
+            if (service.getConfiguration().isForwardAuthorizationCredentials()) {
+                this.clientAuthData = clientData;
+                this.clientAuthMethod = authMethod;
+            }
+            if (this.connectionPool == null) {
+                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
+                                clientAuthMethod, protocolVersionToAdvertise));
+            } else {
+                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
+                        remoteAddress, state, clientAuthRole);
+            }
+        } else {
+            if (this.connectionPool == null) {
+                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
+                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersionToAdvertise));
+            } else {
+                LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {}",
+                        remoteAddress, state);
+            }
+        }
+
         LOG.info("[{}] complete connection, init proxy handler. authenticated with {} role {}, hasProxyToBrokerUrl: {}",
             remoteAddress, authMethod, clientAuthRole, hasProxyToBrokerUrl);
         if (hasProxyToBrokerUrl) {
@@ -237,17 +261,6 @@
         }
     }
 
-    private void createClientAndCompleteConnect(AuthData clientData)
-        throws PulsarClientException {
-        if (service.getConfiguration().isForwardAuthorizationCredentials()) {
-            this.clientAuthData = clientData;
-            this.clientAuthMethod = authMethod;
-        }
-        this.client = createClient(clientConf, this.clientAuthData, this.clientAuthMethod, protocolVersionToAdvertise);
-
-        completeConnect();
-    }
-
     // According to auth result, send newConnected or newAuthChallenge command.
     private void doAuthentication(AuthData clientData) throws Exception {
         AuthData brokerData = authState.authenticate(clientData);
@@ -258,7 +271,7 @@
                 LOG.debug("[{}] Client successfully authenticated with {} role {}",
                     remoteAddress, authMethod, clientAuthRole);
             }
-            createClientAndCompleteConnect(clientData);
+            completeConnect(clientData);
             return;
         }
 
@@ -269,7 +282,6 @@
                 remoteAddress, authMethod);
         }
         state = State.Connecting;
-        return;
     }
 
     @Override
@@ -297,16 +309,10 @@
         try {
             // init authn
             this.clientConf = createClientConfiguration();
-            int protocolVersion = getProtocolVersionToAdvertise(connect);
 
             // authn not enabled, complete
             if (!service.getConfiguration().isAuthenticationEnabled()) {
-                this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                        () -> new ClientCnx(clientConf, service.getWorkerGroup(), protocolVersion));
-                this.client =
-                        new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
-
-                completeConnect();
+                completeConnect(null);
                 return;
             }
 
@@ -331,7 +337,7 @@
                     .orElseThrow(() ->
                         new AuthenticationException("No anonymous role, and no authentication provider configured"));
 
-                createClientAndCompleteConnect(clientData);
+                completeConnect(clientData);
                 return;
             }
 
@@ -349,7 +355,6 @@
             LOG.warn("[{}] Unable to authenticate: ", remoteAddress, e);
             ctx.writeAndFlush(Commands.newError(-1, ServerError.AuthenticationError, "Failed to authenticate"));
             close();
-            return;
         }
     }
 
@@ -404,19 +409,26 @@
         lookupProxyHandler.handleLookup(lookup);
     }
 
-    private void close() {
-        state = State.Closed;
-        ctx.close();
-        try {
-            if (client != null) {
-                client.close();
+    private synchronized void close() {
+        if (state != State.Closed) {
+            state = State.Closed;
+            if (directProxyHandler != null && directProxyHandler.outboundChannel != null) {
+                directProxyHandler.outboundChannel.close();
+                directProxyHandler = null;
             }
-        } catch (PulsarClientException e) {
-            LOG.error("Unable to close pulsar client - {}. Error - {}", client, e.getMessage());
+            if (connectionPool != null) {
+                try {
+                    connectionPool.close();
+                    connectionPool = null;
+                } catch (Exception e) {
+                    LOG.error("Error closing connection pool", e);
+                }
+            }
+            ctx.close();
         }
     }
 
-    ClientConfigurationData createClientConfiguration() throws UnsupportedAuthenticationException {
+    ClientConfigurationData createClientConfiguration() {
         ClientConfigurationData clientConf = new ClientConfigurationData();
         clientConf.setServiceUrl(service.getServiceUrl());
         ProxyConfiguration proxyConfig = service.getConfiguration();
@@ -436,20 +448,12 @@
         return clientConf;
     }
 
-    private PulsarClientImpl createClient(final ClientConfigurationData clientConf, final AuthData clientAuthData,
-            final String clientAuthMethod, final int protocolVersion) throws PulsarClientException {
-        this.connectionPool = new ProxyConnectionPool(clientConf, service.getWorkerGroup(),
-                () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole, clientAuthData,
-                        clientAuthMethod, protocolVersion));
-        return new PulsarClientImpl(clientConf, service.getWorkerGroup(), connectionPool, service.getTimer());
-    }
-
     private static int getProtocolVersionToAdvertise(CommandConnect connect) {
         return Math.min(connect.getProtocolVersion(), Commands.getCurrentProtocolVersion());
     }
 
     long newRequestId() {
-        return client.newRequestId();
+        return requestIdGenerator.getAndIncrement();
     }
 
     public Authentication getClientAuthentication() {
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
index 2af7ebf..271c85f 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyService.java
@@ -28,8 +28,6 @@
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timer;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
@@ -75,7 +73,6 @@
 
     private final ProxyConfiguration proxyConfig;
     private final Authentication proxyClientAuthentication;
-    private final Timer timer;
     private String serviceUrl;
     private String serviceUrlTls;
     private final AuthenticationService authenticationService;
@@ -137,8 +134,6 @@
                         AuthenticationService authenticationService) throws Exception {
         requireNonNull(proxyConfig);
         this.proxyConfig = proxyConfig;
-        this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer",
-                Thread.currentThread().isDaemon()), 1, TimeUnit.MILLISECONDS);
         this.clientCnxs = Sets.newConcurrentHashSet();
         this.topicStats = new ConcurrentHashMap<>();
 
@@ -342,9 +337,6 @@
         for (EventLoopGroup group : extensionsWorkerGroups) {
             group.shutdownGracefully();
         }
-        if (timer != null) {
-            timer.stop();
-        }
     }
 
     public String getServiceUrl() {
@@ -359,10 +351,6 @@
         return proxyConfig;
     }
 
-    public Timer getTimer() {
-        return timer;
-    }
-
     public AuthenticationService getAuthenticationService() {
         return authenticationService;
     }