[improve][client/broker] Add DnsResolverGroup to share DNS cache across multiple PulsarClient instances (#24784)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 5fd6350..aa5c0b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -143,6 +143,7 @@
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
+import org.apache.pulsar.client.impl.DnsResolverGroupImpl;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
@@ -266,6 +267,7 @@
     private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider;
     private final Timer brokerClientSharedTimer;
     private final ExecutorProvider brokerClientSharedLookupExecutorProvider;
+    private final DnsResolverGroupImpl brokerClientSharedDnsResolverGroup;
 
     private MetricsGenerator metricsGenerator;
     private final PulsarBrokerOpenTelemetry openTelemetry;
@@ -398,6 +400,9 @@
                 new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS);
         this.brokerClientSharedLookupExecutorProvider =
                 new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor");
+        this.brokerClientSharedDnsResolverGroup =
+                new DnsResolverGroupImpl(this.ioEventLoopGroup,
+                        loadBrokerClientProperties(new ClientConfigurationData()));
 
         // here in the constructor we don't have the offloader scheduler yet
         this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0);
@@ -697,6 +702,7 @@
             brokerClientSharedInternalExecutorProvider.shutdownNow();
             brokerClientSharedScheduledExecutorProvider.shutdownNow();
             brokerClientSharedLookupExecutorProvider.shutdownNow();
+            brokerClientSharedDnsResolverGroup.close();
             brokerClientSharedTimer.stop();
             if (monotonicClock instanceof AutoCloseable c) {
                 c.close();
@@ -1711,7 +1717,8 @@
                 .internalExecutorProvider(brokerClientSharedInternalExecutorProvider)
                 .externalExecutorProvider(brokerClientSharedExternalExecutorProvider)
                 .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider)
-                .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider);
+                .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider)
+                .dnsResolverGroup(brokerClientSharedDnsResolverGroup);
         if (customizer != null) {
             customizer.accept(pulsarClientImplBuilder);
         }
@@ -1740,10 +1747,7 @@
         // Apply all arbitrary configuration. This must be called before setting any fields annotated as
         // @Secret on the ClientConfigurationData object because of the way they are serialized.
         // See https://github.com/apache/pulsar/issues/8509 for more information.
-        Map<String, Object> overrides = PropertiesUtils
-                .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
-        ClientConfigurationData conf =
-                ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
+        ClientConfigurationData conf = loadBrokerClientProperties(initialConf);
 
         // Disabled auto release useless connections
         // The automatic release connection feature is not yet perfect for transaction scenarios, so turn it
@@ -1789,6 +1793,15 @@
         return conf;
     }
 
+    // load plain brokerClient_ properties without complete initialization
+    private ClientConfigurationData loadBrokerClientProperties(ClientConfigurationData initialConf) {
+        Map<String, Object> overrides = PropertiesUtils
+                .filterAndMapProperties(this.getConfiguration().getProperties(), "brokerClient_");
+        ClientConfigurationData conf =
+                ConfigurationDataUtils.loadData(overrides, initialConf, ClientConfigurationData.class);
+        return conf;
+    }
+
     public synchronized PulsarAdmin getAdminClient() throws PulsarServerException {
         if (this.adminClient == null) {
             try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
index 5cc04d6..d739d93 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConnectionPoolTest.java
@@ -21,6 +21,7 @@
 import static org.apache.pulsar.broker.BrokerTestUtil.spyWithClassAndConstructorArgs;
 import io.netty.channel.EventLoopGroup;
 import io.netty.resolver.AbstractAddressResolver;
+import io.netty.resolver.AddressResolver;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import io.netty.util.concurrent.Promise;
 import java.net.InetSocketAddress;
@@ -260,7 +261,8 @@
         ConnectionPool pool =
                 spyWithClassAndConstructorArgs(ConnectionPool.class, InstrumentProvider.NOOP, conf, eventLoop,
                         (Supplier<ClientCnx>) () -> new ClientCnx(InstrumentProvider.NOOP, conf, eventLoop),
-                        Optional.of(resolver), scheduledExecutorService);
+                        Optional.<Supplier<AddressResolver<InetSocketAddress>>>of(() -> resolver),
+                        scheduledExecutorService);
 
 
         ClientCnx cnx = pool.getConnection(
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index 59baccf..f8412b5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -27,9 +27,6 @@
 import io.netty.channel.ChannelOption;
 import io.netty.channel.EventLoopGroup;
 import io.netty.resolver.AddressResolver;
-import io.netty.resolver.dns.DnsAddressResolverGroup;
-import io.netty.resolver.dns.DnsNameResolverBuilder;
-import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.ScheduledFuture;
 import io.opentelemetry.api.common.Attributes;
@@ -51,6 +48,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import lombok.Builder;
+import lombok.NonNull;
 import lombok.Value;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -61,7 +60,6 @@
 import org.apache.pulsar.client.impl.metrics.Unit;
 import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
 import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.netty.DnsResolverUtil;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +78,7 @@
     private final boolean isSniProxy;
 
     protected final AddressResolver<InetSocketAddress> addressResolver;
+    private DnsResolverGroupImpl dnsResolverGroup;
     private final boolean shouldCloseDnsResolver;
 
 
@@ -106,8 +105,7 @@
     public ConnectionPool(InstrumentProvider instrumentProvider,
                           ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
                           ScheduledExecutorService scheduledExecutorService) throws PulsarClientException {
-        this(instrumentProvider, conf, eventLoopGroup, () -> new ClientCnx(instrumentProvider, conf, eventLoopGroup),
-                scheduledExecutorService);
+        this(instrumentProvider, conf, eventLoopGroup, null, scheduledExecutorService);
     }
 
     public ConnectionPool(InstrumentProvider instrumentProvider,
@@ -118,12 +116,16 @@
                 scheduledExecutorService);
     }
 
-    public ConnectionPool(InstrumentProvider instrumentProvider,
-                          ClientConfigurationData conf, EventLoopGroup eventLoopGroup,
+    @Builder(builderClassName = "ConnectionPoolBuilder")
+    public ConnectionPool(@NonNull InstrumentProvider instrumentProvider,
+                          @NonNull ClientConfigurationData conf, @NonNull EventLoopGroup eventLoopGroup,
                           Supplier<ClientCnx> clientCnxSupplier,
-                          Optional<AddressResolver<InetSocketAddress>> addressResolver,
+                          @NonNull Optional<Supplier<AddressResolver<InetSocketAddress>>> addressResolverSupplier,
                           ScheduledExecutorService scheduledExecutorService)
             throws PulsarClientException {
+        if (clientCnxSupplier == null) {
+            clientCnxSupplier = () -> new ClientCnx(instrumentProvider, conf, eventLoopGroup);
+        }
         this.eventLoopGroup = eventLoopGroup;
         this.clientConfig = conf;
         this.maxConnectionsPerHosts = conf.getConnectionsPerBroker();
@@ -152,8 +154,9 @@
             throw new PulsarClientException(e);
         }
 
-        this.shouldCloseDnsResolver = !addressResolver.isPresent();
-        this.addressResolver = addressResolver.orElseGet(() -> createAddressResolver(conf, eventLoopGroup));
+        this.shouldCloseDnsResolver = !addressResolverSupplier.isPresent();
+        this.addressResolver =
+                addressResolverSupplier.orElseGet(() -> createAddressResolver(conf, eventLoopGroup)).get();
         // Auto release useless connections. see: https://github.com/apache/pulsar/issues/15516.
         this.connectionMaxIdleSeconds = conf.getConnectionMaxIdleSeconds();
         this.autoReleaseIdleConnectionsEnabled = connectionMaxIdleSeconds > 0;
@@ -185,26 +188,12 @@
                 Attributes.builder().put("pulsar.failure.type", "handshake").build());
     }
 
-    private static AddressResolver<InetSocketAddress> createAddressResolver(ClientConfigurationData conf,
-                                                                            EventLoopGroup eventLoopGroup) {
-        DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder()
-                .traceEnabled(true)
-                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup))
-                .socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup), true);
-        if (conf.getDnsLookupBindAddress() != null) {
-            InetSocketAddress addr = new InetSocketAddress(conf.getDnsLookupBindAddress(),
-                    conf.getDnsLookupBindPort());
-            dnsNameResolverBuilder.localAddress(addr);
+    private Supplier<AddressResolver<InetSocketAddress>> createAddressResolver(ClientConfigurationData conf,
+                                                                               EventLoopGroup eventLoopGroup) {
+        if (dnsResolverGroup == null) {
+            dnsResolverGroup = new DnsResolverGroupImpl(eventLoopGroup, conf);
         }
-        List<InetSocketAddress> serverAddresses = conf.getDnsServerAddresses();
-        if (serverAddresses != null && !serverAddresses.isEmpty()) {
-            dnsNameResolverBuilder.nameServerProvider(new SequentialDnsServerAddressStreamProvider(serverAddresses));
-        }
-        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
-        // use DnsAddressResolverGroup to create the AddressResolver since it contains a solution
-        // to prevent cache stampede / thundering herds problem when a DNS entry expires while the system
-        // is under high load
-        return new DnsAddressResolverGroup(dnsNameResolverBuilder).getResolver(eventLoopGroup.next());
+        return () -> dnsResolverGroup.createAddressResolver(eventLoopGroup);
     }
 
     private static final Random random = new Random();
@@ -479,6 +468,9 @@
         if (shouldCloseDnsResolver) {
             addressResolver.close();
         }
+        if (dnsResolverGroup != null) {
+            dnsResolverGroup.close();
+        }
         if (asyncReleaseUselessConnectionsTask != null && !asyncReleaseUselessConnectionsTask.isCancelled()) {
             asyncReleaseUselessConnectionsTask.cancel(false);
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
new file mode 100644
index 0000000..61af796
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.dns.DnsAddressResolverGroup;
+import io.netty.resolver.dns.DnsNameResolverBuilder;
+import io.netty.resolver.dns.DnsServerAddressStreamProvider;
+import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Predicate;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+
+/**
+ * An abstraction to manage a group of Netty {@link AddressResolver} instances.
+ * Uses {@link io.netty.resolver.dns.DnsAddressResolverGroup} to create the {@link AddressResolver} instance
+ * since it contains a shared DNS cache and a solution to prevent cache stampede / thundering herds problem
+ * when a DNS entry expires while the system is under high load.
+ */
+public class DnsResolverGroupImpl implements AutoCloseable {
+    private final DnsAddressResolverGroup dnsAddressResolverGroup;
+
+    public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup, ClientConfigurationData conf) {
+        Optional<InetSocketAddress> bindAddress = Optional.ofNullable(conf.getDnsLookupBindAddress())
+                .map(addr -> new InetSocketAddress(addr, conf.getDnsLookupBindPort()));
+        Optional<DnsServerAddressStreamProvider> dnsServerAddresses = Optional.ofNullable(conf.getDnsServerAddresses())
+                .filter(Predicate.not(List::isEmpty))
+                .map(SequentialDnsServerAddressStreamProvider::new);
+        this.dnsAddressResolverGroup = createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses);
+    }
+
+    public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup, Optional<InetSocketAddress> bindAddress,
+                                Optional<DnsServerAddressStreamProvider> dnsServerAddresses) {
+        this.dnsAddressResolverGroup = createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses);
+    }
+
+    private static DnsAddressResolverGroup createAddressResolverGroup(EventLoopGroup eventLoopGroup,
+                                                                      Optional<InetSocketAddress> bindAddress,
+                                                                      Optional<DnsServerAddressStreamProvider>
+                                                                              dnsServerAddresses) {
+        DnsNameResolverBuilder dnsNameResolverBuilder = createDnsNameResolverBuilder(eventLoopGroup);
+        bindAddress.ifPresent(dnsNameResolverBuilder::localAddress);
+        dnsServerAddresses.ifPresent(dnsNameResolverBuilder::nameServerProvider);
+
+        return new DnsAddressResolverGroup(dnsNameResolverBuilder);
+    }
+
+    private static DnsNameResolverBuilder createDnsNameResolverBuilder(EventLoopGroup eventLoopGroup) {
+        DnsNameResolverBuilder dnsNameResolverBuilder = new DnsNameResolverBuilder()
+                .traceEnabled(true)
+                .channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup))
+                .socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup), true);
+        DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
+        return dnsNameResolverBuilder;
+    }
+
+    @Override
+    public void close() {
+        this.dnsAddressResolverGroup.close();
+    }
+
+    public AddressResolver<InetSocketAddress> createAddressResolver(EventLoopGroup eventLoopGroup) {
+        return dnsAddressResolverGroup.getResolver(eventLoopGroup.next());
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 01b27f4..d38d359 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -24,6 +24,7 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.AddressResolver;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timer;
 import java.io.IOException;
@@ -129,6 +130,8 @@
     private final ScheduledExecutorProvider scheduledExecutorProvider;
     private final boolean createdEventLoopGroup;
     private final boolean createdCnxPool;
+    private final DnsResolverGroupImpl dnsResolverGroupLocalInstance;
+    private final AddressResolver<InetSocketAddress> addressResolver;
 
     public enum State {
         Open, Closing, Closed
@@ -168,22 +171,22 @@
     private TransactionCoordinatorClientImpl tcClient;
 
     public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException {
-        this(conf, null, null, null, null, null, null, null);
+        this(conf, null, null, null, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException {
-        this(conf, eventLoopGroup, null, null, null, null, null, null);
+        this(conf, eventLoopGroup, null, null, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, null, null, null, null, null);
+        this(conf, eventLoopGroup, cnxPool, null, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool,
                             Timer timer)
             throws PulsarClientException {
-        this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null);
+        this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null, null);
     }
 
     public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool,
@@ -192,7 +195,7 @@
                             ScheduledExecutorProvider scheduledExecutorProvider)
             throws PulsarClientException {
         this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider,
-                scheduledExecutorProvider, null);
+                scheduledExecutorProvider, null, null);
     }
 
     @Builder(builderClassName = "PulsarClientImplBuilder")
@@ -200,7 +203,8 @@
                              Timer timer, ExecutorProvider externalExecutorProvider,
                              ExecutorProvider internalExecutorProvider,
                              ScheduledExecutorProvider scheduledExecutorProvider,
-                             ExecutorProvider lookupExecutorProvider) throws PulsarClientException {
+                             ExecutorProvider lookupExecutorProvider,
+                             DnsResolverGroupImpl dnsResolverGroup) throws PulsarClientException {
 
         EventLoopGroup eventLoopGroupReference = null;
         ConnectionPool connectionPoolReference = null;
@@ -225,10 +229,29 @@
             conf.getAuthentication().start();
             this.scheduledExecutorProvider = scheduledExecutorProvider != null ? scheduledExecutorProvider :
                     new ScheduledExecutorProvider(conf.getNumIoThreads(), "pulsar-client-scheduled");
-            connectionPoolReference =
-                    connectionPool != null ? connectionPool :
-                            new ConnectionPool(instrumentProvider, conf, this.eventLoopGroup,
-                                    (ScheduledExecutorService) this.scheduledExecutorProvider.getExecutor());
+            if (connectionPool != null) {
+                connectionPoolReference = connectionPool;
+                dnsResolverGroupLocalInstance = null;
+                addressResolver = null;
+            } else {
+                DnsResolverGroupImpl dnsResolverGroupReference;
+                if (dnsResolverGroup == null) {
+                    dnsResolverGroupReference =
+                            dnsResolverGroupLocalInstance = new DnsResolverGroupImpl(eventLoopGroupReference, conf);
+                } else {
+                    dnsResolverGroupReference = dnsResolverGroup;
+                    dnsResolverGroupLocalInstance = null;
+                }
+                addressResolver = dnsResolverGroupReference.createAddressResolver(eventLoopGroupReference);
+                connectionPoolReference = ConnectionPool.builder()
+                        .instrumentProvider(instrumentProvider)
+                        .conf(conf)
+                        .eventLoopGroup(eventLoopGroupReference)
+                        .addressResolverSupplier(Optional.of(() -> addressResolver))
+                        .scheduledExecutorService(
+                                (ScheduledExecutorService) this.scheduledExecutorProvider.getExecutor())
+                        .build();
+            }
             this.cnxPool = connectionPoolReference;
             this.externalExecutorProvider = externalExecutorProvider != null ? externalExecutorProvider :
                     new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
@@ -919,6 +942,14 @@
                 conf.getServiceUrlProvider().close();
             }
 
+            if (addressResolver != null) {
+                addressResolver.close();
+            }
+
+            if (dnsResolverGroupLocalInstance != null) {
+                dnsResolverGroupLocalInstance.close();
+            }
+
             try {
                 // Shutting down eventLoopGroup separately because in some cases, cnxPool might be using different
                 // eventLoopGroup.
diff --git a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
index 372d45f..e479b8e 100644
--- a/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
+++ b/pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ProxyConnection.java
@@ -404,7 +404,7 @@
             if (this.connectionPool == null) {
                 this.connectionPool = new ConnectionPool(InstrumentProvider.NOOP, clientConf, service.getWorkerGroup(),
                         clientCnxSupplier,
-                        Optional.of(dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null);
+                        Optional.of(() -> dnsAddressResolverGroup.getResolver(service.getWorkerGroup().next())), null);
             } else {
                 LOG.error("BUG! Connection Pool has already been created for proxy connection to {} state {} role {}",
                         remoteAddress, state, maybeAnonymizedClientAuthRole);