CASSANDRASC-79: Sidecar should be able to load metadata even if the local instance is unavailable

- Add SidecarLoadBalancingPolicy to connect to additional nodes beyond local
- Enable Gossip by default (the client really needs it on)
- Retry keyspace creation if it fails
- Use shared executor/hashedwheeltimer netty options for all tests.
  This restores the previous behavior from when the NettyOptions parameter
  was first added, although we're not sure exactly why it's helpful, or if
  it is at all.
- Also store test results as artifacts
- Fix multi-threaded access issues with synchronized for now. Also fix an issue where the host couldn't be found if it wasn't using 9042 for the port.
- Prevent spurious reconnects of ignored nodes on onUp events
- Use `QueryOptions#setReprepareOnUp(false)` to prevent an onUp event from reconnecting to a node we don't want a connection with.
- Integration test for the load balancing polilcy + Adds fixes for issues found while testing.
- Use `ClusterUtils.awaitRingStatus` for retries when waiting for a node to shut down

Patch by Doug Rohrer; Reviewed by Francisco Guerrero, Yifan Cai for CASSANDRASC-79
diff --git a/.circleci/config.yml b/.circleci/config.yml
index ee8fe82..f027cae 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -72,6 +72,10 @@
          path: build/reports
          destination: test-reports
 
+     - store_artifacts:
+         path: build/test-results/
+         destination: test-results
+
      - store_test_results:
          path: build/test-results/
 
@@ -92,6 +96,10 @@
          path: build/reports
          destination: test-reports
 
+     - store_artifacts:
+         path: build/test-results/
+         destination: test-results
+
      - store_test_results:
          path: build/test-results/
 
@@ -111,6 +119,10 @@
          path: build/reports
          destination: test-reports
 
+     - store_artifacts:
+         path: build/test-results/
+         destination: test-results
+
      - store_test_results:
          path: build/test-results/
 
@@ -148,6 +160,10 @@
           path: build/reports
           destination: test-reports
 
+      - store_artifacts:
+          path: build/test-results/
+          destination: test-results
+
       - store_test_results:
           path: build/test-results/
 
@@ -167,6 +183,10 @@
           path: build/reports
           destination: test-reports
 
+      - store_artifacts:
+          path: build/test-results/
+          destination: test-results
+
       - store_test_results:
           path: build/test-results/
 
@@ -186,6 +206,10 @@
           path: build/reports
           destination: test-reports
 
+      - store_artifacts:
+          path: build/test-results/
+          destination: test-results
+
       - store_test_results:
           path: build/test-results/
 
diff --git a/CHANGES.txt b/CHANGES.txt
index a457c5d..f94b7b9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Sidecar should be able to load metadata even if the local instance is unavailable (CASSANDRASC-79)
  * Expose additional SSL configuration options for the Sidecar Service (CASSANDRASC-82)
  * Expose additional node settings (CASSANDRASC-84)
  * Sidecar does not handle keyspaces and table names with mixed case (CASSANDRASC-76)
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
index a453740..0e12a0e 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -18,12 +18,19 @@
 
 package org.apache.cassandra.sidecar.adapters.base;
 
+import java.net.InetSocketAddress;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DriverUtils;
+import com.datastax.driver.core.Host;
 import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
 import org.apache.cassandra.sidecar.common.CQLSessionProvider;
 import org.apache.cassandra.sidecar.common.ClusterMembershipOperations;
 import org.apache.cassandra.sidecar.common.ICassandraAdapter;
@@ -49,16 +56,19 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(CassandraAdapter.class);
     protected final DnsResolver dnsResolver;
     protected final JmxClient jmxClient;
-    private final CQLSessionProvider session;
+    private final CQLSessionProvider cqlSessionProvider;
     private final String sidecarVersion;
+    private final InetSocketAddress localNativeTransportAddress;
+    private volatile Host host;
 
-    public CassandraAdapter(DnsResolver dnsResolver, JmxClient jmxClient, CQLSessionProvider session,
-                            String sidecarVersion)
+    public CassandraAdapter(DnsResolver dnsResolver, JmxClient jmxClient, CQLSessionProvider cqlSessionProvider,
+                            String sidecarVersion, InetSocketAddress localNativeTransportAddress)
     {
         this.dnsResolver = dnsResolver;
         this.jmxClient = jmxClient;
-        this.session = session;
+        this.cqlSessionProvider = cqlSessionProvider;
         this.sidecarVersion = sidecarVersion;
+        this.localNativeTransportAddress = localNativeTransportAddress;
     }
 
     /**
@@ -68,7 +78,7 @@
     @Nullable
     public Metadata metadata()
     {
-        Session activeSession = session.localCql();
+        Session activeSession = cqlSessionProvider.get();
         if (activeSession == null)
         {
             LOGGER.warn("There is no active session to Cassandra");
@@ -97,21 +107,20 @@
     @Nullable
     public NodeSettings nodeSettings()
     {
-        Session activeSession = session.localCql();
-        if (activeSession == null)
-        {
-            return null;
-        }
-
-        Row oneResult = activeSession.execute("SELECT "
+        ResultSet rs = executeLocal("SELECT "
                                               + RELEASE_VERSION_COLUMN_NAME + ", "
                                               + PARTITIONER_COLUMN_NAME + ", "
                                               + DATA_CENTER_COLUMN_NAME + ", "
                                               + RPC_ADDRESS_COLUMN_NAME + ", "
                                               + RPC_PORT_COLUMN_NAME + ", "
                                               + TOKENS_COLUMN_NAME
-                                              + " FROM system.local")
-                                     .one();
+                                              + " FROM system.local");
+        if (rs == null)
+        {
+            return null;
+        }
+
+        Row oneResult = rs.one();
 
         return NodeSettings.builder()
                            .releaseVersion(oneResult.getString(RELEASE_VERSION_COLUMN_NAME))
@@ -124,6 +133,49 @@
                            .build();
     }
 
+    @Override
+    public ResultSet executeLocal(Statement statement)
+    {
+        Session activeSession = cqlSessionProvider.get();
+        Metadata metadata = metadata();
+        // Both of the above log about lack of session/metadata, so no need to log again
+        if (activeSession == null || metadata == null)
+        {
+            return null;
+        }
+
+        Host host = getHost(metadata);
+        if (host == null)
+        {
+            LOGGER.debug("Could not find host in metadata for address {}", localNativeTransportAddress);
+            return null;
+        }
+        statement.setConsistencyLevel(ConsistencyLevel.ONE);
+        statement.setHost(host);
+        return activeSession.execute(statement);
+    }
+
+    private Host getHost(Metadata metadata)
+    {
+        if (host == null)
+        {
+            synchronized (this)
+            {
+                if (host == null)
+                {
+                    host = DriverUtils.getHost(metadata, localNativeTransportAddress);
+                }
+            }
+        }
+        return host;
+    }
+
+    @Override
+    public InetSocketAddress localNativeTransportPort()
+    {
+        return localNativeTransportAddress;
+    }
+
     /**
      * {@inheritDoc}
      */
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java
index 57abe35..5e505d7 100644
--- a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.adapters.base;
 
+import java.net.InetSocketAddress;
+
 import org.apache.cassandra.sidecar.common.CQLSessionProvider;
 import org.apache.cassandra.sidecar.common.ICassandraAdapter;
 import org.apache.cassandra.sidecar.common.ICassandraFactory;
@@ -43,13 +45,16 @@
     /**
      * Returns a new adapter for Cassandra 4.0 clusters.
      *
-     * @param session   the session to the Cassandra database
-     * @param jmxClient the JMX client to connect to the Cassandra database
+     * @param session                     the session to the Cassandra database
+     * @param jmxClient                   the JMX client to connect to the Cassandra database
+     * @param localNativeTransportAddress the address and port on which this instance is configured to listen
      * @return a new adapter for the 4.0 clusters
      */
     @Override
-    public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient)
+    public ICassandraAdapter create(CQLSessionProvider session,
+                                    JmxClient jmxClient,
+                                    InetSocketAddress localNativeTransportAddress)
     {
-        return new CassandraAdapter(dnsResolver, jmxClient, session, sidecarVersion);
+        return new CassandraAdapter(dnsResolver, jmxClient, session, sidecarVersion, localNativeTransportAddress);
     }
 }
diff --git a/client/src/main/java/org/apache/cassandra/sidecar/client/request/TokenRangeReplicasRequest.java b/client/src/main/java/org/apache/cassandra/sidecar/client/request/TokenRangeReplicasRequest.java
index 91f11f7..cc7f803 100644
--- a/client/src/main/java/org/apache/cassandra/sidecar/client/request/TokenRangeReplicasRequest.java
+++ b/client/src/main/java/org/apache/cassandra/sidecar/client/request/TokenRangeReplicasRequest.java
@@ -29,8 +29,7 @@
 {
     /**
      * Constructs a new request to retrieve information by keyspace from token-range replicas endpoint
-     *
-     * @param keyspace the keyspace in Cassandra
+     * @param keyspace the keyspace for which the token range replicas will be retrieved.
      */
     public TokenRangeReplicasRequest(String keyspace)
     {
diff --git a/common/src/main/java/com/datastax/driver/core/DriverUtils.java b/common/src/main/java/com/datastax/driver/core/DriverUtils.java
new file mode 100644
index 0000000..c28da45
--- /dev/null
+++ b/common/src/main/java/com/datastax/driver/core/DriverUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datastax.driver.core;
+
+import java.net.InetSocketAddress;
+
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * A collection of methods that require access to package-private members in the datastax driver.
+ */
+public class DriverUtils
+{
+    /**
+     * Check if a host has active connections
+     *
+     * @param host the host to check
+     * @return true if the host has active connections, false otherwise
+     */
+    @VisibleForTesting
+    public static boolean hasActiveConnections(Host host)
+    {
+        return host.convictionPolicy.hasActiveConnections();
+    }
+
+    /**
+     * Start attempting to reconnect to the given host, as hosts with `IGNORED` distance aren't attempted
+     * and the SidecarLoadBalancingPolicy marks non-selected nodes as IGNORED until they need to rotate in.
+     *
+     * @param cluster The cluster object
+     * @param host    the host to which reconnect attempts will be made
+     */
+    public static void startPeriodicReconnectionAttempt(Cluster cluster, Host host)
+    {
+        cluster.manager.startPeriodicReconnectionAttempt(host, false);
+    }
+
+    /**
+     * Gets a Host instance from metadata based on the native transport address
+     *
+     * @param metadata                    the {@link Metadata} instance to search for the host
+     * @param localNativeTransportAddress the native transport ip address and port for the host to find
+     * @return the {@link Host}           instance if found, else null
+     */
+    public static Host getHost(Metadata metadata, InetSocketAddress localNativeTransportAddress)
+    {
+        // Because the driver can sometimes mess up the broadcast address, we need to search by endpoint
+        // which is what it actually uses to connect to the cluster. Therefore, create a TranslatedAddressEndpoint
+        // to use for searching. It has to be one of these because that's what the driver is using internally,
+        // and the `.equals` method used when searching checks the type explicitly.
+        TranslatedAddressEndPoint endPoint = new TranslatedAddressEndPoint(localNativeTransportAddress);
+        return metadata.getHost(endPoint);
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java
index 586651d..6144779 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSessionProvider.java
@@ -18,159 +18,36 @@
 
 package org.apache.cassandra.sidecar.common;
 
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.ConsistencyLevel;
-import com.datastax.driver.core.NettyOptions;
-import com.datastax.driver.core.QueryOptions;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.DriverException;
-import com.datastax.driver.core.exceptions.DriverInternalError;
-import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
-import com.datastax.driver.core.policies.ReconnectionPolicy;
-import com.datastax.driver.core.policies.RoundRobinPolicy;
-import com.datastax.driver.core.policies.WhiteListPolicy;
 import org.jetbrains.annotations.Nullable;
 
 /**
- * Provides connections to the local Cassandra cluster as defined in the Configuration. Currently, it only supports
- * returning the local connection.
+ * A provider of a CQL Session. The session should be connected to:
+ * <ul>
+ *     <li>All locally-managed instances.</li>
+ *     <li>At least one non-local instance. Preferably, at least two non-replica instances.</li>
+ * </ul>
  */
-public class CQLSessionProvider
+public interface CQLSessionProvider
 {
-    private static final Logger logger = LoggerFactory.getLogger(CQLSessionProvider.class);
-
-    @Nullable
-    private Session localSession;
-    private final InetSocketAddress inet;
-    private final WhiteListPolicy wlp;
-    private final NettyOptions nettyOptions;
-    private final QueryOptions queryOptions;
-    private final ReconnectionPolicy reconnectionPolicy;
-
-    public CQLSessionProvider(String host, int port, int healthCheckInterval)
-    {
-        // this was originally using unresolved Inet addresses, but it would fail when trying to
-        // connect to a docker container
-        logger.info("Connecting to {} on port {}", host, port);
-        inet = new InetSocketAddress(host, port);
-
-        wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet));
-        this.nettyOptions = new NettyOptions();
-        this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
-        this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000, healthCheckInterval);
-    }
-
-    public CQLSessionProvider(InetSocketAddress target, NettyOptions options)
-    {
-        inet = target;
-        wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet));
-        this.nettyOptions = options;
-        this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
-        reconnectionPolicy = new ExponentialReconnectionPolicy(100, 1000);
-    }
-
     /**
-     * Provides a Session connected only to the local node from configuration. If null it means the connection was
-     * not able to be established. The session still might throw a NoHostAvailableException if the local host goes
-     * offline or otherwise unavailable.
+     * Provides a Session connected to the cluster. If null it means the connection was
+     * could not be established. The session still might throw a NoHostAvailableException if the
+     * cluster is otherwise unreachable.
      *
      * @return Session
      */
-    @Nullable
-    public synchronized Session localCql()
-    {
-        Cluster cluster = null;
-        try
-        {
-            if (localSession == null)
-            {
-                logger.info("Connecting to {}", inet);
-                cluster = Cluster.builder()
-                                 .addContactPointsWithPorts(inet)
-                                 .withLoadBalancingPolicy(wlp)
-                                 .withQueryOptions(queryOptions)
-                                 .withReconnectionPolicy(reconnectionPolicy)
-                                 .withoutMetrics()
-                                 // tests can create a lot of these Cluster objects, to avoid creating HWTs and
-                                 // event thread pools for each we have the override
-                                 .withNettyOptions(nettyOptions)
-                                 .build();
-                localSession = cluster.connect();
-                logger.info("Successfully connected to Cassandra instance!");
-            }
-        }
-        catch (Exception e)
-        {
-            logger.error("Failed to reach Cassandra", e);
-            if (cluster != null)
-            {
-                try
-                {
-                    cluster.close();
-                }
-                catch (Exception ex)
-                {
-                    logger.error("Failed to close cluster in cleanup", ex);
-                }
-            }
-        }
-        return localSession;
-    }
+    @Nullable Session get();
 
-    public Session close()
-    {
-        Session localSession;
-        synchronized (this)
-        {
-            localSession = this.localSession;
-            this.localSession = null;
-        }
+    /**
+     * Closes the CQLSessionProvider
+     */
+    void close();
 
-        if (localSession != null)
-        {
-            try
-            {
-                localSession.getCluster().closeAsync().get(1, TimeUnit.MINUTES);
-                localSession.closeAsync().get(1, TimeUnit.MINUTES);
-            }
-            catch (InterruptedException e)
-            {
-                Thread.currentThread().interrupt();
-            }
-            catch (TimeoutException e)
-            {
-                logger.warn("Unable to close session after 1 minute for provider {}", this, e);
-            }
-            catch (ExecutionException e)
-            {
-                throw propagateCause(e);
-            }
-        }
-        return localSession;
-    }
-
-    static RuntimeException propagateCause(ExecutionException e)
-    {
-        Throwable cause = e.getCause();
-
-        if (cause instanceof Error) throw ((Error) cause);
-
-        // We could just rethrow e.getCause(). However, the cause of the ExecutionException has likely
-        // been
-        // created on the I/O thread receiving the response. Which means that the stacktrace associated
-        // with said cause will make no mention of the current thread. This is painful for say, finding
-        // out which execute() statement actually raised the exception. So instead, we re-create the
-        // exception.
-        if (cause instanceof DriverException) throw ((DriverException) cause).copy();
-        else throw new DriverInternalError("Unexpected exception thrown", cause);
-    }
+    /**
+     * Gets the current Session object if it already exists.
+     * Otherwise, returns null.
+     * @return the connected {@link Session} object if available. Null otherwise.
+     */
+    @Nullable Session getIfConnected();
 }
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java
index 4c9acc4..edb6752 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java
@@ -18,7 +18,12 @@
 
 package org.apache.cassandra.sidecar.common;
 
+import java.net.InetSocketAddress;
+
 import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
 
 /**
  * Core Cassandra Adapter interface
@@ -33,9 +38,36 @@
      */
     Metadata metadata();
 
+    /**
+     * The {@link NodeSettings} for this instance.
+     * @return the {@link NodeSettings} instance for this instance.
+     */
     NodeSettings nodeSettings();
 
     /**
+     * Execute the provided query on the locally-managed Cassandra instance
+     * @param query the query to execute
+     * @return the {@link ResultSet}
+     */
+    default ResultSet executeLocal(String query)
+    {
+        return executeLocal(new SimpleStatement(query));
+    }
+
+    /**
+     * Execute the provided statement on the locally-managed Cassandra instance
+     * @param statement the statement to execute
+     * @return the {@link ResultSet}
+     */
+    ResultSet executeLocal(Statement statement);
+
+    /**
+     * The address on which the local instance is listening for CQL connections
+     * @return the {@link InetSocketAddress} representing the local address and port.
+     */
+    InetSocketAddress localNativeTransportPort();
+
+    /**
      * @return the {@link StorageOperations} implementation for the Cassandra cluster
      */
     StorageOperations storageOperations();
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
index 70e70ff..e8581d6 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
@@ -18,17 +18,22 @@
 
 package org.apache.cassandra.sidecar.common;
 
+import java.net.InetSocketAddress;
+
 /**
- * A factory is used here to create instances of an Adapter.  We
+ * A factory that will create an appropriate CassandraAdapter.
  */
 public interface ICassandraFactory
 {
     /**
      * Creates a new {@link ICassandraAdapter} with the provided {@link CQLSessionProvider} and {@link JmxClient}
      *
-     * @param session the session to the Cassandra database
-     * @param client  the JMX client to connect to the Cassandra database
-     * @return a {@link ICassandraAdapter}
+     * @param session                     the session to the Cassandra database
+     * @param client                      the JMX client to connect to the Cassandra database
+     * @param localNativeTransportAddress the native transport address and port of the instance
+     * @return an {@link ICassandraAdapter} implementation for the instance provdied
      */
-    ICassandraAdapter create(CQLSessionProvider session, JmxClient client);
+    ICassandraAdapter create(CQLSessionProvider session,
+                             JmxClient client,
+                             InetSocketAddress localNativeTransportAddress);
 }
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/MockCassandraFactory.java b/common/src/main/java/org/apache/cassandra/sidecar/common/MockCassandraFactory.java
index 13f2e2a..7aa9c19 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/MockCassandraFactory.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/MockCassandraFactory.java
@@ -18,13 +18,17 @@
 
 package org.apache.cassandra.sidecar.common;
 
+import java.net.InetSocketAddress;
+
 /**
  *
  */
 public class MockCassandraFactory implements ICassandraFactory
 {
     @Override
-    public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient)
+    public ICassandraAdapter create(CQLSessionProvider session,
+                                    JmxClient jmxClient,
+                                    InetSocketAddress localNativeTransportAddress)
     {
         return null;
     }
diff --git a/src/main/dist/conf/sidecar.yaml b/src/main/dist/conf/sidecar.yaml
index 6104f69..99ad9e3 100644
--- a/src/main/dist/conf/sidecar.yaml
+++ b/src/main/dist/conf/sidecar.yaml
@@ -126,6 +126,12 @@
 #      path: "path/to/truststore.p12"
 #      password: password
 
+driver_parameters:
+  contact_points:
+    - "127.0.0.1:9042"
+    - "127.0.0.2:9042"
+  num_connections: 6
+#  local_dc: datacenter1
 
 healthcheck:
   initial_delay_millis: 0
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
new file mode 100644
index 0000000..05a0452
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CQLSessionProviderImpl.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.DriverInternalError;
+import com.datastax.driver.core.policies.ExponentialReconnectionPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.ReconnectionPolicy;
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.config.DriverConfiguration;
+import org.apache.cassandra.sidecar.config.SidecarConfiguration;
+import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.VisibleForTesting;
+
+/**
+ * Provides connections to the local Cassandra cluster as defined in the Configuration. Currently, it only supports
+ * returning the local connection.
+ */
+public class CQLSessionProviderImpl implements CQLSessionProvider
+{
+    private static final Logger logger = LoggerFactory.getLogger(CQLSessionProviderImpl.class);
+    private final List<InetSocketAddress> contactPoints;
+    private final int numConnections;
+    private final String localDc;
+    private final NettyOptions nettyOptions;
+    private final ReconnectionPolicy reconnectionPolicy;
+    private final List<InetSocketAddress> localInstances;
+    @Nullable
+    private volatile Session session;
+
+    @VisibleForTesting
+    public CQLSessionProviderImpl(List<InetSocketAddress> contactPoints,
+                                  List<InetSocketAddress> localInstances,
+                                  int healthCheckFrequencyMillis,
+                                  String localDc,
+                                  int numConnections,
+                                  NettyOptions options)
+    {
+        this.contactPoints = contactPoints;
+        this.localInstances = localInstances;
+        this.localDc = localDc;
+        this.numConnections = numConnections;
+        this.nettyOptions = options;
+        this.reconnectionPolicy = new ExponentialReconnectionPolicy(500, healthCheckFrequencyMillis);
+    }
+
+    public CQLSessionProviderImpl(SidecarConfiguration configuration,
+                                  NettyOptions options)
+    {
+        DriverConfiguration driverConfiguration = configuration.driverConfiguration();
+        this.contactPoints = driverConfiguration.contactPoints();
+        this.localInstances = configuration.cassandraInstances()
+                                           .stream()
+                                           .map(i -> new InetSocketAddress(i.host(), i.port()))
+                                           .collect(Collectors.toList());
+        this.localDc = driverConfiguration.localDc();
+        this.numConnections = driverConfiguration.numConnections();
+        this.nettyOptions = options;
+        int maxDelayMs = configuration.healthCheckConfiguration().checkIntervalMillis();
+        this.reconnectionPolicy = new ExponentialReconnectionPolicy(500, maxDelayMs);
+    }
+
+    static RuntimeException propagateCause(ExecutionException e)
+    {
+        Throwable cause = e.getCause();
+
+        if (cause instanceof Error) throw ((Error) cause);
+
+        // We could just rethrow e.getCause(). However, the cause of the ExecutionException has likely
+        // been
+        // created on the I/O thread receiving the response. Which means that the stacktrace associated
+        // with said cause will make no mention of the current thread. This is painful for say, finding
+        // out which execute() statement actually raised the exception. So instead, we re-create the
+        // exception.
+        if (cause instanceof DriverException) throw ((DriverException) cause).copy();
+        else throw new DriverInternalError("Unexpected exception thrown", cause);
+    }
+
+    /**
+     * Provides a Session connected to the cluster. If null it means the connection was
+     * could not be established. The session still might throw a NoHostAvailableException if the
+     * cluster is otherwise unreachable.
+     *
+     * @return Session
+     */
+    @Nullable
+    public synchronized Session get()
+    {
+        if (session != null)
+        {
+            return session;
+        }
+        Cluster cluster = null;
+        try
+        {
+            logger.info("Connecting to cluster using contact points {}", contactPoints);
+
+            LoadBalancingPolicy lbp = new SidecarLoadBalancingPolicy(localInstances, localDc, numConnections);
+            // Prevent spurious reconnects of ignored down nodes on `onUp` events
+            QueryOptions queryOptions = new QueryOptions().setReprepareOnUp(false);
+            cluster = Cluster.builder()
+                             .addContactPointsWithPorts(contactPoints)
+                             .withReconnectionPolicy(reconnectionPolicy)
+                             .withoutMetrics()
+                             .withLoadBalancingPolicy(lbp)
+                             .withQueryOptions(queryOptions)
+                             // tests can create a lot of these Cluster objects, to avoid creating HWTs and
+                             // event thread pools for each we have the override
+                             .withNettyOptions(nettyOptions)
+                             .build();
+            session = cluster.connect();
+            logger.info("Successfully connected to Cassandra!");
+        }
+        catch (Exception e)
+        {
+            logger.error("Failed to reach Cassandra", e);
+            if (cluster != null)
+            {
+                try
+                {
+                    cluster.close();
+                }
+                catch (Exception ex)
+                {
+                    logger.error("Failed to close cluster in cleanup", ex);
+                }
+            }
+        }
+        return session;
+    }
+
+    @Override
+    public Session getIfConnected()
+    {
+        return session;
+    }
+
+    @Override
+    public void close()
+    {
+        Session localSession;
+        synchronized (this)
+        {
+            localSession = this.session;
+            this.session = null;
+        }
+        if (localSession != null)
+        {
+            try
+            {
+                localSession.getCluster().closeAsync().get(1, TimeUnit.MINUTES);
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+            }
+            catch (TimeoutException e)
+            {
+                logger.warn("Unable to close session after 1 minute for provider {}", this, e);
+            }
+            catch (ExecutionException e)
+            {
+                throw propagateCause(e);
+            }
+        }
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
index 115f188..4523ed4 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/CassandraAdapterDelegate.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.sidecar.cluster;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
@@ -27,10 +28,15 @@
 import org.slf4j.LoggerFactory;
 
 import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.DriverUtils;
 import com.datastax.driver.core.Host;
 import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import io.vertx.core.Vertx;
 import io.vertx.core.json.JsonObject;
@@ -81,6 +87,8 @@
     private volatile NodeSettings nodeSettings = null;
     private final AtomicBoolean registered = new AtomicBoolean(false);
     private final AtomicBoolean isHealthCheckActive = new AtomicBoolean(false);
+    private final InetSocketAddress localNativeTransportAddress;
+    private volatile Host host;
 
     /**
      * Constructs a new {@link CassandraAdapterDelegate} for the given {@code cassandraInstance}
@@ -91,16 +99,21 @@
      * @param session             the session to the Cassandra database
      * @param jmxClient           the JMX client used to communicate with the Cassandra instance
      * @param sidecarVersion      the version of the Sidecar from the current binary
+     * @param host                the Cassandra instance's hostname or ip address as a string
+     * @param port                the Cassandra instance's port number
      */
     public CassandraAdapterDelegate(Vertx vertx,
                                     int cassandraInstanceId,
                                     CassandraVersionProvider versionProvider,
                                     CQLSessionProvider session,
                                     JmxClient jmxClient,
-                                    String sidecarVersion)
+                                    String sidecarVersion,
+                                    String host,
+                                    int port)
     {
         this.vertx = Objects.requireNonNull(vertx);
         this.cassandraInstanceId = cassandraInstanceId;
+        this.localNativeTransportAddress = new InetSocketAddress(host, port);
         this.sidecarVersion = sidecarVersion;
         this.versionProvider = versionProvider;
         this.cqlSessionProvider = session;
@@ -159,7 +172,7 @@
 
     private void healthCheckInternal()
     {
-        Session activeSession = cqlSessionProvider.localCql();
+        Session activeSession = cqlSessionProvider.get();
         if (activeSession == null)
         {
             LOGGER.info("No local CQL session is available for cassandraInstanceId={}. " +
@@ -172,15 +185,27 @@
 
         try
         {
-            Row oneResult = activeSession.execute("SELECT "
-                                                  + RELEASE_VERSION_COLUMN_NAME + ", "
-                                                  + PARTITIONER_COLUMN_NAME + ", "
-                                                  + DATA_CENTER_COLUMN_NAME + ", "
-                                                  + RPC_ADDRESS_COLUMN_NAME + ", "
-                                                  + RPC_PORT_COLUMN_NAME + ", "
-                                                  + TOKENS_COLUMN_NAME
-                                                  + " FROM system.local")
-                                         .one();
+            // NOTE: We cannot use `executeLocal` here as there may be no adapter yet.
+            SimpleStatement healthCheckStatement =
+            new SimpleStatement("SELECT "
+                                + RELEASE_VERSION_COLUMN_NAME + ", "
+                                + PARTITIONER_COLUMN_NAME + ", "
+                                + DATA_CENTER_COLUMN_NAME + ", "
+                                + RPC_ADDRESS_COLUMN_NAME + ", "
+                                + RPC_PORT_COLUMN_NAME + ", "
+                                + TOKENS_COLUMN_NAME
+                                + " FROM system.local");
+            Metadata metadata = activeSession.getCluster().getMetadata();
+            host = getHost(metadata);
+            if (host == null)
+            {
+                LOGGER.warn("Could not find host in cluster metadata by address and port {}",
+                            localNativeTransportAddress);
+                return;
+            }
+            healthCheckStatement.setHost(host);
+            healthCheckStatement.setConsistencyLevel(ConsistencyLevel.ONE);
+            Row oneResult = activeSession.execute(healthCheckStatement).one();
 
             // Note that within the scope of this method, we should keep on using the local releaseVersion
             String releaseVersion = oneResult.getString(RELEASE_VERSION_COLUMN_NAME);
@@ -200,7 +225,7 @@
                 SimpleCassandraVersion previousVersion = currentVersion;
                 currentVersion = SimpleCassandraVersion.create(releaseVersion);
                 adapter = versionProvider.cassandra(releaseVersion)
-                                         .create(cqlSessionProvider, jmxClient);
+                                         .create(cqlSessionProvider, jmxClient, localNativeTransportAddress);
                 nodeSettings = newNodeSettings;
                 LOGGER.info("Cassandra version change detected (from={} to={}) for cassandraInstanceId={}. " +
                             "New adapter loaded={}", previousVersion, currentVersion, cassandraInstanceId, adapter);
@@ -213,13 +238,27 @@
         {
             LOGGER.error("Unexpected error connecting to Cassandra instance {}", cassandraInstanceId, e);
             // The cassandra node is down.
-            // Unregister the host listener and nullify the session in order to get a new object.
+            // Unregister the host listener.
             markAsDownAndMaybeNotify();
             maybeUnregisterHostListener(activeSession);
-            cqlSessionProvider.close();
         }
     }
 
+    private Host getHost(Metadata metadata)
+    {
+        if (host == null)
+        {
+            synchronized (this)
+            {
+                if (host == null)
+                {
+                    host = DriverUtils.getHost(metadata, localNativeTransportAddress);
+                }
+            }
+        }
+        return host;
+    }
+
     /**
      * @return metadata on the connected cluster, including known nodes and schema definitions obtained from the
      * {@link ICassandraAdapter}
@@ -241,6 +280,16 @@
         return nodeSettings;
     }
 
+    public ResultSet executeLocal(Statement statement)
+    {
+        return fromAdapter(adapter -> adapter.executeLocal(statement));
+    }
+
+    public InetSocketAddress localNativeTransportPort()
+    {
+        return fromAdapter(ICassandraAdapter::localNativeTransportPort);
+    }
+
     @Nullable
     @Override
     public StorageOperations storageOperations()
@@ -265,25 +314,25 @@
     @Override
     public void onAdd(Host host)
     {
-        healthCheck();
+        runIfThisHost(host, this::healthCheck);
     }
 
     @Override
     public void onUp(Host host)
     {
-        healthCheck();
+        runIfThisHost(host, this::healthCheck);
     }
 
     @Override
     public void onDown(Host host)
     {
-        markAsDownAndMaybeNotify();
+        runIfThisHost(host, this::markAsDownAndMaybeNotify);
     }
 
     @Override
     public void onRemove(Host host)
     {
-        healthCheck();
+        runIfThisHost(host, this::healthCheck);
     }
 
     @Override
@@ -304,7 +353,7 @@
     public void close()
     {
         markAsDownAndMaybeNotify();
-        Session activeSession = cqlSessionProvider.close();
+        Session activeSession = cqlSessionProvider.getIfConnected();
         if (activeSession != null)
         {
             maybeUnregisterHostListener(activeSession);
@@ -355,4 +404,12 @@
         ICassandraAdapter localAdapter = this.adapter;
         return localAdapter == null ? null : getter.apply(localAdapter);
     }
+
+    private void runIfThisHost(Host host, Runnable runnable)
+    {
+        if (this.localNativeTransportAddress.equals(host.getEndPoint().resolve()))
+        {
+            runnable.run();
+        }
+    }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
new file mode 100644
index 0000000..74e1b0e
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicy.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.DriverUtils;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.LoadBalancingPolicy;
+import com.datastax.driver.core.policies.RoundRobinPolicy;
+
+/**
+ * The SidecarLoadBalancingPolicy is designed to ensure that the Cassandra Metadata objects associated with the
+ * CqlSessionProvider have enough non-local hosts in their allowed connections to be kept up-to-date
+ * even if the local Cassandra instances are down/have their native transport disabled.
+ * NOTE: This policy won't work with a child policy that is token-aware
+ */
+class SidecarLoadBalancingPolicy implements LoadBalancingPolicy
+{
+    public static final int MIN_NON_LOCAL_CONNECTIONS = 2;
+    private static final Logger LOGGER = LoggerFactory.getLogger(SidecarLoadBalancingPolicy.class);
+    private final Set<Host> selectedHosts = new HashSet<>();
+    private final Set<InetSocketAddress> localHostAddresses;
+    private final LoadBalancingPolicy childPolicy;
+    private final int totalRequestedConnections;
+    private final Random random = new Random();
+    private final HashSet<Host> allHosts = new HashSet<>();
+    private Cluster cluster;
+
+    public SidecarLoadBalancingPolicy(List<InetSocketAddress> localHostAddresses,
+                                      String localDc,
+                                      int numAdditionalConnections)
+    {
+        this.childPolicy = createChildPolicy(localDc);
+        this.localHostAddresses = new HashSet<>(localHostAddresses);
+        if (numAdditionalConnections < MIN_NON_LOCAL_CONNECTIONS)
+        {
+            LOGGER.warn("Additional instances requested was {}, which is less than the minimum of {}. Using {}.",
+                        numAdditionalConnections, MIN_NON_LOCAL_CONNECTIONS, MIN_NON_LOCAL_CONNECTIONS);
+            numAdditionalConnections = MIN_NON_LOCAL_CONNECTIONS;
+        }
+        this.totalRequestedConnections = this.localHostAddresses.size() + numAdditionalConnections;
+    }
+
+    @Override
+    public void init(Cluster cluster, Collection<Host> hosts)
+    {
+        this.cluster = cluster;
+        this.allHosts.addAll(hosts);
+        recalculateSelectedHosts();
+        childPolicy.init(cluster, hosts);
+    }
+
+    @Override
+    public HostDistance distance(Host host)
+    {
+        if (!selectedHosts.contains(host))
+        {
+            return HostDistance.IGNORED;
+        }
+        return childPolicy.distance(host);
+    }
+
+    @Override
+    public Iterator<Host> newQueryPlan(String loggedKeyspace, Statement statement)
+    {
+        Iterator<Host> child = childPolicy.newQueryPlan(loggedKeyspace, statement);
+        // Filter the child policy to only selected hosts
+        return Iterators.filter(child, selectedHosts::contains);
+    }
+
+    @Override
+    public synchronized void onAdd(Host host)
+    {
+        onUp(host);
+        childPolicy.onAdd(host);
+    }
+
+    @Override
+    public synchronized void onUp(Host host)
+    {
+        this.allHosts.add(host); // replace existing reference if there is one
+        if (selectedHosts.size() < totalRequestedConnections)
+        {
+            recalculateSelectedHosts();
+        }
+        childPolicy.onUp(host);
+    }
+
+    @Override
+    public synchronized void onDown(Host host)
+    {
+        // Don't remove local addresses from the selected host list
+        if (localHostAddresses.contains(host.getBroadcastRpcAddress()))
+        {
+            LOGGER.debug("Local Node {} has been marked down.", host);
+            return;
+        }
+
+        boolean wasSelected = selectedHosts.remove(host);
+        if (!wasSelected)
+        {
+            // Non-selected nodes have been marked with HostDistance.IGNORED
+            // even if they may otherwise be useful. This has a side effect
+            // of preventing the driver from trying to reconnect to them
+            // if we miss the `onUp` event, so we need to schedule reconnects
+            // for these hosts explicitly unless we have active connections.
+            DriverUtils.startPeriodicReconnectionAttempt(cluster, host);
+        }
+        recalculateSelectedHosts();
+        childPolicy.onDown(host);
+    }
+
+    @Override
+    public synchronized void onRemove(Host host)
+    {
+        this.allHosts.remove(host);
+        onDown(host);
+        childPolicy.onRemove(host);
+    }
+
+    @Override
+    public void close()
+    {
+        childPolicy.close();
+    }
+
+    /**
+     * Creates the child policy based on the presence of a local datacenter
+     *
+     * @param localDc the local datacenter to use, or null
+     * @return a {@link LoadBalancingPolicy}
+     */
+    private LoadBalancingPolicy createChildPolicy(String localDc)
+    {
+        if (localDc != null)
+        {
+            return DCAwareRoundRobinPolicy.builder().withLocalDc(localDc).build();
+        }
+        return new RoundRobinPolicy();
+    }
+
+    private synchronized void recalculateSelectedHosts()
+    {
+        Map<Boolean, List<Host>> partitionedHosts = allHosts.stream()
+                                                            .collect(Collectors.partitioningBy(this::isLocalHost));
+        List<Host> localHosts = partitionedHosts.get(true);
+        int numLocalHostsConfigured = localHostAddresses.size();
+        if (localHosts == null || localHosts.isEmpty())
+        {
+            LOGGER.warn("Did not find any local hosts in allHosts.");
+        }
+        else
+        {
+            if (localHosts.size() < numLocalHostsConfigured)
+            {
+                LOGGER.warn("Could not find all configured local hosts in host list.");
+            }
+            selectedHosts.addAll(localHosts);
+        }
+        int requiredNonLocalHosts = this.totalRequestedConnections - selectedHosts.size();
+        if (requiredNonLocalHosts > 0)
+        {
+            List<Host> nonLocalHosts = partitionedHosts.get(false);
+            if (nonLocalHosts == null || nonLocalHosts.isEmpty())
+            {
+                LOGGER.warn("Did not find any non-local hosts in allHosts");
+                return;
+            }
+
+            // Remove down and already selected hosts from consideration
+            nonLocalHosts = nonLocalHosts.stream()
+                                         .filter(h -> !selectedHosts.contains(h) && h.isUp())
+                                         .collect(Collectors.toList());
+
+            if (nonLocalHosts.size() < requiredNonLocalHosts)
+            {
+                LOGGER.warn("Could not find enough new, up non-local hosts to meet requested number {}",
+                            requiredNonLocalHosts);
+            }
+            else
+            {
+                LOGGER.debug("Found enough new, up, non-local hosts to meet requested number {}",
+                             requiredNonLocalHosts);
+            }
+            if (nonLocalHosts.size() > requiredNonLocalHosts)
+            {
+                Collections.shuffle(nonLocalHosts, this.random);
+            }
+            int hostsToAdd = Math.min(requiredNonLocalHosts, nonLocalHosts.size());
+            for (int i = 0; i < hostsToAdd; i++)
+            {
+                selectedHosts.add(nonLocalHosts.get(i));
+            }
+        }
+    }
+
+    private boolean isLocalHost(Host host)
+    {
+        return localHostAddresses.contains(host.getEndPoint().resolve());
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java b/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java
new file mode 100644
index 0000000..6a00064
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/config/DriverConfiguration.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config;
+
+import java.net.InetSocketAddress;
+import java.util.List;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public interface DriverConfiguration
+{
+    /**
+     * A list of contact points to use for initial connection to Cassandra.
+     * At least 2 non-replica nodes are recommended.
+     * @return a list of contact points
+     */
+    List<InetSocketAddress> contactPoints();
+
+    /**
+     * The number of connections other than locally-managed nodes to use.
+     * The minimum is 2 - if your value is less than 2, the Sidecar will use 2.
+     * @return the number of connections to make to the cluster.
+     */
+    int numConnections();
+
+    /**
+     * The local datacenter to use for non-local queries to the cluster.
+     * @return the local datacenter, or null if no local datacenter is specified.
+     */
+    String localDc();
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java b/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
index 32f8718..e1d8a4c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/SidecarConfiguration.java
@@ -55,4 +55,9 @@
      * @return the configuration for Cassandra input validation
      */
     CassandraInputValidationConfiguration cassandraInputValidationConfiguration();
+
+    /**
+     * @return the Cassandra Driver parameters to use when connecting to the cluster
+     */
+    DriverConfiguration driverConfiguration();
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
new file mode 100644
index 0000000..49334ed
--- /dev/null
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/DriverConfigurationImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.config.yaml;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.cassandra.sidecar.config.DriverConfiguration;
+
+/**
+ * The driver configuration to use when connecting to Cassandra
+ */
+public class DriverConfigurationImpl implements DriverConfiguration
+{
+    private final List<InetSocketAddress> contactPoints = new ArrayList<>();
+    private String localDc;
+    private int numConnections;
+
+    @JsonProperty("contact_points")
+    public List<InetSocketAddress> contactPoints()
+    {
+        return contactPoints;
+    }
+
+    @JsonProperty("num_connections")
+    public int numConnections()
+    {
+        return numConnections;
+    }
+
+    @JsonProperty("local_dc")
+    public String localDc()
+    {
+        return localDc;
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
index 9da5ccf..3a08608 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/InstanceConfigurationImpl.java
@@ -81,7 +81,7 @@
         this.jmxRolePassword = null;
     }
 
-    protected InstanceConfigurationImpl(int id,
+    public InstanceConfigurationImpl(int id,
                                         String host,
                                         int port,
                                         String username,
diff --git a/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java b/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
index 4219443..148ab71 100644
--- a/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/config/yaml/SidecarConfigurationImpl.java
@@ -34,6 +34,7 @@
 import org.apache.cassandra.sidecar.common.DataObjectBuilder;
 import org.apache.cassandra.sidecar.config.CacheConfiguration;
 import org.apache.cassandra.sidecar.config.CassandraInputValidationConfiguration;
+import org.apache.cassandra.sidecar.config.DriverConfiguration;
 import org.apache.cassandra.sidecar.config.HealthCheckConfiguration;
 import org.apache.cassandra.sidecar.config.InstanceConfiguration;
 import org.apache.cassandra.sidecar.config.JmxConfiguration;
@@ -59,9 +60,11 @@
     @JsonProperty(value = "cassandra_instances")
     protected final List<InstanceConfiguration> cassandraInstances;
 
+    @JsonProperty(value = "driver_parameters")
+    protected final DriverConfiguration driverConfiguration;
+
     @JsonProperty(value = "sidecar", required = true)
     protected final ServiceConfiguration serviceConfiguration;
-
     @JsonProperty("ssl")
     protected final SslConfiguration sslConfiguration;
 
@@ -84,6 +87,7 @@
         sslConfiguration = builder.sslConfiguration;
         healthCheckConfiguration = builder.healthCheckConfiguration;
         cassandraInputValidationConfiguration = builder.cassandraInputValidationConfiguration;
+        driverConfiguration = builder.driverConfiguration;
     }
 
     /**
@@ -146,6 +150,13 @@
         return healthCheckConfiguration;
     }
 
+    @Override
+    @JsonProperty("driver_parameters")
+    public DriverConfiguration driverConfiguration()
+    {
+        return driverConfiguration;
+    }
+
     /**
      * @return the configuration for Cassandra input validation
      */
@@ -198,7 +209,9 @@
                                     .addAbstractTypeMapping(JmxConfiguration.class,
                                                             JmxConfigurationImpl.class)
                                     .addAbstractTypeMapping(TrafficShapingConfiguration.class,
-                                                            TrafficShapingConfigurationImpl.class);
+                                                            TrafficShapingConfigurationImpl.class)
+                                    .addAbstractTypeMapping(DriverConfiguration.class,
+                                                            DriverConfigurationImpl.class);
 
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory())
                               .configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true)
@@ -226,6 +239,7 @@
         private HealthCheckConfiguration healthCheckConfiguration = new HealthCheckConfigurationImpl();
         private CassandraInputValidationConfiguration cassandraInputValidationConfiguration
         = new CassandraInputValidationConfigurationImpl();
+        private DriverConfiguration driverConfiguration = new DriverConfigurationImpl();
 
         protected Builder()
         {
@@ -293,6 +307,18 @@
         }
 
         /**
+         * Sets the {@code driverConfiguration} and returns a reference to this Builder enabling
+         * method chaining.
+         *
+         * @param driverConfiguration the {@code driverConfiguration} to set
+         * @return a reference to this Builder
+         */
+        public Builder driverConfiguration(DriverConfiguration driverConfiguration)
+        {
+            return update(b -> b.driverConfiguration = driverConfiguration);
+        }
+
+        /**
          * Sets the {@code cassandraInputValidationConfiguration} and returns a reference to this Builder enabling
          * method chaining.
          *
diff --git a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
index ddabae3..fec3f4c 100644
--- a/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/server/MainModule.java
@@ -27,6 +27,7 @@
 
 import com.google.common.util.concurrent.SidecarRateLimiter;
 
+import com.datastax.driver.core.NettyOptions;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
@@ -42,6 +43,7 @@
 import io.vertx.ext.web.handler.StaticHandler;
 import io.vertx.ext.web.handler.TimeoutHandler;
 import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
+import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
 import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
@@ -79,6 +81,8 @@
 import org.apache.cassandra.sidecar.utils.MD5ChecksumVerifier;
 import org.apache.cassandra.sidecar.utils.TimeProvider;
 
+import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_SERVER_STOP;
+
 /**
  * Provides main binding for more complex Guice dependencies
  */
@@ -257,14 +261,23 @@
 
     @Provides
     @Singleton
+    public CQLSessionProvider cqlSessionProvider(Vertx vertx, SidecarConfiguration sidecarConfiguration)
+    {
+        CQLSessionProviderImpl cqlSessionProvider = new CQLSessionProviderImpl(sidecarConfiguration,
+                                                                               new NettyOptions());
+        vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), message -> cqlSessionProvider.close());
+        return cqlSessionProvider;
+    }
+
+    @Provides
+    @Singleton
     public InstancesConfig instancesConfig(Vertx vertx,
                                            SidecarConfiguration configuration,
                                            CassandraVersionProvider cassandraVersionProvider,
                                            SidecarVersionProvider sidecarVersionProvider,
-                                           DnsResolver dnsResolver)
+                                           DnsResolver dnsResolver,
+                                           CQLSessionProvider cqlSessionProvider)
     {
-        int healthCheckFrequencyMillis = configuration.healthCheckConfiguration().checkIntervalMillis();
-
         List<InstanceMetadata> instanceMetadataList =
         configuration.cassandraInstances()
                      .stream()
@@ -273,9 +286,9 @@
                          return buildInstanceMetadata(vertx,
                                                       cassandraInstance,
                                                       cassandraVersionProvider,
-                                                      healthCheckFrequencyMillis,
                                                       sidecarVersionProvider.sidecarVersion(),
-                                                      jmxConfiguration);
+                                                      jmxConfiguration,
+                                                      cqlSessionProvider);
                      })
                      .collect(Collectors.toList());
 
@@ -362,25 +375,24 @@
      * Builds the {@link InstanceMetadata} from the {@link InstanceConfiguration},
      * a provided {@code  versionProvider}, and {@code healthCheckFrequencyMillis}.
      *
-     * @param vertx                      the vertx instance
-     * @param cassandraInstance          the cassandra instance configuration
-     * @param versionProvider            a Cassandra version provider
-     * @param healthCheckFrequencyMillis the health check frequency configuration in milliseconds
-     * @param sidecarVersion             the version of the Sidecar from the current binary
-     * @param jmxConfiguration           the configuration for the JMX Client
+     * @param vertx             the vertx instance
+     * @param cassandraInstance the cassandra instance configuration
+     * @param versionProvider   a Cassandra version provider
+     * @param sidecarVersion    the version of the Sidecar from the current binary
+     * @param jmxConfiguration  the configuration for the JMX Client
+     * @param session           the CQL Session provider
      * @return the build instance metadata object
      */
     private static InstanceMetadata buildInstanceMetadata(Vertx vertx,
                                                           InstanceConfiguration cassandraInstance,
                                                           CassandraVersionProvider versionProvider,
-                                                          int healthCheckFrequencyMillis,
                                                           String sidecarVersion,
-                                                          JmxConfiguration jmxConfiguration)
+                                                          JmxConfiguration jmxConfiguration,
+                                                          CQLSessionProvider session)
     {
         String host = cassandraInstance.host();
         int port = cassandraInstance.port();
 
-        CQLSessionProvider session = new CQLSessionProvider(host, port, healthCheckFrequencyMillis);
         JmxClient jmxClient = JmxClient.builder()
                                        .host(cassandraInstance.jmxHost())
                                        .port(cassandraInstance.jmxPort())
@@ -395,7 +407,9 @@
                                                                          versionProvider,
                                                                          session,
                                                                          jmxClient,
-                                                                         sidecarVersion);
+                                                                         sidecarVersion,
+                                                                         host,
+                                                                         port);
         return InstanceMetadataImpl.builder()
                                    .id(cassandraInstance.id())
                                    .host(host)
diff --git a/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
new file mode 100644
index 0000000..4628a14
--- /dev/null
+++ b/src/test/integration/org/apache/cassandra/sidecar/cluster/SidecarLoadBalancingPolicyTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cluster;
+
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.jupiter.api.Assertions;
+
+import com.datastax.driver.core.DriverUtils;
+import com.datastax.driver.core.Host;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * A test for the SidecarLoadBalancingPolicy
+ */
+public class SidecarLoadBalancingPolicyTest extends IntegrationTestBase
+{
+
+    public static final int SIDECAR_MANAGED_INSTANCES = 2;
+
+    private static List<Host> getConnectedHosts(Set<Host> hosts)
+    {
+        return hosts.stream()
+                    .filter(DriverUtils::hasActiveConnections)
+                    .collect(Collectors.toList());
+    }
+
+    protected int getNumInstancesToManage(int clusterSize)
+    {
+        return SIDECAR_MANAGED_INSTANCES; // we only want to manage the first 2 instances in the "cluster"
+    }
+
+    @CassandraIntegrationTest(nodesPerDc = 6)
+    public void shouldMaintainMinimumConnections() throws ExecutionException, InterruptedException
+    {
+        Set<Host> hosts = sidecarTestContext.session().getCluster().getMetadata().getAllHosts();
+        List<Host> connectedHosts = getConnectedHosts(hosts);
+        // We manage 2 hosts, and ask for an additional 2 (the default) for connections.
+        // Therefore, we expect 4 hosts to have connections at startup.
+        int expectedConnections = SIDECAR_MANAGED_INSTANCES + SidecarLoadBalancingPolicy.MIN_NON_LOCAL_CONNECTIONS;
+        assertThat(connectedHosts.size()).isEqualTo(expectedConnections);
+        // Now, shut down one of the hosts and make sure that we connect to a different node
+        UpgradeableCluster cluster = sidecarTestContext.cluster();
+        IUpgradeableInstance inst = shutDownNonLocalInstance(cluster, sidecarTestContext.instancesConfig().instances());
+        assertThat(inst.isShutdown()).isTrue();
+        InetSocketAddress downInstanceAddress = new InetSocketAddress(inst.broadcastAddress().getAddress(),
+                                                                      inst.config().getInt("native_transport_port"));
+        assertConnectionsWithRetry(downInstanceAddress, expectedConnections);
+    }
+
+    private void assertConnectionsWithRetry(InetSocketAddress downInstanceAddress, int expectedConnections)
+    {
+        List<Host> connectedHosts = Collections.emptyList();
+        int attempts = 0;
+        // Retry for up to 2 minutes, but passes much more quickly most of the time, so this should be safe.
+        while (attempts <= 24)
+        {
+            Set<Host> hosts = sidecarTestContext.session().getCluster().getMetadata().getAllHosts();
+            connectedHosts = getConnectedHosts(hosts);
+            List<InetSocketAddress> connectedAddresses = getAddresses(connectedHosts);
+            if (connectedHosts.size() == expectedConnections && !connectedAddresses.contains(downInstanceAddress))
+            {
+                return;
+            }
+            attempts++;
+            Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
+        }
+        List<InetSocketAddress> connectedAddresses = getAddresses(connectedHosts);
+        assertThat(connectedAddresses).doesNotContain(downInstanceAddress);
+        if (connectedHosts.size() == expectedConnections)
+        {
+            return;
+        }
+        String message =
+        String.format("Waited 2 minutes for connected hosts (%d) to be the expected number (%d) but failed",
+                      connectedHosts.size(), expectedConnections);
+        Assertions.fail(message);
+    }
+
+    private List<InetSocketAddress> getAddresses(List<Host> connectedHosts)
+    {
+        return connectedHosts.stream()
+                             .map(h -> h.getEndPoint().resolve())
+                             .collect(Collectors.toList());
+    }
+
+    private IUpgradeableInstance shutDownNonLocalInstance(UpgradeableCluster cluster,
+                                                          List<InstanceMetadata> instances)
+    throws ExecutionException, InterruptedException
+    {
+        Set<InetSocketAddress> localInstances = instances.stream().map(i -> new InetSocketAddress(i.host(), i.port()))
+                                                         .collect(Collectors.toSet());
+        for (IUpgradeableInstance inst : cluster)
+        {
+            InetSocketAddress nativeAddress = new InetSocketAddress(inst.config().broadcastAddress().getAddress(),
+                                                                    inst.config().getInt("native_transport_port"));
+            if (localInstances.contains(nativeAddress))
+            {
+                continue;
+            }
+            inst.shutdown(true).get();
+            return inst;
+        }
+        throw new RuntimeException("Could not find instance to shut down");
+    }
+}
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
new file mode 100644
index 0000000..04bddec
--- /dev/null
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/CQLSessionProviderTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.Future;
+import io.vertx.core.eventbus.Message;
+import io.vertx.core.json.JsonObject;
+import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.ext.web.client.HttpResponse;
+import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.codec.BodyCodec;
+import io.vertx.junit5.VertxExtension;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
+import org.apache.cassandra.testing.CassandraIntegrationTest;
+import org.apache.cassandra.testing.CassandraTestContext;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.SERVICE_UNAVAILABLE;
+import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Test CQLSessionProvider in a variety of cluster states
+ */
+@ExtendWith(VertxExtension.class)
+public class CQLSessionProviderTest extends IntegrationTestBase
+{
+
+    public static final String OK_KEYSPACE_RESPONSE_START = "{\"schema\":\"CREATE KEYSPACE ";
+    public static final String KEYSPACE_FAILED_RESPONSE_START = "{\"status\":\"Service Unavailable\",";
+
+    @CassandraIntegrationTest(nodesPerDc = 2, startCluster = false)
+    void testCqlSessionProviderWorksAsExpected(VertxTestContext context, CassandraTestContext cassandraTestContext)
+    throws Exception
+    {
+        UpgradeableCluster cluster = cassandraTestContext.getCluster();
+        testWithClient(context, false, webClient -> {
+                           // To start, both instances are stopped, so we should get 503s for both
+                           buildInstanceHealthRequest(webClient, "1")
+                           .send()
+                           .onSuccess(response -> assertHealthCheckFailed(response, context))
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "2")
+                                    .send()
+                                    .onSuccess(response -> assertHealthCheckFailed(response, context)))
+                           .compose(_ignored ->
+                                    buildKeyspaceRequest(webClient)
+                                    .send()
+                                    // With no instances available in the cluster, keyspace requests should fail
+                                    .onSuccess(response -> assertKeyspaceFailed(response, context)))
+                           .compose(_ignored -> {
+                               // Start instance 1 and check both again
+                               return Future.future(promise -> {
+                                   vertx.eventBus()
+                                        .localConsumer(ON_CASSANDRA_CQL_READY.address(),
+                                                       (Message<JsonObject> message) -> {
+                                                           if (message.body().getInteger("cassandraInstanceId") == 1)
+                                                           {
+                                                               promise.complete();
+                                                           }
+                                                       });
+                                   cluster.get(1).startup();
+                               });
+                           })
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "1")
+                                    .send()
+                                    .onSuccess(response -> assertHealthCheckOk(response, context)))
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "2")
+                                    .send()
+                                    .onSuccess(response -> assertHealthCheckFailed(response, context))
+                           )
+                           .compose(_ignored ->
+                                    // Even with only 1 instance connected/up, we should still have keyspace metadata
+                                    buildKeyspaceRequest(webClient)
+                                    .send()
+                                    .onSuccess(response -> assertKeyspaceOk(response, context)))
+                           .compose(_ignored -> {
+                               // Start instance 2 and check both again
+                               return Future.future(promise -> {
+                                   vertx.eventBus()
+                                        .localConsumer(ON_CASSANDRA_CQL_READY.address(),
+                                                       (Message<JsonObject> message) -> {
+                                                           if (message.body().getInteger("cassandraInstanceId") == 2)
+                                                           {
+                                                               promise.complete();
+                                                           }
+                                                       });
+                                   cluster.get(2).startup();
+                               });
+                           })
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "1")
+                                    .send()
+                                    .onSuccess(response -> assertHealthCheckOk(response, context)))
+                           .compose(_ignored ->
+                                    buildInstanceHealthRequest(webClient, "2")
+                                    .send()
+                                    .onSuccess(response -> assertHealthCheckOk(response, context))
+                           )
+                           .onSuccess(_ignored -> context.completeNow())
+                           .onFailure(context::failNow);
+                       }
+        );
+    }
+
+    private HttpRequest<String> buildInstanceHealthRequest(WebClient webClient, String instanceId)
+    {
+        return webClient.get(server.actualPort(),
+                             "localhost",
+                             "/api/v1/cassandra/__health?instanceId=" + instanceId)
+                        .as(BodyCodec.string());
+    }
+
+    private HttpRequest<String> buildKeyspaceRequest(WebClient webClient)
+    {
+        return webClient.get(server.actualPort(),
+                             "localhost",
+                             "/api/v1/schema/keyspaces")
+                        .as(BodyCodec.string());
+    }
+
+    private void assertHealthCheckOk(HttpResponse<String> response, VertxTestContext context)
+    {
+        context.verify(() -> {
+            assertThat(response.statusCode()).isEqualTo(OK.code());
+            assertThat(response.body()).isEqualTo("{\"status\":\"OK\"}");
+        });
+    }
+
+    private void assertHealthCheckFailed(HttpResponse<String> response, VertxTestContext context)
+    {
+        context.verify(() -> {
+            assertThat(response.statusCode()).isEqualTo(SERVICE_UNAVAILABLE.code());
+            assertThat(response.body()).isEqualTo("{\"status\":\"NOT_OK\"}");
+        });
+    }
+
+    private void assertKeyspaceOk(HttpResponse<String> response, VertxTestContext context)
+    {
+        context.verify(() -> {
+            assertThat(response.statusCode()).isEqualTo(OK.code());
+            assertThat(response.body()).startsWith(OK_KEYSPACE_RESPONSE_START);
+        });
+    }
+
+    private void assertKeyspaceFailed(HttpResponse<String> response, VertxTestContext context)
+    {
+        context.verify(() -> {
+            assertThat(response.statusCode()).isEqualTo(SERVICE_UNAVAILABLE.code());
+            assertThat(response.body()).startsWith(KEYSPACE_FAILED_RESPONSE_START);
+        });
+    }
+}
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
index 25ad96d..7586a22 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
@@ -32,8 +32,8 @@
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.distributed.api.NodeToolResult;
-import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 
@@ -73,13 +73,8 @@
                                                                      .delegate();
         assertThat(adapterDelegate.isUp()).as("health check succeeds").isTrue();
 
-        // Disable binary
-        NodeToolResult nodetoolResult = sidecarTestContext.cluster().get(1).nodetoolResult("disablebinary");
-        assertThat(nodetoolResult.getRc())
-        .withFailMessage("Failed to disable binary:\nstdout:" + nodetoolResult.getStdout()
-                         + "\nstderr: " + nodetoolResult.getStderr())
-        .isEqualTo(0);
-
+        // Set up test listeners before disabling/enabling binary to avoid race conditions
+        // where the event happens before the consumer is registered.
         eventBus.localConsumer(ON_CASSANDRA_CQL_DISCONNECTED.address(), (Message<JsonObject> message) -> {
             int instanceId = message.body().getInteger("cassandraInstanceId");
             CassandraAdapterDelegate delegate = sidecarTestContext.instancesConfig()
@@ -100,6 +95,15 @@
                                        .isTrue();
             cqlReady.flag();
         });
+
+        // Disable binary
+        NodeToolResult nodetoolResult = sidecarTestContext.cluster().get(1).nodetoolResult("disablebinary");
+        assertThat(nodetoolResult.getRc())
+        .withFailMessage("Failed to disable binary:\nstdout:" + nodetoolResult.getStdout()
+                         + "\nstderr: " + nodetoolResult.getStderr())
+        .isEqualTo(0);
+        // NOTE: enable binary happens inside the disable binary handler above, which then will trigger the
+        // cqlReady flag.
     }
 
     @CassandraIntegrationTest(jmx = false, nodesPerDc = 3)
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
index de7c793..8a91324 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
@@ -23,8 +23,8 @@
 import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.common.data.GossipInfoResponse;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 import org.apache.cassandra.testing.CassandraTestContext;
 
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
index 0d3000f..51f6828 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
@@ -31,10 +31,10 @@
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
-import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.common.data.RingEntry;
 import org.apache.cassandra.sidecar.common.data.RingResponse;
-import org.apache.cassandra.sidecar.test.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 import org.apache.cassandra.testing.CassandraTestContext;
 
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
index 32fabf9..c1b30f9 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/SchemaHandlerIntegrationTest.java
@@ -23,8 +23,8 @@
 import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.common.data.SchemaResponse;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 
 import static org.assertj.core.api.Assertions.assertThat;
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
index 2e2c797..3d7b371 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
@@ -30,8 +30,8 @@
 import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerIntegrationTest.java
index b37099f..4798b56 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandlerIntegrationTest.java
@@ -38,10 +38,10 @@
 import io.vertx.ext.web.codec.BodyCodec;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
-import org.apache.cassandra.sidecar.test.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 
 import static io.netty.handler.codec.http.HttpResponseStatus.OK;
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
index 3b7227b..b839b69 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
@@ -45,9 +45,9 @@
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.distributed.UpgradeableCluster;
-import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
-import org.apache.cassandra.sidecar.test.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.CassandraSidecarTestContext;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.sidecar.utils.SimpleCassandraVersion;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
index a3180eb..250bdee 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/BaseTokenRangeIntegrationTest.java
@@ -40,9 +40,9 @@
 import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.TokenSupplier;
-import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.adapters.base.Partitioner;
 import org.apache.cassandra.sidecar.common.data.TokenRangeReplicasResponse;
+import org.apache.cassandra.sidecar.testing.IntegrationTestBase;
 import org.apache.cassandra.testing.AbstractCassandraTestContext;
 import org.apache.cassandra.testing.CassandraIntegrationTest;
 import org.apache.cassandra.testing.ConfigurableCassandraTestContext;
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTest.java
index 5059378..e6b1f31 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTest.java
@@ -42,7 +42,7 @@
 @ExtendWith(VertxExtension.class)
 public class JoiningTest extends JoiningBaseTest
 {
-    @CassandraIntegrationTest(nodesPerDc = 3, newNodesPerDc = 1, gossip = true, network = true)
+    @CassandraIntegrationTest(nodesPerDc = 3, newNodesPerDc = 1, network = true)
     void retrieveMappingWithKeyspaceWithAddNode(VertxTestContext context) throws Exception
     {
         createTestKeyspace(ImmutableMap.of("replication_factor", DEFAULT_RF));
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestDoubleCluster.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestDoubleCluster.java
index 65b00fb..322c5ad 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestDoubleCluster.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestDoubleCluster.java
@@ -58,7 +58,7 @@
 @ExtendWith(VertxExtension.class)
 public class JoiningTestDoubleCluster extends JoiningBaseTest
 {
-    @CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 5, network = true, buildCluster = false)
     void retrieveMappingWithDoubleClusterSize(VertxTestContext context,
                                               ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDC.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDC.java
index a5e603a..0f56390 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDC.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDC.java
@@ -60,7 +60,7 @@
 public class JoiningTestMultiDC extends JoiningBaseTest
 {
     @CassandraIntegrationTest(
-    nodesPerDc = 3, newNodesPerDc = 3, numDcs = 2, network = true, gossip = true, buildCluster = false)
+    nodesPerDc = 3, newNodesPerDc = 3, numDcs = 2, network = true, buildCluster = false)
     void retrieveMappingsDoubleClusterSizeMultiDC(VertxTestContext context,
                                                   ConfigurableCassandraTestContext cassandraTestContext)
     throws Exception
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDCSingleReplicated.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDCSingleReplicated.java
index db24057..3db934d 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDCSingleReplicated.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultiDCSingleReplicated.java
@@ -60,7 +60,7 @@
 public class JoiningTestMultiDCSingleReplicated extends JoiningBaseTest
 {
     @CassandraIntegrationTest(
-    nodesPerDc = 5, newNodesPerDc = 1, numDcs = 2, network = true, gossip = true, buildCluster = false)
+    nodesPerDc = 5, newNodesPerDc = 1, numDcs = 2, network = true, buildCluster = false)
     void retrieveMappingsSingleDCReplicatedKeyspace(VertxTestContext context,
                                                     ConfigurableCassandraTestContext cassandraTestContext)
     throws Exception
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultipleNodes.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultipleNodes.java
index 65936f8..1eec7e2 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultipleNodes.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestMultipleNodes.java
@@ -58,7 +58,7 @@
 @ExtendWith(VertxExtension.class)
 public class JoiningTestMultipleNodes extends JoiningBaseTest
 {
-    @CassandraIntegrationTest(nodesPerDc = 3, newNodesPerDc = 2, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 3, newNodesPerDc = 2, network = true, buildCluster = false)
     void retrieveMappingWithMultipleJoiningNodes(VertxTestContext context,
                                                  ConfigurableCassandraTestContext cassandraTestContext)
     throws Exception
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestSingleNode.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestSingleNode.java
index 6b285d8..ac36244 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestSingleNode.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/JoiningTestSingleNode.java
@@ -58,7 +58,7 @@
 @ExtendWith(VertxExtension.class)
 public class JoiningTestSingleNode extends JoiningBaseTest
 {
-    @CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 1, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 1, network = true, buildCluster = false)
     void retrieveMappingWithJoiningNode(VertxTestContext context,
                                         ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTest.java
index b114564..d4e6a8a 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTest.java
@@ -55,7 +55,7 @@
 @ExtendWith(VertxExtension.class)
 class LeavingTest extends LeavingBaseTest
 {
-    @CassandraIntegrationTest(nodesPerDc = 5, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster = false)
     void retrieveMappingWithKeyspaceLeavingNode(VertxTestContext context,
                                                 ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
@@ -69,7 +69,7 @@
                                generateExpectedRangeMappingSingleLeavingNode());
     }
 
-    @CassandraIntegrationTest(nodesPerDc = 5, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster = false)
     void retrieveMappingWithMultipleLeavingNodes(VertxTestContext context,
                                                  ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
@@ -83,7 +83,7 @@
                                generateExpectedRangeMappingMultipleLeavingNodes());
     }
 
-    @CassandraIntegrationTest(nodesPerDc = 6, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 6, network = true, buildCluster = false)
     void retrieveMappingHalveClusterSize(VertxTestContext context,
                                          ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDC.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDC.java
index 77173df..0e28836 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDC.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDC.java
@@ -55,7 +55,7 @@
 class LeavingTestMultiDC extends LeavingBaseTest
 {
     @CassandraIntegrationTest(
-    nodesPerDc = 5, numDcs = 2, network = true, gossip = true, buildCluster = false)
+    nodesPerDc = 5, numDcs = 2, network = true, buildCluster = false)
     void retrieveMappingWithLeavingNodesMultiDC(VertxTestContext context,
                                                 ConfigurableCassandraTestContext cassandraTestContext)
     throws Exception
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDCHalveCluster.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDCHalveCluster.java
index 6811116..6684f9e 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDCHalveCluster.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/LeavingTestMultiDCHalveCluster.java
@@ -57,7 +57,7 @@
 @ExtendWith(VertxExtension.class)
 class LeavingTestMultiDCHalveCluster extends LeavingBaseTest
 {
-    @CassandraIntegrationTest(nodesPerDc = 6, numDcs = 2, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 6, numDcs = 2, network = true, buildCluster = false)
     void retrieveMappingMultiDCHalveClusterSize(VertxTestContext context,
                                                 ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingMultiDCTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingMultiDCTest.java
index a486de6..64bf535 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingMultiDCTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingMultiDCTest.java
@@ -55,7 +55,7 @@
 @ExtendWith(VertxExtension.class)
 class MovingMultiDCTest extends MovingBaseTest
 {
-    @CassandraIntegrationTest(nodesPerDc = 5, numDcs = 2, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 5, numDcs = 2, network = true, buildCluster = false)
     void retrieveMappingWhileMovingNodeMultiDC(VertxTestContext context,
                                                ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingTest.java
index 5b0238a..3e89b8e 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/MovingTest.java
@@ -56,7 +56,7 @@
 class MovingTest extends MovingBaseTest
 {
 
-    @CassandraIntegrationTest(nodesPerDc = 5, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 5, network = true, buildCluster = false)
     void retrieveMappingWithKeyspaceMovingNode(VertxTestContext context,
                                                ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java
index 14e3a17..68673e1 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementBaseTest.java
@@ -193,13 +193,7 @@
         for (IUpgradeableInstance nodeToRemove : removedNodes)
         {
             ClusterUtils.stopUnchecked(nodeToRemove);
-            String remAddress = nodeToRemove.config().broadcastAddress().getAddress().getHostAddress();
-
-            List<ClusterUtils.RingInstanceDetails> ring = ClusterUtils.ring(seed);
-            List<ClusterUtils.RingInstanceDetails> match = ring.stream()
-                                                               .filter((d) -> d.getAddress().equals(remAddress))
-                                                               .collect(Collectors.toList());
-            assertThat(match.stream().anyMatch(r -> r.getStatus().equals("Down"))).isTrue();
+            ClusterUtils.awaitRingStatus(seed, nodeToRemove, "Down");
         }
     }
 
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementMultiDCTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementMultiDCTest.java
index a76df99..d94dcb9 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementMultiDCTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementMultiDCTest.java
@@ -58,7 +58,7 @@
 class ReplacementMultiDCTest extends ReplacementBaseTest
 {
     @CassandraIntegrationTest(
-    nodesPerDc = 5, newNodesPerDc = 1, numDcs = 2, network = true, gossip = true, buildCluster = false)
+    nodesPerDc = 5, newNodesPerDc = 1, numDcs = 2, network = true, buildCluster = false)
     void retrieveMappingWithNodeReplacementMultiDC(VertxTestContext context,
                                                    ConfigurableCassandraTestContext cassandraTestContext)
     throws Exception
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementTest.java
index c1e822f..c5d0e2b 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/tokenrange/ReplacementTest.java
@@ -59,7 +59,7 @@
 @ExtendWith(VertxExtension.class)
 class ReplacementTest extends ReplacementBaseTest
 {
-    @CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 1, network = true, gossip = true, buildCluster = false)
+    @CassandraIntegrationTest(nodesPerDc = 5, newNodesPerDc = 1, network = true, buildCluster = false)
     void retrieveMappingWithNodeReplacement(VertxTestContext context,
                                             ConfigurableCassandraTestContext cassandraTestContext) throws Exception
     {
diff --git a/src/test/integration/org/apache/cassandra/sidecar/test/CassandraSidecarTestContext.java b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
similarity index 85%
rename from src/test/integration/org/apache/cassandra/sidecar/test/CassandraSidecarTestContext.java
rename to src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
index 0bfab3b..0faa420 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/test/CassandraSidecarTestContext.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/CassandraSidecarTestContext.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.test;
+package org.apache.cassandra.sidecar.testing;
 
 import java.net.InetSocketAddress;
 import java.nio.file.Path;
@@ -24,8 +24,8 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.stream.Collectors;
 
-import com.datastax.driver.core.NettyOptions;
 import com.datastax.driver.core.Session;
 import io.vertx.core.Vertx;
 import org.apache.cassandra.distributed.UpgradeableCluster;
@@ -33,6 +33,7 @@
 import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.distributed.shared.JMXUtil;
 import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
+import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
 import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
@@ -53,24 +54,27 @@
  */
 public class CassandraSidecarTestContext implements AutoCloseable
 {
+    private static final SidecarVersionProvider svp = new SidecarVersionProvider("/sidecar.version");
     public final SimpleCassandraVersion version;
     private final CassandraVersionProvider versionProvider;
     private final DnsResolver dnsResolver;
     private final AbstractCassandraTestContext abstractCassandraTestContext;
     private final Vertx vertx;
-    public InstancesConfig instancesConfig;
-    private List<CQLSessionProvider> sessionProviders;
-    private List<JmxClient> jmxClients;
-    private static final SidecarVersionProvider svp = new SidecarVersionProvider("/sidecar.version");
+    private final int numInstancesToManage;
     private final List<InstanceConfigListener> instanceConfigListeners;
+    public InstancesConfig instancesConfig;
+    private List<JmxClient> jmxClients;
+    private CQLSessionProvider sessionProvider;
 
     private CassandraSidecarTestContext(Vertx vertx,
                                         AbstractCassandraTestContext abstractCassandraTestContext,
                                         SimpleCassandraVersion version,
                                         CassandraVersionProvider versionProvider,
-                                        DnsResolver dnsResolver)
+                                        DnsResolver dnsResolver,
+                                        int numInstancesToManage)
     {
         this.vertx = vertx;
+        this.numInstancesToManage = numInstancesToManage;
         this.instanceConfigListeners = new ArrayList<>();
         this.abstractCassandraTestContext = abstractCassandraTestContext;
         this.version = version;
@@ -80,7 +84,8 @@
 
     public static CassandraSidecarTestContext from(Vertx vertx,
                                                    AbstractCassandraTestContext cassandraTestContext,
-                                                   DnsResolver dnsResolver)
+                                                   DnsResolver dnsResolver,
+                                                   int numInstancesToManage)
     {
         org.apache.cassandra.testing.SimpleCassandraVersion rootVersion = cassandraTestContext.version;
         SimpleCassandraVersion versionParsed = SimpleCassandraVersion.create(rootVersion.major,
@@ -91,7 +96,8 @@
                                                cassandraTestContext,
                                                versionParsed,
                                                versionProvider,
-                                               dnsResolver);
+                                               dnsResolver,
+                                               numInstancesToManage);
     }
 
     public static CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver)
@@ -100,6 +106,18 @@
                .add(new CassandraFactory(dnsResolver, svp.sidecarVersion())).build();
     }
 
+    private static int tryGetIntConfig(IInstanceConfig config, String configName, int defaultValue)
+    {
+        try
+        {
+            return config.getInt(configName);
+        }
+        catch (NullPointerException npe)
+        {
+            return defaultValue;
+        }
+    }
+
     public void registerInstanceConfigListener(InstanceConfigListener listener)
     {
         this.instanceConfigListeners.add(listener);
@@ -127,8 +145,9 @@
 
     public InstancesConfig instancesConfig()
     {
+        // rebuild instances config if cluster changed
         if (instancesConfig == null
-            || instancesConfig.instances().size() != cluster().size()) // rebuild instances config if cluster changed
+            || instancesConfig.instances().size() != numInstancesToManage)
         {
             // clean-up any open sessions or client resources
             close();
@@ -139,18 +158,7 @@
 
     public Session session()
     {
-        return session(0);
-    }
-
-    public Session session(int instance)
-    {
-        if (sessionProviders == null)
-        {
-            setInstancesConfig();
-        }
-        CQLSessionProvider cqlSessionProvider = sessionProviders.get(instance);
-        assertThat(cqlSessionProvider).as("cqlSessionProvider for instance=" + instance).isNotNull();
-        return cqlSessionProvider.localCql();
+        return sessionProvider.get();
     }
 
     @Override
@@ -165,10 +173,6 @@
     @Override
     public void close()
     {
-        if (sessionProviders != null)
-        {
-            sessionProviders.forEach(CQLSessionProvider::close);
-        }
         if (instancesConfig != null)
         {
             instancesConfig.instances().forEach(instance -> instance.delegate().close());
@@ -189,14 +193,6 @@
         return jmxClients.get(instance);
     }
 
-    /**
-     * A listener for {@link InstancesConfig} state changes
-     */
-    public interface InstanceConfigListener
-    {
-        void onInstancesConfigChange(InstancesConfig instancesConfig);
-    }
-
     private void setInstancesConfig()
     {
         this.instancesConfig = buildInstancesConfig(versionProvider, dnsResolver);
@@ -211,18 +207,16 @@
     {
         UpgradeableCluster cluster = cluster();
         List<InstanceMetadata> metadata = new ArrayList<>();
-        sessionProviders = new ArrayList<>();
         jmxClients = new ArrayList<>();
-        for (int i = 0; i < cluster.size(); i++)
+        List<InetSocketAddress> addresses = buildContactList(cluster);
+        sessionProvider = new CQLSessionProviderImpl(addresses, addresses, 500, null,
+                                                     0, SharedExecutorNettyOptions.INSTANCE);
+        for (int i = 0; i < numInstancesToManage; i++)
         {
             IUpgradeableInstance instance = cluster.get(i + 1); // 1-based indexing to match node names;
             IInstanceConfig config = instance.config();
             String hostName = JMXUtil.getJmxHost(config);
             int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042);
-            InetSocketAddress address = InetSocketAddress.createUnresolved(hostName,
-                                                                           nativeTransportPort);
-            CQLSessionProvider sessionProvider = new CQLSessionProvider(address, new NettyOptions());
-            this.sessionProviders.add(sessionProvider);
             // The in-jvm dtest framework sometimes returns a cluster before all the jmx infrastructure is initialized.
             // In these cases, we want to wait longer than the default retry/delay settings to connect.
             JmxClient jmxClient = JmxClient.builder()
@@ -240,12 +234,15 @@
             assertThat(dataDirParentPath).isNotNull();
             Path stagingPath = dataDirParentPath.resolve("staging");
             String stagingDir = stagingPath.toFile().getAbsolutePath();
+
             CassandraAdapterDelegate delegate = new CassandraAdapterDelegate(vertx,
                                                                              i + 1,
                                                                              versionProvider,
                                                                              sessionProvider,
                                                                              jmxClient,
-                                                                             "1.0-TEST");
+                                                                             "1.0-TEST",
+                                                                             hostName,
+                                                                             nativeTransportPort);
             metadata.add(InstanceMetadataImpl.builder()
                                              .id(i + 1)
                                              .host(config.broadcastAddress().getAddress().getHostAddress())
@@ -258,15 +255,20 @@
         return new InstancesConfigImpl(metadata, dnsResolver);
     }
 
-    private static int tryGetIntConfig(IInstanceConfig config, String configName, int defaultValue)
+    private List<InetSocketAddress> buildContactList(UpgradeableCluster cluster)
     {
-        try
-        {
-            return config.getInt(configName);
-        }
-        catch (NullPointerException npe)
-        {
-            return defaultValue;
-        }
+        return cluster.stream()
+                      .map(i -> new InetSocketAddress(i.config().broadcastAddress().getAddress(),
+                                                      tryGetIntConfig(i.config(), "native_transport_port", 9042)))
+                      .limit(numInstancesToManage)
+                      .collect(Collectors.toList());
+    }
+
+    /**
+     * A listener for {@link InstancesConfig} state changes
+     */
+    public interface InstanceConfigListener
+    {
+        void onInstancesConfigChange(InstancesConfig instancesConfig);
     }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
similarity index 74%
rename from src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java
rename to src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
index 32dd6a8..c68eda2 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestBase.java
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar;
+package org.apache.cassandra.sidecar.testing;
 
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -33,8 +34,10 @@
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +60,6 @@
 import org.apache.cassandra.sidecar.common.dns.DnsResolver;
 import org.apache.cassandra.sidecar.server.MainModule;
 import org.apache.cassandra.sidecar.server.Server;
-import org.apache.cassandra.sidecar.test.CassandraSidecarTestContext;
 import org.apache.cassandra.testing.AbstractCassandraTestContext;
 
 import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_CASSANDRA_CQL_READY;
@@ -70,25 +72,29 @@
  */
 public abstract class IntegrationTestBase
 {
+    protected static final String TEST_KEYSPACE = "testkeyspace";
+    protected static final int DEFAULT_RF = 3;
+    private static final String TEST_TABLE_PREFIX = "testtable";
+    private static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0);
     protected Logger logger = LoggerFactory.getLogger(this.getClass());
     protected Vertx vertx;
     protected Server server;
     protected WebClient client;
-
-    protected static final String TEST_KEYSPACE = "testkeyspace";
-    private static final String TEST_TABLE_PREFIX = "testtable";
-
-    protected static final int DEFAULT_RF = 3;
-    private static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0);
     protected CassandraSidecarTestContext sidecarTestContext;
+    protected Injector injector;
 
     @BeforeEach
-    void setup(AbstractCassandraTestContext cassandraTestContext) throws InterruptedException
+    void setup(AbstractCassandraTestContext cassandraTestContext, TestInfo testInfo) throws InterruptedException
     {
         IntegrationTestModule integrationTestModule = new IntegrationTestModule();
-        Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(integrationTestModule));
+        System.setProperty("cassandra.testtag", testInfo.getTestClass().get().getCanonicalName());
+        System.setProperty("suitename", testInfo.getDisplayName() + ": " + cassandraTestContext.version);
+        int clusterSize = cassandraTestContext.clusterSize();
+        injector = Guice.createInjector(Modules.override(new MainModule()).with(integrationTestModule));
         vertx = injector.getInstance(Vertx.class);
-        sidecarTestContext = CassandraSidecarTestContext.from(vertx, cassandraTestContext, DnsResolver.DEFAULT);
+        sidecarTestContext = CassandraSidecarTestContext.from(vertx, cassandraTestContext, DnsResolver.DEFAULT,
+                                                              getNumInstancesToManage(clusterSize));
+
         integrationTestModule.setCassandraTestContext(sidecarTestContext);
 
         server = injector.getInstance(Server.class);
@@ -110,7 +116,8 @@
                   sidecarTestContext.registerInstanceConfigListener(this::healthCheck);
                   if (!sidecarTestContext.isClusterBuilt())
                   {
-                      context.completeNow();
+                      // Give everything a moment to get started and connected
+                      vertx.setTimer(TimeUnit.SECONDS.toMillis(1), id1 -> context.completeNow());
                   }
               })
               .onFailure(context::failNow);
@@ -118,6 +125,20 @@
         context.awaitCompletion(5, TimeUnit.SECONDS);
     }
 
+    /**
+     * Some tests may want to "manage" fewer instances than the complete cluster.
+     * Therefore, override this if your test wants to manage fewer than the complete cluster size.
+     * The Sidecar will be configured to manage the first N instances in the cluster by instance number.
+     * Defaults to the entire cluster.
+     *
+     * @param clusterSize the size of the cluster as defined by the integration test
+     * @return the number of instances to manage
+     */
+    protected int getNumInstancesToManage(int clusterSize)
+    {
+        return clusterSize;
+    }
+
     @AfterEach
     void tearDown() throws InterruptedException
     {
@@ -133,11 +154,19 @@
 
     protected void testWithClient(VertxTestContext context, Consumer<WebClient> tester) throws Exception
     {
+        testWithClient(context, true, tester);
+    }
+
+    protected void testWithClient(VertxTestContext context,
+                                  boolean waitForCluster,
+                                  Consumer<WebClient> tester)
+    throws Exception
+    {
         CassandraAdapterDelegate delegate = sidecarTestContext.instancesConfig()
                                                               .instanceFromId(1)
                                                               .delegate();
 
-        if (delegate.isUp())
+        if (delegate.isUp() || !waitForCluster)
         {
             tester.accept(client);
         }
@@ -162,9 +191,30 @@
 
     protected void createTestKeyspace(Map<String, Integer> rf)
     {
-        Session session = maybeGetSession();
-        session.execute("CREATE KEYSPACE " + TEST_KEYSPACE +
-                        " WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', " + generateRfString(rf) + " };");
+        int attempts = 1;
+        ArrayList<Throwable> thrown = new ArrayList<>(5);
+        while (attempts <= 5)
+        {
+            try
+            {
+                Session session = maybeGetSession();
+
+                session.execute("CREATE KEYSPACE IF NOT EXISTS " + TEST_KEYSPACE +
+                                " WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', " +
+                                generateRfString(rf) + " };");
+                return;
+            }
+            catch (Throwable t)
+            {
+                thrown.add(t);
+                logger.debug("Failed to create keyspace {} on attempt {}", TEST_KEYSPACE, attempts);
+                attempts++;
+                Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+            }
+        }
+        RuntimeException rte = new RuntimeException("Could not create test keyspace after 5 attempts.");
+        thrown.forEach(rte::addSuppressed);
+        throw rte;
     }
 
     private String generateRfString(Map<String, Integer> dcToRf)
diff --git a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
similarity index 97%
rename from src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java
rename to src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
index 0564f61..9b7f1a7 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/IntegrationTestModule.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar;
+package org.apache.cassandra.sidecar.testing;
 
 import java.util.Collections;
 import java.util.List;
@@ -33,7 +33,6 @@
 import org.apache.cassandra.sidecar.config.yaml.HealthCheckConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.ServiceConfigurationImpl;
 import org.apache.cassandra.sidecar.config.yaml.SidecarConfigurationImpl;
-import org.apache.cassandra.sidecar.test.CassandraSidecarTestContext;
 import org.jetbrains.annotations.NotNull;
 
 /**
@@ -55,11 +54,27 @@
         return new WrapperInstancesConfig();
     }
 
+    @Provides
+    @Singleton
+    public SidecarConfiguration configuration()
+    {
+        ServiceConfiguration conf = ServiceConfigurationImpl.builder()
+                                                            .host("127.0.0.1")
+                                                            .port(0) // let the test find an available port
+                                                            .build();
+        HealthCheckConfiguration healthCheckConfiguration = new HealthCheckConfigurationImpl(50, 500);
+        return SidecarConfigurationImpl.builder()
+                                       .serviceConfiguration(conf)
+                                       .healthCheckConfiguration(healthCheckConfiguration)
+                                       .build();
+    }
+
     class WrapperInstancesConfig implements InstancesConfig
     {
         /**
          * @return metadata of instances owned by the sidecar
          */
+        @Override
         @NotNull
         public List<InstanceMetadata> instances()
         {
@@ -92,19 +107,4 @@
             return cassandraTestContext.instancesConfig().instanceFromHost(host);
         }
     }
-
-    @Provides
-    @Singleton
-    public SidecarConfiguration configuration()
-    {
-        ServiceConfiguration conf = ServiceConfigurationImpl.builder()
-                                                            .host("127.0.0.1")
-                                                            .port(0) // let the test find an available port
-                                                            .build();
-        HealthCheckConfiguration healthCheckConfiguration = new HealthCheckConfigurationImpl(50, 500);
-        return SidecarConfigurationImpl.builder()
-                                       .serviceConfiguration(conf)
-                                       .healthCheckConfiguration(healthCheckConfiguration)
-                                       .build();
-    }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
new file mode 100644
index 0000000..3bdeead
--- /dev/null
+++ b/src/test/integration/org/apache/cassandra/sidecar/testing/SharedExecutorNettyOptions.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.testing;
+
+import java.util.concurrent.ThreadFactory;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import com.datastax.driver.core.NettyOptions;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Timer;
+
+/**
+ * This class is used to encapsulate several heavy-weight objects that can be shared across all executions
+ * of tests within a test run. It intentionally does not close these resources when the Cassandra Driver's
+ * cluster is closed, so they can be reused for the next test.
+ */
+class SharedExecutorNettyOptions extends NettyOptions
+{
+    private SharedExecutorNettyOptions()
+    {
+    }
+
+    public static final SharedExecutorNettyOptions INSTANCE = new SharedExecutorNettyOptions();
+
+    private final ThreadFactory threadFactory = new ThreadFactoryBuilder()
+                                                .setDaemon(true)
+                                                .setNameFormat("IntegrationTest-%d")
+                                                .build();
+    private final HashedWheelTimer sharedHWT = new HashedWheelTimer(threadFactory);
+    private final EventLoopGroup sharedEventLoopGroup = new NioEventLoopGroup(0, threadFactory);
+
+    public EventLoopGroup eventLoopGroup(ThreadFactory threadFactory)
+    {
+        return sharedEventLoopGroup;
+    }
+
+    public void onClusterClose(EventLoopGroup eventLoopGroup)
+    {
+    }
+
+    @Override
+    public Timer timer(ThreadFactory threadFactory)
+    {
+        return sharedHWT;
+    }
+
+    public void onClusterClose(Timer timer)
+    {
+    }
+}
diff --git a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
index 57519ca..4aaa055 100644
--- a/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
+++ b/src/test/integration/org/apache/cassandra/testing/AbstractCassandraTestContext.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.testing;
 
+import java.util.Objects;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,10 +69,24 @@
             {
                 cluster.close();
             }
-            catch (ShutdownException shutdownException)
+            // ShutdownException may be thrown from a different classloader, and therefore the standard
+            // `catch (ShutdownException)` won't always work - compare the canonical names instead.
+            catch (Throwable t)
             {
-                LOGGER.warn("Encountered shutdown exception which closing the cluster", shutdownException);
+                if (Objects.equals(t.getClass().getCanonicalName(), ShutdownException.class.getCanonicalName()))
+                {
+                    LOGGER.warn("Encountered shutdown exception which closing the cluster", t);
+                }
+                else
+                {
+                    throw t;
+                }
             }
         }
     }
+
+    public int clusterSize()
+    {
+        return annotation.numDcs() * annotation.nodesPerDc();
+    }
 }
diff --git a/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java b/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
index efa7a12..c52471d 100644
--- a/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/testing/CassandraIntegrationTest.java
@@ -68,11 +68,11 @@
     int numDataDirsPerInstance() default 1;
 
     /**
-     * Returns whether gossip is enabled or disabled for the integration test. Defaults to {@code false}.
+     * Returns whether gossip is enabled or disabled for the integration test. Defaults to {@code true}.
      *
      * @return whether gossip is enabled or disabled for the integration test
      */
-    boolean gossip() default false;
+    boolean gossip() default true;
 
     /**
      * Returns whether internode networking is enabled or disabled for the integration test. Defaults to {@code false}.
diff --git a/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java b/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
index d6fd852..8498776 100644
--- a/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
+++ b/src/test/integration/org/apache/cassandra/testing/CassandraTestContext.java
@@ -39,7 +39,7 @@
     public String toString()
     {
         return "CassandraTestContext{"
-               + ", version=" + version
+               + "version=" + version
                + ", cluster=" + cluster
                + '}';
     }
diff --git a/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java b/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
index 94683ba..cbd9ff6 100644
--- a/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
+++ b/src/test/integration/org/apache/cassandra/testing/CassandraTestTemplate.java
@@ -307,8 +307,13 @@
         System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true");
         // End gossip delay settings
         // Set the location of dtest jars
-        System.setProperty("cassandra.test.dtest_jar_path", "dtest-jars");
+        System.setProperty("cassandra.test.dtest_jar_path",
+                           System.getProperty("cassandra.test.dtest_jar_path", "dtest-jars"));
         // Disable tcnative in netty as it can cause jni issues and logs lots errors
         System.setProperty("cassandra.disable_tcactive_openssl", "true");
+        // As we enable gossip by default, make the checks happen faster
+        System.setProperty("cassandra.gossip_settle_min_wait_ms", "500"); // Default 5000
+        System.setProperty("cassandra.gossip_settle_interval_ms", "250"); // Default 1000
+        System.setProperty("cassandra.gossip_settle_poll_success_required", "6"); // Default 3
     }
 }
diff --git a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
index 8b7be5e..d7cd14b 100644
--- a/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
+++ b/src/test/integration/org/apache/cassandra/testing/SimpleCassandraVersion.java
@@ -164,9 +164,6 @@
     @Override
     public String toString()
     {
-        StringBuilder sb = new StringBuilder();
-        sb.append(major).append('.').append(minor).append('.').append(patch);
-
-        return sb.toString();
+        return major + "." + minor + "." + patch;
     }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java b/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
index 23d7001..62b2ac9 100644
--- a/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/config/SidecarConfigurationTest.java
@@ -19,7 +19,10 @@
 package org.apache.cassandra.sidecar.config;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -172,6 +175,21 @@
                                "valid values are (NONE,REQUEST,REQUIRED)");
     }
 
+    @Test
+    void testDriverParameters() throws IOException
+    {
+        Path yamlPath = yaml("config/sidecar_driver_params.yaml");
+        SidecarConfiguration config = SidecarConfigurationImpl.readYamlConfiguration(yamlPath);
+
+        DriverConfiguration driverConfiguration = config.driverConfiguration();
+        assertThat(driverConfiguration).isNotNull();
+        assertThat(driverConfiguration.localDc()).isEqualTo("dc1");
+        List<InetSocketAddress> endpoints = Arrays.asList(new InetSocketAddress("127.0.0.1", 9042),
+                                                                  new InetSocketAddress("127.0.0.2", 9042));
+        assertThat(driverConfiguration.contactPoints()).isEqualTo(endpoints);
+        assertThat(driverConfiguration.numConnections()).isEqualTo(6);
+    }
+
     void validateSingleInstanceSidecarConfiguration(SidecarConfiguration config)
     {
         assertThat(config.cassandraInstances()).isNotNull().hasSize(1);
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/V30.java b/src/test/java/org/apache/cassandra/sidecar/mocks/V30.java
index da23329..dbf4b78 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/V30.java
+++ b/src/test/java/org/apache/cassandra/sidecar/mocks/V30.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.mocks;
 
+import java.net.InetSocketAddress;
+
 import org.apache.cassandra.sidecar.common.CQLSessionProvider;
 import org.apache.cassandra.sidecar.common.ICassandraAdapter;
 import org.apache.cassandra.sidecar.common.ICassandraFactory;
@@ -31,7 +33,9 @@
 public class V30 implements ICassandraFactory
 {
     @Override
-    public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient)
+    public ICassandraAdapter create(CQLSessionProvider session,
+                                    JmxClient jmxClient,
+                                    InetSocketAddress localNativeTransportAddress)
     {
         return null;
     }
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/V40.java b/src/test/java/org/apache/cassandra/sidecar/mocks/V40.java
index 6c419f7..fe3685b 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/V40.java
+++ b/src/test/java/org/apache/cassandra/sidecar/mocks/V40.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.mocks;
 
+import java.net.InetSocketAddress;
+
 import org.apache.cassandra.sidecar.common.CQLSessionProvider;
 import org.apache.cassandra.sidecar.common.ICassandraAdapter;
 import org.apache.cassandra.sidecar.common.ICassandraFactory;
@@ -31,7 +33,9 @@
 public class V40 implements ICassandraFactory
 {
     @Override
-    public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient)
+    public ICassandraAdapter create(CQLSessionProvider session,
+                                    JmxClient jmxClient,
+                                    InetSocketAddress localNativeTransportAddress)
     {
         return null;
     }
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/V41.java b/src/test/java/org/apache/cassandra/sidecar/mocks/V41.java
index 50fb3ab..13d4c83 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/V41.java
+++ b/src/test/java/org/apache/cassandra/sidecar/mocks/V41.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.sidecar.mocks;
 
+import java.net.InetSocketAddress;
+
 import org.apache.cassandra.sidecar.common.CQLSessionProvider;
 import org.apache.cassandra.sidecar.common.ICassandraAdapter;
 import org.apache.cassandra.sidecar.common.ICassandraFactory;
@@ -31,7 +33,9 @@
 public class V41 implements ICassandraFactory
 {
     @Override
-    public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient)
+    public ICassandraAdapter create(CQLSessionProvider session,
+                                    JmxClient jmxClient,
+                                    InetSocketAddress localNativeTransportAddress)
     {
         return null;
     }
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
index 1705c1f..86ee719 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/sstableuploads/BaseUploadsHandlerTest.java
@@ -205,7 +205,7 @@
         @Singleton
         public InstancesConfig instancesConfig(Vertx vertx)
         {
-            return mockInstancesConfig(vertx, canonicalTemporaryPath, delegate, delegate, null, null);
+            return mockInstancesConfig(vertx, canonicalTemporaryPath, delegate, null);
         }
 
         @Singleton
diff --git a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
index b6a0c92..3ea3d84 100644
--- a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
+++ b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotUtils.java
@@ -29,6 +29,7 @@
 import java.util.List;
 
 import io.vertx.core.Vertx;
+import org.apache.cassandra.sidecar.cluster.CQLSessionProviderImpl;
 import org.apache.cassandra.sidecar.cluster.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
 import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
@@ -93,30 +94,24 @@
 
     public static InstancesConfig mockInstancesConfig(Vertx vertx, String rootPath)
     {
-        CQLSessionProvider mockSession1 = mock(CQLSessionProvider.class);
-        CQLSessionProvider mockSession2 = mock(CQLSessionProvider.class);
-        return mockInstancesConfig(vertx, rootPath, null, null, mockSession1, mockSession2);
+        CQLSessionProvider mockSession1 = mock(CQLSessionProviderImpl.class);
+        return mockInstancesConfig(vertx, rootPath, null, mockSession1);
     }
 
     public static InstancesConfig mockInstancesConfig(Vertx vertx,
                                                       String rootPath,
-                                                      CassandraAdapterDelegate delegate1,
-                                                      CassandraAdapterDelegate delegate2,
-                                                      CQLSessionProvider cqlSessionProvider1,
-                                                      CQLSessionProvider cqlSessionProvider2)
+                                                      CassandraAdapterDelegate delegate,
+                                                      CQLSessionProvider cqlSessionProvider1)
     {
         CassandraVersionProvider.Builder versionProviderBuilder = new CassandraVersionProvider.Builder();
         versionProviderBuilder.add(new MockCassandraFactory());
         CassandraVersionProvider versionProvider = versionProviderBuilder.build();
+        String stagingDir = makeStagingDir(rootPath);
 
-        if (delegate1 == null)
+        if (delegate == null)
         {
-            delegate1 = new CassandraAdapterDelegate(vertx, 1, versionProvider, cqlSessionProvider1, null, null);
-        }
-
-        if (delegate2 == null)
-        {
-            delegate2 = new CassandraAdapterDelegate(vertx, 2, versionProvider, cqlSessionProvider2, null, null);
+            delegate = new CassandraAdapterDelegate(vertx, 1, versionProvider, cqlSessionProvider1, null, null,
+                                                    "localhost1", 9042);
         }
 
         InstanceMetadataImpl localhost = InstanceMetadataImpl.builder()
@@ -124,16 +119,16 @@
                                                              .host("localhost")
                                                              .port(9043)
                                                              .dataDirs(Collections.singletonList(rootPath + "/d1"))
-                                                             .stagingDir(makeStagingDir(rootPath))
-                                                             .delegate(delegate1)
+                                                             .stagingDir(stagingDir)
+                                                             .delegate(delegate)
                                                              .build();
         InstanceMetadataImpl localhost2 = InstanceMetadataImpl.builder()
                                                               .id(2)
                                                               .host("localhost2")
                                                               .port(9043)
                                                               .dataDirs(Collections.singletonList(rootPath + "/d2"))
-                                                              .stagingDir(makeStagingDir(rootPath))
-                                                              .delegate(delegate2)
+                                                              .stagingDir(stagingDir)
+                                                              .delegate(delegate)
                                                               .build();
         List<InstanceMetadata> instanceMetas = Arrays.asList(localhost, localhost2);
         return new InstancesConfigImpl(instanceMetas, DnsResolver.DEFAULT);
diff --git a/src/test/resources/config/sidecar_driver_params.yaml b/src/test/resources/config/sidecar_driver_params.yaml
new file mode 100644
index 0000000..bb0e8e3
--- /dev/null
+++ b/src/test/resources/config/sidecar_driver_params.yaml
@@ -0,0 +1,6 @@
+driver_parameters:
+  contact_points:
+    - "127.0.0.1:9042"
+    - "127.0.0.2:9042"
+  num_connections: 6
+  local_dc: dc1
\ No newline at end of file