CASSANDRASC-51: Use in-jvm dtest framework for integration tests

This commit replaces the existing TestContainers-based test template
with one that leverages Cassandra's in-jvm dtest framework now that it
has JMX support. This will allow for more complete testing of endpoints,
especially when those endpoints need to react to cluster changes like
moves, joins, and leaves.

patch by Doug Rohrer; reviewed by Dinesh Joshi, Yifan Cai, Francisco Guerrero, Bernardo Botella Corbi for CASSANDRASC-51
diff --git a/.circleci/config.yml b/.circleci/config.yml
index e27581b..124250f 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -41,79 +41,39 @@
 
 jobs:
   # Runs java 8 tests on a docker image
-  java8_docker:
+  java8:
     docker:
      - image: circleci/openjdk:8-jdk-stretch
     steps:
      - checkout
 
-     - run: ./gradlew -i clean test -x integrationTest --stacktrace
+     - run: BRANCHES="cassandra-4.0 cassandra-4.1" scripts/build-dtest-jars.sh
+     - run: echo 'export DTEST_JAR=$(cd dtest-jars && find . -name 'dtest-4.1*.jar' -maxdepth 1)' >> "$BASH_ENV"
+     - run: ./gradlew -i test integrationTest --stacktrace -Dcassandra.sidecar.versions_to_test="4.0,4.1"
 
      - store_artifacts:
          path: build/reports
          destination: test-reports
 
      - store_test_results:
-         path: ~/repo/build/test-results/
-
-  # Runs java 8 tests on a VM to be able to run Docker for integration tests
-  java8:
-    <<: *base_job
-
-    steps:
-      - checkout
-      - install_common
-
-      - install_java:
-          version: adoptopenjdk-8-hotspot
-      - run: sudo update-java-alternatives -s adoptopenjdk-8-hotspot-amd64 && java -version
-      - run: ./gradlew -i clean test integrationTest --stacktrace
-
-      - store_artifacts:
-          path: build/reports
-          destination: test-reports
-
-      - store_test_results:
-          path: ~/repo/build/test-results/
-
-      - store_test_results:
-          path: ~/repo/cassandra-integration-tests/build/test-results/
+         path: build/test-results/
 
   # Runs java 11 tests on a docker image
-  java11_docker:
+  java11:
     docker:
       - image: circleci/openjdk:11-jdk-stretch
     steps:
       - checkout
 
-      - run: ./gradlew -i clean test -x integrationTest --stacktrace
+      - run: CASSANDRA_USE_JDK11=true scripts/build-dtest-jars.sh
+      - run: ./gradlew -i test integrationTest --stacktrace
 
       - store_artifacts:
           path: build/reports
           destination: test-reports
 
       - store_test_results:
-          path: ~/repo/build/test-results/
-
-  # Runs java 11 tests on a VM to be able to run Docker for integration tests
-  java11:
-    <<: *base_job
-
-    steps:
-      - checkout
-      - install_common
-
-      - install_java:
-          version: adoptopenjdk-11-hotspot
-      - run: sudo update-java-alternatives -s adoptopenjdk-11-hotspot-amd64 && java -version
-      - run: ./gradlew -i clean test integrationTest --stacktrace
-
-      - store_artifacts:
-          path: build/reports
-          destination: test-reports
-
-      - store_test_results:
-          path: ~/repo/build/test-results/
+          path: build/test-results/
 
   # ensures we can build and install deb packages
   deb_build_install:
@@ -156,41 +116,25 @@
   version: 2
   build-and-test:
     jobs:
-      - java8_docker
-      - java11_docker
-      - java8:
-          requires:
-            - java8_docker
-      - java11:
-          requires:
-            - java11_docker
+      - java8
+      - java11
       - docs_build:
           requires:
             - java8
-            - java8_docker
             - java11
-            - java11_docker
       - docker_build:
           requires:
             - java8
-            - java8_docker
             - java11
-            - java11_docker
       - rpm_build_install:
           requires:
             - java8
-            - java8_docker
             - java11
-            - java11_docker
       - deb_build_install:
           requires:
             - java8
-            - java8_docker
             - java11
-            - java11_docker
       - docker_build:
           requires:
             - java8
-            - java8_docker
             - java11
-            - java11_docker
diff --git a/.gitignore b/.gitignore
index 86800d7..fa89706 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,3 +87,6 @@
 
 # Local gradle cache
 .gradle
+
+dtest-jars
+scripts/dependency-reduced-pom.xml
diff --git a/CHANGES.txt b/CHANGES.txt
index 03259f2..1bf9841 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Use in-jvm dtest framework for integration tests (CASSANDRASC-51)
  * Sidecar returns own version in node settings (CASSANDRASC-52)
  * Deprecate the sidecar cassandra health endpoint containing instance segment (CASSANDRASC-50)
  * Add an endpoint that gives information about the release version & partitioner name of a node (CASSANDRASC-48)
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index efdd437..5d61de0 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -17,6 +17,7 @@
   * [Asynchronous Handlers](#async-handlers)
   * [Future Composition](#future-composition)
   * [Failure Handling](#failure-handling)
+  * [Cassandra Adapters](#cassandra-adapters)
 * [Source Code Style](#source-code-style)
 
 ## <a name="how-to-contribute"></a>How to Contribute
@@ -329,6 +330,17 @@
 }
 ```
 
+### <a name="cassandra-adapters></a>Cassandra Adapters
+The Cassandra Sidecar is designed to support multiple Cassandra versions, including multiple, different instances on the same host.
+The `adapters` subproject contains an implementation of the Cassandra adapter for different versions of Cassandra.
+The `base` adapter supports Cassandra 4.0 and greater, including `trunk`.
+When some form of breaking change is necessary in `base`, the new functionality should be developed in the `base` adapter,
+and then any shim necessary to work with the older version(s) should be added to a new adapter package.
+The `CassandraAdapterFactory` has a version tag which represents the minimum version that particular adapter package supports.
+When adding shims, implement the minimum necessary changes in the new package and name all classes with a version number after the word `Cassandra`.
+For example, if `base`'s minimum version is moved to 5.0, a Cassandra40 adapter package/subproject should be added, with a minimum version of 4.0.0.
+Within that project, the classes should all be named `Cassandra40*`, so `Cassandra40Adapter`, `Cassandra40Factory`, etc.
+
 ## <a name="source-code-style"></a>Source Code Style
 
 The project provides an
diff --git a/README.md b/README.md
index 21991bf..3bcbe16 100644
--- a/README.md
+++ b/README.md
@@ -11,6 +11,24 @@
   2. Apache Cassandra 4.0.  We depend on virtual tables which is a 4.0 only feature.
   3. [Docker](https://www.docker.com/products/docker-desktop/) for running integration tests.
 
+Build Prerequisites
+-------------------
+We depend on the Cassandra in-jvm dtest framework for testing. 
+Because these jars are not published, you must manually build the dtest jars before you can build the project.
+
+```shell
+./scripts/build-dtest-jars.sh
+```
+
+The build script supports two parameters:
+- `REPO` - the Cassandra git repository to use for the source files. This is helpful if you need to test with a fork of the Cassandra codebase.
+    - default: `git@github.com:apache/cassandra.git`
+- `BRANCHES` - a space-delimited list of branches to build.
+  -default: `"cassandra-4.1 trunk"`
+
+Remove any versions you may not want to test with. We recommend at least the latest (released) 4.X series and `trunk`.
+See Testing for more details on how to choose which Cassandra versions to use while testing.
+
 Getting started: Running The Sidecar
 --------------------------------------
 
@@ -27,9 +45,9 @@
 Testing
 -------
 
-We rely on docker containers for integration tests.
-
-The only requirement is to install and run [Docker](https://www.docker.com/products/docker-desktop/) on your test machine.
+The test framework is set up to run 4.1 and 5.0 (Trunk) tests (see `TestVersionSupplier.java`) by default.  
+You can change this via the Java property `cassandra.sidecar.versions_to_test` by supplying a comma-delimited string.
+For example, `-Dcassandra.sidecar.versions_to_test=4.0,4.1,5.0`.
 
 CircleCI Testing
 -----------------
diff --git a/cassandra40/build.gradle b/adapters/base/build.gradle
similarity index 89%
rename from cassandra40/build.gradle
rename to adapters/base/build.gradle
index 170b073..50b3d93 100644
--- a/cassandra40/build.gradle
+++ b/adapters/base/build.gradle
@@ -2,6 +2,7 @@
     id 'java-library'
     id 'idea'
     id 'maven-publish'
+    id "com.github.spotbugs"
 }
 
 group 'org.apache.cassandra.sidecar'
@@ -18,10 +19,10 @@
 }
 
 dependencies {
-    implementation(project(":common"))
+    api(project(":common"))
+    api("com.google.guava:guava:${project.rootProject.guavaVersion}")
     compileOnly('org.jetbrains:annotations:23.0.0')
     compileOnly('com.datastax.cassandra:cassandra-driver-core:3.11.3')
-    implementation("com.google.guava:guava:${project.rootProject.guavaVersion}")
     implementation("org.slf4j:slf4j-api:${project.slf4jVersion}")
 
     testImplementation "org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
new file mode 100644
index 0000000..a730bfe
--- /dev/null
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraAdapter.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.ClusterMembershipOperations;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.NodeSettings;
+import org.apache.cassandra.sidecar.common.StorageOperations;
+import org.apache.cassandra.sidecar.common.TableOperations;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * A {@link ICassandraAdapter} implementation for Cassandra 4.0 and later
+ */
+public class CassandraAdapter implements ICassandraAdapter
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraAdapter.class);
+    protected final DnsResolver dnsResolver;
+    protected final JmxClient jmxClient;
+    private final CQLSessionProvider session;
+    private final String sidecarVersion;
+
+    public CassandraAdapter(DnsResolver dnsResolver, JmxClient jmxClient, CQLSessionProvider session,
+                            String sidecarVersion)
+    {
+        this.dnsResolver = dnsResolver;
+        this.jmxClient = jmxClient;
+        this.session = session;
+        this.sidecarVersion = sidecarVersion;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @Nullable
+    public Metadata metadata()
+    {
+        Session activeSession = session.localCql();
+        if (activeSession == null)
+        {
+            LOGGER.warn("There is no active session to Cassandra");
+            return null;
+        }
+
+        if (activeSession.getCluster() == null)
+        {
+            LOGGER.warn("There is no available cluster for session={}", activeSession);
+            return null;
+        }
+
+        if (activeSession.getCluster().getMetadata() == null)
+        {
+            LOGGER.warn("There is no available metadata for session={}, cluster={}",
+                        activeSession, activeSession.getCluster());
+        }
+
+        return activeSession.getCluster().getMetadata();
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @Nullable
+    public NodeSettings nodeSettings()
+    {
+        Session activeSession = session.localCql();
+        if (activeSession == null)
+        {
+            return null;
+        }
+
+        Row oneResult = activeSession.execute("select release_version, partitioner from system.local")
+                                     .one();
+
+        return new NodeSettings(oneResult.getString("release_version"),
+                                oneResult.getString("partitioner"),
+                                sidecarVersion);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public StorageOperations storageOperations()
+    {
+        return new CassandraStorageOperations(jmxClient, dnsResolver);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public ClusterMembershipOperations clusterMembershipOperations()
+    {
+        return new CassandraClusterMembershipOperations(jmxClient);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public TableOperations tableOperations()
+    {
+        return new CassandraTableOperations(jmxClient);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString()
+    {
+        return "CassandraAdapter" + "@" + Integer.toHexString(hashCode());
+    }
+}
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40ClusterMembershipOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraClusterMembershipOperations.java
similarity index 80%
rename from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40ClusterMembershipOperations.java
rename to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraClusterMembershipOperations.java
index c9e274e..9205f60 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40ClusterMembershipOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraClusterMembershipOperations.java
@@ -16,17 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.adapters.base;
 
 import org.apache.cassandra.sidecar.common.ClusterMembershipOperations;
 import org.apache.cassandra.sidecar.common.JmxClient;
 
-import static org.apache.cassandra.sidecar.cassandra40.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
+import static org.apache.cassandra.sidecar.adapters.base.ClusterMembershipJmxOperations.FAILURE_DETECTOR_OBJ_NAME;
 
 /**
- * An implementation of the {@link ClusterMembershipOperations} that interfaces with Cassandra 4.0
+ * An implementation of the {@link ClusterMembershipOperations} that interfaces with Cassandra 4.0 and later
  */
-public class Cassandra40ClusterMembershipOperations implements ClusterMembershipOperations
+public class CassandraClusterMembershipOperations implements ClusterMembershipOperations
 {
     private final JmxClient jmxClient;
 
@@ -35,7 +35,7 @@
      *
      * @param jmxClient the JMX client used to communicate with the Cassandra instance
      */
-    public Cassandra40ClusterMembershipOperations(JmxClient jmxClient)
+    public CassandraClusterMembershipOperations(JmxClient jmxClient)
     {
         this.jmxClient = jmxClient;
     }
diff --git a/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java
new file mode 100644
index 0000000..57abe35
--- /dev/null
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.adapters.base;
+
+import org.apache.cassandra.sidecar.common.CQLSessionProvider;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.ICassandraFactory;
+import org.apache.cassandra.sidecar.common.JmxClient;
+import org.apache.cassandra.sidecar.common.MinimumVersion;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+
+/**
+ * Factory to produce the 4.0 adapter
+ */
+@MinimumVersion("4.0.0")
+public class CassandraFactory implements ICassandraFactory
+{
+    protected final DnsResolver dnsResolver;
+    private final String sidecarVersion;
+
+    public CassandraFactory(DnsResolver dnsResolver, String sidecarVersion)
+    {
+        this.dnsResolver = dnsResolver;
+        this.sidecarVersion = sidecarVersion;
+    }
+
+    /**
+     * Returns a new adapter for Cassandra 4.0 clusters.
+     *
+     * @param session   the session to the Cassandra database
+     * @param jmxClient the JMX client to connect to the Cassandra database
+     * @return a new adapter for the 4.0 clusters
+     */
+    @Override
+    public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient)
+    {
+        return new CassandraAdapter(dnsResolver, jmxClient, session, sidecarVersion);
+    }
+}
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40StorageOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
similarity index 61%
rename from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40StorageOperations.java
rename to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
index f1cdd45..dfe2198 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40StorageOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraStorageOperations.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.adapters.base;
 
+import java.io.IOException;
 import java.net.UnknownHostException;
 import java.util.Map;
 
@@ -28,30 +29,37 @@
 import org.apache.cassandra.sidecar.common.StorageOperations;
 import org.apache.cassandra.sidecar.common.data.RingResponse;
 import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.apache.cassandra.sidecar.common.exceptions.NodeBootstrappingException;
+import org.apache.cassandra.sidecar.common.exceptions.SnapshotAlreadyExistsException;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.util.Objects.requireNonNull;
-import static org.apache.cassandra.sidecar.cassandra40.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
 
 /**
- * An implementation of the {@link StorageOperations} that interfaces with Cassandra 4.0
+ * An implementation of the {@link StorageOperations} that interfaces with Cassandra 4.0 and later
  */
-public class Cassandra40StorageOperations implements StorageOperations
+public class CassandraStorageOperations implements StorageOperations
 {
-    private static final Logger LOGGER = LoggerFactory.getLogger(Cassandra40StorageOperations.class);
-    private final JmxClient jmxClient;
-    private final RingProvider ringProvider;
+    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraStorageOperations.class);
+    protected final JmxClient jmxClient;
+    protected final RingProvider ringProvider;
 
     /**
      * Creates a new instance with the provided {@link JmxClient}
      *
      * @param jmxClient the JMX client used to communicate with the Cassandra instance
      */
-    public Cassandra40StorageOperations(JmxClient jmxClient, DnsResolver dnsResolver)
+    public CassandraStorageOperations(JmxClient jmxClient, DnsResolver dnsResolver)
+    {
+        this(jmxClient, new RingProvider(jmxClient, dnsResolver));
+    }
+
+    public CassandraStorageOperations(JmxClient jmxClient, RingProvider ringProvider)
     {
         this.jmxClient = jmxClient;
-        this.ringProvider = new RingProvider(jmxClient, dnsResolver);
+        this.ringProvider = ringProvider;
     }
 
     /**
@@ -64,8 +72,31 @@
         requireNonNull(tag, "snapshot tag must be non-null");
         requireNonNull(keyspace, "keyspace for the  must be non-null");
         requireNonNull(table, "table must be non-null");
-        jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
-                 .takeSnapshot(tag, options, keyspace + "." + table);
+        try
+        {
+            jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME)
+                     .takeSnapshot(tag, options, keyspace + "." + table);
+        }
+        catch (IOException e)
+        {
+            String errorMessage = e.getMessage();
+            if (errorMessage != null)
+            {
+                if (errorMessage.contains("Snapshot " + tag + " already exists"))
+                {
+                    throw new SnapshotAlreadyExistsException(e);
+                }
+                else if (errorMessage.contains("Keyspace " + keyspace + " does not exist"))
+                {
+                    throw new IllegalArgumentException(e);
+                }
+                else if (errorMessage.contains("Cannot snapshot until bootstrap completes"))
+                {
+                    throw new NodeBootstrappingException(e);
+                }
+            }
+            throw new RuntimeException(e);
+        }
     }
 
     /**
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40TableOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
similarity index 92%
rename from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40TableOperations.java
rename to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
index f94f626..d982eef 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40TableOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/CassandraTableOperations.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.adapters.base;
 
 import java.util.Collections;
 import java.util.List;
@@ -26,13 +26,13 @@
 import org.jetbrains.annotations.NotNull;
 
 /**
- * An implementation of the {@link TableOperations} that interfaces with Cassandra 4.0
+ * An implementation of the {@link TableOperations} that interfaces with Cassandra 4.0 and later
  */
-public class Cassandra40TableOperations implements TableOperations
+public class CassandraTableOperations implements TableOperations
 {
     private final JmxClient jmxClient;
 
-    public Cassandra40TableOperations(JmxClient jmxClient)
+    public CassandraTableOperations(JmxClient jmxClient)
     {
         this.jmxClient = jmxClient;
     }
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/ClusterMembershipJmxOperations.java
similarity index 95%
rename from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java
rename to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/ClusterMembershipJmxOperations.java
index b91aa9c..b259e9c 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/ClusterMembershipJmxOperations.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.adapters.base;
 
 /**
  * An interface that pulls a method from Cassandra Gossiper and Failure Detector proxies
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/EndpointSnitchJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/EndpointSnitchJmxOperations.java
similarity index 96%
rename from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/EndpointSnitchJmxOperations.java
rename to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/EndpointSnitchJmxOperations.java
index 97f2478..96fc4dc 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/EndpointSnitchJmxOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/EndpointSnitchJmxOperations.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.adapters.base;
 
 import java.net.UnknownHostException;
 
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/RingProvider.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/RingProvider.java
similarity index 81%
rename from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/RingProvider.java
rename to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/RingProvider.java
index 92a91c9..16554d6 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/RingProvider.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/RingProvider.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.adapters.base;
 
 import java.net.UnknownHostException;
 import java.text.DecimalFormat;
@@ -35,8 +35,8 @@
 import org.apache.cassandra.sidecar.common.dns.DnsResolver;
 import org.jetbrains.annotations.Nullable;
 
-import static org.apache.cassandra.sidecar.cassandra40.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
-import static org.apache.cassandra.sidecar.cassandra40.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
+import static org.apache.cassandra.sidecar.adapters.base.EndpointSnitchJmxOperations.ENDPOINT_SNITCH_INFO_OBJ_NAME;
+import static org.apache.cassandra.sidecar.adapters.base.StorageJmxOperations.STORAGE_SERVICE_OBJ_NAME;
 
 /**
  * Aggregates the ring view of cluster
@@ -54,7 +54,7 @@
     private static final String STATE_NORMAL = "Normal";
     private static final String DECIMAL_FORMAT = "##0.00%";
 
-    private final JmxClient jmxClient;
+    protected final JmxClient jmxClient;
     private final DnsResolver dnsResolver;
 
     public RingProvider(JmxClient jmxClient, DnsResolver dnsResolver)
@@ -66,32 +66,31 @@
     @SuppressWarnings("UnstableApiUsage")
     public RingResponse ring(@Nullable String keyspace) throws UnknownHostException
     {
-        StorageJmxOperations probe = jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
-        EndpointSnitchJmxOperations epSnitchInfo = jmxClient.proxy(EndpointSnitchJmxOperations.class,
-                                                                   ENDPOINT_SNITCH_INFO_OBJ_NAME);
+        StorageJmxOperations storageOps = initializeStorageOps();
+        EndpointSnitchJmxOperations epSnitchInfo = initializeEndpointProxy();
 
         // Collect required data from the probe
-        List<String> liveNodes = probe.getLiveNodesWithPort();
-        List<String> deadNodes = probe.getUnreachableNodesWithPort();
+        List<String> liveNodes = storageOps.getLiveNodesWithPort();
+        List<String> deadNodes = storageOps.getUnreachableNodesWithPort();
         Status status = new Status(liveNodes, deadNodes);
-        List<String> joiningNodes = probe.getJoiningNodesWithPort();
-        List<String> leavingNodes = probe.getLeavingNodesWithPort();
-        List<String> movingNodes = probe.getMovingNodesWithPort();
+        List<String> joiningNodes = storageOps.getJoiningNodesWithPort();
+        List<String> leavingNodes = storageOps.getLeavingNodesWithPort();
+        List<String> movingNodes = storageOps.getMovingNodesWithPort();
         State state = new State(joiningNodes, leavingNodes, movingNodes);
-        Map<String, String> loadMap = probe.getLoadMapWithPort();
-        Map<String, String> tokensToEndpoints = probe.getTokenToEndpointWithPortMap();
-        Map<String, String> endpointsToHostIds = probe.getEndpointWithPortToHostId();
+        Map<String, String> loadMap = storageOps.getLoadMapWithPort();
+        Map<String, String> tokensToEndpoints = storageOps.getTokenToEndpointWithPortMap();
+        Map<String, String> endpointsToHostIds = storageOps.getEndpointWithPortToHostId();
 
         boolean showEffectiveOwnership = true;
         // Calculate per-token ownership of the ring
         Map<String, Float> ownerships;
         try
         {
-            ownerships = probe.effectiveOwnershipWithPort(keyspace);
+            ownerships = storageOps.effectiveOwnershipWithPort(keyspace);
         }
         catch (IllegalStateException ex)
         {
-            ownerships = probe.getOwnershipWithPort();
+            ownerships = storageOps.getOwnershipWithPort();
             LOGGER.warn("Unable to retrieve effective ownership information for keyspace={}", keyspace, ex);
             showEffectiveOwnership = false;
         }
@@ -124,6 +123,16 @@
         return response;
     }
 
+    protected EndpointSnitchJmxOperations initializeEndpointProxy()
+    {
+        return jmxClient.proxy(EndpointSnitchJmxOperations.class, ENDPOINT_SNITCH_INFO_OBJ_NAME);
+    }
+
+    protected StorageJmxOperations initializeStorageOps()
+    {
+        return jmxClient.proxy(StorageJmxOperations.class, STORAGE_SERVICE_OBJ_NAME);
+    }
+
     /**
      * Resolves the endpoint to the format "IP_ADDRESS:PORT"
      *
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/StorageJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
similarity index 95%
rename from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/StorageJmxOperations.java
rename to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
index ea108c4..369630b 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/StorageJmxOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/StorageJmxOperations.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.adapters.base;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
@@ -104,8 +105,9 @@
      * @param tag      the tag given to the snapshot; may not be null or empty
      * @param options  map of options, for example ttl, skipFlush
      * @param entities list of keyspaces / tables in the form of empty | ks1 ks2 ... | ks1.cf1,ks2.cf2,...
+     * @throws IOException in certain versions of Cassandra.
      */
-    void takeSnapshot(String tag, Map<String, String> options, String... entities);
+    void takeSnapshot(String tag, Map<String, String> options, String... entities) throws IOException;
 
     /**
      * Remove the snapshot with the given name from the given keyspaces.
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/TableJmxOperations.java b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TableJmxOperations.java
similarity index 97%
rename from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/TableJmxOperations.java
rename to adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TableJmxOperations.java
index fca81d6..d320d60 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/TableJmxOperations.java
+++ b/adapters/base/src/main/java/org/apache/cassandra/sidecar/adapters/base/TableJmxOperations.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.adapters.base;
 
 import java.util.List;
 import java.util.Set;
diff --git a/build.gradle b/build.gradle
index 90bf37c..3a3764b 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1,3 +1,4 @@
+import java.nio.file.Paths
 
 buildscript {
     dependencies {
@@ -25,6 +26,10 @@
     id 'com.google.cloud.tools.jib' version '2.2.0'
 }
 
+ext.dtestJar = System.getenv("DTEST_JAR") ?: "dtest-5.0.jar" // trunk is currently 5.0.jar - update when trunk moves
+println("Using DTest jar: ${ext.dtestJar}")
+
+
 allprojects {
     apply plugin: 'jacoco'
     apply plugin: 'checkstyle'
@@ -143,9 +148,6 @@
     testImplementation('com.google.guava:guava-testlib:31.1-jre') {
         exclude group: 'junit', module: 'junit'
     }
-    testImplementation('org.cassandraunit:cassandra-unit-shaded:3.11.2.0') {
-        exclude group: 'junit', module: 'junit'
-    }
 
     testImplementation('com.datastax.cassandra:cassandra-driver-core:3.9.0:tests')
     testImplementation('org.mockito:mockito-core:4.10.0')
@@ -154,7 +156,7 @@
     testImplementation(testFixtures(project(":common")))
 
     implementation(project(":common"))
-    implementation(project(":cassandra40"))
+    implementation(project(":adapters:base"))
 
     testFixturesApi(testFixtures(project(":common")))
     testFixturesImplementation("io.vertx:vertx-junit5:${project.vertxVersion}")
@@ -167,9 +169,9 @@
         exclude group: 'junit', module: 'junit'
     }
 
-    integrationTestImplementation("org.testcontainers:testcontainers:${project.testcontainersVersion}")
-    integrationTestImplementation("org.testcontainers:cassandra:${project.testcontainersVersion}")
-    integrationTestImplementation("org.testcontainers:junit-jupiter:${project.testcontainersVersion}")
+    integrationTestImplementation(files("dtest-jars/${dtestJar}"))
+    integrationTestImplementation("org.apache.cassandra:dtest-api:0.0.15")
+    integrationTestImplementation("org.junit.vintage:junit-vintage-engine:${junitVersion}")
 }
 
 jar {
@@ -234,21 +236,66 @@
     useJUnitPlatform()
     reports {
         junitXml.enabled = true
+        def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", "test-results", "test").toFile()
+        println("Destination directory for unit tests: ${destDir}")
+        junitXml.destination = destDir
         html.enabled = true
     }
+    testLogging {
+        events "passed", "skipped", "failed"
+    }
 }
 
 tasks.register("integrationTest", Test) {
+    if (JavaVersion.current().isJava11Compatible()) {
+        def JDK11_OPTIONS = ['-Djdk.attach.allowAttachSelf=true',
+                             '--add-exports', 'java.base/jdk.internal.misc=ALL-UNNAMED',
+                             '--add-exports', 'java.base/jdk.internal.ref=ALL-UNNAMED',
+                             '--add-exports', 'java.base/sun.nio.ch=ALL-UNNAMED',
+                             '--add-exports', 'java.management.rmi/com.sun.jmx.remote.internal.rmi=ALL-UNNAMED',
+                             '--add-exports', 'java.rmi/sun.rmi.registry=ALL-UNNAMED',
+                             '--add-exports', 'java.rmi/sun.rmi.server=ALL-UNNAMED',
+                             '--add-exports', 'java.sql/java.sql=ALL-UNNAMED',
+                             '--add-opens', 'java.base/java.lang.module=ALL-UNNAMED',
+                             '--add-opens', 'java.base/jdk.internal.loader=ALL-UNNAMED',
+                             '--add-opens', 'java.base/jdk.internal.ref=ALL-UNNAMED',
+                             '--add-opens', 'java.base/jdk.internal.reflect=ALL-UNNAMED',
+                             '--add-opens', 'java.base/jdk.internal.math=ALL-UNNAMED',
+                             '--add-opens', 'java.base/jdk.internal.module=ALL-UNNAMED',
+                             '--add-opens', 'java.base/jdk.internal.util.jar=ALL-UNNAMED',
+                             '--add-opens', 'jdk.management/com.sun.management.internal=ALL-UNNAMED']
+        jvmArgs(JDK11_OPTIONS)
+        println("JVM arguments for $project.name are $allJvmArgs")
+    }
     systemProperty "vertxweb.environment", "dev"
+    // Because tests are forked, we need to explicitly pass the system property from the
+    // Gradle JVM down to the children
+
+    def versionsToTest = System.getProperty("cassandra.sidecar.versions_to_test", null)
+    if (versionsToTest != "" && versionsToTest != null) {
+        systemProperty "cassandra.sidecar.versions_to_test", versionsToTest
+    }
     jacoco {
         enabled = false
     }
     useJUnitPlatform() {
         includeTags "integrationTest"
     }
+    reports {
+        junitXml.enabled = true
+        def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", "test-results", "integration").toFile()
+        println("Destination directory for integration tests: ${destDir}")
+        junitXml.destination = destDir
+        html.enabled = true
+    }
+    testLogging {
+        events "passed", "skipped", "failed"
+    }
     testClassesDirs = sourceSets.integrationTest.output.classesDirs
     classpath = sourceSets.integrationTest.runtimeClasspath
     shouldRunAfter test
+    forkEvery = 1 // DTest framework tends to have issues without forkEvery method
+    maxHeapSize = "8g"
 }
 
 // copy the user documentation to the final build
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40Factory.java b/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40Factory.java
deleted file mode 100644
index f93f074..0000000
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40Factory.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.cassandra40;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datastax.driver.core.Metadata;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import org.apache.cassandra.sidecar.common.CQLSessionProvider;
-import org.apache.cassandra.sidecar.common.ClusterMembershipOperations;
-import org.apache.cassandra.sidecar.common.ICassandraAdapter;
-import org.apache.cassandra.sidecar.common.ICassandraFactory;
-import org.apache.cassandra.sidecar.common.JmxClient;
-import org.apache.cassandra.sidecar.common.MinimumVersion;
-import org.apache.cassandra.sidecar.common.NodeSettings;
-import org.apache.cassandra.sidecar.common.StorageOperations;
-import org.apache.cassandra.sidecar.common.TableOperations;
-import org.apache.cassandra.sidecar.common.dns.DnsResolver;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Factory to produce the 4.0 adapter
- */
-@MinimumVersion("4.0.0")
-public class Cassandra40Factory implements ICassandraFactory
-{
-    private static final Logger LOGGER = LoggerFactory.getLogger(Cassandra40Factory.class);
-
-    private final DnsResolver dnsResolver;
-    private final String sidecarVersion;
-
-    public Cassandra40Factory(DnsResolver dnsResolver, String sidecarVersion)
-    {
-        this.dnsResolver = dnsResolver;
-        this.sidecarVersion = sidecarVersion;
-    }
-
-    /**
-     * Returns a new adapter for Cassandra 4.0 clusters.
-     *
-     * @param session   the session to the Cassandra database
-     * @param jmxClient the JMX client to connect to the Cassandra database
-     * @return a new adapter for the 4.0 clusters
-     */
-    @Override
-    public ICassandraAdapter create(CQLSessionProvider session, JmxClient jmxClient)
-    {
-        return new ICassandraAdapter()
-        {
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            @Nullable
-            public Metadata metadata()
-            {
-                Session activeSession = session.localCql();
-                if (activeSession == null)
-                {
-                    LOGGER.warn("There is no active session to Cassandra");
-                    return null;
-                }
-
-                if (activeSession.getCluster() == null)
-                {
-                    LOGGER.warn("There is no available cluster for session={}", activeSession);
-                    return null;
-                }
-
-                if (activeSession.getCluster().getMetadata() == null)
-                {
-                    LOGGER.warn("There is no available metadata for session={}, cluster={}",
-                                activeSession, activeSession.getCluster());
-                }
-
-                return activeSession.getCluster().getMetadata();
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            @Nullable
-            public NodeSettings nodeSettings()
-            {
-                Session activeSession = session.localCql();
-                if (activeSession == null)
-                {
-                    return null;
-                }
-
-                Row oneResult = activeSession.execute("select release_version, partitioner from system.local")
-                                             .one();
-
-                return new NodeSettings(oneResult.getString("release_version"),
-                                        oneResult.getString("partitioner"),
-                                        sidecarVersion);
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public StorageOperations storageOperations()
-            {
-                return new Cassandra40StorageOperations(jmxClient, dnsResolver);
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public ClusterMembershipOperations clusterMembershipOperations()
-            {
-                return new Cassandra40ClusterMembershipOperations(jmxClient);
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            @Override
-            public TableOperations tableOperations()
-            {
-                return new Cassandra40TableOperations(jmxClient);
-            }
-
-            /**
-             * {@inheritDoc}
-             */
-            public String toString()
-            {
-                return "Cassandra40Adapter" + "@" + Integer.toHexString(hashCode());
-            }
-        };
-    }
-}
diff --git a/client/build.gradle b/client/build.gradle
index 4e530f0..edc4c16 100644
--- a/client/build.gradle
+++ b/client/build.gradle
@@ -1,3 +1,5 @@
+import java.nio.file.Paths
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -40,6 +42,13 @@
     testLogging {
         events "passed", "skipped", "failed"
     }
+    reports {
+        junitXml.enabled = true
+        def destDir = Paths.get(rootProject.rootDir.absolutePath, "build", "test-results", "client").toFile()
+        println("Destination directory for client tests: ${destDir}")
+        junitXml.destination = destDir
+        html.enabled = true
+    }
 }
 
 configurations {
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
index d6bab9a..9f7feaa 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.sidecar.common;
 
+import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 
@@ -246,6 +247,15 @@
             cqlSessionProvider.close();
         }
         nodeSettings = null;
+        try
+        {
+            jmxClient.close();
+        }
+        catch (IOException e)
+        {
+            // Can't throw unchecked exceptions here, so wrap and rethrow
+            throw new RuntimeException(e);
+        }
     }
 
     public SimpleCassandraVersion version()
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java b/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
index 7315ab1..b6182d0 100644
--- a/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/JmxClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.sidecar.common;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.net.MalformedURLException;
 import java.rmi.server.RMIClientSocketFactory;
@@ -41,7 +42,7 @@
 /**
  * A simple wrapper around a JMX connection that makes it easier to get proxy instances.
  */
-public class JmxClient implements NotificationListener
+public class JmxClient implements NotificationListener, Closeable
 {
     public static final String JMX_SERVICE_URL_FMT = "service:jmx:rmi://%s/jndi/rmi://%s:%d/jmxrmi";
     public static final String REGISTRY_CONTEXT_SOCKET_FACTORY = "com.sun.jndi.rmi.factory.socket";
@@ -49,6 +50,7 @@
     private MBeanServerConnection mBeanServerConnection;
     private final Map<String, Object> jmxEnv;
     private boolean connected = false;
+    private JMXConnector jmxConnector;
 
     /**
      * Creates a new client with the provided {@code host} and {@code port}.
@@ -144,7 +146,7 @@
     {
         try
         {
-            JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxServiceURL, jmxEnv);
+            jmxConnector = JMXConnectorFactory.connect(jmxServiceURL, jmxEnv);
             jmxConnector.addConnectionNotificationListener(this, null, null);
             mBeanServerConnection = jmxConnector.getMBeanServerConnection();
             connected = true;
@@ -157,6 +159,7 @@
         }
     }
 
+    @Override
     public void handleNotification(Notification notification, Object handback)
     {
         if (notification instanceof JMXConnectionNotification)
@@ -206,4 +209,15 @@
             throw new RuntimeException(errorMessage, e);
         }
     }
+
+    @Override
+    public synchronized void close() throws IOException
+    {
+        JMXConnector connector = jmxConnector;
+        if (connector != null)
+        {
+            jmxConnector = null;
+            connector.close();
+        }
+    }
 }
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java b/common/src/main/java/org/apache/cassandra/sidecar/common/exceptions/NodeBootstrappingException.java
similarity index 65%
copy from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java
copy to common/src/main/java/org/apache/cassandra/sidecar/common/exceptions/NodeBootstrappingException.java
index b91aa9c..fd91441 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/exceptions/NodeBootstrappingException.java
@@ -16,18 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.common.exceptions;
 
 /**
- * An interface that pulls a method from Cassandra Gossiper and Failure Detector proxies
+ * Exception thrown when a node is bootstrapping while an operation cannot be performed when the node is bootstrapping
  */
-public interface ClusterMembershipJmxOperations
+public class NodeBootstrappingException extends RuntimeException
 {
-    String FAILURE_DETECTOR_OBJ_NAME = "org.apache.cassandra.net:type=FailureDetector";
-
-    /**
-     * Retrieves gossip info with ports included for the nodes
-     * @return gossip info text
-     */
-    String getAllEndpointStatesWithPort();
+    public NodeBootstrappingException(Exception inner)
+    {
+        super(inner);
+    }
 }
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java b/common/src/main/java/org/apache/cassandra/sidecar/common/exceptions/SnapshotAlreadyExistsException.java
similarity index 65%
copy from cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java
copy to common/src/main/java/org/apache/cassandra/sidecar/common/exceptions/SnapshotAlreadyExistsException.java
index b91aa9c..e28a1f9 100644
--- a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/ClusterMembershipJmxOperations.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/exceptions/SnapshotAlreadyExistsException.java
@@ -16,18 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.cassandra40;
+package org.apache.cassandra.sidecar.common.exceptions;
 
 /**
- * An interface that pulls a method from Cassandra Gossiper and Failure Detector proxies
+ * Exception thrown when the snapshot with the same name already exists
  */
-public interface ClusterMembershipJmxOperations
+public class SnapshotAlreadyExistsException extends RuntimeException
 {
-    String FAILURE_DETECTOR_OBJ_NAME = "org.apache.cassandra.net:type=FailureDetector";
-
-    /**
-     * Retrieves gossip info with ports included for the nodes
-     * @return gossip info text
-     */
-    String getAllEndpointStatesWithPort();
+    public SnapshotAlreadyExistsException(Exception inner)
+    {
+        super(inner);
+    }
 }
diff --git a/common/src/test/java/org/apache/cassandra/sidecar/common/JmxClientTest.java b/common/src/test/java/org/apache/cassandra/sidecar/common/JmxClientTest.java
index 9a88c5b..4b716f6 100644
--- a/common/src/test/java/org/apache/cassandra/sidecar/common/JmxClientTest.java
+++ b/common/src/test/java/org/apache/cassandra/sidecar/common/JmxClientTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.sidecar.common;
 
+import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.net.MalformedURLException;
 import java.rmi.registry.LocateRegistry;
@@ -105,26 +106,33 @@
     }
 
     @Test
-    public void testCanCallMethodWithoutEntireInterface()
+    public void testCanCallMethodWithoutEntireInterface() throws IOException
     {
-        JmxClient client = new JmxClient(serviceURL, "controlRole", "password");
-        List<String> result = client.proxy(Import.class, objectName)
-                                    .importNewSSTables(Sets.newHashSet("foo", "bar"), true,
-                                                       true, true, true, true,
-                                                       true);
+        List<String> result;
+        try (JmxClient client = new JmxClient(serviceURL, "controlRole", "password"))
+        {
+            result = client.proxy(Import.class, objectName)
+                           .importNewSSTables(Sets.newHashSet("foo", "bar"), true,
+                                              true, true, true, true,
+                                              true);
+        }
         assertThat(result.size()).isEqualTo(0);
     }
 
     @Test
-    public void testCanCallMethodWithoutEntireInterfaceGetResults()
+    public void testCanCallMethodWithoutEntireInterfaceGetResults() throws IOException
     {
         importMBean.shouldSucceed = false;
-        JmxClient client = new JmxClient(serviceURL, "controlRole", "password");
-        final HashSet<String> srcPaths = Sets.newHashSet("foo", "bar");
-        final List<String> failedDirs = client.proxy(Import.class, objectName)
-                                              .importNewSSTables(srcPaths, true,
-                                                                 true, true, true, true,
-                                                                 true);
+        HashSet<String> srcPaths;
+        List<String> failedDirs;
+        try (JmxClient client = new JmxClient(serviceURL, "controlRole", "password"))
+        {
+            srcPaths = Sets.newHashSet("foo", "bar");
+            failedDirs = client.proxy(Import.class, objectName)
+                               .importNewSSTables(srcPaths, true,
+                                                  true, true, true, true,
+                                                  true);
+        }
         assertThat(failedDirs.size()).isEqualTo(2);
         assertThat(failedDirs.toArray()).isEqualTo(srcPaths.toArray());
     }
@@ -135,54 +143,61 @@
         assertThatExceptionOfType(SecurityException.class)
         .isThrownBy(() ->
                     {
-                        JmxClient client = new JmxClient(serviceURL);
-                        client.proxy(Import.class, objectName)
-                              .importNewSSTables(Sets.newHashSet("foo", "bar"),
-                                                 true,
-                                                 true,
-                                                 true,
-                                                 true,
-                                                 true,
-                                                 true);
+                        try (JmxClient client = new JmxClient(serviceURL))
+                        {
+                            client.proxy(Import.class, objectName)
+                                  .importNewSSTables(Sets.newHashSet("foo", "bar"),
+                                                     true,
+                                                     true,
+                                                     true,
+                                                     true,
+                                                     true,
+                                                     true);
+                        }
                     });
     }
 
     @Test
     public void testDisconnectReconnect() throws Exception
     {
-        JmxClient client = new JmxClient(serviceURL, "controlRole", "password");
-        assertThat(client.isConnected()).isFalse();
-        List<String> result = client.proxy(Import.class, objectName)
-                                    .importNewSSTables(
-                                    Sets.newHashSet("foo", "bar"), true, true, true,
-                                    true, true,
-                                    true);
-        assertThat(client.isConnected()).isTrue();
-        assertThat(result.size()).isEqualTo(0);
+        List<String> result;
+        try (JmxClient client = new JmxClient(serviceURL, "controlRole", "password"))
+        {
+            assertThat(client.isConnected()).isFalse();
+            result = client.proxy(Import.class, objectName)
+                           .importNewSSTables(
+                           Sets.newHashSet("foo", "bar"), true, true, true,
+                           true, true,
+                           true);
+            assertThat(client.isConnected()).isTrue();
+            assertThat(result.size()).isEqualTo(0);
 
-        tearDown();
-        setUp();
+            tearDown();
+            setUp();
 
-        result = client.proxy(Import.class, objectName)
-                       .importNewSSTables(
-                       Sets.newHashSet("foo", "bar"), true, true, true,
-                       true, true,
-                       true);
+            result = client.proxy(Import.class, objectName)
+                           .importNewSSTables(
+                           Sets.newHashSet("foo", "bar"), true, true, true,
+                           true, true,
+                           true);
+        }
         assertThat(result.size()).isEqualTo(0);
     }
 
     @Test
-    public void testLotsOfProxies()
+    public void testLotsOfProxies() throws IOException
     {
-        JmxClient client = new JmxClient(serviceURL, "controlRole", "password");
-        for (int i = 0; i < PROXIES_TO_TEST; i++)
+        try (JmxClient client = new JmxClient(serviceURL, "controlRole", "password"))
         {
-            List<String> result = client.proxy(Import.class, objectName)
-                                        .importNewSSTables(
-                                        Sets.newHashSet("foo", "bar"), true, true, true,
-                                        true, true,
-                                        true);
-            assertThat(result).isNotNull();
+            for (int i = 0; i < PROXIES_TO_TEST; i++)
+            {
+                List<String> result = client.proxy(Import.class, objectName)
+                                            .importNewSSTables(
+                                            Sets.newHashSet("foo", "bar"), true, true, true,
+                                            true, true,
+                                            true);
+                assertThat(result).isNotNull();
+            }
         }
     }
 
diff --git a/docs/src/development.adoc b/docs/src/development.adoc
index 0f0841e..10705b2 100644
--- a/docs/src/development.adoc
+++ b/docs/src/development.adoc
@@ -17,17 +17,19 @@
 
 Contains the libraries which are shared between the version specific Cassandra code as well as the dependencies of Cassandra itself.
 
-### Cassandra40
+### adapters
 
-Implementation of ICassandraAdapter for Cassandra 4.0.
+Implementation of ICassandraAdapter and related classes for different Cassandra versions.
+
+#### adapters/base
+
+Implementation of ICassandraAdapter and related classes for Cassandra 4.0.
 
 ### Cassandra Integration Tests
 
-Cassandra integration tests leverage Docker to create Cassandra nodes and test the different implementations
-of the ICassandraAdapters against real C* nodes.
+Cassandra integration tests leverage the in-jvm dtest framework to create Cassandra nodes and test the different implementations of the ICassandraAdapters against real C* nodes.
 
-The integration tests will not run by default when running `./gradlew test` since they require a bit of setup and you may not have
-Docker available.
+The integration tests will not run by default when running `./gradlew test` since they require a bit of setup.
 
 #### Running Integration Tests
 
@@ -38,7 +40,7 @@
 
 #### Tests
 
-Tests in the cassandra-integration-tests submodule should be marked as integration tests with the @CassandraIntegrationTest annotation:
+Integration tests should be marked as integration tests with the @CassandraIntegrationTest annotation:
 
     @CassandraIntegrationTest
     void myTestHere(CassandraTestContext context)
@@ -46,8 +48,8 @@
         //
     }
 
-The `CassandraTestTemplate` will handle starting up each of the Docker services required for testing and inject the
-`CassandraTestContext`, which will have the version, CQLSession, container, and other useful information required for testing.
+The `CassandraTestTemplate` will handle starting up each of the instance(s) for testing and inject the
+`CassandraTestContext`, which will have the version, CQLSession, and other useful information required for testing.
 
 ### Main Project
 
diff --git a/gradle.properties b/gradle.properties
index 96f28b1..d1ac405 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,7 +1,5 @@
 version=1.0-SNAPSHOT
 junitVersion=5.9.2
-testcontainersVersion=1.17.5
-cassandra40Version=4.0.5
 vertxVersion=4.2.1
 guavaVersion=27.0.1-jre
 slf4jVersion=1.7.36
diff --git a/scripts/build-dtest-jars.sh b/scripts/build-dtest-jars.sh
new file mode 100755
index 0000000..62e7857
--- /dev/null
+++ b/scripts/build-dtest-jars.sh
@@ -0,0 +1,58 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -xe
+BRANCHES=${BRANCHES:-cassandra-4.0 trunk}
+REPO=${REPO:-"git@github.com:apache/cassandra.git"}
+SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
+DTEST_JAR_DIR="$(dirname ${SCRIPT_DIR}/)/dtest-jars"
+BUILD_DIR="${DTEST_JAR_DIR}/build"
+mkdir -p $BUILD_DIR
+for branch in $BRANCHES; do
+  cd ${BUILD_DIR}
+  # check out the correct cassandra version:
+  if [ ! -d ${branch} ] ; then
+    git clone --depth 1 --single-branch --branch $branch $REPO $branch
+    cd $branch
+  else
+    cd $branch
+    git pull
+  fi
+  git checkout $branch
+  git clean -fd
+  CASSANDRA_VERSION=$(cat build.xml | grep 'property name="base.version"' | awk -F "\"" '{print $4}')
+  # Loop to prevent failure due to maven-ant-tasks not downloading a jar.
+  for x in $(seq 1 3); do
+      if [ -f "${DTEST_JAR_DIR}/dtest-${CASSANDRA_VERSION}.jar" ]; then
+          RETURN="0"
+          break
+        else
+          ${SCRIPT_DIR}/build-shaded-dtest-jar-local.sh
+          RETURN="$?"
+          if [ "${RETURN}" -eq "0" ]; then
+              break
+          fi
+      fi
+  done
+  # Exit, if we didn't build successfully
+  if [ "${RETURN}" -ne "0" ]; then
+      echo "Build failed with exit code: ${RETURN}"
+      exit ${RETURN}
+  fi
+done
\ No newline at end of file
diff --git a/scripts/build-shaded-dtest-jar-local.sh b/scripts/build-shaded-dtest-jar-local.sh
new file mode 100755
index 0000000..7be43c8
--- /dev/null
+++ b/scripts/build-shaded-dtest-jar-local.sh
@@ -0,0 +1,57 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set -xe
+
+ARTIFACT_NAME=cassandra-dtest
+REPO_DIR="$(pwd)/out"
+SCRIPT_DIR=$( dirname -- "$( readlink -f -- "$0"; )"; )
+CASSANDRA_VERSION=$(cat build.xml | grep 'property name="base.version"' | awk -F "\"" '{print $4}')
+GIT_HASH=$(git rev-parse --short HEAD)
+DTEST_ARTIFACT_ID=${ARTIFACT_NAME}-local
+DTEST_JAR_DIR="$(dirname ${SCRIPT_DIR}/)/dtest-jars"
+
+echo $CASSANDRA_VERSION
+echo $GIT_HASH
+echo $DTEST_ARTIFACT_ID
+
+ant clean
+ant dtest-jar -Dno-checkstyle=true
+
+# Install the version that will be shaded
+mvn install:install-file               \
+   -Dfile=./build/dtest-${CASSANDRA_VERSION}.jar \
+   -DgroupId=org.apache.cassandra      \
+   -DartifactId=${DTEST_ARTIFACT_ID} \
+   -Dversion=${CASSANDRA_VERSION}-${GIT_HASH}         \
+   -Dpackaging=jar                     \
+   -DgeneratePom=true                  \
+   -DlocalRepositoryPath=${REPO_DIR}
+
+# Create shaded artifact
+mvn -f ${SCRIPT_DIR}/relocate-dtest-dependencies.pom package \
+    -Drevision=${CASSANDRA_VERSION} \
+    -DskipTests \
+    -Ddtest.version=${CASSANDRA_VERSION}-${GIT_HASH} \
+    -Dmaven.repo.local=${REPO_DIR} \
+    -DoutputFilePath=${DTEST_JAR_DIR}/dtest-${CASSANDRA_VERSION}.jar \
+    -Drelocation.prefix=shaded-${GIT_HASH} \
+    -nsu -U
+
+set +xe
diff --git a/scripts/relocate-dtest-dependencies.pom b/scripts/relocate-dtest-dependencies.pom
new file mode 100644
index 0000000..f7758f3
--- /dev/null
+++ b/scripts/relocate-dtest-dependencies.pom
@@ -0,0 +1,177 @@
+<!--
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.cassandra</groupId>
+    <artifactId>cassandra-dtest-shaded</artifactId>
+    <version>${revision}</version>
+    <packaging>jar</packaging>
+
+    <name>Cassandra in-jvm dtests shaded jar</name>
+
+    <properties>
+        <project.type>library</project.type>
+        <java.version>1.8</java.version>
+        <test.source.directory>src/test/unit/java</test.source.directory>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+
+        <junit.version>4.12</junit.version>
+        <reflections.version>0.10.2</reflections.version>
+        <maven.compiler.source>${java.version}</maven.compiler.source>
+        <maven.compiler.target>${java.version}</maven.compiler.target>
+        <dtest-local.version>${dtest.version}</dtest-local.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.cassandra</groupId>
+            <artifactId>cassandra-dtest-local</artifactId>
+            <version>${dtest-local.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>${junit.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- We need to include reflections in the final shaded jar explicitly
+             as 4.0 doesn't pull it in on its own due to a not-backported fix
+             in later dtest jars
+         -->
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+            <version>${reflections.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <testSourceDirectory>${test.source.directory}</testSourceDirectory>
+
+        <plugins>
+            <!-- generate a shaded JAR -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.4.1</version>
+
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <createSourcesJar>false</createSourcesJar>
+
+                            <artifactSet>
+                                <includes>
+                                    <include>org.apache.cassandra:cassandra-dtest-local</include>
+                                    <include>org.reflections:reflections</include>
+                                </includes>
+                            </artifactSet>
+
+                            <relocations>
+                                <relocation>
+                                    <pattern>io.netty</pattern>
+                                    <shadedPattern>shaded.io.netty</shadedPattern>
+                                </relocation>
+
+                                <relocation>
+                                    <pattern>com.google</pattern>
+                                    <shadedPattern>shaded.com.google</shadedPattern>
+                                    <!-- ImmutableMaps and other items in com.google.common are shipped across the
+                                    network, so we need to keep them un-relocated -->
+                                    <excludes>
+                                        <exclude>com.google.common.**.*</exclude>
+                                    </excludes>
+                                </relocation>
+
+                                <relocation>
+                                    <pattern>com.github.benmanes.caffeine</pattern>
+                                    <shadedPattern>shaded.com.github.benmanes.caffeine</shadedPattern>
+                                </relocation>
+
+                                <relocation>
+                                    <pattern>com.datastax</pattern>
+                                    <shadedPattern>shaded.com.datastax</shadedPattern>
+                                </relocation>
+
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    <shadedPattern>shaded.com.fasterxml.jackson</shadedPattern>
+                                </relocation>
+                            </relocations>
+
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>**/Log4j2Plugins.dat</exclude>
+                                    </excludes>
+                                </filter>
+
+                                <filter>
+                                    <artifact>io.netty:netty-*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/maven/</exclude>
+                                        <exclude>META-INF/io.netty.versions.properties</exclude>
+                                    </excludes>
+                                </filter>
+
+                                <filter>
+                                    <artifact>com.google.guava:guava</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/maven/</exclude>
+                                    </excludes>
+                                </filter>
+
+                                <filter>
+                                    <artifact>com.google.guava:failureaccess</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/maven/</exclude>
+                                    </excludes>
+                                </filter>
+
+                                <filter>
+                                    <artifact>com.datastax.cassandra:cassandra-driver-core</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/maven/</exclude>
+                                    </excludes>
+                                </filter>
+
+                            </filters>
+                            <outputFile>${outputFilePath}</outputFile>
+                        </configuration>
+                    </execution>
+                </executions>
+
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/settings.gradle b/settings.gradle
index e41d48a..c723715 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,8 +1,9 @@
 rootProject.name = "cassandra-sidecar"
 
-include "cassandra40"
+include "adapters:base"
 include "common"
 include "client"
 include "vertx-client"
 include "vertx-client-shaded"
 include "docs"
+
diff --git a/spotbugs-exclude.xml b/spotbugs-exclude.xml
index d773958..7085a6e 100644
--- a/spotbugs-exclude.xml
+++ b/spotbugs-exclude.xml
@@ -30,6 +30,12 @@
         </Or>
     </Match>
 
+    <!-- Ignore GC call in CassandraTestTemplate as we want to GC after cluster shutdown -->
+    <Match>
+        <Class name="org.apache.cassandra.sidecar.common.testing.CassandraTestTemplate" />
+        <Bug pattern="DM_GC" />
+    </Match>
+
     <!-- Ignore DMI_HARDCODED_ABSOLUTE_FILENAME for testing SnapshotDirectory.of with strings that are paths -->
     <Match>
         <Class name="org.apache.cassandra.sidecar.snapshots.SnapshotDirectoryTest" />
diff --git a/src/main/java/org/apache/cassandra/sidecar/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
index e103113..baf5837 100644
--- a/src/main/java/org/apache/cassandra/sidecar/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
@@ -42,7 +42,7 @@
 import io.vertx.ext.web.handler.LoggerHandler;
 import io.vertx.ext.web.handler.StaticHandler;
 import io.vertx.ext.web.handler.TimeoutHandler;
-import org.apache.cassandra.sidecar.cassandra40.Cassandra40Factory;
+import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
 import org.apache.cassandra.sidecar.common.ApiEndpointsV1;
 import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
@@ -271,7 +271,7 @@
                                                              SidecarVersionProvider sidecarVersionProvider)
     {
         CassandraVersionProvider.Builder builder = new CassandraVersionProvider.Builder();
-        builder.add(new Cassandra40Factory(dnsResolver, sidecarVersionProvider.sidecarVersion()));
+        builder.add(new CassandraFactory(dnsResolver, sidecarVersionProvider.sidecarVersion()));
         return builder.build();
     }
 
diff --git a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
index a41a152..7aeb6c0 100644
--- a/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
+++ b/src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
@@ -105,4 +105,5 @@
     {
         return delegate;
     }
+
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java
index 758f15e..f9e82a1 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/SnapshotsHandler.java
@@ -19,8 +19,6 @@
 package org.apache.cassandra.sidecar.routes;
 
 import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
 import java.nio.file.NoSuchFileException;
 import java.util.List;
 
@@ -41,6 +39,8 @@
 import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.common.StorageOperations;
 import org.apache.cassandra.sidecar.common.data.ListSnapshotFilesResponse;
+import org.apache.cassandra.sidecar.common.exceptions.NodeBootstrappingException;
+import org.apache.cassandra.sidecar.common.exceptions.SnapshotAlreadyExistsException;
 import org.apache.cassandra.sidecar.common.utils.CassandraInputValidator;
 import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
 import org.apache.cassandra.sidecar.data.SnapshotRequest;
@@ -256,38 +256,29 @@
         logger.error("SnapshotsHandler failed for request={}, remoteAddress={}, instance={}, method={}",
                      requestParams, remoteAddress, host, context.request().method(), cause);
 
-        Throwable rootCause = cause instanceof UndeclaredThrowableException
-                              ? ((UndeclaredThrowableException) cause).getUndeclaredThrowable()
-                              : cause;
-
-        if (rootCause instanceof IOException)
+        if (cause instanceof SnapshotAlreadyExistsException)
         {
-            if (StringUtils.contains(rootCause.getMessage(),
-                                     "Snapshot " + requestParams.snapshotName() + " already exists"))
-            {
-                context.fail(wrapHttpException(HttpResponseStatus.CONFLICT, rootCause.getMessage()));
-                return;
-            }
-            else if (StringUtils.contains(rootCause.getMessage(),
-                                          "Cannot snapshot until bootstrap completes"))
-            {
-                // Cassandra does not allow taking snapshots while the node is JOINING the ring
-                context.fail(wrapHttpException(HttpResponseStatus.SERVICE_UNAVAILABLE,
-                                               "The Cassandra instance " + host + " is not available"));
-            }
+            context.fail(wrapHttpException(HttpResponseStatus.CONFLICT, cause.getMessage()));
+            return;
         }
-        else if (rootCause instanceof IllegalArgumentException)
+        else if (cause instanceof NodeBootstrappingException)
         {
-            if (StringUtils.contains(rootCause.getMessage(),
+            // Cassandra does not allow taking snapshots while the node is JOINING the ring
+            context.fail(wrapHttpException(HttpResponseStatus.SERVICE_UNAVAILABLE,
+                                           "The Cassandra instance " + host + " is not available"));
+        }
+        else if (cause instanceof IllegalArgumentException)
+        {
+            if (StringUtils.contains(cause.getMessage(),
                                      "Keyspace " + requestParams.keyspace() + " does not exist") ||
-                StringUtils.contains(rootCause.getMessage(),
+                StringUtils.contains(cause.getMessage(),
                                      "Unknown keyspace/cf pair"))
             {
-                context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, rootCause.getMessage()));
+                context.fail(wrapHttpException(HttpResponseStatus.NOT_FOUND, cause.getMessage()));
             }
             else
             {
-                context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, rootCause.getMessage()));
+                context.fail(wrapHttpException(HttpResponseStatus.BAD_REQUEST, cause.getMessage()));
             }
             return;
         }
diff --git a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
index 8d14629..413dbab 100644
--- a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
+++ b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
@@ -366,7 +366,7 @@
                .recover(t ->
                         {
                             String errMsg = String.format("Snapshot directory '%s' does not exist", snapshotName);
-                            logger.warn("Snapshot directory {} does not exist in {}", snapshotName, snapshotDirectory);
+                            logger.warn("Snapshot directory {} does not exist in {}", snapshotName, baseDirectory);
                             return Future.failedFuture(new NoSuchFileException(errMsg));
                         });
     }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java b/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java
index 7a9fdcf..e13ac08 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestBase.java
@@ -18,10 +18,18 @@
 
 package org.apache.cassandra.sidecar;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -36,6 +44,7 @@
 import io.vertx.core.http.HttpServer;
 import io.vertx.ext.web.client.WebClient;
 import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
 
@@ -43,8 +52,8 @@
 
 /**
  * Base class for integration test.
- * Start a docker container of cassandra at the beginning of each test, and
- * teardown the container after each test.
+ * Start an in-jvm dtest cluster at the beginning of each test, and
+ * teardown the cluster after each test.
  */
 public abstract class IntegrationTestBase
 {
@@ -58,7 +67,7 @@
     private static final AtomicInteger TEST_TABLE_ID = new AtomicInteger(0);
 
     @BeforeEach
-    public void setup(CassandraTestContext cassandraTestContext) throws InterruptedException
+    void setup(CassandraTestContext cassandraTestContext) throws InterruptedException
     {
         Injector injector = Guice.createInjector(Modules.override(new MainModule())
                                                         .with(new IntegrationTestModule(cassandraTestContext)));
@@ -119,7 +128,7 @@
 
     protected Session maybeGetSession(CassandraTestContext cassandraTestContext)
     {
-        Session session = cassandraTestContext.session.localCql();
+        Session session = cassandraTestContext.session();
         assertThat(session).isNotNull();
         return session;
     }
@@ -128,4 +137,27 @@
     {
         return new QualifiedTableName(TEST_KEYSPACE, TEST_TABLE_PREFIX + TEST_TABLE_ID.getAndIncrement());
     }
+
+    public List<Path> findChildFile(CassandraTestContext context, String hostname, String target)
+    {
+        InstanceMetadata instanceConfig = context.getInstancesConfig().instanceFromHost("127.0.0.1");
+        List<String> parentDirectories = instanceConfig.dataDirs();
+
+        return parentDirectories.stream().flatMap(s -> findChildFile(Paths.get(s), target).stream())
+                                .collect(Collectors.toList());
+    }
+
+    private List<Path> findChildFile(Path path, String target)
+    {
+        try (Stream<Path> walkStream = Files.walk(path))
+        {
+            return walkStream.filter(p -> p.toString().endsWith(target)
+                                          || p.toString().contains("/" + target + "/"))
+                             .collect(Collectors.toList());
+        }
+        catch (IOException e)
+        {
+            return Collections.emptyList();
+        }
+    }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java b/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java
index ccdd20e..2acf3c9 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/IntegrationTestModule.java
@@ -18,22 +18,15 @@
 
 package org.apache.cassandra.sidecar;
 
-import java.util.Collections;
-
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
-import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
-import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
-import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
-import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.common.TestValidationConfiguration;
 import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
 import org.apache.cassandra.sidecar.common.utils.ValidationConfiguration;
 import org.apache.cassandra.sidecar.config.CacheConfiguration;
 import org.apache.cassandra.sidecar.config.WorkerPoolConfiguration;
-import org.apache.cassandra.sidecar.utils.SidecarVersionProvider;
 
 /**
  * Provides the basic dependencies for integration tests
@@ -49,22 +42,9 @@
 
     @Provides
     @Singleton
-    public InstancesConfig instancesConfig(CassandraVersionProvider versionProvider,
-                                           SidecarVersionProvider sidecarVersionProvider)
+    public InstancesConfig instancesConfig()
     {
-        String dataDirectory = cassandraTestContext.dataDirectoryPath.toFile().getAbsolutePath();
-        String stagingDirectory = cassandraTestContext.dataDirectoryPath.resolve("staging")
-                                                                               .toFile().getAbsolutePath();
-        InstanceMetadata metadata = new InstanceMetadataImpl(1,
-                                                             "localhost",
-                                                             9043,
-                                                             Collections.singleton(dataDirectory),
-                                                             stagingDirectory,
-                                                             cassandraTestContext.session,
-                                                             cassandraTestContext.jmxClient,
-                                                             versionProvider,
-                                                             sidecarVersionProvider.sidecarVersion());
-        return new InstancesConfigImpl(metadata);
+        return cassandraTestContext.getInstancesConfig();
     }
 
     @Provides
@@ -76,7 +56,7 @@
                                                                            30000);
         return new Configuration.Builder()
                .setInstancesConfig(instancesConfig)
-               .setHost("localhost")
+               .setHost("127.0.0.1")
                .setPort(9043)
                .setRateLimitStreamRequestsPerSecond(1000L)
                .setValidationConfiguration(validationConfiguration)
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
index bcb081c..03f165a 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/DelegateTest.java
@@ -18,12 +18,16 @@
 
 package org.apache.cassandra.sidecar.common;
 
-import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
+import com.datastax.driver.core.exceptions.TransportException;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
 import org.apache.cassandra.sidecar.common.testing.CassandraIntegrationTest;
 import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
-import org.apache.cassandra.sidecar.mocks.V30;
+import org.apache.cassandra.sidecar.utils.SidecarVersionProvider;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -32,33 +36,65 @@
  */
 class DelegateTest
 {
-    @CassandraIntegrationTest
-    void testCorrectVersionIsEnabled(CassandraTestContext context)
+    private static CassandraAdapterDelegate getCassandraAdapterDelegate(CassandraTestContext context)
     {
-        CassandraVersionProvider provider = new CassandraVersionProvider.Builder().add(new V30()).build();
-        CassandraAdapterDelegate delegate = new CassandraAdapterDelegate(
-                provider, context.session, context.jmxClient, "1.0-TEST");
-        SimpleCassandraVersion version = delegate.version();
-        assertThat(version).isNotNull();
+        SidecarVersionProvider svp = new SidecarVersionProvider("/sidecar.version");
+        CassandraVersionProvider versionProvider = new CassandraVersionProvider.Builder()
+                                                       .add(new CassandraFactory(DnsResolver.DEFAULT,
+                                                                                 svp.sidecarVersion()))
+                                                       .build();
+        InstanceMetadata instanceMetadata = context.instancesConfig.instances().get(0);
+        CQLSessionProvider sessionProvider = new CQLSessionProvider(instanceMetadata.host(),
+                                                                    instanceMetadata.port(),
+                                                                    1000);
+        CassandraAdapterDelegate delegate = new CassandraAdapterDelegate(versionProvider,
+                                                                         sessionProvider,
+                                                                         context.jmxClient(),
+                                                                         svp.sidecarVersion());
+        return delegate;
     }
 
     @CassandraIntegrationTest
-    void testHealthCheck(CassandraTestContext context) throws IOException, InterruptedException
+    void testCorrectVersionIsEnabled(CassandraTestContext context)
     {
-        CassandraVersionProvider provider = new CassandraVersionProvider.Builder().add(new V30()).build();
-        CassandraAdapterDelegate delegate = new CassandraAdapterDelegate(
-                provider, context.session, context.jmxClient, "1.0-TEST");
+        CassandraAdapterDelegate delegate = getCassandraAdapterDelegate(context);
+        SimpleCassandraVersion version = delegate.version();
+        assertThat(version).isNotNull();
+        assertThat(version.major).isEqualTo(context.version.major);
+        assertThat(version.minor).isEqualTo(context.version.minor);
+        assertThat(version).isGreaterThanOrEqualTo(context.version);
+    }
+
+    @CassandraIntegrationTest
+    void testHealthCheck(CassandraTestContext context) throws InterruptedException
+    {
+        CassandraAdapterDelegate delegate = getCassandraAdapterDelegate(context);
 
         delegate.healthCheck();
 
         assertThat(delegate.isUp()).as("health check succeeds").isTrue();
 
-        context.container.execInContainer("nodetool", "disablebinary");
+        NodeToolResult nodetoolResult = context.cluster.get(1).nodetoolResult("disablebinary");
+        assertThat(nodetoolResult.getRc())
+        .withFailMessage("Failed to disable binary:\nstdout:" + nodetoolResult.getStdout()
+                         + "\nstderr: " + nodetoolResult.getStderr())
+        .isEqualTo(0);
 
-        delegate.healthCheck();
+        for (int i = 0; i < 10; i++)
+        {
+            try
+            {
+                delegate.healthCheck();
+                break;
+            }
+            catch (TransportException tex)
+            {
+                Thread.sleep(1000); // Give the delegate some time to recover
+            }
+        }
         assertThat(delegate.isUp()).as("health check fails after binary has been disabled").isFalse();
 
-        context.container.execInContainer("nodetool", "enablebinary");
+        context.cluster.get(1).nodetool("enablebinary");
 
         TimeUnit.SECONDS.sleep(1);
         delegate.healthCheck();
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
index 71f4418..dcfca90 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/JmxClientTest.java
@@ -18,8 +18,11 @@
 
 package org.apache.cassandra.sidecar.common;
 
+import java.io.IOException;
 import java.util.Map;
 
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
 import org.apache.cassandra.sidecar.common.testing.CassandraIntegrationTest;
 import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
 import org.apache.cassandra.sidecar.common.utils.GossipInfoParser;
@@ -34,31 +37,40 @@
     private static final String SS_OBJ_NAME = "org.apache.cassandra.db:type=StorageService";
 
     @CassandraIntegrationTest
-    void testJmxConnectivity(CassandraTestContext context)
+    void testJmxConnectivity(CassandraTestContext context) throws IOException
     {
-        String opMode = context.jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
-                                         .getOperationMode();
-        assertThat(opMode).isNotNull();
-        assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", "DECOMMISSIONED", "CLIENT");
+        try (JmxClient jmxClient = getJmxClient(context))
+        {
+            String opMode = jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                                     .getOperationMode();
+            assertThat(opMode).isNotNull();
+            assertThat(opMode).isIn("LEAVING", "JOINING", "NORMAL", "DECOMMISSIONED", "CLIENT");
+        }
     }
 
     @CassandraIntegrationTest
-    void testGossipInfo(CassandraTestContext context)
+    void testGossipInfo(CassandraTestContext context) throws IOException
     {
-        SSProxy proxy = context.jmxClient.proxy(SSProxy.class,
-                                                "org.apache.cassandra.net:type=FailureDetector");
-        String rawGossipInfo = proxy.getAllEndpointStatesWithPort();
-        assertThat(rawGossipInfo).isNotEmpty();
-        Map<String, ?> gossipInfoMap = GossipInfoParser.parse(rawGossipInfo);
-        assertThat(gossipInfoMap).isNotEmpty();
-        gossipInfoMap.forEach((key, value) -> GossipInfoParser.isGossipInfoHostHeader(key));
+        try (JmxClient jmxClient = getJmxClient(context))
+        {
+            FailureDetector proxy = jmxClient.proxy(FailureDetector.class,
+                                                    "org.apache.cassandra.net:type=FailureDetector");
+            String rawGossipInfo = proxy.getAllEndpointStates();
+            assertThat(rawGossipInfo).isNotEmpty();
+            Map<String, ?> gossipInfoMap = GossipInfoParser.parse(rawGossipInfo);
+            assertThat(gossipInfoMap).isNotEmpty();
+            gossipInfoMap.forEach((key, value) -> GossipInfoParser.isGossipInfoHostHeader(key));
+        }
     }
 
     @CassandraIntegrationTest
-    void testConsumerCall(CassandraTestContext context)
+    void testCorrectVersion(CassandraTestContext context) throws IOException
     {
-        context.jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
-                         .refreshSizeEstimates();
+        try (JmxClient jmxClient = getJmxClient(context))
+        {
+            jmxClient.proxy(SSProxy.class, SS_OBJ_NAME)
+                     .refreshSizeEstimates();
+        }
     }
 
     /**
@@ -70,6 +82,23 @@
 
         void refreshSizeEstimates();
 
-        String getAllEndpointStatesWithPort();
+        String getReleaseVersion();
+    }
+
+    /**
+     * An interface that pulls information from the Failure Detector MBean
+     */
+    public interface FailureDetector
+    {
+        String getAllEndpointStates();
+    }
+
+
+    private static JmxClient getJmxClient(CassandraTestContext context)
+    {
+        IUpgradeableInstance instance = context.cluster.getFirstRunningInstance();
+        IInstanceConfig config = instance.config();
+        JmxClient client = new JmxClient(config.broadcastAddress().getAddress().getHostAddress(), config.jmxPort());
+        return client;
     }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/StatusTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/StatusTest.java
deleted file mode 100644
index bcc894a..0000000
--- a/src/test/integration/org/apache/cassandra/sidecar/common/StatusTest.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.common;
-
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Session;
-import org.apache.cassandra.sidecar.common.testing.CassandraIntegrationTest;
-import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
-
-import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
-
-/**
- * Placeholder test
- */
-class StatusTest
-{
-    private static final Logger logger = LoggerFactory.getLogger(StatusTest.class);
-
-    @BeforeEach
-    void setupData(CassandraTestContext context)
-    {
-        assertThat(context.container.isRunning()).isTrue();
-        logger.info("Running Cassandra on host={}", context.container.getContactPoint());
-    }
-
-    @CassandraIntegrationTest
-    @DisplayName("Ensure status returns correctly")
-    void testSomething(CassandraTestContext context)
-    {
-        logger.info("test context in test {}", context);
-        Session session = context.session.localCql();
-        assertThat(session).isNotNull();
-        assert session != null; // quiet spotbugs
-        ResultSet rs = session.execute("SELECT * from system.peers_v2");
-        assertThat(rs).isNotNull();
-    }
-}
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/containers/ExtendedCassandraContainer.java b/src/test/integration/org/apache/cassandra/sidecar/common/containers/ExtendedCassandraContainer.java
deleted file mode 100644
index ef8d41a..0000000
--- a/src/test/integration/org/apache/cassandra/sidecar/common/containers/ExtendedCassandraContainer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.common.containers;
-
-import org.testcontainers.containers.CassandraContainer;
-
-/**
- * A Cassandra container extension that adds functionality to {@link CassandraContainer} for JMX.
- */
-public class ExtendedCassandraContainer extends CassandraContainer<ExtendedCassandraContainer>
-{
-    public static final String RMI_SERVER_HOSTNAME = "127.0.0.1";
-    public static final Integer JMX_PORT = 7199;
-
-    /**
-     * Constructs a new {@link ExtendedCassandraContainer} with the {@link #JMX_PORT JMX port} exposed
-     *
-     * @param dockerImageName the name of the docker image to use for the container
-     */
-    public ExtendedCassandraContainer(String dockerImageName)
-    {
-        super(dockerImageName);
-        addFixedExposedPort(JMX_PORT, JMX_PORT);
-
-        addEnv("LOCAL_JMX", "no");
-        addEnv("JVM_EXTRA_OPTS", "-Dcom.sun.management.jmxremote=true"
-                                 + " -Djava.rmi.server.hostname=" + RMI_SERVER_HOSTNAME
-                                 + " -Dcom.sun.management.jmxremote.port=" + JMX_PORT
-                                 + " -Dcom.sun.management.jmxremote.rmi.port=" + JMX_PORT
-                                 + " -Dcom.sun.management.jmxremote.authenticate=false"
-                                 + " -Dcom.sun.management.jmxremote.local.only=false"
-                                 + " -Dcom.sun.management.jmxremote.ssl=false"
-        );
-    }
-}
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraIntegrationTest.java
index 87541c3..776bf24 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraIntegrationTest.java
@@ -37,4 +37,61 @@
 @ExtendWith(CassandraTestTemplate.class)
 public @interface CassandraIntegrationTest
 {
+    /**
+     * Returns the number of initial nodes per datacenter for the integration tests. Defaults to 1 node per datacenter.
+     *
+     * @return the number of nodes per datacenter for the integration tests
+     * */
+    int nodesPerDc() default 1;
+
+    /**
+     * Returns the number of nodes expected to be added by the end of the integration test. Defaults ot 0.
+     * @return the number of nodes the test expects to add for the integration test.
+     */
+    int newNodesPerDc() default 0;
+
+    /**
+     * Returns the number of datacenters to configure for the integration test. Defaults to 1 datacenter.
+     *
+     * @return the number of datacenters to configure for the integration test
+     */
+    int numDcs() default 1;
+
+    /**
+     * Returns the number of data directories to use per instance. Cassandra supports multiple data directories
+     * for each instance. Defaults to 1 data directory per instance.
+     *
+     * @return the number of data directories to use per instance
+     */
+    int numDataDirsPerInstance() default 1;
+
+    /**
+     * Returns whether gossip is enabled or disabled for the integration test. Defaults to {@code false}.
+     *
+     * @return whether gossip is enabled or disabled for the integration test
+     */
+    boolean gossip() default false;
+
+    /**
+     * Returns whether internode networking is enabled or disabled for the integration test. Defaults to {@code false}.
+     *
+     * @return whether internode networking is enabled or disabled for the integration test
+     */
+    boolean network() default false;
+
+    /**
+     * Returns whether JMX is enabled or disabled for the integration test. Defaults to {@code true}.
+     *
+     * @return whether JMX is enabled or disabled for the integration test
+     */
+    boolean jmx() default true;
+
+    /**
+     * Returns whether the native transport protocol is enabled or disabled for the integration test. Defaults to
+     * {@code true}.
+     *
+     * @return whether the native transport protocol is enabled or disabled for the integration test
+     */
+    boolean nativeTransport() default true;
+
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraTestContext.java b/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraTestContext.java
index fdef416..157c422 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraTestContext.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraTestContext.java
@@ -18,52 +18,139 @@
 
 package org.apache.cassandra.sidecar.common.testing;
 
+import java.io.Closeable;
+import java.net.InetSocketAddress;
 import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
+import com.datastax.driver.core.NettyOptions;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
+import org.apache.cassandra.distributed.api.IUpgradeableInstance;
+import org.apache.cassandra.distributed.shared.JMXUtil;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
 import org.apache.cassandra.sidecar.common.CQLSessionProvider;
-import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.common.JmxClient;
 import org.apache.cassandra.sidecar.common.SimpleCassandraVersion;
-import org.apache.cassandra.sidecar.common.containers.ExtendedCassandraContainer;
+
+import static org.assertj.core.api.Assertions.assertThat;
 
 /**
  * Passed to integration tests.
  * See {@link CassandraIntegrationTest} for the required annotation
  * See {@link CassandraTestTemplate} for the Test Template
  */
-public class CassandraTestContext
+public class CassandraTestContext implements Closeable
 {
-    public final JmxClient jmxClient;
-    public final CQLSessionProvider session;
     public final SimpleCassandraVersion version;
-    public final ExtendedCassandraContainer container;
-    public final ICassandraAdapter cassandra;
-    public final Path dataDirectoryPath;
+    public final UpgradeableCluster cluster;
+    public final InstancesConfig instancesConfig;
+    private List<CQLSessionProvider> sessionProviders;
+    private List<JmxClient> jmxClients;
 
     CassandraTestContext(SimpleCassandraVersion version,
-                         ExtendedCassandraContainer container,
-                         CQLSessionProvider session,
-                         JmxClient jmxClient,
-                         ICassandraAdapter cassandra,
-                         Path dataDirectoryPath)
+                         UpgradeableCluster cluster,
+                         CassandraVersionProvider versionProvider)
     {
         this.version = version;
-        this.container = container;
-        this.session = session;
-        this.cassandra = cassandra;
-        this.jmxClient = jmxClient;
-        this.dataDirectoryPath = dataDirectoryPath;
+        this.cluster = cluster;
+        this.sessionProviders = new ArrayList<>();
+        this.jmxClients = new ArrayList<>();
+        this.instancesConfig = buildInstancesConfig(versionProvider);
+    }
+
+    private InstancesConfig buildInstancesConfig(CassandraVersionProvider versionProvider)
+    {
+        List<InstanceMetadata> metadata = new ArrayList<>();
+        for (int i = 0; i < cluster.size(); i++)
+        {
+            IUpgradeableInstance instance = cluster.get(i + 1); // 1-based indexing to match node names;
+            IInstanceConfig config = instance.config();
+            String hostName = JMXUtil.getJmxHost(config);
+            int nativeTransportPort = tryGetIntConfig(config, "native_transport_port", 9042);
+            InetSocketAddress address = InetSocketAddress.createUnresolved(hostName,
+                                                                           nativeTransportPort);
+            CQLSessionProvider sessionProvider = new CQLSessionProvider(address, new NettyOptions());
+            this.sessionProviders.add(sessionProvider);
+            JmxClient jmxClient = new JmxClient(hostName, config.jmxPort());
+            this.jmxClients.add(jmxClient);
+
+            String[] dataDirectories = (String[]) config.get("data_file_directories");
+            // Use the parent of the first data directory as the staging directory
+            Path dataDirParentPath = Paths.get(dataDirectories[0]).getParent();
+            assertThat(dataDirParentPath).isNotNull()
+                                         .exists();
+            String uploadsStagingDirectory = dataDirParentPath.resolve("staging").toFile().getAbsolutePath();
+            metadata.add(new InstanceMetadataImpl(i + 1,
+                                                  config.broadcastAddress().getAddress().getHostAddress(),
+                                                  nativeTransportPort,
+                                                  Arrays.asList(dataDirectories),
+                                                  uploadsStagingDirectory,
+                                                  sessionProvider,
+                                                  jmxClient,
+                                                  versionProvider,
+                                                  "1.0-TEST"));
+        }
+        return new InstancesConfigImpl(metadata);
+    }
+
+    private static int tryGetIntConfig(IInstanceConfig config, String configName, int defaultValue)
+    {
+        try
+        {
+            return config.getInt(configName);
+        }
+        catch (NullPointerException npe)
+        {
+            return defaultValue;
+        }
+    }
+
+
+    public InstancesConfig getInstancesConfig()
+    {
+        return this.instancesConfig;
+    }
+
+    public Session session()
+    {
+        return session(0);
+    }
+
+    public Session session(int instance)
+    {
+        return this.sessionProviders.get(instance).localCql();
     }
 
     @Override
     public String toString()
     {
         return "CassandraTestContext{" +
-               "jmxClient=" + jmxClient +
-               ", session=" + session +
                ", version=" + version +
-               ", container=" + container +
-               ", cassandra=" + cassandra +
+               ", cluster=" + cluster +
                '}';
     }
+    @Override
+    public void close()
+    {
+        instancesConfig.instances().forEach(instance -> instance.delegate().close());
+    }
+
+    public JmxClient jmxClient()
+    {
+        return jmxClient(0);
+    }
+
+    private JmxClient jmxClient(int instance)
+    {
+        return jmxClients.get(instance);
+    }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraTestTemplate.java b/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraTestTemplate.java
index c3e9cd1..bac94b1 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraTestTemplate.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/testing/CassandraTestTemplate.java
@@ -18,10 +18,6 @@
 
 package org.apache.cassandra.sidecar.common.testing;
 
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.List;
 import java.util.stream.Stream;
@@ -37,16 +33,17 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.NettyOptions;
-import org.apache.cassandra.sidecar.common.CQLSessionProvider;
-import org.apache.cassandra.sidecar.common.ICassandraAdapter;
-import org.apache.cassandra.sidecar.common.ICassandraFactory;
-import org.apache.cassandra.sidecar.common.JmxClient;
+import com.vdurmont.semver4j.Semver;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.Versions;
+import org.apache.cassandra.sidecar.adapters.base.CassandraFactory;
+import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.common.SimpleCassandraVersion;
-import org.apache.cassandra.sidecar.common.containers.ExtendedCassandraContainer;
+import org.apache.cassandra.sidecar.common.dns.DnsResolver;
+import org.apache.cassandra.sidecar.utils.SidecarVersionProvider;
 
-import static org.apache.cassandra.sidecar.common.containers.ExtendedCassandraContainer.JMX_PORT;
-import static org.apache.cassandra.sidecar.common.containers.ExtendedCassandraContainer.RMI_SERVER_HOSTNAME;
 
 /**
  * Creates a test per version of Cassandra we are testing
@@ -62,7 +59,9 @@
 {
 
     private static final Logger logger = LoggerFactory.getLogger(CassandraTestTemplate.class);
-    public static final String CONTAINER_CASSANDRA_DATA_PATH = "/var/lib/cassandra";
+    private static SidecarVersionProvider svp = new SidecarVersionProvider("/sidecar.version");
+
+    private UpgradeableCluster cluster;
 
     @Override
     public boolean supportsTestTemplate(ExtensionContext context)
@@ -83,7 +82,7 @@
      * @param version a version for the test
      * @param context the <em>context</em> in which the current test or container is being executed.
      * @return the <em>context</em> of a single invocation of a
-     *         {@linkplain org.junit.jupiter.api.TestTemplate test template}
+     * {@linkplain org.junit.jupiter.api.TestTemplate test template}
      */
     private TestTemplateInvocationContext invocationContext(TestVersion version, ExtensionContext context)
     {
@@ -104,7 +103,7 @@
             }
 
             /**
-             * Used to register the extensions required to start and stop the docker environment
+             * Used to register the extensions required to start and stop the in-jvm dtest environment
              * @return a list of registered {@link Extension extensions}
              */
             @Override
@@ -116,64 +115,67 @@
             private BeforeEachCallback beforeEach()
             {
                 return beforeEachCtx -> {
-                    // spin up a C* instance using Testcontainers
-                    ICassandraFactory factory = version.getFactory();
+                    CassandraIntegrationTest annotation =
+                    context.getElement().map(e -> e.getAnnotation(CassandraIntegrationTest.class)).get();
+                    // spin up a C* cluster using the in-jvm dtest
+                    Versions versions = Versions.find();
+                    int nodesPerDc = annotation.nodesPerDc();
+                    int dcCount = annotation.numDcs();
+                    int newNodesPerDc = annotation.newNodesPerDc(); // if the test wants to add more nodes later
+                    int finalNodeCount = dcCount * (nodesPerDc + newNodesPerDc);
+                    Versions.Version requestedVersion = versions.getLatest(new Semver(version.version(),
+                                                                                      Semver.SemverType.LOOSE));
+                    UpgradeableCluster.Builder builder =
+                    UpgradeableCluster.build(nodesPerDc)
+                                      .withVersion(requestedVersion)
+                                      .withDCs(dcCount)
+                                      .withDataDirCount(annotation.numDataDirsPerInstance())
+                                      .withConfig(config -> {
+                                          if (annotation.nativeTransport())
+                                          {
+                                              config.with(Feature.NATIVE_PROTOCOL);
+                                          }
+                                          if (annotation.jmx())
+                                          {
+                                              config.with(Feature.JMX);
+                                          }
+                                          if (annotation.gossip())
+                                          {
+                                              config.with(Feature.GOSSIP);
+                                          }
+                                          if (annotation.network())
+                                          {
+                                              config.with(Feature.NETWORK);
+                                          }
+                                      });
+                    TokenSupplier tokenSupplier = TokenSupplier.evenlyDistributedTokens(finalNodeCount,
+                                                                                        builder.getTokenCount());
+                    builder.withTokenSupplier(tokenSupplier);
+                    cluster = builder.start();
 
-                    // Create a temp directory to be mounted inside the container
-                    Path dataDirectoryPath = Files.createTempDirectory("cassandra-sidecar-test-");
-                    ExtendedCassandraContainer container = new ExtendedCassandraContainer(version.image())
-                                                      // Mount the temp directory as the Cassandra data directory
-                                                      .withFileSystemBind(dataDirectoryPath.toFile().getAbsolutePath(),
-                                                                          CONTAINER_CASSANDRA_DATA_PATH);
-                    container.start();
-                    logger.info("Testing {} against docker container", version);
-
-                    CQLSessionProvider session = new CQLSessionProvider(container.getContactPoint(),
-                                                                        new NettyOptions());
-                    JmxClient jmxClient = new JmxClient(RMI_SERVER_HOSTNAME, JMX_PORT);
-
+                    logger.info("Testing {} against in-jvm dtest cluster", version);
+                    CassandraVersionProvider versionProvider = cassandraVersionProvider(DnsResolver.DEFAULT);
                     SimpleCassandraVersion versionParsed = SimpleCassandraVersion.create(version.version());
-
-                    ICassandraAdapter cassandra = factory.create(session, jmxClient);
-
-                    cassandraTestContext = new CassandraTestContext(versionParsed,
-                                                                    container,
-                                                                    session,
-                                                                    jmxClient,
-                                                                    cassandra,
-                                                                    dataDirectoryPath);
+                    cassandraTestContext = new CassandraTestContext(versionParsed, cluster, versionProvider);
                     logger.info("Created test context {}", cassandraTestContext);
                 };
             }
 
             /**
-             * Shuts down the docker container when the test is finished
+             * Shuts down the in-jvm dtest cluster when the test is finished
              * @return the {@link AfterTestExecutionCallback}
              */
             private AfterTestExecutionCallback postProcessor()
             {
                 return postProcessorCtx -> {
-                    // tear down the docker instance
-                    cassandraTestContext.container.stop();
-                    // cleanup temp directory
-                    deleteDataDirectory(cassandraTestContext.dataDirectoryPath);
+                    // Tear down the client-side before the cluster as we need to close some server-side connections
+                    // that can only be closed by clients?
+                    cassandraTestContext.close();
+                    // tear down the in-jvm cluster
+                    cluster.close();
                 };
             }
 
-            private void deleteDataDirectory(Path dataDirectoryPath)
-            {
-                try
-                {
-                    Files.walk(dataDirectoryPath)
-                         .map(Path::toFile)
-                         .forEach(File::delete);
-                    Files.delete(dataDirectoryPath);
-                }
-                catch (IOException ignored)
-                {
-                }
-            }
-
             /**
              * Required for Junit to know the CassandraTestContext can be used in these tests
              * @return a {@link ParameterResolver}
@@ -186,14 +188,11 @@
                     public boolean supportsParameter(ParameterContext parameterContext,
                                                      ExtensionContext extensionContext)
                     {
-                        return parameterContext.getParameter()
-                                               .getType()
-                                               .equals(CassandraTestContext.class);
+                        return parameterContext.getParameter().getType().equals(CassandraTestContext.class);
                     }
 
                     @Override
-                    public Object resolveParameter(ParameterContext parameterContext,
-                                                   ExtensionContext extensionContext)
+                    public Object resolveParameter(ParameterContext parameterContext, ExtensionContext extensionContext)
                     {
                         return cassandraTestContext;
                     }
@@ -201,4 +200,21 @@
             }
         };
     }
+
+    public CassandraVersionProvider cassandraVersionProvider(DnsResolver dnsResolver)
+    {
+        return new CassandraVersionProvider.Builder()
+               .add(new CassandraFactory(dnsResolver, svp.sidecarVersion())).build();
+    }
+
+    static
+    {
+        // Settings to reduce the test setup delay incurred if gossip is enabled
+        System.setProperty("cassandra.ring_delay_ms", "5000"); // down from 30s default
+        System.setProperty("cassandra.consistent.rangemovement", "false");
+        System.setProperty("cassandra.consistent.simultaneousmoves.allow", "true");
+        // End gossip delay settings
+        // Set the location of dtest jars
+        System.setProperty("cassandra.test.dtest_jar_path", "dtest-jars");
+    }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/testing/TestVersion.java b/src/test/integration/org/apache/cassandra/sidecar/common/testing/TestVersion.java
index f835c69..956bc8a 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/testing/TestVersion.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/testing/TestVersion.java
@@ -18,36 +18,20 @@
 
 package org.apache.cassandra.sidecar.common.testing;
 
-import org.apache.cassandra.sidecar.common.ICassandraFactory;
-
 /**
  * Works with {@link TestVersionSupplier}
  */
 public class TestVersion
 {
     private final String version;
-    private final ICassandraFactory factory;
-    private final String image;
 
-    public TestVersion(String version, ICassandraFactory factory, String image)
+    public TestVersion(String version)
     {
         this.version = version;
-        this.factory = factory;
-        this.image = image;
     }
 
     public String version()
     {
         return version;
     }
-
-    public ICassandraFactory getFactory()
-    {
-        return factory;
-    }
-
-    public String image()
-    {
-        return image;
-    }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/common/testing/TestVersionSupplier.java b/src/test/integration/org/apache/cassandra/sidecar/common/testing/TestVersionSupplier.java
index 92a9f7d..2822a0a 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/common/testing/TestVersionSupplier.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/common/testing/TestVersionSupplier.java
@@ -18,28 +18,24 @@
 
 package org.apache.cassandra.sidecar.common.testing;
 
+import java.util.Arrays;
 import java.util.stream.Stream;
 
-import org.apache.cassandra.sidecar.cassandra40.Cassandra40Factory;
-import org.apache.cassandra.sidecar.common.dns.DnsResolver;
-
 /**
  * Generates the list of versions we're going to test against.
- * Depending on future releases, we may end up running the same module (Cassandra40 for example) against multiple
- * versions of Cassandra.  This may be due to releases that don't add new features that would affect the sidecar,
+ * We will run the same module (trunk for example) against multiple versions of Cassandra.
+ * This is due to releases that don't add new features that would affect the sidecar,
  * but we still want to test those versions specifically to avoid the chance of regressions.
  *
- * <p>At the moment, it's returning a hard coded list.  We could / should probably load this from a configuration
- * and make it possible to override it, so teams that customize C* can run and test their own implementation.
- *
  * <p>Ideally, we'd probably have concurrent runs of the test infrastructure each running tests against one specific
- * version of C*, but we don't need that yet given we only have one version.
+ * version of C*, but this will require some additional work in the dtest framework so for now we run one at a time.
  */
 public class TestVersionSupplier
 {
     Stream<TestVersion> testVersions()
     {
-        return Stream.of(
-                new TestVersion("4.0.7", new Cassandra40Factory(DnsResolver.DEFAULT, "1.0-TEST"), "cassandra:4.0"));
+        // By default, we test 2 versions that will exercise oldest and newest supported versions
+        String versions = System.getProperty("cassandra.sidecar.versions_to_test", "4.0,5.0");
+        return Arrays.stream(versions.split(",")).map(String::trim).map(TestVersion::new);
     }
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
index d0ed034..70285dc 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/GossipInfoHandlerIntegrationTest.java
@@ -36,15 +36,12 @@
 @ExtendWith(VertxExtension.class)
 class GossipInfoHandlerIntegrationTest extends IntegrationTestBase
 {
-    @CassandraIntegrationTest
+    @CassandraIntegrationTest()
     void retrieveGossipInfo(VertxTestContext context, CassandraTestContext cassandraTestContext) throws Exception
     {
         String testRoute = "/api/v1/cassandra/gossip";
-        String expectedReleaseVersionSimple = String.format("%s.%s",
-                                                            cassandraTestContext.version.major,
-                                                            cassandraTestContext.version.minor);
         testWithClient(context, client -> {
-            client.get(config.getPort(), "localhost", testRoute)
+            client.get(config.getPort(), "127.0.0.1", testRoute)
                   .expect(ResponsePredicate.SC_OK)
                   .send(context.succeeding(response -> {
                       GossipInfoResponse gossipResponse = response.bodyAsJson(GossipInfoResponse.class);
@@ -55,7 +52,9 @@
                       assertThat(gossipInfo.generation()).isNotNull();
                       assertThat(gossipInfo.heartbeat()).isNotNull();
                       assertThat(gossipInfo.hostId()).isNotNull();
-                      assertThat(gossipInfo.releaseVersion()).startsWith(expectedReleaseVersionSimple);
+                      String releaseVersion = cassandraTestContext.cluster.getFirstRunningInstance()
+                                                                          .getReleaseVersionString();
+                      assertThat(gossipInfo.releaseVersion()).startsWith(releaseVersion);
                       context.completeNow();
                   }));
         });
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
index dc4d7aa..01c5cc5 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/RingHandlerIntegrationTest.java
@@ -29,6 +29,8 @@
 import io.vertx.ext.web.client.predicate.ResponsePredicate;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.sidecar.IntegrationTestBase;
 import org.apache.cassandra.sidecar.common.data.RingEntry;
 import org.apache.cassandra.sidecar.common.data.RingResponse;
@@ -43,15 +45,17 @@
 @ExtendWith(VertxExtension.class)
 class RingHandlerIntegrationTest extends IntegrationTestBase
 {
+
     @CassandraIntegrationTest
-    void retrieveRingWithoutKeyspace(VertxTestContext context) throws Exception
+    void retrieveRingWithoutKeyspace(VertxTestContext context, CassandraTestContext cassandraTestContext)
+    throws Exception
     {
         String testRoute = "/api/v1/cassandra/ring";
         testWithClient(context, client -> {
-            client.get(config.getPort(), "localhost", testRoute)
+            client.get(config.getPort(), "127.0.0.1", testRoute)
                   .expect(ResponsePredicate.SC_OK)
                   .send(context.succeeding(response -> {
-                      assertRingResponseOK(response);
+                      assertRingResponseOK(response, cassandraTestContext);
                       context.completeNow();
                   }));
         });
@@ -77,7 +81,7 @@
     {
         createTestKeyspace(cassandraTestContext);
         retrieveRingWithKeyspace(context, TEST_KEYSPACE, response -> {
-            assertRingResponseOK(response);
+            assertRingResponseOK(response, cassandraTestContext);
             context.completeNow();
         });
     }
@@ -87,29 +91,30 @@
     {
         String testRoute = "/api/v1/cassandra/ring/keyspaces/" + keyspace;
         testWithClient(context, client -> {
-            client.get(config.getPort(), "localhost", testRoute)
+            client.get(config.getPort(), "127.0.0.1", testRoute)
                   .send(context.succeeding(verifier));
         });
     }
 
-    void assertRingResponseOK(HttpResponse<Buffer> response)
+    void assertRingResponseOK(HttpResponse<Buffer> response, CassandraTestContext cassandraTestContext)
     {
+        IInstance instance = cassandraTestContext.cluster.getFirstRunningInstance();
+        IInstanceConfig config = instance.config();
         RingResponse ringResponse = response.bodyAsJson(RingResponse.class);
         assertThat(ringResponse).isNotNull()
                                 .hasSize(1);
         RingEntry entry = ringResponse.poll();
         assertThat(entry).isNotNull();
-        assertThat(entry.datacenter()).isEqualTo("datacenter1");
-        assertThat(entry.address()).isNotEmpty();
-        assertThat(entry.port()).isEqualTo(7000);
+        assertThat(entry.datacenter()).isEqualTo(config.localDatacenter());
+        assertThat(entry.address()).isEqualTo(config.broadcastAddress().getAddress().getHostAddress());
+        assertThat(entry.port()).isEqualTo(config.broadcastAddress().getPort());
         assertThat(entry.status()).isEqualTo("Up");
         assertThat(entry.state()).isEqualTo("Normal");
         assertThat(entry.load()).isNotEmpty();
         // there is just 1 node, so own 100%; the format should be right, i.e. "##0.00%"
         assertThat(entry.owns()).isEqualTo("100.00%");
-        assertThat(entry.token()).isEqualTo("0");
+        assertThat(entry.token()).isEqualTo(config.getString("initial_token"));
         assertThat(entry.fqdn()).isNotEmpty();
         assertThat(entry.hostId()).isNotEmpty();
     }
-
 }
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
index ac36c3b..46ad70f 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/SnapshotsHandlerIntegrationTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.cassandra.sidecar.routes;
 
+import java.nio.file.Path;
+import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -28,7 +31,6 @@
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
 import org.apache.cassandra.sidecar.IntegrationTestBase;
-import org.apache.cassandra.sidecar.common.containers.ExtendedCassandraContainer;
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.common.testing.CassandraIntegrationTest;
 import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
@@ -44,7 +46,7 @@
     {
         WebClient client = WebClient.create(vertx);
         String testRoute = "/api/v1/keyspaces/non-existent/tables/testtable/snapshots/my-snapshot";
-        client.put(config.getPort(), "localhost", testRoute)
+        client.put(config.getPort(), "127.0.0.1", testRoute)
               .expect(ResponsePredicate.SC_NOT_FOUND)
               .send(context.succeedingThenComplete());
         // wait until the test completes
@@ -60,7 +62,7 @@
 
         WebClient client = WebClient.create(vertx);
         String testRoute = "/api/v1/keyspaces/testkeyspace/tables/non-existent/snapshots/my-snapshot";
-        client.put(config.getPort(), "localhost", testRoute)
+        client.put(config.getPort(), "127.0.0.1", testRoute)
               .expect(ResponsePredicate.SC_NOT_FOUND)
               .send(context.succeedingThenComplete());
         // wait until the test completes
@@ -78,16 +80,18 @@
         WebClient client = WebClient.create(vertx);
         String testRoute = String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/my-snapshot",
                                          TEST_KEYSPACE, table);
-        client.put(config.getPort(), "localhost", testRoute)
+        client.put(config.getPort(), "127.0.0.1", testRoute)
               .expect(ResponsePredicate.SC_OK)
-              .send(context.succeeding(response -> context.verify(() -> {
-                  assertThat(response.statusCode()).isEqualTo(OK.code());
+              .send(context.succeeding(response ->
+                                       context.verify(() -> {
+                                           assertThat(response.statusCode()).isEqualTo(OK.code());
 
-                  // creating the snapshot with the same name will return a 409 (Conflict) status code
-                  client.put(config.getPort(), "localhost", testRoute)
-                        .expect(ResponsePredicate.SC_CONFLICT)
-                        .send(context.succeedingThenComplete());
-              })));
+                                           // creating the snapshot with the same name will return
+                                           // a 409 (Conflict) status code
+                                           client.put(config.getPort(), "127.0.0.1", testRoute)
+                                                 .expect(ResponsePredicate.SC_CONFLICT)
+                                                 .send(context.succeedingThenComplete());
+                                       })));
         // wait until the test completes
         assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
     }
@@ -102,25 +106,20 @@
         WebClient client = WebClient.create(vertx);
         String testRoute = String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/my-snapshot",
                                          TEST_KEYSPACE, table);
-        client.put(config.getPort(), "localhost", testRoute)
+        client.put(config.getPort(), "127.0.0.1", testRoute)
               .expect(ResponsePredicate.SC_OK)
               .send(context.succeeding(response -> context.verify(() -> {
                   assertThat(response.statusCode()).isEqualTo(OK.code());
 
                   // validate that the snapshot is created
-                  ExtendedCassandraContainer container = cassandraTestContext.container;
-                  final String directory = container.execInContainer("find",
-                                                                     "/opt/cassandra/data/",
-                                                                     "-name",
-                                                                     "my-snapshot").getStdout().trim();
-                  assertThat(directory).isNotEmpty();
-                  assertThat(directory).contains("testkeyspace");
-                  assertThat(directory).contains("testtable");
-                  assertThat(directory).contains("my-snapshot");
-                  final String lsSnapshot = container.execInContainer("ls", directory).getStdout();
-                  assertThat(lsSnapshot).contains("manifest.json");
-                  assertThat(lsSnapshot).contains("schema.cql");
-                  assertThat(lsSnapshot).contains("-big-Data.db");
+                  final List<Path> found = findChildFile(cassandraTestContext, "127.0.0.1",
+                                                         "my-snapshot");
+                  assertThat(found).isNotEmpty();
+
+                  assertThat(found.stream().anyMatch(p -> p.endsWith("manifest.json")));
+                  assertThat(found.stream().anyMatch(p -> p.endsWith("schema.cql")));
+                  assertThat(found.stream().anyMatch(p -> p.endsWith("-big-Data.db")));
+
 
                   context.completeNow();
               })));
@@ -160,7 +159,8 @@
         assertNotFoundOnDeleteSnapshot(context, testRoute);
     }
 
-    @CassandraIntegrationTest
+    @CassandraIntegrationTest(numDataDirsPerInstance = 1)
+        // Set to > 1 to fail test
     void testDeleteSnapshotEndpoint(VertxTestContext context, CassandraTestContext cassandraTestContext)
     throws InterruptedException
     {
@@ -168,34 +168,38 @@
         String table = createTestTableAndPopulate(cassandraTestContext);
 
         WebClient client = WebClient.create(vertx);
-        String testRoute = String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/my-snapshot",
-                                         TEST_KEYSPACE, table);
+        String snapshotName = "my-snapshot" + UUID.randomUUID();
+        String testRoute = String.format("/api/v1/keyspaces/%s/tables/%s/snapshots/%s",
+                                         TEST_KEYSPACE, table, snapshotName);
 
         // first create the snapshot
-        client.put(config.getPort(), "localhost", testRoute)
-              .send(context.succeeding(createResponse -> context.verify(() -> {
+        client.put(config.getPort(), "127.0.0.1", testRoute)
+              .expect(ResponsePredicate.SC_OK)
+              .send(context.succeeding(
+              createResponse ->
+              context.verify(() -> {
                   assertThat(createResponse.statusCode()).isEqualTo(OK.code());
-                  ExtendedCassandraContainer container = cassandraTestContext.container;
-                  String directory = container.execInContainer("find",
-                                                               "/opt/cassandra/data/",
-                                                               "-name",
-                                                               "my-snapshot").getStdout().trim();
-                  // snapshot directory exists inside container
-                  assertThat(directory).isNotBlank();
+                  final List<Path> found =
+                  findChildFile(cassandraTestContext, "127.0.0.1", snapshotName);
+                  assertThat(found).isNotEmpty();
+
+                  // snapshot directory exists inside data directory
+                  assertThat(found).isNotEmpty();
 
                   // then delete the snapshot
-                  client.delete(config.getPort(), "localhost", testRoute)
-                        .send(context.succeeding(deleteResponse -> context.verify(() -> {
-                            assertThat(deleteResponse.statusCode()).isEqualTo(OK.code());
-                            // validate that the snapshot is removed
-                            String removedDir = container.execInContainer("find",
-                                                                          "/opt/cassandra/data/",
-                                                                          "-name",
-                                                                          "my-snapshot").getStdout().trim();
-                            assertThat(removedDir).isEmpty();
-
-                            context.completeNow();
-                        })));
+                  client.delete(config.getPort(), "127.0.0.1", testRoute)
+                        .expect(ResponsePredicate.SC_OK)
+                        .send(context.succeeding(
+                        deleteResponse ->
+                        context.verify(() ->
+                                       {
+                                           assertThat(deleteResponse.statusCode()).isEqualTo(OK.code());
+                                           final List<Path> found2 =
+                                           findChildFile(cassandraTestContext,
+                                                         "127.0.0.1", snapshotName);
+                                           assertThat(found2).isEmpty();
+                                           context.completeNow();
+                                       })));
               })));
         // wait until the test completes
         assertThat(context.awaitCompletion(30, TimeUnit.SECONDS)).isTrue();
@@ -216,7 +220,7 @@
     private void assertNotFoundOnDeleteSnapshot(VertxTestContext context, String testRoute) throws InterruptedException
     {
         WebClient client = WebClient.create(vertx);
-        client.delete(config.getPort(), "localhost", testRoute)
+        client.delete(config.getPort(), "127.0.0.1", testRoute)
               .expect(ResponsePredicate.SC_NOT_FOUND)
               .send(context.succeedingThenComplete());
         // wait until test completes
diff --git a/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java b/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
index 468d727..06b0b3e 100644
--- a/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
+++ b/src/test/integration/org/apache/cassandra/sidecar/routes/sstableuploads/SSTableImportHandlerIntegrationTest.java
@@ -21,6 +21,8 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
@@ -42,15 +44,15 @@
 import io.vertx.ext.web.client.WebClient;
 import io.vertx.junit5.VertxExtension;
 import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.distributed.UpgradeableCluster;
 import org.apache.cassandra.sidecar.IntegrationTestBase;
-import org.apache.cassandra.sidecar.common.containers.ExtendedCassandraContainer;
+import org.apache.cassandra.sidecar.common.SimpleCassandraVersion;
 import org.apache.cassandra.sidecar.common.data.QualifiedTableName;
 import org.apache.cassandra.sidecar.common.testing.CassandraIntegrationTest;
 import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
-import org.testcontainers.containers.Container;
 
-import static org.apache.cassandra.sidecar.common.testing.CassandraTestTemplate.CONTAINER_CASSANDRA_DATA_PATH;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /**
  * Integration tests for {@link SSTableImportHandler}
@@ -58,10 +60,18 @@
 @ExtendWith(VertxExtension.class)
 public class SSTableImportHandlerIntegrationTest extends IntegrationTestBase
 {
+    public static final SimpleCassandraVersion MIN_VERSION_WITH_IMPORT = SimpleCassandraVersion.create("4.0.0");
+
     @CassandraIntegrationTest
     void testSSTableImport(VertxTestContext context, CassandraTestContext cassandraTestContext)
     throws IOException, InterruptedException
     {
+        // Cassandra before 4.0 does not have the necessary JMX endpoints,
+        // so we skip if the cluster version is below 4.0
+        assumeThat(cassandraTestContext.version)
+        .withFailMessage("Import is only available in Cassandra 4.0 and later.")
+        .isGreaterThanOrEqualTo(MIN_VERSION_WITH_IMPORT);
+
         // create a table. Insert some data, create a snapshot that we'll use for import.
         // Truncate the table, insert more data.
         // Test the import SSTable endpoint by importing data that was originally truncated.
@@ -72,46 +82,43 @@
         QualifiedTableName tableName = createTestTableAndPopulate(cassandraTestContext, Arrays.asList("a", "b"));
 
         // create a snapshot called <tableName>-snapshot for tbl1
-        ExtendedCassandraContainer container = cassandraTestContext.container;
-        final String snapshotStdout =
-        container.execInContainer("nodetool", "snapshot",
-                                  "--tag", tableName.tableName() + "-snapshot",
-                                  "--table", tableName.tableName(),
-                                  "--", tableName.keyspace()).getStdout();
+        UpgradeableCluster cluster = cassandraTestContext.cluster;
+        final String snapshotStdout = cluster.get(1).nodetoolResult("snapshot",
+                                                                    "--tag", tableName.tableName() + "-snapshot",
+                                                                    "--table", tableName.tableName(),
+                                                                    "--", tableName.keyspace()).getStdout();
         assertThat(snapshotStdout).contains("Snapshot directory: " + tableName.tableName() + "-snapshot");
         // find the directory in the filesystem
-        final String directory = container.execInContainer("find",
-                                                           CONTAINER_CASSANDRA_DATA_PATH,
-                                                           "-name",
-                                                           tableName.tableName() + "-snapshot").getStdout().trim();
-        assertThat(directory).isNotEmpty();
+        final List<Path> snapshotFiles = findChildFile(cassandraTestContext,
+                                                       "127.0.0.1", tableName.tableName() + "-snapshot");
+
+        assertThat(snapshotFiles).isNotEmpty();
 
         // copy the snapshot to the expected staging directory
         final UUID uploadId = UUID.randomUUID();
-        final String stagingPathInContainer = cassandraTestContext.dataDirectoryPath.toFile().getAbsolutePath()
-                                              + File.separator + "staging" + File.separator + uploadId
+        // for the test, we need to have the same directory structure for the local data directory replicated
+        // inside the cluster. The way the endpoint works is by verifying that the directory exists, this
+        // verification happens in the host system. When calling import we use the same directory, but the
+        // directory does not exist inside the cluster. For that reason we need to do the following to
+        // ensure "import" finds the path inside the cluster
+        String uploadStagingDir = cassandraTestContext.getInstancesConfig()
+                                                      .instanceFromHost("127.0.0.1").stagingDir();
+        final String stagingPathInContainer = uploadStagingDir + File.separator + uploadId
                                               + File.separator + tableName.keyspace()
                                               + File.separator + tableName.tableName();
-        // for the test, we need to have the same directory structure for the local data directory replicated
-        // inside the container. The way the endpoint works is by verifying that the directory exists, this
-        // verification happens in the host system. When calling import we use the same directory, but the
-        // directory does not exist inside the container. For that reason we need to do the following to
-        // ensure "import" finds the path inside the container
-        container.execInContainer("mkdir", "-p", stagingPathInContainer);
-        // write/execute permission required for Cassandra import
-        container.execInContainer("chmod", "777", stagingPathInContainer);
-        // copy snapshot files into the staging path in the container
-        // we use sh -c because wildcards are not supported
-        Container.ExecResult cpCmd = container
-                                     .execInContainer("sh", "-c",
-                                                      String.format("cp %s/* %s/", directory, stagingPathInContainer));
-        assertThat(cpCmd.getExitCode()).isEqualTo(0);
-        // create the directory in the host for validation
-        Files.createDirectories(
-        cassandraTestContext.dataDirectoryPath.resolve("staging")
-                                              .resolve(uploadId.toString())
-                                              .resolve(tableName.keyspace())
-                                              .resolve(tableName.tableName()));
+        boolean mkdirs = new File(stagingPathInContainer).mkdirs();
+        assertThat(mkdirs)
+        .withFailMessage("Could not create directory " + uploadStagingDir)
+        .isTrue();
+
+        // copy snapshot files into the staging path in the cluster
+        for (Path path : snapshotFiles)
+        {
+            if (path.toFile().isFile())
+            {
+                Files.copy(path, Paths.get(stagingPathInContainer).resolve(path.getFileName()));
+            }
+        }
 
         // Now truncate the contents of the table
         truncateAndVerify(cassandraTestContext, tableName);
@@ -123,7 +130,7 @@
         String testRoute = "/api/v1/uploads/" + uploadId + "/keyspaces/" + tableName.keyspace()
                            + "/tables/" + tableName.tableName() + "/import";
         sendRequest(context,
-                    () -> client.put(config.getPort(), "localhost", testRoute),
+                    () -> client.put(config.getPort(), "127.0.0.1", testRoute),
                     context.succeeding(response -> context.verify(() -> {
                         assertThat(response.statusCode()).isEqualTo(HttpResponseStatus.OK.code());
                         assertThat(queryValues(cassandraTestContext, tableName))
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/GossipInfoHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/GossipInfoHandlerTest.java
index cb55b57..f2dfdbe 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/GossipInfoHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/GossipInfoHandlerTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.cassandra.sidecar.routes;
 
-import java.io.IOException;
 import java.util.Collections;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -126,7 +125,7 @@
     {
         @Provides
         @Singleton
-        public InstancesConfig instanceConfig() throws IOException
+        public InstancesConfig instanceConfig()
         {
             final int instanceId = 100;
             final String host = "127.0.0.1";
diff --git a/src/test/java/org/apache/cassandra/sidecar/routes/RingHandlerTest.java b/src/test/java/org/apache/cassandra/sidecar/routes/RingHandlerTest.java
index 7469414..d0467dd 100644
--- a/src/test/java/org/apache/cassandra/sidecar/routes/RingHandlerTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/routes/RingHandlerTest.java
@@ -187,9 +187,9 @@
         @Singleton
         public InstancesConfig instanceConfig() throws IOException
         {
-            final int instanceId = 100;
-            final String host = "127.0.0.1";
-            final InstanceMetadata instanceMetadata = mock(InstanceMetadata.class);
+            int instanceId = 100;
+            String host = "127.0.0.1";
+            InstanceMetadata instanceMetadata = mock(InstanceMetadata.class);
             when(instanceMetadata.host()).thenReturn(host);
             when(instanceMetadata.port()).thenReturn(9042);
             when(instanceMetadata.id()).thenReturn(instanceId);
diff --git a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
index 15ffe5b..eb716a6 100644
--- a/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/utils/SSTableImporterTest.java
@@ -67,7 +67,7 @@
         mockTableOperations1 = mock(TableOperations.class);
         TableOperations mockTableOperations2 = mock(TableOperations.class);
         when(mockConfiguration.getSSTableImportPollIntervalMillis()).thenReturn(10);
-        when(mockMetadataFetcher.delegate("127.0.0.1")).thenReturn(mockCassandraAdapterDelegate1);
+        when(mockMetadataFetcher.delegate("localhost")).thenReturn(mockCassandraAdapterDelegate1);
         when(mockMetadataFetcher.delegate("127.0.0.2")).thenReturn(mockCassandraAdapterDelegate2);
         when(mockMetadataFetcher.delegate("127.0.0.3")).thenReturn(mockCassandraAdapterDelegate3);
         when(mockCassandraAdapterDelegate1.tableOperations()).thenReturn(mockTableOperations1);
@@ -103,7 +103,7 @@
     void testImportSucceeds(VertxTestContext context)
     {
         Future<Void> importFuture = importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
-                                                            .host("127.0.0.1")
+                                                            .host("localhost")
                                                             .keyspace("ks")
                                                             .tableName("tbl")
                                                             .directory("/dir")
@@ -111,7 +111,7 @@
                                                             .build());
         importFuture.onComplete(context.succeeding(v -> {
             assertThat(importer.importQueuePerHost).isNotEmpty();
-            assertThat(importer.importQueuePerHost).containsKey("127.0.0.1$ks$tbl");
+            assertThat(importer.importQueuePerHost).containsKey("localhost$ks$tbl");
             for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
             {
                 assertThat(queue).isEmpty();
@@ -152,7 +152,7 @@
     void testImportFailsWhenImportReturnsFailedDirectories(VertxTestContext context)
     {
         Future<Void> importFuture = importer.scheduleImport(new SSTableImporter.ImportOptions.Builder()
-                                                            .host("127.0.0.1")
+                                                            .host("localhost")
                                                             .keyspace("ks")
                                                             .tableName("tbl")
                                                             .directory("/failed-dir")
@@ -166,7 +166,7 @@
             assertThat(exception.getPayload()).isEqualTo("Failed to import from directories: [/failed-dir]");
 
             assertThat(importer.importQueuePerHost).isNotEmpty();
-            assertThat(importer.importQueuePerHost).containsKey("127.0.0.1$ks$tbl");
+            assertThat(importer.importQueuePerHost).containsKey("localhost$ks$tbl");
             for (SSTableImporter.ImportQueue queue : importer.importQueuePerHost.values())
             {
                 assertThat(queue).isEmpty();
@@ -208,7 +208,7 @@
         SSTableImporter importer = new SSTableImporter(vertx, mockMetadataFetcher, mockConfiguration, executorPools,
                                                        mockUploadPathBuilder);
         SSTableImporter.ImportOptions options = new SSTableImporter.ImportOptions.Builder()
-                                                .host("127.0.0.1")
+                                                .host("localhost")
                                                 .keyspace("ks")
                                                 .tableName("tbl")
                                                 .directory("/dir")
@@ -223,7 +223,7 @@
     void testCancelImportNoOpAfterProcessing(VertxTestContext context)
     {
         SSTableImporter.ImportOptions options = new SSTableImporter.ImportOptions.Builder()
-                                                .host("127.0.0.1")
+                                                .host("localhost")
                                                 .keyspace("ks")
                                                 .tableName("tbl")
                                                 .directory("/dir")