Support for multiple Cassandra versions

This patch lays the groundwork to support multiple Cassandra versions.
New submodules were created for common libraries as well as specific
Cassandra versions.  Several dependencies are moved to the common
submodule due to their universal nature.

Most importantly, this patch introduces Junit Test Template for testing multiple
Cassandra versions.  Test should be annotated with a
@CassandraIntegrationTest.  These tests require Docker and Kubernetes be available.
A single version is currently supported, 4.0.  A dockerfile sets up the image
from the beta tarball and initializes the session.

Cassandra driver was upgraded to version 3.9, previous versions errored
with Java 11.

CircleCI tests are run using microk8s.

Patch by Jon Haddad; Reviewed by Dinesh Joshi for CASSANDRASC-23
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 893c9bc..51a8555 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -8,7 +8,7 @@
 aliases:
   base_job: &base_job
     machine:
-      image: ubuntu-1604:201903-01
+      image: ubuntu-1604:202007-01
     working_directory: ~/repo
     environment:
       TERM: dumb
@@ -26,13 +26,17 @@
     parameters:
       version:
         type: string
-
     steps:
       - run: wget -qO - https://adoptopenjdk.jfrog.io/adoptopenjdk/api/gpg/key/public | sudo apt-key add -
       - run: sudo add-apt-repository --yes https://adoptopenjdk.jfrog.io/adoptopenjdk/deb/
       - run: sudo apt-get update
       - run: sudo apt-get install -y << parameters.version>>
 
+  install_kube:
+    description: "Installs kubernetes"
+    steps:
+      - run: .circleci/setup-microk8.sh
+
   install_common:
     description: "Installs common software and certificates"
     steps:
@@ -46,7 +50,8 @@
     steps:
      - checkout
 
-     - run: ./gradlew -i clean build --stacktrace
+     # setting the docker registry here skips the internal minikube setup
+     - run: SIDECAR_DOCKER_REGISTRY="127.0.0.1" ./gradlew -i clean test  --stacktrace
 
      - store_artifacts:
          path: build/reports
@@ -58,17 +63,20 @@
   java8:
     <<: *base_job
 
+    # todo move to common command for integration tests
+    environment:
+      SIDECAR_DOCKER_REGISTRY: "http://localhost:32000"
+
     steps:
       - checkout
       - install_common
+      - install_kube
       
       - install_java:
           version: adoptopenjdk-8-hotspot
-
       - run: sudo update-java-alternatives -s adoptopenjdk-8-hotspot-amd64 && java -version
-
-      # make sure it builds with build steps like swagger docs and dist
-      - run: ./gradlew -i clean build --stacktrace
+      - run: ./gradlew :containers:pushAll
+      - run: ./gradlew -i test integrationTest --stacktrace
 
       - store_artifacts:
           path: build/reports
@@ -77,13 +85,16 @@
       - store_test_results:
           path: ~/repo/build/test-results/
 
+      - store_test_results:
+          path: ~/repo/cassandra-integration-tests/build/test-results/
+
   java11_docker:
     docker:
       - image: circleci/openjdk:11-jdk-stretch
     steps:
       - checkout
 
-      - run: ./gradlew -i clean build --stacktrace
+      - run: SIDECAR_DOCKER_REGISTRY="127.0.0.1" ./gradlew -i clean test --stacktrace
 
       - store_artifacts:
           path: build/reports
@@ -94,16 +105,20 @@
 
   java11:
     <<: *base_job
+    environment:
+      SIDECAR_DOCKER_REGISTRY: "http://localhost:32000"
+
     steps:
       - checkout
       - install_common
+      - install_kube
 
       - install_java:
           version: adoptopenjdk-11-hotspot
-
       - run: sudo update-java-alternatives -s adoptopenjdk-11-hotspot-amd64 && java -version
-
-      - run: ./gradlew -i clean build --stacktrace
+      - run: export
+      - run: ./gradlew :containers:pushAll
+      - run: ./gradlew -i test integrationTest --stacktrace
 
       - store_artifacts:
           path: build/reports
@@ -143,17 +158,21 @@
       - image: circleci/openjdk:11-jdk-stretch
     steps:
       - checkout
-      - run: ./gradlew -i clean build --stacktrace
-      - run: test -f build/docs/html5/user.html
+      - run: ./gradlew docs:asciidoctor
+      - run: test -f docs/build/html5/user.html
 
 workflows:
   version: 2
   build-and-test:
     jobs:
-      - java8
       - java8_docker
-      - java11
       - java11_docker
+      - java8:
+          requires:
+            - java8_docker
+      - java11:
+          requires:
+            - java11_docker
       - docs_build:
           requires:
             - java8
diff --git a/.circleci/setup-microk8.sh b/.circleci/setup-microk8.sh
new file mode 100755
index 0000000..d8abafa
--- /dev/null
+++ b/.circleci/setup-microk8.sh
@@ -0,0 +1,23 @@
+#!/bin/bash
+
+set -v
+
+sudo snap install microk8s --classic
+
+sudo microk8s status --wait-ready
+sudo microk8s enable registry
+sudo microk8s status --wait-ready
+sudo microk8s status -a registry -wait-ready
+
+curl http://localhost:32000/v2/_catalog
+
+sudo mkdir -p ~/.kube
+sudo chown -R circleci ~/.kube
+
+sudo microk8s config > ~/.kube/config
+sudo chown -R circleci ~/.kube
+
+cat ~/.kube/config
+
+sudo microk8s kubectl get all
+sudo iptables -P FORWARD ACCEPT
\ No newline at end of file
diff --git a/CHANGES.txt b/CHANGES.txt
index 899e4f2..114940e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 1.0.0
 -----
+ * Support multiple Cassandra versions with integration tests using Kubernetes (CASSANDRA-23)
  * RESTEasy integration with dynamically generated Swagger OpenAPI, Swagger UI and JAX-RS (CASSANDRASC-22)
  * Set up asciidoc based documentation (CASSANDRASC-15)
  * Gradle can now generate Deb packages, RPMs and Docker containers (CASSANDRASC-14)
diff --git a/README.md b/README.md
index aec6c96..17187a6 100644
--- a/README.md
+++ b/README.md
@@ -9,17 +9,58 @@
 ------------
   1. Java >= 1.8 (OpenJDK or Oracle), or Java 11
   2. Apache Cassandra 4.0.  We depend on virtual tables which is a 4.0 only feature.
+  3. A Kubernetes cluster for running integration tests.  [MiniKube](https://kubernetes.io/docs/tutorials/hello-minikube/) can be used to run Kubernetes locally.
 
-Getting started
----------------
+Getting started: Running The Sidecar
+--------------------------------------
 
 After you clone the git repo, you can use the gradle wrapper to build and run the project. Make sure you have 
 Apache Cassandra running on the host & port specified in `conf/sidecar.yaml`.
 
     $ ./gradlew run
   
+Testing
+---------
+
+We rely on Kubernetes for creating docker containers for integration tests.
+
+The easiest way to get started locally is by installing [Minikube](https://kubernetes.io/docs/tasks/tools/install-minikube/). 
+
+Start minikube with a command similar to the following.  Use a netmask appropriate for your local network, and allow minikube to use as much RAM as you can afford to:
+
+    minikube start --insecure-registry "192.168.0.0/16" --addons registry --memory 8G --cpus=4
+    
+This will create a MiniKub cluster using the default driver.  On OSX, this is hyperkit.  
+
+Enabling the tunnel is required in certain environments for tests to connect to the instances.
+
+In a separate tab (or background process) run the following:
+
+    minikube tunnel
+
+Check the dashboard to ensure your installation is working as expected:
+    
+    minikube dashboard
+
+Set the environment property for the Minikube container (we recommend you do this as part of your system profile):
+
+You can use an existing Kubernetes environment by setting the appropriate project properties either through environment variables
+
+    export SIDECAR_DOCKER_REGISTRY="http://$(minikube ip):5000"
+
+Gradle will register the required test containers with the local docker registry.    You can enable this after setting up Minikube by doing the following:
+
+*Note*: If using MiniKube, the Docker daemon will need to be configured to push to your Minikube repo insecurely.  
+This should be added to the `daemon.json` config, usually found in /etc/docker, or in the Docker Engine section of the docker preferences:
+
+      "insecure-registries": [
+        "192.168.64.14:5000"
+      ]
+    
 You can use `build`, `test` to build & test the project.
 
+Please see the developer documentation in docs/src/development.adoc for more information.
+
 CircleCI Testing
 -----------------
 
diff --git a/build.gradle b/build.gradle
index 6d13ace..8bfc590 100644
--- a/build.gradle
+++ b/build.gradle
@@ -10,10 +10,9 @@
 plugins {
     id 'java'
     id 'application'
-    id 'idea'
-    id 'checkstyle'
-    id 'jacoco'
-    id "com.github.spotbugs" version "3.0.0"
+
+    // since we're using a specific version here, we delay applying the plugin till the all projects
+    id "com.github.spotbugs" version "3.0.0" apply false
     id 'org.hidetake.swagger.generator' version '2.16.0'
     id "io.swagger.core.v3.swagger-gradle-plugin" version "2.1.2"
 
@@ -23,15 +22,83 @@
     id 'com.google.cloud.tools.jib' version '2.2.0'
 }
 
+
+
+allprojects {
+    apply plugin: 'idea'
+    apply plugin: 'jacoco'
+    apply plugin: 'checkstyle'
+    apply plugin: "com.github.spotbugs"
+
+    tasks.register("configureKubernetes") {
+        // figure out the docker registry, make this as easy as possible for new folks
+        def dockerRegistryString = System.getenv("SIDECAR_DOCKER_REGISTRY")
+        if(dockerRegistryString != null) {
+            logger.info("Using ENV SIDECAR_DOCKER_REGISTRY ${dockerRegistryString}")
+        }
+        else {
+            // SIDECAR_DOCKER_REGISTRY not set (likely a local environment), let's try minikube
+            logger.info("Looking for minikube ip")
+            new ByteArrayOutputStream().withStream { os ->
+                try {
+
+                def result = exec {
+                    executable = "minikube"
+                    args = ["ip"]
+                    standardOutput = os
+                }
+                def output = os.toString()
+                if(output != "") {
+                    dockerRegistryString = "http://${output.trim()}:5000" // minikube uses port 5000
+                    logger.info("Env variable SIDECAR_DOCKER_REGISTRY not defined, using output of minikube ip ${dockerRegistryString}")
+                } else {
+                    logger.warn("SIDECAR_DOCKER_REGISTRY not set and minikube ip failed")
+                }
+                if(result.exitValue != 0) {
+                    logger.error("Could not find minikube ip: exit code ${result.exitValue}")
+                }
+                }
+                catch (GradleException e) {
+                    // microk8s use this
+                    dockerRegistryString = "http://localhost:5000"
+                    logger.error("Could not auto configure docker registry, please set your SIDECAR_DOCKER_REGISTRY environment variable.  Using $dockerRegistryString")
+                }
+            }
+        }
+
+        ext.dockerTag = "latest"
+        ext.dockerGroup = System.getenv("SIDECAR_DOCKER_GROUP") ?: "cassandra_sidecar"
+        ext.kubernetesNamespace = System.getenv("SIDECAR_KUBERNETES_NAMESPACE") ?: "default"
+        ext.dockerRegistry = new URI(dockerRegistryString)
+        ext.dockerRegistryWithoutProtocol = dockerRegistry.getHost() + ":" + dockerRegistry.getPort()
+    }
+
+    repositories {
+        mavenCentral()
+        jcenter()
+    }
+
+    checkstyle {
+        toolVersion '7.8.1'
+        configFile file("${project.rootDir}/checkstyle.xml")
+    }
+    spotbugs {
+        toolVersion = '4.0.0'
+        excludeFilter = file("${project.rootDir}/spotbugs-exclude.xml")
+    }
+
+    tasks.withType(com.github.spotbugs.SpotBugsTask) {
+        reports.xml.enabled = false
+        reports.html.enabled = true
+    }
+
+}
+
 group 'org.apache.cassandra'
 version project.version
 
 sourceCompatibility = 1.8
 
-repositories {
-    mavenCentral()
-}
-
 // Take the application out once we're running via Cassandra
 mainClassName = "org.apache.cassandra.sidecar.CassandraSidecarDaemon"
 applicationName = 'cassandra-sidecar'
@@ -93,9 +160,6 @@
     compile 'ch.qos.logback:logback-core:1.2.3'
     compile 'ch.qos.logback:logback-classic:1.2.3'
 
-    compile 'com.datastax.cassandra:cassandra-driver-core:3.6+'
-    compile group: 'com.google.inject', name: 'guice', version: '4.2.2'
-
     compile group: 'org.apache.commons', name: 'commons-configuration2', version: '2.7'
     compile 'org.webjars:swagger-ui:3.10.0'
 
@@ -104,13 +168,17 @@
 
     jolokia 'org.jolokia:jolokia-jvm:1.6.0:agent'
 
-    testCompile group: 'org.cassandraunit', name: 'cassandra-unit-shaded', version: '3.3.0.2'
-    testCompile 'com.datastax.cassandra:cassandra-driver-core:3.6.+:tests'
-    testCompile 'org.apache.commons:commons-exec:1.3+'
+    testCompile "org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
+    testCompile "org.junit.jupiter:junit-jupiter-params:${project.junitVersion}"
+    testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}"
+
+    testCompile group: 'org.cassandraunit', name: 'cassandra-unit-shaded', version: '3.11.2.0'
+    testCompile 'com.datastax.cassandra:cassandra-driver-core:3.9.0:tests'
     testCompile group: 'org.mockito', name: 'mockito-all', version: '1.10.19'
     testCompile group: 'io.vertx', name: 'vertx-junit5', version: '3.8.5'
 
-    integrationTestCompile group: 'com.datastax.oss.simulacron', name: 'simulacron-driver-3x', version: '0.8.10'
+    compile project(":common")
+    compile project(":cassandra40")
 }
 
 task copyCodeStyle(type: Copy) {
@@ -141,20 +209,22 @@
 }
 
 test {
+    // ordinarily we don't need integration tests
+    // see the integrationTest task
     useJUnitPlatform()
-    systemProperty "javax.net.ssl.trustStore", "$projectDir/src/test/resources/certs/ca.p12"
-    systemProperty "javax.net.ssl.trustStorePassword", "password"
     reports {
         junitXml.enabled = true
         html.enabled = true
     }
 }
 
-task integrationTest(type: Test) {
+tasks.register("integrationTest", Test) {
     jacoco {
         enabled = false
     }
-    useJUnitPlatform()
+    useJUnitPlatform() {
+        includeTags "integrationTest"
+    }
     testClassesDirs = sourceSets.integrationTest.output.classesDirs
     classpath = sourceSets.integrationTest.runtimeClasspath
     shouldRunAfter test
@@ -169,21 +239,6 @@
     exclude "tmp"
 }
 
-checkstyle {
-    toolVersion '7.8.1'
-    configFile file("checkstyle.xml")
-}
-
-spotbugs {
-    toolVersion = '4.0.0'
-    excludeFilter = file("src/main/resources/spotbugs-exclude.xml")
-}
-
-tasks.withType(com.github.spotbugs.SpotBugsTask) {
-    reports.xml.enabled = false
-    reports.html.enabled = true
-}
-
 /**
  * General configuration for linux packages.
  * Can be overridden in the buildRpm and buildDeb configuration
@@ -227,6 +282,8 @@
     outputDir = file('build/generated/swagger')
 }
 
+
+
 // copyDist gets called on every build
 copyDist.dependsOn installDist, copyJolokia
 check.dependsOn checkstyleMain, checkstyleTest, integrationTest, jacocoTestReport
diff --git a/cassandra-integration-tests/build.gradle b/cassandra-integration-tests/build.gradle
new file mode 100644
index 0000000..2c3d88f
--- /dev/null
+++ b/cassandra-integration-tests/build.gradle
@@ -0,0 +1,44 @@
+plugins {
+    id 'java'
+}
+
+repositories {
+    mavenCentral()
+    jcenter()
+}
+
+dependencies {
+    testCompile project(":cassandra40")
+
+    testCompile "io.kubernetes:client-java:${project.kubernetesClientVersion}"
+    testCompile "io.kubernetes:client-java-extended:${project.kubernetesClientVersion}"
+
+    testCompile "org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
+    testCompile "org.junit.jupiter:junit-jupiter-params:${project.junitVersion}"
+    testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}"
+
+    testImplementation("org.assertj:assertj-core:3.16.0")
+}
+
+// we don't want to run the integration tests by default - they're heavy weight and won't run correctly
+// if kubernetes isn't present
+// we don't want the lack of local k8 to impede development
+// for the most part, the C* integrations should be simple, and non-integration tests should use mocks
+test {
+    useJUnitPlatform() {
+        excludeTags "integrationTest"
+    }
+}
+
+tasks.register("integrationTest", Test) {
+    jacoco {
+        enabled = false
+    }
+    useJUnitPlatform() {
+        includeTags "integrationTest"
+    }
+    systemProperty "sidecar.dockerRegistry", configureKubernetes.ext.dockerRegistry.toString()
+    systemProperty "sidecar.kubernetesNamespace", configureKubernetes.ext.kubernetesNamespace.toString()
+    systemProperty "sidecar.dockerGroup", configureKubernetes.ext.dockerGroup.toString()
+    group = "verification"
+}
\ No newline at end of file
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/StatusTest.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/StatusTest.java
new file mode 100644
index 0000000..cf27d8d
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/StatusTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.sidecar.common.testing.CassandraIntegrationTest;
+import org.apache.cassandra.sidecar.common.testing.CassandraTestContext;
+
+/**
+ * Placeholder test
+ */
+public class StatusTest
+{
+    private static final Logger logger = LoggerFactory.getLogger(StatusTest.class);
+
+    @BeforeEach
+    void setupData(CassandraTestContext context)
+    {
+        logger.info("setup {}", context.container);
+    }
+
+    @CassandraIntegrationTest
+    @DisplayName("Ensure status returns correctly")
+    void testSomething(CassandraTestContext context)
+    {
+        logger.info("test context in test {}", context);
+        Session session = context.session.getLocalCql();
+        session.execute("SELECT * from system.peers_v2");
+    }
+}
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraIntegrationTest.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraIntegrationTest.java
new file mode 100644
index 0000000..87541c3
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraIntegrationTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.testing;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Interface to mark an integration test which should be run against multiple Cassandra versions
+ */
+@TestTemplate
+@Target({ ElementType.ANNOTATION_TYPE, ElementType.METHOD })
+@Retention(RetentionPolicy.RUNTIME)
+@Tag("integrationTest")
+@ExtendWith(CassandraTestTemplate.class)
+public @interface CassandraIntegrationTest
+{
+}
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPod.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPod.java
new file mode 100644
index 0000000..9014146
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPod.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.testing;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.kubernetes.client.Exec;
+
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.Configuration;
+
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1Pod;
+import io.kubernetes.client.openapi.models.V1PodBuilder;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1ServiceBuilder;
+import io.kubernetes.client.openapi.models.V1ServicePort;
+import io.kubernetes.client.openapi.models.V1ServiceSpec;
+
+import io.kubernetes.client.util.ClientBuilder;
+import okhttp3.OkHttpClient;
+import okhttp3.Protocol;
+import okhttp3.internal.Util;
+
+/**
+ * Manages a single instance of a Cassandra container
+ */
+class CassandraPod
+{
+    private final URI dockerRegistry;
+    private final String image;
+    private final String namespace;
+    private final String dockerGroup;
+    private static final Logger logger = LoggerFactory.getLogger(CassandraPod.class);
+    private final String podName;
+
+    private final CoreV1Api coreV1Api;
+    private Boolean deleted = false;
+
+    String ip;
+    Integer port;
+
+    public CassandraPod(URI dockerRegistry, String dockerGroup, String image, String namespace, CoreV1Api coreV1Api)
+    {
+        this.dockerRegistry = dockerRegistry;
+        this.image = image;
+        this.namespace = namespace;
+        this.dockerGroup = dockerGroup;
+        this.podName = String.format("cassandra-%s", UUID.randomUUID());
+        this.coreV1Api = coreV1Api;
+    }
+
+    /**
+     * Creates a single pod using the system properties passed in through Gradle
+     * @param image
+     * @return
+     * @throws Exception
+     */
+    public static CassandraPod createFromProperties(String image) throws Exception
+    {
+        URI dockerRegistry = new URI(System.getProperty("sidecar.dockerRegistry"));
+        String namespace = System.getProperty("sidecar.kubernetesNamespace");
+        String dockerGroup = System.getProperty("sidecar.dockerGroup");
+
+        logger.info("Creating pod from registry {}, namespace {}, group {}", dockerRegistry, namespace, dockerGroup);
+
+        if (dockerRegistry == null)
+        {
+            throw new Exception("Docker registry required but sidecar.dockerRegistry = null");
+        }
+        if (namespace == null)
+        {
+            throw new Exception("sidecar.kubernetesNamespace is not defined and is required for K8 testing");
+        }
+
+        ApiClient apiClient = ClientBuilder.standard().build();
+
+        // this is a workaround for socket errors that show up in certain JVM versions...
+        // without it, the tests fail in CI.
+        // we can probably get rid of this when we either move to JDK 11 only or if the Kubernetes clienti s updated
+        OkHttpClient httpClient =
+                apiClient.getHttpClient().newBuilder()
+                        .protocols(Util.immutableList(Protocol.HTTP_1_1))
+                        .readTimeout(10, TimeUnit.SECONDS)
+                        .writeTimeout(10, TimeUnit.SECONDS)
+                        .connectTimeout(10, TimeUnit.SECONDS)
+                        .callTimeout(10, TimeUnit.SECONDS)
+                        .retryOnConnectionFailure(true)
+                        .build();
+        apiClient.setHttpClient(httpClient);
+
+        Configuration.setDefaultApiClient(apiClient);
+
+        logger.info("K8 client: {}", apiClient.getBasePath());
+
+        CoreV1Api coreV1Api = new CoreV1Api(apiClient);
+
+
+        return new CassandraPod(dockerRegistry, dockerGroup, image, namespace, coreV1Api);
+    }
+
+    public void start() throws ApiException, InterruptedException, CassandraPodException
+    {
+        // create a v1 deployment spec
+        String fullImage = getFullImageName();
+
+        // similar to the spec yaml file, just programmatic
+
+        HashMap<String, String> labels = getLabels();
+
+        V1Service serviceBuilder = getService();
+
+        logger.debug("Exposing service with: {}", serviceBuilder.toString());
+
+        try
+        {
+            coreV1Api.createNamespacedService(namespace, serviceBuilder, null, null, null);
+        }
+        catch (ApiException e)
+        {
+            logger.error("Unable to create namespaced service: {}", e.getMessage());
+            throw e;
+        }
+
+        // get the service
+        V1Service namespacedService = coreV1Api.readNamespacedService(podName, namespace, null, null, null);
+        logger.debug("Service result: {}", namespacedService);
+
+        V1ServiceSpec serviceSpec = namespacedService.getSpec();
+
+        logger.info("Starting container {}", fullImage);
+        V1Pod pod = getPod(fullImage, labels);
+
+        logger.debug("Pod spec: {}", pod);
+        V1Pod podResult = coreV1Api.createNamespacedPod(namespace, pod, null, null, null);
+        logger.debug("Pod result: {}", podResult);
+
+        int maxTime = 120;
+        V1Pod namespacedPod = null;
+        Boolean started = false;
+        String response = "";
+        int sleepTimeInMs = 1000;
+
+        for (int i = 0; i < maxTime; i++)
+        {
+            // we sleep in the beginning because the pod will never be ready right away
+            // sometimes K8 seems to hang in CI as well, so this might be enough to let the pod start
+            logger.debug("Reading namespace pod - sleeping for {}ms, ID: {}", sleepTimeInMs, podName);
+            try
+            {
+                Thread.sleep(sleepTimeInMs);
+            }
+            catch (InterruptedException e)
+            {
+                logger.error("Unable to sleep: {}", e.getMessage());
+                throw e;
+            }
+            namespacedPod = coreV1Api.readNamespacedPod(podName, namespace, null, null, null);
+            response = namespacedPod.getStatus().getPhase();
+            // not ready
+
+            if (!response.contentEquals("Running"))
+            {
+                continue;
+            }
+
+            started = namespacedPod.getStatus().getContainerStatuses().get(0).getStarted();
+            if (namespacedPod.getStatus().getContainerStatuses().get(0).getReady() && started) {
+                logger.info("Pod startup OK");
+                break;
+            }
+
+            logger.info("Container not ready: {}", response);
+            Thread.sleep(1000);
+        }
+        if (!started)
+        {
+            throw new CassandraPodException("container not ready: " + response);
+        }
+        logger.debug("pod status: {}", namespacedPod);
+
+
+        ip = serviceSpec.getClusterIP();
+
+        List<V1ServicePort> ports = serviceSpec.getPorts();
+        port = ports.get(0).getPort();
+        logger.info("Cassandra pod {} running on {}:{}", podName, ip, port);
+    }
+
+    private HashMap<String, String> getLabels()
+    {
+        HashMap<String, String> labels = new HashMap<>();
+        labels.put("name", podName);
+        labels.put("purpose", "cassandra_sidecar_testing");
+        return labels;
+    }
+
+
+    private V1Service getService()
+    {
+        return new V1ServiceBuilder()
+                .withApiVersion("v1")
+                .withKind("Service")
+                .withNewMetadata()
+                .withName(podName)
+                .addToLabels("purpose", "cassandra_sidecar_testing")
+                .withNamespace(namespace)
+                .endMetadata()
+                .withNewSpec()
+                .withType("NodePort")
+                .addToPorts(new V1ServicePort().port(9042).protocol("TCP"))
+                .addToSelector("name", podName)
+                .endSpec()
+                .build();
+    }
+
+    private V1Pod getPod(String fullImage, HashMap<String, String> labels)
+    {
+        return new V1PodBuilder()
+                .withApiVersion("v1")
+                .withKind("Pod")
+                .withNewMetadata()
+                    .withName(podName)
+                    .withNamespace(namespace)
+                    .withLabels(labels)
+                    .endMetadata()
+                .withNewSpec()
+                    .addNewContainer()
+                        .withName(podName)
+                        .withImage(fullImage)
+                        .addToPorts(new V1ContainerPort().containerPort(9042))
+                        .withNewStartupProbe()
+                            .withNewTcpSocket()
+                                .withNewPort(9042)
+                                .endTcpSocket()
+                            .withInitialDelaySeconds(5)
+                            .withPeriodSeconds(3)
+                            .withFailureThreshold(30)
+                            .endStartupProbe()
+                        .endContainer()
+                    .endSpec()
+                .build();
+    }
+
+    public String getFullImageName()
+    {
+        return String.format("%s:%d/%s/%s", dockerRegistry.getHost(), dockerRegistry.getPort(), dockerGroup, image);
+    }
+
+    public void disableBinary() throws InterruptedException, ApiException, IOException
+    {
+        nodetool(new String[] {"disablebinary"});
+    }
+
+    public void enableBinary() throws InterruptedException, ApiException, IOException
+    {
+        nodetool(new String[] {"enablebinary"});
+        // temporary sleep to ensure we start back up
+        Thread.sleep(5000);
+    }
+
+    /**
+     *
+     */
+    public void nodetool(String[] args) throws IOException, ApiException, InterruptedException
+    {
+        Exec exec = new Exec();
+        List<String> command = new ArrayList<>();
+        command.add("/cassandra/bin/nodetool");
+        Collections.addAll(command, args);
+        logger.info("Executing in container {}", command);
+
+        Process proc = exec.exec(namespace, podName, command.toArray(new String[0]), false, false);
+        proc.waitFor();
+        proc.destroy();
+    }
+
+
+    /**
+     * this is done asynchronously, so technically we could connect for a small window after the pod is deleted
+     * not recommended to use mid-test
+     */
+    public void delete()
+    {
+        if (deleted)
+        {
+            logger.info("Pod already deleted, skipping");
+            return;
+        }
+        deleteService();
+        deletePod();
+        deleted = true;
+
+    }
+    public String getIp()
+    {
+        return ip;
+    }
+
+    public Integer getPort()
+    {
+        return port;
+    }
+
+    private void deleteService()
+    {
+        try
+        {
+            logger.info("Deleting service {}", podName);
+            coreV1Api.deleteNamespacedService(podName, namespace, null, null, null, null, null, null);
+
+        }
+        catch (Exception ignored)
+        {
+            logger.info("Could not delete service {}", podName);
+        }
+    }
+
+    /**
+     * Tries to delete a pod.  Might fail.
+     * There's a variety of cases that can result in a pod not being deleted properly
+     */
+    private void deletePod()
+    {
+        try
+        {
+            logger.info("Deleting pod {}", podName);
+            coreV1Api.deleteNamespacedPod(podName, namespace, null, null, null, null, null, null);
+        }
+        catch (Exception ignored)
+        {
+            logger.error("Exception when stopping pod: {}", ignored.getMessage());
+        }
+    }
+}
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPodException.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPodException.java
new file mode 100644
index 0000000..5d4fd72
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraPodException.java
@@ -0,0 +1,12 @@
+package org.apache.cassandra.sidecar.common.testing;
+
+/**
+ * Misc exception to be thrown when we don't know what's actually wrong
+ */
+public class CassandraPodException extends Exception
+{
+    public CassandraPodException(String message)
+    {
+        super(message);
+    }
+}
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraTestContext.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraTestContext.java
new file mode 100644
index 0000000..eb4ef65
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraTestContext.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.testing;
+
+import org.apache.cassandra.sidecar.common.CQLSession;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.SimpleCassandraVersion;
+
+/**
+ * Passed to integration tests.
+ * See {@link CassandraIntegrationTest} for the required annotation
+ * See {@link CassandraTestTemplate} for the Test Template
+ */
+public class CassandraTestContext
+{
+    public final CQLSession session;
+    public final SimpleCassandraVersion version;
+    public final CassandraPod container;
+    public final ICassandraAdapter cassandra;
+
+    CassandraTestContext(SimpleCassandraVersion version, CassandraPod container, CQLSession session,
+                         ICassandraAdapter cassandra)
+    {
+        this.version = version;
+        this.container = container;
+        this.session = session;
+        this.cassandra = cassandra;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "CassandraTestContext{" +
+                "session=" + session +
+                ", version=" + version +
+                ", container=" + container +
+                ", cassandra=" + cassandra +
+                '}';
+    }
+}
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraTestTemplate.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraTestTemplate.java
new file mode 100644
index 0000000..e8a38a9
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/CassandraTestTemplate.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.testing;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeEachCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.api.extension.ParameterContext;
+import org.junit.jupiter.api.extension.ParameterResolver;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.sidecar.common.CQLSession;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.ICassandraFactory;
+import org.apache.cassandra.sidecar.common.SimpleCassandraVersion;
+
+/**
+ * Creates a test per version of Cassandra we are testing
+ * Tests must be marked with {@link CassandraIntegrationTest}
+ *
+ *  This is a mix of parameterized tests + a custom extension.  we need to be able to provide the test context
+ *  to each test (like an extension) but also need to create multiple tests (like parameterized tests).  Unfortunately
+ *  the two don't play well with each other.  You can't get access to the parameters from the extension.
+ *  This test template allows us full control of the test lifecycle and lets us tightly couple the context to each test
+ *  we generate, since the same test can be run for multiple versions of C*.
+ */
+public class CassandraTestTemplate implements TestTemplateInvocationContextProvider
+{
+
+    private static final Logger logger = LoggerFactory.getLogger(CassandraTestTemplate.class);
+
+
+    @Override
+    public boolean supportsTestTemplate(ExtensionContext context)
+    {
+        return true;
+    }
+
+    @Override
+    public Stream<TestTemplateInvocationContext> provideTestTemplateInvocationContexts(ExtensionContext context)
+    {
+        return new TestVersionSupplier().getTestVersions()
+                .map(v -> invocationContext(v, context));
+    }
+
+    /**
+     *
+     * @param version
+     * @param context
+     * @return
+     */
+    private TestTemplateInvocationContext invocationContext(TestVersion version, ExtensionContext context)
+    {
+        return new TestTemplateInvocationContext()
+        {
+            private CassandraTestContext cassandraTestContext;
+
+            /**
+             * A display name can be configured per test still - this adds the C* version we're testing automatically
+             * as a suffix to the name
+             * @param invocationIndex
+             * @return
+             */
+            @Override
+            public String getDisplayName(int invocationIndex)
+            {
+                return context.getDisplayName() + ": " + version.getVersion();
+            }
+
+            /**
+             * Used to register the extensions required to start and stop the docker environment
+             * @return
+             */
+            @Override
+            public List<Extension> getAdditionalExtensions()
+            {
+                return Arrays.asList(parameterResolver(), postProcessor(), beforeEach());
+            }
+
+            private BeforeEachCallback beforeEach()
+            {
+                return new BeforeEachCallback()
+                {
+                    @Override
+                    public void beforeEach(ExtensionContext context) throws Exception
+                    {
+                        // spin up a C* instance using Kubernetes
+                        ICassandraFactory factory = version.getFactory();
+
+                        CassandraPod container = CassandraPod.createFromProperties(version.getImage());
+                        container.start();
+                        logger.info("Testing {} against docker container", version);
+
+                        CQLSession session = new CQLSession(container.getIp(), container.getPort(), 5000);
+
+                        SimpleCassandraVersion versionParsed = SimpleCassandraVersion.create(version.getVersion());
+
+                        ICassandraAdapter cassandra = factory.create(session);
+
+                        cassandraTestContext = new CassandraTestContext(versionParsed, container, session, cassandra);
+                        logger.info("Created test context {}", cassandraTestContext);
+                    }
+                };
+            }
+
+            /**
+             * Shuts down the docker container when the test is finished
+             * @return
+             */
+            private AfterTestExecutionCallback postProcessor()
+            {
+                return new AfterTestExecutionCallback()
+                {
+                    @Override
+                    public void afterTestExecution(ExtensionContext context) throws Exception
+                    {
+                        // tear down the docker instance
+                        cassandraTestContext.container.delete();
+                    }
+                };
+            }
+
+            /**
+             * Required for Junit to know the CassandraTestContext can be used in these tests
+             * @return
+             */
+            private ParameterResolver parameterResolver()
+            {
+                return new ParameterResolver()
+                {
+                    @Override
+                    public boolean supportsParameter(ParameterContext parameterContext,
+                                                     ExtensionContext extensionContext)
+                    {
+                        return parameterContext.getParameter()
+                                .getType()
+                                .equals(CassandraTestContext.class);
+                    }
+
+                    @Override
+                    public Object resolveParameter(ParameterContext parameterContext,
+                                                   ExtensionContext extensionContext)
+                    {
+                        return cassandraTestContext;
+                    }
+                };
+            }
+
+        };
+    }
+}
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/DelegateTest.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/DelegateTest.java
new file mode 100644
index 0000000..be775cd
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/DelegateTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.testing;
+
+import java.io.IOException;
+
+import io.kubernetes.client.openapi.ApiException;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
+import org.apache.cassandra.sidecar.common.SimpleCassandraVersion;
+import org.apache.cassandra.sidecar.mocks.V30;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Insures the Delegate works correctly
+ */
+public class DelegateTest
+{
+    @CassandraIntegrationTest
+    void testCorrectVersionIsEnabled(CassandraTestContext context)
+    {
+        CassandraVersionProvider provider = new CassandraVersionProvider.Builder().add(new V30()).build();
+        CassandraAdapterDelegate delegate = new CassandraAdapterDelegate(provider, context.session);
+        delegate.checkSession();
+        SimpleCassandraVersion version = delegate.getVersion();
+        assertThat(version).isNotNull();
+    }
+
+    @CassandraIntegrationTest
+    void testHealthCheck(CassandraTestContext context) throws InterruptedException, ApiException, IOException
+    {
+        CassandraVersionProvider provider = new CassandraVersionProvider.Builder().add(new V30()).build();
+        CassandraAdapterDelegate delegate = new CassandraAdapterDelegate(provider, context.session);
+
+        delegate.checkSession();
+        delegate.healthCheck();
+
+        assertThat(delegate.isUp()).isTrue();
+
+        context.container.disableBinary();
+
+        delegate.healthCheck();
+        assertThat(delegate.isUp()).isFalse();
+
+        context.container.enableBinary();
+
+        delegate.healthCheck();
+
+        assertThat(delegate.isUp()).isTrue();
+    }
+}
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/TestVersion.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/TestVersion.java
new file mode 100644
index 0000000..013c004
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/TestVersion.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.testing;
+
+import org.apache.cassandra.sidecar.common.ICassandraFactory;
+
+/**
+ * Works with {@link TestVersionSupplier}
+ */
+public class TestVersion
+{
+    private final String version;
+    private final ICassandraFactory factory;
+    private final String image;
+
+    public TestVersion(String version, ICassandraFactory factory, String image)
+    {
+        this.version = version;
+        this.factory = factory;
+        this.image = image;
+    }
+
+    public String getVersion()
+    {
+        return version;
+    }
+
+    public ICassandraFactory getFactory()
+    {
+        return factory;
+    }
+
+    public String getImage()
+    {
+        return image;
+    }
+}
diff --git a/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/TestVersionSupplier.java b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/TestVersionSupplier.java
new file mode 100644
index 0000000..233c9a8
--- /dev/null
+++ b/cassandra-integration-tests/src/test/java/org/apache/cassandra/sidecar/common/testing/TestVersionSupplier.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common.testing;
+
+import java.util.stream.Stream;
+
+import org.apache.cassandra.sidecar.cassandra40.Cassandra40Factory;
+
+/**
+ * Generates the list of versions we're going to test against.
+ * Depending on future releases, we may end up running the same module (Cassandra40 for example) against multiple
+ * versions of Cassandra.  This may be due to releases that don't add new features that would affect the sidecar,
+ * but we still want to test those versions specifically to avoid the chance of regressions.
+ *
+ * At the moment, it's returning a hard coded list.  We could / should probably load this from a configuration and make
+ * it possible to override it, so teams that customize C* can run and test their own implementation
+ *
+ * Ideally, we'd probably have concurrent runs of the test infrastructure each running tests against one specific
+ * version of C*, but we don't need that yet given we only have one version.
+ */
+public class TestVersionSupplier
+{
+    Stream<TestVersion> getTestVersions()
+    {
+        return Stream.of(new TestVersion("4.0.0", new Cassandra40Factory(), "cassandra40"));
+    }
+
+}
diff --git a/cassandra40/build.gradle b/cassandra40/build.gradle
new file mode 100644
index 0000000..cf71cce
--- /dev/null
+++ b/cassandra40/build.gradle
@@ -0,0 +1,18 @@
+plugins {
+    id 'java'
+    id 'idea'
+}
+
+group 'org.apache.cassandra'
+version project.version
+
+sourceCompatibility = 1.8
+
+repositories {
+    mavenCentral()
+    jcenter()
+}
+
+dependencies {
+    compile project(":common")
+}
\ No newline at end of file
diff --git a/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40Factory.java b/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40Factory.java
new file mode 100644
index 0000000..93e5aa6
--- /dev/null
+++ b/cassandra40/src/main/java/org/apache/cassandra/sidecar/cassandra40/Cassandra40Factory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.cassandra40;
+
+import java.util.List;
+import org.apache.cassandra.sidecar.common.CQLSession;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.ICassandraFactory;
+import org.apache.cassandra.sidecar.common.MinimumVersion;
+import org.apache.cassandra.sidecar.common.NodeStatus;
+
+/**
+ * Factory to produce the 4.0 adapter
+ */
+@MinimumVersion("4.0.0")
+public class Cassandra40Factory implements ICassandraFactory
+{
+    @Override
+    public ICassandraAdapter create(CQLSession session)
+    {
+        return new ICassandraAdapter()
+        {
+            public List<NodeStatus> getStatus()
+            {
+                return null;
+            }
+        };
+    }
+}
diff --git a/common/build.gradle b/common/build.gradle
new file mode 100644
index 0000000..8e6760a
--- /dev/null
+++ b/common/build.gradle
@@ -0,0 +1,38 @@
+plugins {
+    id 'java'
+    id 'idea'
+
+    // todo move to all projects or subprojects.  possibly only include with java projects
+    id 'jacoco'
+    id "com.github.spotbugs"
+}
+
+group 'org.apache.cassandra'
+version project.version
+
+sourceCompatibility = 1.8
+
+repositories {
+    mavenCentral()
+    jcenter()
+}
+
+test {
+    useJUnitPlatform()
+}
+
+dependencies {
+    compile 'org.slf4j:slf4j-api:1.7.25'
+    compile 'ch.qos.logback:logback-core:1.2.3'
+    compile 'ch.qos.logback:logback-classic:1.2.3'
+    compile 'com.datastax.cassandra:cassandra-driver-core:3.9.0+'
+    compile group: 'com.google.inject', name: 'guice', version: '4.2.2'
+    compile group: 'org.apache.commons', name: 'commons-configuration2', version: '2.7'
+
+    implementation 'org.apache.commons:commons-lang3:3.10'
+
+    testCompile "org.junit.jupiter:junit-jupiter-api:${project.junitVersion}"
+    testCompile "org.junit.jupiter:junit-jupiter-params:${project.junitVersion}"
+    testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:${project.junitVersion}"
+    testImplementation("org.assertj:assertj-core:3.16.0")
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/cassandra/sidecar/CQLSession.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSession.java
similarity index 72%
rename from src/main/java/org/apache/cassandra/sidecar/CQLSession.java
rename to common/src/main/java/org/apache/cassandra/sidecar/common/CQLSession.java
index b156547..5ec3147 100644
--- a/src/main/java/org/apache/cassandra/sidecar/CQLSession.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CQLSession.java
@@ -1,11 +1,28 @@
-package org.apache.cassandra.sidecar;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
 
 import javax.annotation.Nullable;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -18,17 +35,15 @@
 import com.datastax.driver.core.policies.ReconnectionPolicy;
 import com.datastax.driver.core.policies.RoundRobinPolicy;
 import com.datastax.driver.core.policies.WhiteListPolicy;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
 
 /**
  * Represents a connection to Cassandra cluster. Currently supports returning the local connection only as
  * defined in the Configuration.
  */
-@Singleton
 public class CQLSession
 {
     private static final Logger logger = LoggerFactory.getLogger(CQLSession.class);
+
     @Nullable
     private Session localSession;
     private final InetSocketAddress inet;
@@ -37,19 +52,20 @@
     private QueryOptions queryOptions;
     private ReconnectionPolicy reconnectionPolicy;
 
-    @Inject
-    public CQLSession(Configuration configuration)
+    public CQLSession(String host, Integer port, Integer healthCheckFrequency)
     {
-        inet = InetSocketAddress.createUnresolved(configuration.getCassandraHost(), configuration.getCassandraPort());
+        // this was originally using unresolved Inet addresses, but it would fail when trying to
+        // connect to a docker container
+        logger.info("Connecting to {} on port {}", host, port);
+        inet = new InetSocketAddress(host, port);
+
         wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet));
         this.nettyOptions = new NettyOptions();
         this.queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.ONE);
-        this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000,
-                                                                    configuration.getHealthCheckFrequencyMillis());
+        this.reconnectionPolicy = new ExponentialReconnectionPolicy(1000, healthCheckFrequency);
     }
 
-    @VisibleForTesting
-    CQLSession(InetSocketAddress target, NettyOptions options)
+    public CQLSession(InetSocketAddress target, NettyOptions options)
     {
         inet = target;
         wlp = new WhiteListPolicy(new RoundRobinPolicy(), Collections.singletonList(inet));
@@ -73,6 +89,7 @@
         {
             if (localSession == null)
             {
+                logger.info("Connecting to {}", inet);
                 cluster = Cluster.builder()
                                  .addContactPointsWithPorts(inet)
                                  .withLoadBalancingPolicy(wlp)
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
new file mode 100644
index 0000000..b00faf8
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraAdapterDelegate.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+
+
+/**
+ * Since it's possible for the version of Cassandra to change under us, we need this delegate to wrap the functionality
+ * of the underlying Cassandra adapter.  If a server reboots, we can swap out the right Adapter when the driver
+ * reconnects.
+ *
+ * This delegate *MUST* checkSession() before every call, because:
+ *
+ * 1. The session lazily connects
+ * 2. We might need to swap out the adapter if the version has changed
+ *
+ */
+public class CassandraAdapterDelegate implements ICassandraAdapter, Host.StateListener
+{
+    private final CQLSession cqlSession;
+    private final CassandraVersionProvider versionProvider;
+    private Session session;
+    private SimpleCassandraVersion currentVersion;
+    private ICassandraAdapter adapter;
+    private Boolean isUp = false;
+    private final int refreshRate;
+
+    private static final Logger logger = LoggerFactory.getLogger(CassandraAdapterDelegate.class);
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+    private boolean registered = false;
+
+    public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cqlSession)
+    {
+        this(provider, cqlSession, 5000);
+    }
+
+    public CassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession cqlSession, int refreshRate)
+    {
+        this.cqlSession = cqlSession;
+        this.versionProvider = provider;
+        this.refreshRate = refreshRate;
+    }
+
+    public synchronized void start()
+    {
+        logger.info("Starting health check");
+        executor.scheduleWithFixedDelay(this::healthCheck, 0, refreshRate, TimeUnit.MILLISECONDS);
+        maybeRegisterHostListener();
+    }
+
+    private synchronized void maybeRegisterHostListener()
+    {
+        if (!registered)
+        {
+            checkSession();
+            if (session != null)
+            {
+                session.getCluster().register(this);
+            }
+        }
+    }
+
+    public synchronized void stop()
+    {
+        logger.info("Stopping health check");
+        executor.shutdown();
+    }
+
+    /**
+     * Need to be called before routing the request to the adapter
+     * We might end up swapping the adapter out because of a server upgrade
+     */
+    public synchronized void checkSession()
+    {
+        if (session == null)
+        {
+            session = cqlSession.getLocalCql();
+            start();
+        }
+    }
+
+    /**
+     * Should be called on initial connect as well as when a server comes back since it might be from an upgrade
+     * synchronized so we don't flood the DB with version requests
+     *
+     * If the healthcheck determines we've changed versions, it should load the proper adapter
+     */
+    public synchronized void healthCheck()
+    {
+        Preconditions.checkNotNull(session);
+        try
+        {
+            String version = session.execute("select release_version from system.local")
+                    .one()
+                    .getString("release_version");
+            isUp = true;
+            // this might swap the adapter out
+            SimpleCassandraVersion newVersion = SimpleCassandraVersion.create(version);
+            if (!newVersion.equals(currentVersion))
+            {
+                currentVersion = SimpleCassandraVersion.create(version);
+                adapter = versionProvider.getCassandra(version).create(cqlSession);
+                logger.info("Cassandra version change detected.  New adapter loaded: {}", adapter);
+            }
+            logger.info("Cassandra version {}");
+        }
+        catch (NoHostAvailableException e)
+        {
+            isUp = false;
+        }
+    }
+
+
+    @Override
+    public List<NodeStatus> getStatus()
+    {
+        checkSession();
+        return adapter.getStatus();
+    }
+
+    @Override
+    public void onAdd(Host host)
+    {
+        healthCheck();
+    }
+
+    @Override
+    public void onUp(Host host)
+    {
+        healthCheck();
+        isUp = true;
+    }
+
+    @Override
+    public void onDown(Host host)
+    {
+        isUp = false;
+    }
+
+    @Override
+    public void onRemove(Host host)
+    {
+        healthCheck();
+    }
+
+    @Override
+    public void onRegister(Cluster cluster)
+    {
+    }
+
+    @Override
+    public void onUnregister(Cluster cluster)
+    {
+    }
+
+    public boolean isUp()
+    {
+        return isUp;
+    }
+
+    public SimpleCassandraVersion getVersion()
+    {
+        healthCheck();
+        return currentVersion;
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraVersionProvider.java b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraVersionProvider.java
new file mode 100644
index 0000000..fc6e40d
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/CassandraVersionProvider.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.util.ArrayList;
+import java.util.List;
+import javax.annotation.concurrent.NotThreadSafe;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Manages multiple Cassandra versions
+ */
+public class CassandraVersionProvider
+{
+    final ArrayList<ICassandraFactory> versions;
+
+    public CassandraVersionProvider(ArrayList<ICassandraFactory> versions)
+    {
+        this.versions = versions;
+    }
+
+    @VisibleForTesting
+    public List<ICassandraFactory> getAllVersions()
+    {
+        return this.versions;
+    }
+
+    /**
+     * For the provided CassandraVersion, return a new ICassandraFactory instance
+     * that meets the minimum version requirements
+     * That factory can be used to create an ICassandraAdapter
+     * @param requestedVersion
+     * @return
+     */
+    public ICassandraFactory getCassandra(SimpleCassandraVersion requestedVersion)
+    {
+        ICassandraFactory result = versions.get(0);
+
+        for (ICassandraFactory f : versions)
+        {
+            SimpleCassandraVersion currentMinVersion = SimpleCassandraVersion.create(result);
+            SimpleCassandraVersion nextVersion = SimpleCassandraVersion.create(f);
+
+            // skip if we can rule this out early
+            if (nextVersion.isGreaterThan(requestedVersion)) continue;
+
+            if (requestedVersion.isGreaterThan(currentMinVersion))
+            {
+                result = f;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Convenience method for getCassandra, converts the String version to a typed one
+     * @param requestedVersion
+     * @return
+     */
+    ICassandraFactory getCassandra(String requestedVersion)
+    {
+        SimpleCassandraVersion version = SimpleCassandraVersion.create(requestedVersion);
+        return getCassandra(version);
+    }
+
+    /**
+     * Builder for VersionProvider
+     */
+    @NotThreadSafe
+    public static class Builder
+    {
+        ArrayList<ICassandraFactory> versions;
+        public Builder()
+        {
+            versions = new ArrayList<>();
+        }
+
+        public CassandraVersionProvider build()
+        {
+            if (versions.isEmpty())
+            {
+                throw new IllegalStateException("At least one ICassandraFactory is required");
+            }
+            return new CassandraVersionProvider(versions);
+        }
+        public Builder add(ICassandraFactory version)
+        {
+            versions.add(version);
+            return this;
+        }
+
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java
similarity index 60%
copy from src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
copy to common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java
index c37fef1..9412df6 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraAdapter.java
@@ -16,29 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.mocks;
+package org.apache.cassandra.sidecar.common;
 
-import org.apache.cassandra.sidecar.routes.HealthCheck;
+import java.util.List;
 
 /**
- * Settable HealthCheck
+ * Core Cassandra Adapter interface
+ * For now, this is just a placeholder.  We will most likely want to define the interface to returns bits such as
+ * getCompaction(), getClusterMembership, etc, which return interfaces such as ICompaction, IClusterMembership.
+ * We will need different implementations due to the slow move away from JMX towards CQL for some, but not all, actions.
  */
-public class MockHealthCheck extends HealthCheck
+public interface ICassandraAdapter
 {
-    private volatile boolean status;
+    List<NodeStatus> getStatus();
 
-    public MockHealthCheck()
-    {
-        super(null);
-    }
-
-    public Boolean get()
-    {
-        return status;
-    }
-
-    public void setStatus(boolean status)
-    {
-        this.status = status;
-    }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
similarity index 65%
copy from src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
copy to common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
index c37fef1..d28a859 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/ICassandraFactory.java
@@ -16,29 +16,12 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.mocks;
-
-import org.apache.cassandra.sidecar.routes.HealthCheck;
+package org.apache.cassandra.sidecar.common;
 
 /**
- * Settable HealthCheck
+ * A factory is used here to create instances of an Adapter.  We
  */
-public class MockHealthCheck extends HealthCheck
+public interface ICassandraFactory
 {
-    private volatile boolean status;
-
-    public MockHealthCheck()
-    {
-        super(null);
-    }
-
-    public Boolean get()
-    {
-        return status;
-    }
-
-    public void setStatus(boolean status)
-    {
-        this.status = status;
-    }
+    ICassandraAdapter create(CQLSession session);
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java b/common/src/main/java/org/apache/cassandra/sidecar/common/MinimumVersion.java
similarity index 65%
copy from src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
copy to common/src/main/java/org/apache/cassandra/sidecar/common/MinimumVersion.java
index c37fef1..376bf4a 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/MinimumVersion.java
@@ -16,29 +16,19 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.mocks;
+package org.apache.cassandra.sidecar.common;
 
-import org.apache.cassandra.sidecar.routes.HealthCheck;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
 
 /**
- * Settable HealthCheck
+ * Indicates the minimum version requires for this Cassandra implementation
+ * The minimum version will be used to select the right Cassandra adapter
  */
-public class MockHealthCheck extends HealthCheck
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface MinimumVersion
 {
-    private volatile boolean status;
-
-    public MockHealthCheck()
-    {
-        super(null);
-    }
-
-    public Boolean get()
-    {
-        return status;
-    }
-
-    public void setStatus(boolean status)
-    {
-        this.status = status;
-    }
+    String value();
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java b/common/src/main/java/org/apache/cassandra/sidecar/common/MockCassandraFactory.java
similarity index 66%
copy from src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
copy to common/src/main/java/org/apache/cassandra/sidecar/common/MockCassandraFactory.java
index c37fef1..a3aca93 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/MockCassandraFactory.java
@@ -16,29 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.mocks;
-
-import org.apache.cassandra.sidecar.routes.HealthCheck;
+package org.apache.cassandra.sidecar.common;
 
 /**
- * Settable HealthCheck
+ *
  */
-public class MockHealthCheck extends HealthCheck
+public class MockCassandraFactory implements ICassandraFactory
 {
-    private volatile boolean status;
-
-    public MockHealthCheck()
+    @Override
+    public ICassandraAdapter create(CQLSession session)
     {
-        super(null);
-    }
-
-    public Boolean get()
-    {
-        return status;
-    }
-
-    public void setStatus(boolean status)
-    {
-        this.status = status;
+        return null;
     }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java b/common/src/main/java/org/apache/cassandra/sidecar/common/NodeStatus.java
similarity index 65%
copy from src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
copy to common/src/main/java/org/apache/cassandra/sidecar/common/NodeStatus.java
index c37fef1..d50a30f 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/NodeStatus.java
@@ -16,29 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.sidecar.mocks;
 
-import org.apache.cassandra.sidecar.routes.HealthCheck;
+package org.apache.cassandra.sidecar.common;
 
 /**
- * Settable HealthCheck
+ * Placeholder
  */
-public class MockHealthCheck extends HealthCheck
+public class NodeStatus
 {
-    private volatile boolean status;
 
-    public MockHealthCheck()
-    {
-        super(null);
-    }
-
-    public Boolean get()
-    {
-        return status;
-    }
-
-    public void setStatus(boolean status)
-    {
-        this.status = status;
-    }
 }
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersion.java b/common/src/main/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersion.java
new file mode 100644
index 0000000..426f3f8
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersion.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.base.Objects;
+
+/**
+ * Implements versioning used in Cassandra and CQL.
+ * <p>
+ * Note: The following code uses a slight variation from the semver document (http://semver.org).
+ * </p>
+ *
+ * The rules here are a bit different than normal semver comparison.  For simplicity,
+ * an alpha version of 4.0 or a snapshot is equal to 4.0.  This allows us to test sidecar
+ * against alpha versions of a release.
+ *
+ * While it's possible to implement full version comparison, it's likely not very useful
+ * This is because the main testing we are going to do will be against release versions - something like 4.0.
+ * We want to list an adapter as being compatible with 4.0 - and that should include 4.0 alpha, etc.
+ */
+public class SimpleCassandraVersion implements Comparable<SimpleCassandraVersion>
+{
+    /**
+     * note: 3rd group matches to words but only allows number and checked after regexp test.
+     * this is because 3rd and the last can be identical.
+     **/
+    private static final String VERSION_REGEXP = "(\\d+)\\.(\\d+)(?:\\.(\\w+))?(\\-[.\\w]+)?([.+][.\\w]+)?";
+
+    private static final Pattern pattern = Pattern.compile(VERSION_REGEXP);
+    private static final Pattern SNAPSHOT = Pattern.compile("-SNAPSHOT");
+
+    public final int major;
+    public final int minor;
+    public final int patch;
+
+    /**
+     * Parse a version from a string.
+     *
+     * @param version the string to parse
+     * @throws IllegalArgumentException if the provided string does not
+     *                                  represent a version
+     */
+    public static SimpleCassandraVersion create(String version)
+    {
+        String stripped = SNAPSHOT.matcher(version).replaceFirst("");
+        Matcher matcher = pattern.matcher(stripped);
+        if (!matcher.matches())
+            throw new IllegalArgumentException("Invalid version value: " + version);
+
+        try
+        {
+            int major = Integer.parseInt(matcher.group(1));
+            int minor = Integer.parseInt(matcher.group(2));
+            int patch = matcher.group(3) != null ? Integer.parseInt(matcher.group(3)) : 0;
+
+            return SimpleCassandraVersion.create(major, minor, patch);
+        }
+        catch (NumberFormatException e)
+        {
+            throw new IllegalArgumentException("Invalid version value: " + version, e);
+        }
+    }
+
+    public static SimpleCassandraVersion create(int major, int minor, int patch)
+    {
+        if (major < 0 || minor < 0 || patch < 0)
+        {
+            throw new IllegalArgumentException();
+        }
+        return new SimpleCassandraVersion(major, minor, patch);
+    }
+
+    public static SimpleCassandraVersion create(ICassandraFactory factory)
+    {
+        return SimpleCassandraVersion.create(factory.getClass().getAnnotation(MinimumVersion.class).value());
+    }
+
+    private SimpleCassandraVersion(int major, int minor, int patch)
+    {
+        this.major = major;
+        this.minor = minor;
+        this.patch = patch;
+    }
+
+
+    public int compareTo(SimpleCassandraVersion other)
+    {
+        if (major < other.major)
+            return -1;
+        if (major > other.major)
+            return 1;
+
+        if (minor < other.minor)
+            return -1;
+        if (minor > other.minor)
+            return 1;
+
+        if (patch < other.patch)
+            return -1;
+        if (patch > other.patch)
+            return 1;
+        return 0;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (!(o instanceof SimpleCassandraVersion))
+            return false;
+        SimpleCassandraVersion that = (SimpleCassandraVersion) o;
+        return major == that.major
+                && minor == that.minor
+                && patch == that.patch;
+    }
+
+    /**
+     * Returns true if this > v2
+     * @param v2
+     * @return
+     */
+    public boolean isGreaterThan(SimpleCassandraVersion v2)
+    {
+        return compareTo(v2) > 0;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(major, minor, patch);
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append(major).append('.').append(minor).append('.').append(patch);
+
+        return sb.toString();
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java b/common/src/main/java/org/apache/cassandra/sidecar/mocks/V30.java
similarity index 67%
copy from src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
copy to common/src/main/java/org/apache/cassandra/sidecar/mocks/V30.java
index c37fef1..b14a6f3 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/mocks/V30.java
@@ -18,27 +18,20 @@
 
 package org.apache.cassandra.sidecar.mocks;
 
-import org.apache.cassandra.sidecar.routes.HealthCheck;
+import org.apache.cassandra.sidecar.common.CQLSession;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.ICassandraFactory;
+import org.apache.cassandra.sidecar.common.MinimumVersion;
 
 /**
- * Settable HealthCheck
+ * Placeholder for 3.0
  */
-public class MockHealthCheck extends HealthCheck
+@MinimumVersion("3.0.0")
+public class V30 implements ICassandraFactory
 {
-    private volatile boolean status;
-
-    public MockHealthCheck()
+    @Override
+    public ICassandraAdapter create(CQLSession session)
     {
-        super(null);
-    }
-
-    public Boolean get()
-    {
-        return status;
-    }
-
-    public void setStatus(boolean status)
-    {
-        this.status = status;
+        return null;
     }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java b/common/src/main/java/org/apache/cassandra/sidecar/mocks/V40.java
similarity index 67%
copy from src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
copy to common/src/main/java/org/apache/cassandra/sidecar/mocks/V40.java
index c37fef1..2d5e6e2 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/mocks/V40.java
@@ -18,27 +18,20 @@
 
 package org.apache.cassandra.sidecar.mocks;
 
-import org.apache.cassandra.sidecar.routes.HealthCheck;
+import org.apache.cassandra.sidecar.common.CQLSession;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.ICassandraFactory;
+import org.apache.cassandra.sidecar.common.MinimumVersion;
 
 /**
- * Settable HealthCheck
+ * Placeholder for 4.0
  */
-public class MockHealthCheck extends HealthCheck
+@MinimumVersion("4.0.0")
+public class V40 implements ICassandraFactory
 {
-    private volatile boolean status;
-
-    public MockHealthCheck()
+    @Override
+    public ICassandraAdapter create(CQLSession session)
     {
-        super(null);
-    }
-
-    public Boolean get()
-    {
-        return status;
-    }
-
-    public void setStatus(boolean status)
-    {
-        this.status = status;
+        return null;
     }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java b/common/src/main/java/org/apache/cassandra/sidecar/mocks/V41.java
similarity index 67%
rename from src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
rename to common/src/main/java/org/apache/cassandra/sidecar/mocks/V41.java
index c37fef1..35e5973 100644
--- a/src/test/java/org/apache/cassandra/sidecar/mocks/MockHealthCheck.java
+++ b/common/src/main/java/org/apache/cassandra/sidecar/mocks/V41.java
@@ -18,27 +18,20 @@
 
 package org.apache.cassandra.sidecar.mocks;
 
-import org.apache.cassandra.sidecar.routes.HealthCheck;
+import org.apache.cassandra.sidecar.common.CQLSession;
+import org.apache.cassandra.sidecar.common.ICassandraAdapter;
+import org.apache.cassandra.sidecar.common.ICassandraFactory;
+import org.apache.cassandra.sidecar.common.MinimumVersion;
 
 /**
- * Settable HealthCheck
+ * Placeholder for 4.1
  */
-public class MockHealthCheck extends HealthCheck
+@MinimumVersion("4.1.0")
+public class V41 implements ICassandraFactory
 {
-    private volatile boolean status;
-
-    public MockHealthCheck()
+    @Override
+    public ICassandraAdapter create(CQLSession session)
     {
-        super(null);
-    }
-
-    public Boolean get()
-    {
-        return status;
-    }
-
-    public void setStatus(boolean status)
-    {
-        this.status = status;
+        return null;
     }
 }
diff --git a/common/src/test/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersionProviderTest.java b/common/src/test/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersionProviderTest.java
new file mode 100644
index 0000000..20fd109
--- /dev/null
+++ b/common/src/test/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersionProviderTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.cassandra.sidecar.mocks.V30;
+import org.apache.cassandra.sidecar.mocks.V40;
+import org.apache.cassandra.sidecar.mocks.V41;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class SimpleCassandraVersionProviderTest
+{
+
+    CassandraVersionProvider.Builder builder;
+    CassandraVersionProvider provider;
+
+    @BeforeEach
+    void setupBuilder()
+    {
+        builder = new CassandraVersionProvider.Builder();
+        provider = builder.add(new V30())
+                          .add(new V40())
+                          .add(new V41()).build();
+    }
+
+    @Test
+    void simpleTest()
+    {
+        ICassandraFactory cassandra = provider.getCassandra(SimpleCassandraVersion.create("3.0.1"));
+        assertThat(cassandra).hasSameClassAs(new V30());
+    }
+
+    @Test
+    void equalityTest()
+    {
+        ICassandraFactory cassandra = provider.getCassandra(SimpleCassandraVersion.create("3.0.0"));
+        assertThat(cassandra).hasSameClassAs(new V30());
+    }
+
+    @Test
+    void equalityTest2()
+    {
+        ICassandraFactory cassandra = provider.getCassandra(SimpleCassandraVersion.create("4.0.0"));
+        assertThat(cassandra).hasSameClassAs(new V40());
+    }
+
+    @Test
+    void ensureHighVersionsWork()
+    {
+        ICassandraFactory cassandra = provider.getCassandra(SimpleCassandraVersion.create("10.0.0"));
+        assertThat(cassandra).hasSameClassAs(new V41());
+    }
+
+    @Test
+    void ensureOutOfOrderInsertionWorks()
+    {
+        builder = new CassandraVersionProvider.Builder();
+        provider = builder.add(new V40())
+                          .add(new V41())
+                          .add(new V30()).build();
+
+        ICassandraFactory cassandra = provider.getCassandra(SimpleCassandraVersion.create("4.0.0"));
+        assertThat(cassandra).hasSameClassAs(new V40());
+    }
+
+}
diff --git a/common/src/test/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersionTest.java b/common/src/test/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersionTest.java
new file mode 100644
index 0000000..7d82c9f
--- /dev/null
+++ b/common/src/test/java/org/apache/cassandra/sidecar/common/SimpleCassandraVersionTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.sidecar.common;
+
+import org.junit.jupiter.api.Test;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
+
+/**
+ * Tests that CassandraVersion comparisons work correctly
+ */
+public class SimpleCassandraVersionTest
+{
+    SimpleCassandraVersion v4 = SimpleCassandraVersion.create(4, 0, 0);
+    SimpleCassandraVersion v5 = SimpleCassandraVersion.create(5, 0, 0);
+
+    @Test
+    void testNegativeVersionsFail()
+    {
+        assertThatIllegalArgumentException()
+                .isThrownBy(() -> SimpleCassandraVersion.create(-1, 0, 0));
+        assertThatIllegalArgumentException()
+                .isThrownBy(() -> SimpleCassandraVersion.create(4, -1, 0));
+        assertThatIllegalArgumentException()
+                .isThrownBy(() -> SimpleCassandraVersion.create(0, 0, -1));
+        assertThatIllegalArgumentException()
+                .isThrownBy(() -> SimpleCassandraVersion.create("-3.0.0"));
+    }
+
+    @Test
+    void testNormalParse()
+    {
+        assertThat(v4.major).isEqualTo(4);
+        assertThat(v4.minor).isEqualTo(0);
+        assertThat(v5.patch).isEqualTo(0);
+    }
+
+    @Test
+    void testParseNonZeros()
+    {
+        SimpleCassandraVersion v6 = SimpleCassandraVersion.create("6.14.13");
+        assertThat(v6.major).isEqualTo(6);
+        assertThat(v6.minor).isEqualTo(14);
+        assertThat(v6.patch).isEqualTo(13);
+    }
+
+    @Test
+    void testMajorCompare()
+    {
+        assertThat(v5).isGreaterThan(v4);
+    }
+
+    @Test
+    void testMinorCompare()
+    {
+        SimpleCassandraVersion v41 = SimpleCassandraVersion.create(4, 1, 0);
+        assertThat(v41).isGreaterThan(v4);
+    }
+
+    @Test
+    void testBugFixCompare()
+    {
+        SimpleCassandraVersion v401 = SimpleCassandraVersion.create(4, 0, 1);
+        assertThat(v401).isGreaterThan(v4);
+    }
+
+    @Test
+    void testEqual()
+    {
+        SimpleCassandraVersion v4Alt = SimpleCassandraVersion.create(4, 0, 0);
+        assertThat(v4Alt).isEqualByComparingTo(v4);
+
+    }
+
+    @Test
+    void testAlphaParsing()
+    {
+        SimpleCassandraVersion alpha = SimpleCassandraVersion.create("4.0-alpha4");
+        assertThat(alpha).isEqualTo(v4);
+        assertThat(alpha.major).isEqualTo(4);
+        assertThat(alpha.minor).isEqualTo(0);
+        assertThat(alpha.patch).isEqualTo(0);
+
+        String parsed = alpha.toString();
+        SimpleCassandraVersion alpha2 = SimpleCassandraVersion.create(parsed);
+        assertThat(alpha).isEqualTo(alpha2);
+    }
+
+    @Test
+    void testGreaterThan()
+    {
+        assertThat(v5.isGreaterThan(v4)).isTrue();
+    }
+
+    @Test
+    void testSnapshotBuild()
+    {
+        SimpleCassandraVersion alpha = SimpleCassandraVersion.create("4.0-alpha5-SNAPSHOT");
+        assertThat(alpha.major).isEqualTo(4);
+        assertThat(alpha.minor).isEqualTo(0);
+        assertThat(alpha.patch).isEqualTo(0);
+
+    }
+
+}
diff --git a/containers/build.gradle b/containers/build.gradle
new file mode 100644
index 0000000..23ea758
--- /dev/null
+++ b/containers/build.gradle
@@ -0,0 +1,76 @@
+import com.bmuschko.gradle.docker.tasks.image.*
+
+plugins {
+    id 'com.bmuschko.docker-remote-api'
+    id "de.undercouch.download" version "4.1.0"
+}
+
+docker {
+    registryCredentials {
+        url = "${-> configureKubernetes.ext.dockerRegistry.toString()}/v2/"
+    }
+}
+
+class BuildDockerConfig extends Copy {
+
+}
+
+tasks.register("download40", Download) {
+    //src 'http://apache.mirrors.hoobly.com/cassandra/4.0-alpha4/apache-cassandra-4.0-alpha4-bin.tar.gz'
+    src 'https://downloads.apache.org/cassandra/4.0-beta1/apache-cassandra-4.0-beta1-bin.tar.gz'
+    dest "$buildDir/cassandra40/apache-cassandra-4.0-beta1-bin.tar.gz"
+    overwrite false
+}
+
+tasks.register("cassandra40", BuildDockerConfig) {
+    from("src/Cassandra40") {
+    }
+
+    from("src") {
+        include "docker-entrypoint.sh"
+        include "optimize-memory.sh"
+    }
+
+    into "$buildDir/cassandra40/"
+    dependsOn download40
+
+}
+
+tasks.register("generateDockerConfigs") {
+    doFirst {
+        mkdir "build"
+    }
+    dependsOn tasks.withType(BuildDockerConfig)
+}
+
+def getRemoteTag(name) {
+    return "${configureKubernetes.ext.dockerRegistryWithoutProtocol}/${configureKubernetes.ext.dockerGroup}/${name}:${configureKubernetes.ext.dockerTag}".toString()
+}
+
+tasks.register("buildImageCassandra40", DockerBuildImage) {
+    // the toString is required here, otherwise we get org.codehaus.groovy.runtime.GStringImpl cannot be cast to java.lang.String
+    def name = "cassandra40"
+    def localTag = "${name}:${configureKubernetes.ext.dockerTag}".toString()
+
+    // there might not be a group
+    def remoteTag = getRemoteTag(name)
+    // tag the private repo
+    tags = [localTag, remoteTag]
+    inputDir = file("$buildDir/${name}")
+    dependsOn "cassandra40"
+}
+
+tasks.register("publishCassandra40", DockerPushImage) {
+    def name = "cassandra40"
+    imageName = "${configureKubernetes.ext.dockerRegistryWithoutProtocol}/${configureKubernetes.ext.dockerGroup}/${name}"
+
+    dependsOn buildImageCassandra40
+}
+
+tasks.register("buildAll") {
+    dependsOn tasks.withType(DockerBuildImage)
+}
+tasks.register("pushAll") {
+    dependsOn tasks.withType(DockerPushImage)
+    dependsOn "buildAll"
+}
\ No newline at end of file
diff --git a/containers/src/Cassandra40/Dockerfile b/containers/src/Cassandra40/Dockerfile
new file mode 100644
index 0000000..04fd3ea
--- /dev/null
+++ b/containers/src/Cassandra40/Dockerfile
@@ -0,0 +1,18 @@
+FROM ubuntu:bionic
+RUN apt update && apt install -y openjdk-11-jdk-headless
+RUN mkdir /downloads
+WORKDIR /downloads/
+COPY apache-cassandra-4.0-beta1-bin.tar.gz .
+RUN tar zxvf apache-cassandra-4.0-beta1-bin.tar.gz
+RUN mv apache-cassandra-4.0-beta1 /cassandra
+
+ADD docker-entrypoint.sh /cassandra/
+RUN chmod +x /cassandra/docker-entrypoint.sh
+
+ADD optimize-memory.sh .
+
+RUN chmod +x optimize-memory.sh && ./optimize-memory.sh
+
+EXPOSE 9042
+ENTRYPOINT ["/cassandra/docker-entrypoint.sh"]
+
diff --git a/containers/src/docker-entrypoint.sh b/containers/src/docker-entrypoint.sh
new file mode 100644
index 0000000..ebf4e57
--- /dev/null
+++ b/containers/src/docker-entrypoint.sh
@@ -0,0 +1,15 @@
+#!/bin/bash
+set -e
+
+export CASSANDRA_ADDRESS=$(awk 'END{print $1}' /etc/hosts)
+
+# 4 tokens for faster startup (but still use tokens)
+sed -i -e "s/^\(num_tokens:\).*/\1 4 /g" /cassandra/conf/cassandra.yaml
+# default listens on localhost, can't connect to that
+sed -i -e "s/^\(rpc_address:\).*/\1 ${CASSANDRA_ADDRESS} /g" /cassandra/conf/cassandra.yaml
+
+#echo "-Xmx1G" >> /cassandra/conf/jvm-server.options
+#echo "-Xmn500M" >> /cassandra/conf/jvm-server.options
+
+cd /cassandra
+bin/cassandra -f -R
diff --git a/containers/src/optimize-memory.sh b/containers/src/optimize-memory.sh
new file mode 100644
index 0000000..999d5a0
--- /dev/null
+++ b/containers/src/optimize-memory.sh
@@ -0,0 +1,16 @@
+#!/usr/bin/env bash
+
+CONF_DIR=/cassandra/conf
+YAML=$CONF_DIR/cassandra.yaml
+
+
+sed -i 's/#MAX_HEAP_SIZE="4G"/MAX_HEAP_SIZE="512m"/' $CONF_DIR/cassandra-env.sh
+sed -i 's/#HEAP_NEWSIZE="800M"/HEAP_NEWSIZE="256m"/' $CONF_DIR/cassandra-env.sh
+sed -i 's/num_tokens: 256/num_tokens: 1/' $YAML
+echo 'phi_convict_threshold: 16' >> $YAML
+sed -i 's/concurrent_reads: 32/concurrent_reads: 4/' $YAML
+sed -i 's/concurrent_writes: 32/concurrent_writes: 4/' $YAML
+sed -i 's/concurrent_counter_writes: 32/concurrent_counter_writes: 4/' $YAML
+sed -i 's/# file_cache_size_in_mb: 512/file_cache_size_in_mb: 1/' $YAML
+
+rm -rf /cassandra/data/
\ No newline at end of file
diff --git a/docs/src/development.adoc b/docs/src/development.adoc
index ad92651..3f8b9a1 100644
--- a/docs/src/development.adoc
+++ b/docs/src/development.adoc
@@ -11,6 +11,99 @@
     ./gradlew buildDeb
     ./gradlew buildRpm
 
+## Project Layout
+
+### Common
+
+Contains the libraries which are shared between the version specific Cassandra code as well as the dependencies of Cassandra itself.
+
+### Cassandra40
+
+Implementation of ICassandraAdapter for Cassandra 4.0.
+
+### Cassandra Integration Tests
+
+Cassandra integration tests leverage Kubernetes to create Cassandra nodes and test the different implementations
+of the ICassandraAdapters against real C* nodes.
+
+The integration tests will not run by default when running `./gradlew test` since they require a bit of setup and you may not have
+Kubernetes available.
+
+#### Mac Local Setup
+
+Running kubernetes locally on a Mac can be a bit tricky as it requires several steps.
+
+First, install minikube: https://kubernetes.io/docs/tasks/tools/install-minikube/
+
+There is a convenience script, located at `scripts/setup-minikube.sh`, that can be used to create the minikube environment and enable the registry for you.
+
+You will also need Docker: https://docs.docker.com/get-docker/ to build containers locally before pushing to the minikube registry.
+
+NOTE: If you have a registry already running you can skip the above step and instead set the SIDECAR_DOCKER_REGISTRY environment variable to the full URI of your registry..
+
+If you're using the local option, make sure you can push to an insecure registry.
+The following should be added to your docker configuration (under docker preferences -> docker engine).  Be sure to use whatever ip is reported by `minikube ip`:
+
+    {
+        "insecure-registries": ["192.168.64.16:5000"]
+    }
+
+You will need to run `minikube tunnel` to ensure your Mac can connect to the IP addresses inside the hyperkit virtual machine:
+
+    minikube tunnel
+
+Enter your password when prompted.  It'll redirect part of the 10.x address space to the hyperkit virtual machine.
+
+Once all this is completed, execute `./gradlew pushAll` to create all the containers required for integration testing.
+
+Sometimes minikube can run into issues starting up new jobs, especially if there are several running pods which failed to get cleaned up.  Running the following can help clean up the old pods:
+
+    scripts/cleanup-pods.sh
+
+#### Linux Setup
+
+The most straightforward setup on Linux is to use microk8s, as this relies on non virtualized containers, which give excellent performance.
+
+https://microk8s.io
+
+You will need to configure the SIDECAR_DOCKER_REGISTRY environment variable.  If you're using the built in microk8's registry you should configure the following:
+
+    export SIDECAR_DOCKER_REGISTRY="http://localhost:32000"
+
+NOTE: The MicroK8 project uses 32000 for its registry while minikube on MacOS uses port 5000.
+
+Please see the setup in `.circleci/setup-microk8.sh`.
+
+#### Push containers to the registry
+
+    ./gradlew pushAll
+
+#### Running Integration Tests
+
+Integration tests can be run with the following:
+
+    ./gradlew integrationTest
+
+
+
+#### Tests
+
+Tests in the cassandra-integration-tests submodule should be marked as integration tests with the @CassandraIntegrationTest annotation:
+
+    @CassandraIntegrationTest
+    void myTestHere(CassandraTestContext context)
+    {
+        //
+    }
+
+The `CassandraTestTemplate` will handle starting up each of the Kubernetes services required for testing and inject the
+`CassandraTestContext`, which will have the version, CQLSession, container, and other useful information required for testing.
+
+### Main Project
+
+The main project consumes the various Cassandra versions via the CassandraAdapterDelegate.  When connecting to the server,
+the delegate will discover the version and choose the best `ICassandraAdapter` to match the server version.
+
 
 ## Testing
 
@@ -18,6 +111,8 @@
 
 We strive for high quality, thoroughly tested code.
 
+Some tests require docker to be installed locally to run Cassandra.
+
 ## Issue Tracker
 
 File bugs or look for issues to work on in our project JIRA: https://issues.apache.org/jira/browse/CASSANDRASC
diff --git a/gradle.properties b/gradle.properties
index bd60cdf..d31abca 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1 +1,4 @@
-version=1.0-SNAPSHOT
\ No newline at end of file
+version=1.0-SNAPSHOT
+junitVersion=5.4.2
+kubernetesClientVersion=9.0.0
+
diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties
index 4a6ebce..d2a1ec1 100644
--- a/gradle/wrapper/gradle-wrapper.properties
+++ b/gradle/wrapper/gradle-wrapper.properties
@@ -1,5 +1,6 @@
+#Fri May 01 11:14:42 PDT 2020
+distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.1-all.zip
 distributionBase=GRADLE_USER_HOME
 distributionPath=wrapper/dists
-distributionUrl=https\://services.gradle.org/distributions/gradle-6.2.1-bin.zip
-zipStoreBase=GRADLE_USER_HOME
 zipStorePath=wrapper/dists
+zipStoreBase=GRADLE_USER_HOME
diff --git a/scripts/cleanup-pods.sh b/scripts/cleanup-pods.sh
new file mode 100755
index 0000000..e046996
--- /dev/null
+++ b/scripts/cleanup-pods.sh
@@ -0,0 +1,12 @@
+#!/usr/bin/env bash
+
+echo "Existing pods and services"
+kubectl get pods
+kubectl get services
+
+echo "Cleaning up pods:"
+kubectl get pods | awk '{print $1}' | egrep '^cassandra-' | xargs -n 1 kubectl delete pod
+
+echo "Cleaning up services:"
+kubectl get services | awk '{print $1}' | egrep '^cassandra-' | xargs -n 1 kubectl delete service
+
diff --git a/scripts/setup-minikube.sh b/scripts/setup-minikube.sh
new file mode 100755
index 0000000..42ebb0e
--- /dev/null
+++ b/scripts/setup-minikube.sh
@@ -0,0 +1,30 @@
+#!/usr/bin/env bash
+
+# Script to start minikube locally and push the containers to the local registry
+# we need to use the hyperkit driver or we can't use the registry
+
+minikube start --insecure-registry "192.168.0.0/16" --insecure-registry "10.0.0.0/24"  --memory 8G --cpus=4 --vm=true
+minikube addons enable registry
+minikube addons enable dashboard
+minikube addons list
+
+kubectl get all
+
+echo "Be sure to configure your docker registry to allow for insecure image uploads:"
+
+echo ""
+echo "{"
+echo " \"insecure-registries\": [\"$(minikube ip):5000\"]  "
+echo "}"
+
+echo "Ensure your docker configuration (Docker preferences -> Docker Engine) allows for insecure pushes then press enter to publish the test containers."
+
+read var
+
+./gradlew buildAll pushAll
+
+echo "To allow the tests to connect to your containers, please run the following:"
+
+echo "minikube tunnel"
+
+echo "Remember to do this before running integration tests!"
\ No newline at end of file
diff --git a/settings.gradle b/settings.gradle
index 6f8fc53..c28cfe4 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,4 +1,9 @@
 rootProject.name = "cassandra-sidecar"
 
+include "cassandra40"
+include "common"
+include "containers"
 include "docs"
+include "cassandra-integration-tests"
+
 
diff --git a/src/main/resources/spotbugs-exclude.xml b/spotbugs-exclude.xml
similarity index 100%
rename from src/main/resources/spotbugs-exclude.xml
rename to spotbugs-exclude.xml
diff --git a/src/integration/java/org/apache/cassandra/sidecar/HealthServiceIntegrationTest.java b/src/integration/java/org/apache/cassandra/sidecar/HealthServiceIntegrationTest.java
deleted file mode 100644
index ec8bd4d..0000000
--- a/src/integration/java/org/apache/cassandra/sidecar/HealthServiceIntegrationTest.java
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar;
-
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-
-import com.datastax.driver.core.NettyOptions;
-import com.datastax.oss.simulacron.common.cluster.ClusterSpec;
-import com.datastax.oss.simulacron.server.BoundCluster;
-import com.datastax.oss.simulacron.server.BoundNode;
-import com.datastax.oss.simulacron.server.NodePerPortResolver;
-import com.datastax.oss.simulacron.server.Server;
-import com.google.inject.AbstractModule;
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.Provides;
-import com.google.inject.Singleton;
-import com.google.inject.util.Modules;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Timer;
-import io.vertx.core.Vertx;
-import io.vertx.core.http.HttpServer;
-import io.vertx.core.logging.Logger;
-import io.vertx.core.logging.LoggerFactory;
-import io.vertx.ext.web.client.WebClient;
-import io.vertx.ext.web.codec.BodyCodec;
-import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.routes.HealthCheck;
-import org.apache.cassandra.sidecar.routes.HealthService;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-/**
- * Longer run and more intensive tests for the HealthService and HealthCheck
- */
-@DisplayName("Health Service Integration Tests")
-public class HealthServiceIntegrationTest
-{
-    private static final ThreadFactory threadFactory = new ThreadFactoryBuilder()
-                                                       .setDaemon(true)
-                                                       .setNameFormat("HealthServiceTest-%d")
-                                                       .build();
-    private static final HashedWheelTimer sharedHWT = new HashedWheelTimer(threadFactory);
-    private static final EventLoopGroup sharedEventLoopGroup = new NioEventLoopGroup(0, threadFactory);
-
-    private static final Logger logger = LoggerFactory.getLogger(HealthServiceIntegrationTest.class);
-
-    private static final NettyOptions shared = new NettyOptions()
-    {
-        public EventLoopGroup eventLoopGroup(ThreadFactory threadFactory)
-        {
-            return sharedEventLoopGroup;
-        }
-
-        public void onClusterClose(EventLoopGroup eventLoopGroup)
-        {
-        }
-
-        public Timer timer(ThreadFactory threadFactory)
-        {
-            return sharedHWT;
-        }
-
-        public void onClusterClose(Timer timer)
-        {
-        }
-    };
-
-    private Vertx vertx;
-    private int port;
-    private List<CQLSession> sessions = new LinkedList<>();
-    private Injector injector;
-
-    @BeforeEach
-    void setUp() throws InterruptedException
-    {
-        AtomicBoolean failedToListen = new AtomicBoolean(false);
-
-        do
-        {
-            injector = Guice.createInjector(Modules.override(new MainModule())
-                                                   .with(new IntegrationTestModule(1, sessions)));
-            vertx = injector.getInstance(Vertx.class);
-            HttpServer httpServer = injector.getInstance(HttpServer.class);
-            Configuration config = injector.getInstance(Configuration.class);
-            port = config.getPort();
-
-            CountDownLatch waitLatch = new CountDownLatch(1);
-            httpServer.listen(port, res ->
-            {
-                if (res.succeeded())
-                {
-                    logger.info("Succeeded to listen on port " + port);
-                }
-                else
-                {
-                    logger.error("Failed to listen on port " + port + " " + res.cause());
-                    failedToListen.set(true);
-                }
-                waitLatch.countDown();
-            });
-
-            if (waitLatch.await(60, TimeUnit.SECONDS))
-                logger.info("Listen complete before timeout.");
-            else
-                logger.error("Listen complete timed out.");
-
-            if (failedToListen.get())
-                closeClusters();
-        } while(failedToListen.get());
-    }
-
-    @AfterEach
-    void tearDown() throws InterruptedException
-    {
-        CountDownLatch waitLatch = new CountDownLatch(1);
-        vertx.close(res -> waitLatch.countDown());
-        if (waitLatch.await(60, TimeUnit.SECONDS))
-            logger.info("Close complete before timeout.");
-        else
-            logger.error("Close timed out.");
-
-    }
-
-    @AfterEach
-    public void closeClusters()
-    {
-        for (CQLSession session : sessions)
-            session.close();
-        sessions.clear();
-    }
-
-    /**
-     * This test has a race condition that can result in test failure.  Be sure to wait long enough for the server
-     * to register as up.
-     * See CASSANDRA-15615
-     */
-    @DisplayName("100 node cluster stopping, then starting")
-    @Test
-    public void testDownHost() throws InterruptedException
-    {
-        int nodeCount = 100;
-        try (Server server = Server.builder()
-                                   .withMultipleNodesPerIp(true)
-                                   .withAddressResolver(new NodePerPortResolver(new byte[]{ 127, 0, 0, 1 }, 49152))
-                                   .build())
-        {
-            ClusterSpec cluster = ClusterSpec.builder()
-                                             .withNodes(nodeCount)
-                                             .build();
-            BoundCluster bCluster = server.register(cluster);
-
-            Set<BoundNode> downNodes = new HashSet<>();
-            Map<BoundNode, HealthCheck> checks = new HashMap<>();
-
-            logger.info("Create a health check per node");
-            for (BoundNode node : bCluster.getNodes())
-                checks.put(node, healthCheckFor(node, shared));
-
-            logger.info("verify all nodes marked as up");
-            for (BoundNode node : bCluster.getNodes())
-                assertTrue(checks.get(node).get());
-
-            logger.info("shut down nodes one at a time, and verify we get correct response on all HealthChecks");
-            for (int i = 0; downNodes.size() < nodeCount; i++)
-            {
-                for (BoundNode node : bCluster.getNodes())
-                    assertEquals(checks.get(node).get(), !downNodes.contains(node));
-                bCluster.node(i).stop();
-                downNodes.add(bCluster.node(i));
-            }
-
-            logger.info("all hosts should be down");
-            for (BoundNode node : bCluster.getNodes())
-                assertFalse(checks.get(node).get());
-
-            logger.info("Starting nodes back up");
-
-            int i;
-            for (i = 0; downNodes.size() > 0; i++)
-            {
-                bCluster.node(i).start();
-                downNodes.remove(bCluster.node(i));
-            }
-            logger.info("Nodes started back up: " + i);
-
-            logger.info("verify all nodes marked as up");
-
-            long start = System.currentTimeMillis();
-
-            int checkNumber = 0;
-            for (BoundNode node : bCluster.getNodes())
-            {
-                while ((System.currentTimeMillis() - start) < 20000 && !checks.get(node).get())
-                    Thread.sleep(250);
-
-                logger.info("Started node " + checkNumber);
-                assertTrue(checks.get(node).get(), "Failed on node " + checkNumber);
-                checkNumber++;
-            }
-        }
-    }
-
-
-    @DisplayName("Down on startup, then comes up")
-    @Test
-    public void testDownHostTurnsOn() throws Throwable
-    {
-        VertxTestContext testContext = new VertxTestContext();
-        BoundCluster bc = injector.getInstance(BoundCluster.class);
-        BoundNode node = bc.node(0);
-        HealthCheck check = injector.getInstance(HealthCheck.class);
-        HealthService service = injector.getInstance(HealthService.class);
-        Server server = injector.getInstance(Server.class);
-
-        try
-        {
-            WebClient client = WebClient.create(vertx);
-            long start = System.currentTimeMillis();
-            client.get(port, "localhost", "/api/v1/__health")
-                  .as(BodyCodec.string())
-                  .send(testContext.succeeding(response -> testContext.verify(() ->
-                  {
-                      assertEquals(503, response.statusCode());
-
-                      node.start();
-                      while ((System.currentTimeMillis() - start) < (1000 * 60 * 2) && !check.get())
-                          Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
-                      service.refreshNow();
-                      client.get(port, "localhost", "/api/v1/__health")
-                            .as(BodyCodec.string())
-                            .send(testContext.succeeding(upResponse -> testContext.verify(() ->
-                            {
-                                assertEquals(200, upResponse.statusCode());
-                                testContext.completeNow();
-                            })));
-                  })));
-            assertTrue(testContext.awaitCompletion(125, TimeUnit.SECONDS));
-            if (testContext.failed())
-            {
-                throw testContext.causeOfFailure();
-            }
-        }
-        finally
-        {
-            service.stop();
-            server.close();
-        }
-    }
-
-    public HealthCheck healthCheckFor(BoundNode node, NettyOptions shared)
-    {
-        CQLSession session = new CQLSession(node.inetSocketAddress(), shared);
-        sessions.add(session);
-        return new HealthCheck(session);
-    }
-
-    private static class IntegrationTestModule extends AbstractModule
-    {
-        private final int nodeCount;
-        private final List<CQLSession> sessions;
-
-        private IntegrationTestModule(int count, List<CQLSession> sessions)
-        {
-            this.nodeCount = count;
-            this.sessions = sessions;
-        }
-
-        @Provides
-        @Singleton
-        BoundCluster cluster(Server server)
-        {
-            ClusterSpec cluster = ClusterSpec.builder()
-                                             .withNodes(nodeCount)
-                                             .build();
-            BoundCluster bc = server.register(cluster);
-            for (BoundNode n : bc.getNodes())
-                n.stop();
-
-            return bc;
-        }
-
-        @Provides
-        @Singleton
-        BoundNode node(BoundCluster bc)
-        {
-            return bc.node(0);
-        }
-
-        @Provides
-        @Singleton
-        Server server()
-        {
-            return Server.builder()
-                         .withMultipleNodesPerIp(true)
-                         .withAddressResolver(new NodePerPortResolver(new byte[]{ 127, 0, 0, 1 }, 49152))
-                         .build();
-        }
-
-        @Provides
-        @Singleton
-        HealthCheck healthCheck(BoundNode node)
-        {
-            CQLSession session = new CQLSession(node.inetSocketAddress(), shared);
-            sessions.add(session);
-            HealthCheck check = new HealthCheck(session);
-            return check;
-        }
-
-        @Provides
-        @Singleton
-        public Configuration configuration() throws IOException
-        {
-            ServerSocket socket = new ServerSocket(0);
-            int randomPort = socket.getLocalPort();
-            socket.close();
-
-            return new Configuration.Builder()
-                   .setCassandraHost("INVALID_FOR_TEST")
-                   .setCassandraPort(0)
-                   .setHost("127.0.0.1")
-                   .setPort(randomPort)
-                   .setHealthCheckFrequency(1000)
-                   .setSslEnabled(false)
-                   .build();
-        }
-    }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
index 64c5e47..63e503f 100644
--- a/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
+++ b/src/main/java/org/apache/cassandra/sidecar/CassandraSidecarDaemon.java
@@ -27,7 +27,6 @@
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import io.vertx.core.http.HttpServer;
-import org.apache.cassandra.sidecar.routes.HealthService;
 import org.apache.cassandra.sidecar.utils.SslUtils;
 
 /**
@@ -37,14 +36,12 @@
 public class CassandraSidecarDaemon
 {
     private static final Logger logger = LoggerFactory.getLogger(CassandraSidecarDaemon.class);
-    private final HealthService healthService;
     private final HttpServer server;
     private final Configuration config;
 
     @Inject
-    public CassandraSidecarDaemon(HealthService healthService, HttpServer server, Configuration config)
+    public CassandraSidecarDaemon(HttpServer server, Configuration config)
     {
-        this.healthService = healthService;
         this.server = server;
         this.config = config;
     }
@@ -54,14 +51,12 @@
         banner(System.out);
         validate();
         logger.info("Starting Cassandra Sidecar on {}:{}", config.getHost(), config.getPort());
-        healthService.start();
         server.listen(config.getPort(), config.getHost());
     }
 
     public void stop()
     {
         logger.info("Stopping Cassandra Sidecar");
-        healthService.stop();
         server.close();
     }
 
diff --git a/src/main/java/org/apache/cassandra/sidecar/MainModule.java b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
index 4db26c5..27eefd7 100644
--- a/src/main/java/org/apache/cassandra/sidecar/MainModule.java
+++ b/src/main/java/org/apache/cassandra/sidecar/MainModule.java
@@ -40,6 +40,10 @@
 import io.vertx.ext.web.Router;
 import io.vertx.ext.web.handler.LoggerHandler;
 import io.vertx.ext.web.handler.StaticHandler;
+import org.apache.cassandra.sidecar.cassandra40.Cassandra40Factory;
+import org.apache.cassandra.sidecar.common.CQLSession;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.routes.HealthService;
 import org.apache.cassandra.sidecar.routes.SwaggerOpenApiResource;
 import org.jboss.resteasy.plugins.server.vertx.VertxRegistry;
@@ -154,4 +158,30 @@
             throw new ConfigurationException("Failed reading from sidebar.config path: " + confPath, e);
         }
     }
+
+    @Provides
+    public CQLSession session(Configuration config)
+    {
+        String host = config.getCassandraHost();
+        Integer port = config.getPort();
+        Integer healthCheckFrequencyMillis = config.getHealthCheckFrequencyMillis();
+
+        return new CQLSession(host, port, healthCheckFrequencyMillis);
+    }
+
+    @Provides
+    @Singleton
+    public CassandraVersionProvider cassandraVersionProvider()
+    {
+        CassandraVersionProvider.Builder builder = new CassandraVersionProvider.Builder();
+        builder.add(new Cassandra40Factory());
+        return builder.build();
+    }
+
+    @Provides
+    public CassandraAdapterDelegate cassandraAdapterDelegate(CassandraVersionProvider provider, CQLSession session,
+                                                             Configuration config)
+    {
+        return new CassandraAdapterDelegate(provider, session, config.getHealthCheckFrequencyMillis());
+    }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/HealthCheck.java b/src/main/java/org/apache/cassandra/sidecar/routes/HealthCheck.java
deleted file mode 100644
index 46c57ea..0000000
--- a/src/main/java/org/apache/cassandra/sidecar/routes/HealthCheck.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.sidecar.routes;
-
-import java.util.function.Supplier;
-
-import javax.annotation.Nullable;
-
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.exceptions.NoHostAvailableException;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import io.vertx.core.logging.Logger;
-import io.vertx.core.logging.LoggerFactory;
-import org.apache.cassandra.sidecar.CQLSession;
-
-/**
- * Basic health check to verify that a CQL connection can be established with a basic SELECT query.
- */
-@Singleton
-public class HealthCheck implements Supplier<Boolean>
-{
-    private static final Logger logger = LoggerFactory.getLogger(HealthCheck.class);
-
-    @Nullable
-    private final CQLSession session;
-
-    @Inject
-    public HealthCheck(@Nullable CQLSession session)
-    {
-        this.session = session;
-    }
-
-    /**
-     * The actual health check
-     *
-     * @return
-     */
-    private boolean check()
-    {
-        try
-        {
-            if (session != null && session.getLocalCql() != null)
-            {
-                ResultSet rs = session.getLocalCql().execute("SELECT release_version FROM system.local");
-                boolean result = (rs.one() != null);
-                logger.debug("HealthCheck status: {}", result);
-                return result;
-            }
-        }
-        catch (NoHostAvailableException nha)
-        {
-            logger.trace("NoHostAvailableException in HealthCheck - Cassandra Down");
-        }
-        catch (Exception e)
-        {
-            logger.error("Failed to reach Cassandra.", e);
-        }
-        return false;
-    }
-
-    /**
-     * Get the check value
-     *
-     * @return true or false based on whether check was successful
-     */
-    @Override
-    public Boolean get()
-    {
-        return check();
-    }
-
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/HealthService.java b/src/main/java/org/apache/cassandra/sidecar/routes/HealthService.java
index fa877c4..bd15fde 100644
--- a/src/main/java/org/apache/cassandra/sidecar/routes/HealthService.java
+++ b/src/main/java/org/apache/cassandra/sidecar/routes/HealthService.java
@@ -18,11 +18,6 @@
 
 package org.apache.cassandra.sidecar.routes;
 
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import javax.annotation.Nullable;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
 import javax.ws.rs.Produces;
@@ -31,8 +26,6 @@
 
 import com.google.common.collect.ImmutableMap;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Host;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import io.netty.handler.codec.http.HttpResponseStatus;
@@ -41,71 +34,22 @@
 import io.vertx.core.json.Json;
 import io.vertx.core.logging.Logger;
 import io.vertx.core.logging.LoggerFactory;
-import org.apache.cassandra.sidecar.CQLSession;
-import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
 
 /**
- * Tracks health check[s] and provides a REST response that should match that defined by api.yaml
+ * Provides a simple REST endpoint to determine if a node is available
  */
 @Singleton
 @Path("/api/v1/__health")
-public class HealthService implements Host.StateListener
+public class HealthService
 {
     private static final Logger logger = LoggerFactory.getLogger(HealthService.class);
-    private final int checkPeriodMs;
-    private final Supplier<Boolean> check;
-    private volatile boolean registered = false;
-
-    @Nullable
-    private final CQLSession session;
-
-    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
-    private volatile boolean lastKnownStatus = false;
+    private final CassandraAdapterDelegate cassandra;
 
     @Inject
-    public HealthService(Configuration config, HealthCheck check, @Nullable CQLSession session)
+    public HealthService(CassandraAdapterDelegate cassandra)
     {
-        this.checkPeriodMs = config.getHealthCheckFrequencyMillis();
-        this.session = session;
-        this.check = check;
-    }
-
-    public synchronized void start()
-    {
-        logger.info("Starting health check");
-        maybeRegisterHostListener();
-        executor.scheduleWithFixedDelay(this::refreshNow, 0, checkPeriodMs, TimeUnit.MILLISECONDS);
-    }
-
-    public synchronized void refreshNow()
-    {
-        try
-        {
-            lastKnownStatus = this.check.get();
-            maybeRegisterHostListener();
-        }
-        catch (Exception e)
-        {
-            logger.error("Error while performing health check", e);
-        }
-    }
-
-    private synchronized void maybeRegisterHostListener()
-    {
-        if (!registered)
-        {
-            if (session != null && session.getLocalCql() != null)
-            {
-                session.getLocalCql().getCluster().register(this);
-                registered = true;
-            }
-        }
-    }
-
-    public synchronized void stop()
-    {
-        logger.info("Stopping health check");
-        executor.shutdown();
+        this.cassandra = cassandra;
     }
 
     @Operation(summary = "Health Check for Cassandra's status",
@@ -118,36 +62,9 @@
     @GET
     public Response doGet()
     {
-        int status = lastKnownStatus ? HttpResponseStatus.OK.code() : HttpResponseStatus.SERVICE_UNAVAILABLE.code();
-        return Response.status(status).entity(Json.encode(ImmutableMap.of("status", lastKnownStatus ?
+        Boolean up = cassandra.isUp();
+        int status = up ? HttpResponseStatus.OK.code() : HttpResponseStatus.SERVICE_UNAVAILABLE.code();
+        return Response.status(status).entity(Json.encode(ImmutableMap.of("status", up ?
                                                                                     "OK" : "NOT_OK"))).build();
     }
-
-    public void onAdd(Host host)
-    {
-        refreshNow();
-    }
-
-    public void onUp(Host host)
-    {
-        refreshNow();
-    }
-
-    public void onDown(Host host)
-    {
-        refreshNow();
-    }
-
-    public void onRemove(Host host)
-    {
-        refreshNow();
-    }
-
-    public void onRegister(Cluster cluster)
-    {
-    }
-
-    public void onUnregister(Cluster cluster)
-    {
-    }
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/AbstractHealthServiceTest.java b/src/test/java/org/apache/cassandra/sidecar/AbstractHealthServiceTest.java
index f84bb3f..946be3c 100644
--- a/src/test/java/org/apache/cassandra/sidecar/AbstractHealthServiceTest.java
+++ b/src/test/java/org/apache/cassandra/sidecar/AbstractHealthServiceTest.java
@@ -36,12 +36,14 @@
 import com.google.inject.util.Modules;
 import io.vertx.core.Vertx;
 import io.vertx.core.http.HttpServer;
+import io.vertx.core.net.JksOptions;
 import io.vertx.ext.web.client.WebClient;
+import io.vertx.ext.web.client.WebClientOptions;
 import io.vertx.ext.web.codec.BodyCodec;
 import io.vertx.junit5.VertxTestContext;
-import org.apache.cassandra.sidecar.mocks.MockHealthCheck;
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.routes.HealthService;
-
+import static org.mockito.Mockito.when;
 
 /**
  * Provides basic tests shared between SSL and normal http health services
@@ -49,11 +51,11 @@
 public abstract class AbstractHealthServiceTest
 {
     private static final Logger logger = LoggerFactory.getLogger(AbstractHealthServiceTest.class);
-    private MockHealthCheck check;
     private HealthService service;
     private Vertx vertx;
     private Configuration config;
     private HttpServer server;
+    private CassandraAdapterDelegate cassandra;
 
     public abstract boolean isSslEnabled();
 
@@ -70,8 +72,8 @@
     {
         Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(getTestModule()));
         server = injector.getInstance(HttpServer.class);
+        cassandra = injector.getInstance(CassandraAdapterDelegate.class);
 
-        check = injector.getInstance(MockHealthCheck.class);
         service = injector.getInstance(HealthService.class);
         vertx = injector.getInstance(Vertx.class);
         config = injector.getInstance(Configuration.class);
@@ -98,10 +100,8 @@
     @Test
     public void testHealthCheckReturns200OK(VertxTestContext testContext)
     {
-        check.setStatus(true);
-        service.refreshNow();
-
-        WebClient client = WebClient.create(vertx);
+        when(cassandra.isUp()).thenReturn(true);
+        WebClient client = getClient();
 
         client.get(config.getPort(), "localhost", "/api/v1/__health")
               .as(BodyCodec.string())
@@ -113,14 +113,29 @@
               })));
     }
 
+    private WebClient getClient()
+    {
+        return WebClient.create(vertx, getWebClientOptions());
+    }
+
+    private WebClientOptions getWebClientOptions()
+    {
+        WebClientOptions options = new WebClientOptions();
+        if (isSslEnabled())
+        {
+            options.setTrustStoreOptions(new JksOptions().setPath("src/test/resources/certs/ca.p12")
+                                                         .setPassword("password"));
+        }
+        return options;
+    }
+
     @DisplayName("Should return HTTP 503 Failure when check=False")
     @Test
     public void testHealthCheckReturns503Failure(VertxTestContext testContext)
     {
-        check.setStatus(false);
-        service.refreshNow();
 
-        WebClient client = WebClient.create(vertx);
+        when(cassandra.isUp()).thenReturn(false);
+        WebClient client = getClient();
 
         client.get(config.getPort(), "localhost", "/api/v1/__health")
               .as(BodyCodec.string())
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestModule.java b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
index 608ea8e..67c28d0 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestModule.java
@@ -18,31 +18,42 @@
 
 package org.apache.cassandra.sidecar;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
-import org.apache.cassandra.sidecar.mocks.MockHealthCheck;
+
+import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
+import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
+import org.apache.cassandra.sidecar.common.MockCassandraFactory;
 import org.apache.cassandra.sidecar.routes.HealthService;
 
+import static org.mockito.Mockito.mock;
+
 /**
  * Provides the basic dependencies for unit tests.
  */
 public class TestModule extends AbstractModule
 {
-    @Provides
+    private static final Logger logger = LoggerFactory.getLogger(TestModule.class);
+
     @Singleton
-    public HealthService healthService(Configuration config, MockHealthCheck check)
+    @Provides
+    public CassandraAdapterDelegate delegate()
     {
-        return new HealthService(config, check, null);
+        return mock(CassandraAdapterDelegate.class);
     }
 
-    @Provides
     @Singleton
-    public MockHealthCheck healthCheck()
+    @Provides
+    public HealthService healthService(CassandraAdapterDelegate delegate)
     {
-        return new MockHealthCheck();
+        return new HealthService(delegate);
     }
 
+
     @Provides
     @Singleton
     public Configuration configuration()
@@ -61,4 +72,18 @@
                            .setSslEnabled(false)
                            .build();
     }
+
+    /**
+     * The Mock factory is used for testing purposes, enabling us to test all failures and possible results
+     * @return
+     */
+    @Provides
+    @Singleton
+    public CassandraVersionProvider cassandraVersionProvider()
+    {
+        CassandraVersionProvider.Builder builder = new CassandraVersionProvider.Builder();
+        builder.add(new MockCassandraFactory());
+        return builder.build();
+    }
+
 }
diff --git a/src/test/java/org/apache/cassandra/sidecar/TestSslModule.java b/src/test/java/org/apache/cassandra/sidecar/TestSslModule.java
index b140bb7..883e04e 100644
--- a/src/test/java/org/apache/cassandra/sidecar/TestSslModule.java
+++ b/src/test/java/org/apache/cassandra/sidecar/TestSslModule.java
@@ -18,11 +18,19 @@
 
 package org.apache.cassandra.sidecar;
 
+import java.io.File;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Changes to the TestModule to define SSL dependencies
  */
 public class TestSslModule extends TestModule
 {
+    private static final Logger logger = LoggerFactory.getLogger(TestSslModule.class);
+
+
     @Override
     public Configuration abstractConfig()
     {
@@ -32,6 +40,15 @@
         final String trustStorePath = TestSslModule.class.getClassLoader().getResource("certs/ca.p12").getPath();
         final String trustStorePassword = "password";
 
+        if (!new File(keyStorePath).exists())
+        {
+            logger.error("JMX password file not found");
+        }
+        if (!new File(trustStorePath).exists())
+        {
+            logger.error("Trust Store file not found");
+        }
+
         return new Configuration.Builder()
                            .setCassandraHost("INVALID_FOR_TEST")
                            .setCassandraPort(0)