initial commit

Co-authored-by: Bhaskar Muppana <mbhaskar@apple.com>
Co-authored-by: Jeff Jirsa <jjirsa@apple.com>
Co-authored-by: Marcus Eriksson <marcuse@apache.org>
Co-authored-by: Michael Kjellman <kjellman@apple.com>
Co-authored-by: Nate McCall <zznate.m@gmail.com>
Co-authored-by: Sam Tunnicliffe <sam@beobal.com>
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..7b48652
--- /dev/null
+++ b/README.md
@@ -0,0 +1,86 @@
+# Cassandra diff
+
+## Configuration
+See `spark-job/localconfig.yaml` for an example config.
+
+## Custom cluster providers
+To make it easy to run in any environment the cluster providers are pluggable - there are two interfaces to implement.
+First, the `ClusterProvider` interface is used to create a connection to the clusters, and it is configured using
+`JobConfiguration#clusterConfig` (see below).
+### cluster_config
+This section has 3 parts - `source`, `target` and `metadata` where source and target describes the clusters that should
+be compared and metadata describes where we store information about any mismatches and the progress the job has done. 
+Metadata can be stored in one of the source/target clusters or in a separate cluster.
+
+The fields under source/target/metadata are passed in to the `ClusterProvider` (described by `impl`) as a map, so any
+custom cluster providers can be configured here.
+
+## Setting up clusters for diff
+One way of setting up clusters for diff is to restore a snapshot to two different clusters and then modifying one 
+of the clusters to be able to make sure that the queries still return the same results. This could include 
+upgrades/replacements/bounces/decommission/expansion. 
+
+## Environment variables
+Currently usernames and passwords are set as environment variables when running the diff tool and the api server:
+
+* `diff.cluster.<identifier>.cql_user` - the user name to use
+* `diff.cluster.<identifier>.cql_password` - password
+
+where `<identifier>` should be `source`, `target` and `metadata` for the username/password combinations for the
+matching clusters in the configuration.
+
+## Example
+This example starts two cassandra single-node clusters in docker, runs stress to populate them and then runs diff 
+to make sure the data matches;
+
+You need to have docker and spark setup.
+
+```shell script
+$ git clone <wherever>/cassandra-diff.git
+$ cd cassandra-diff
+$ mvn package
+$ docker run --name cas-src -d  -p 9042:9042 cassandra:3.0.18
+$ docker run --name cas-tgt -d  -p 9043:9042 cassandra:latest
+$ docker exec cas-src cassandra-stress write n=1k
+$ docker exec cas-tgt cassandra-stress write n=1k
+$ spark-submit --verbose --files ./spark-job/localconfig.yaml --class org.apache.cassandra.diff.DiffJob spark-job/target/spark-job-0.1-SNAPSHOT.jar localconfig.yaml
+# ... logs
+INFO  DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values - 6000, Mismatched Values - 0 }
+## start api-server:
+$ mvn install
+$ cd api-server
+$ mvn exec:java
+$ curl -s localhost:8089/jobs/recent | python -mjson.tool
+  [
+      {
+          "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+          "buckets": 100,
+          "keyspace": "keyspace1",
+          "tables": [
+              "standard1"
+          ],
+          "sourceClusterName": "local_test_1",
+          "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+          "targetClusterName": "local_test_2",
+          "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+          "tasks": 10000,
+          "start": "2019-08-16T11:47:36.123Z"
+      }
+  ]
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | python -mjson.tool
+  [
+      {
+          "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+          "table": "standard1",
+          "matchedPartitions": 1000,
+          "mismatchedPartitions": 0,
+          "matchedRows": 1000,
+          "matchedValues": 6000,
+          "mismatchedValues": 0,
+          "onlyInSource": 0,
+          "onlyInTarget": 0,
+          "skippedPartitions": 0
+      }
+  ]
+
+```
diff --git a/api-server/README.md b/api-server/README.md
new file mode 100644
index 0000000..254601c
--- /dev/null
+++ b/api-server/README.md
@@ -0,0 +1,117 @@
+# Diff API Server
+## Configuration
+See main project README - the api server reads the `metadata_options` and `cluster_config.metadata` to connect
+
+## Running locally
+`mvn exec:java`
+
+## Endpoints
+### `/jobs/running/id`
+Returns the ids of currently running jobs
+
+### `/jobs/running`
+Summaries of all currently running jobs
+
+### `/jobs/recent`
+Summaries of recently run jobs
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/recent | python -mjson.tool
+  [
+      {
+          "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+          "buckets": 100,
+          "keyspace": "keyspace1",
+          "tables": [
+              "standard1"
+          ],
+          "sourceClusterName": "local_test_1",
+          "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+          "targetClusterName": "local_test_2",
+          "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+          "tasks": 10000,
+          "start": "2019-08-16T11:47:36.123Z"
+      }
+  ]
+
+```
+
+### `/jobs/{jobid}`
+Summary about a single job
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a | python -mjson.tool
+  {
+      "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+      "buckets": 100,
+      "keyspace": "keyspace1",
+      "tables": [
+          "standard1"
+      ],
+      "sourceClusterName": "local_test_1",
+      "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+      "targetClusterName": "local_test_2",
+      "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+      "tasks": 10000,
+      "start": "2019-08-16T11:47:36.123Z"
+  }
+```
+
+### `/jobs/{jobid}/results`
+The results for the given job.
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | python -mjson.tool
+[
+    {
+        "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+        "table": "standard1",
+        "matchedPartitions": 1000,
+        "mismatchedPartitions": 0,
+        "matchedRows": 1000,
+        "matchedValues": 6000,
+        "mismatchedValues": 0,
+        "onlyInSource": 0,
+        "onlyInTarget": 0,
+        "skippedPartitions": 0
+    }
+]
+```
+
+### `/jobs/{jobid}/status`
+Current status for a job, shows how many splits have been finished.
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/status | python -mjson.tool
+{
+    "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+    "completedByTable": {
+        "standard1": 10000
+    }
+}
+```
+
+### `/jobs/{jobid}/mismatches`
+Number of mismatches for the job.
+```shell script
+$  curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/mismatches | python -mjson.tool
+  {
+      "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+      "mismatchesByTable": {}
+  }
+```
+
+### `/jobs/{jobid}/errors/summary`
+Summary of the number errors for the job. 
+
+### `/jobs/{jobid}/errors/ranges`
+Lists failed ranges for the job.
+
+### `/jobs/{jobid}/errors`
+Details about the job errors.
+
+### `/jobs/by-start-date/{started-after}`
+### `/jobs/by-start-date/{started-after}/{started-before}`
+### `/jobs/by-source-cluster/{source}`
+### `/jobs/by-target-cluster/{target}`
+### `/jobs/by-keyspace/{keyspace}`
diff --git a/api-server/pom.xml b/api-server/pom.xml
new file mode 100644
index 0000000..ab431da
--- /dev/null
+++ b/api-server/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+		 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+      <groupId>org.apache.cassandra.diff</groupId>
+      <artifactId>diff</artifactId>
+      <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>api-server</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+      <!-- compile dependencies -->
+      <dependency>
+        <groupId>org.apache.cassandra.diff</groupId>
+        <artifactId>common</artifactId>
+        <version>${project.parent.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+        <version>3.1.7</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.cxf</groupId>
+        <artifactId>cxf-rt-transports-http-jetty</artifactId>
+        <version>3.1.7</version>
+      </dependency>
+
+      <dependency>
+        <groupId>joda-time</groupId>
+        <artifactId>joda-time</artifactId>
+        <version>2.9.9</version>
+      </dependency>
+
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>2.9.9.2</version>
+      </dependency>
+
+      <!-- provided dependencies -->
+      <dependency>
+        <groupId>org.jetbrains</groupId>
+        <artifactId>annotations</artifactId>
+      </dependency>
+
+      <!-- runtime dependencies -->
+      <dependency>
+        <groupId>org.apache.logging.log4j</groupId>
+        <artifactId>log4j-slf4j-impl</artifactId>
+      </dependency>
+
+      <!-- test dependencies -->
+      <dependency>
+        <groupId>junit</groupId>
+        <artifactId>junit</artifactId>
+      </dependency>
+
+      <dependency>
+        <groupId>org.assertj</groupId>
+        <artifactId>assertj-core</artifactId>
+        <version>3.4.1</version>
+        <scope>test</scope>
+      </dependency>
+
+    </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>1.6.0</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>java</goal>
+            </goals>
+          </execution>
+        </executions>
+        <configuration>
+          <mainClass>org.apache.cassandra.diff.api.DiffAPIServer</mainClass>
+          <arguments>
+            <argument>
+              ../spark-job/localconfig.yaml
+            </argument>
+          </arguments>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
new file mode 100644
index 0000000..6f14128
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api;
+
+import java.io.IOException;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.diff.YamlJobConfiguration;
+import org.apache.cassandra.diff.api.resources.DiffJobsResource;
+import org.apache.cassandra.diff.api.resources.HealthResource;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+
+public class DiffAPIServer {
+    public static void main(String[] args) throws IOException {
+        String filename = args[0];
+        JAXRSServerFactoryBean factoryBean = new JAXRSServerFactoryBean();
+
+        DiffJobsResource diffResource = new DiffJobsResource(YamlJobConfiguration.load(filename));
+        factoryBean.setResourceProviders(Lists.newArrayList(new SingletonResourceProvider(diffResource),
+                                                            new SingletonResourceProvider(new HealthResource())));
+        factoryBean.setAddress("http://localhost:8089/");
+        Server server = factoryBean.create();
+
+        try {
+            server.start();
+            System.in.read();
+        } catch (Throwable t) {
+            t.printStackTrace(System.out);
+            throw t;
+        } finally {
+            diffResource.close();
+            if (server.isStarted())
+                server.stop();
+            System.exit(0);
+        }
+    }
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java
new file mode 100644
index 0000000..564f960
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.resources;
+
+import java.util.SortedSet;
+import java.util.UUID;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.diff.JobConfiguration;
+import org.apache.cassandra.diff.api.services.DBService;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+@Path("/jobs")
+public class DiffJobsResource {
+
+    private static final Logger logger = LoggerFactory.getLogger(DiffJobsResource.class);
+    private static final String DATE_FORMAT = "yyyy-MM-dd";
+    private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+    private static final ObjectMapper MAPPER = new ObjectMapper().setVisibility(PropertyAccessor.FIELD,
+                                                                                JsonAutoDetect.Visibility.NON_PRIVATE);
+    private final DBService dbService;
+
+    public DiffJobsResource(JobConfiguration conf) {
+        dbService = new DBService(conf);
+    }
+
+    @GET
+    @Path("/running/id")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getRunningJobIds() {
+        return response(dbService.fetchRunningJobs());
+    }
+
+    @GET
+    @Path("/running")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getRunningJobs() {
+        return response(dbService.fetchJobSummaries(dbService.fetchRunningJobs()));
+    }
+
+    @GET
+    @Path("/recent")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getRecentJobs() {
+
+        SortedSet<DBService.JobSummary> recentJobs = Sets.newTreeSet(DBService.JobSummary.COMPARATOR.reversed());
+        DateTime now = DateTime.now(DateTimeZone.UTC);
+        DateTime maxStartDate = now;
+        DateTime minStartDate = maxStartDate.minusDays(30);
+        recentJobs.addAll(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+
+        while (recentJobs.size() < 10 && (maxStartDate.compareTo(now.minusDays(90)) >= 0)) {
+            maxStartDate = minStartDate;
+            minStartDate = minStartDate.minusDays(30);
+            recentJobs.addAll(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+        }
+
+        return response(recentJobs);
+    }
+
+    @GET
+    @Path("/{jobid}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJob(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchJobSummary(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/results")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobResults(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchJobResults(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/status")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobStatus(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchJobStatus(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/mismatches")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobMismatches(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchMismatches(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/errors/summary")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobErrorSummary(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchErrorSummary(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/errors/ranges")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobErrorRanges(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchErrorRanges(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/{jobid}/errors")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobErrorDetail(@PathParam("jobid") final String jobId) {
+        return response(dbService.fetchErrorDetail(UUID.fromString(jobId)));
+    }
+
+    @GET
+    @Path("/by-start-date/{started-after}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsStartedSince(@PathParam("started-after") final String minStart) {
+
+        DateTime minStartDate = parseDate(minStart);
+        if (minStartDate == null)
+            return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+
+        DateTime maxStartDate = DateTime.now(DateTimeZone.UTC);
+
+        return getJobsStartedBetween(minStartDate, maxStartDate);
+    }
+
+    @GET
+    @Path("/by-start-date/{started-after}/{started-before}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsStartedSince(@PathParam("started-after") final String minStart,
+                                        @PathParam("started-before") final String maxStart) {
+
+        DateTime minStartDate = parseDate(minStart);
+        if (minStartDate == null)
+            return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+
+        DateTime maxStartDate = null;
+        if (isNullOrEmpty(maxStart)) {
+            DateTime.now(DateTimeZone.UTC);
+        }
+        else {
+            maxStartDate = parseDate(maxStart);
+            if (maxStartDate == null)
+                return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+            if (maxStartDate.compareTo(minStartDate) < 0)
+                return Response.status(400).entity("Invalid date range, started-before cannot be earlier than started-after").build();
+        }
+
+        return getJobsStartedBetween(minStartDate, maxStartDate);
+    }
+
+    @GET
+    @Path("/by-source-cluster/{source}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsBySourceCluster(@PathParam("source") final String sourceClusterName) {
+        return response(dbService.fetchJobsForSourceCluster(sourceClusterName));
+    }
+
+    @GET
+    @Path("/by-target-cluster/{target}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsByTargetCluster(@PathParam("target") final String targetClusterName) {
+        return response(dbService.fetchJobsForTargetCluster(targetClusterName));
+    }
+
+    @GET
+    @Path("/by-keyspace/{keyspace}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getJobsByKeyspace(@PathParam("keyspace") final String keyspace) {
+        return response(dbService.fetchJobsForKeyspace(keyspace));
+    }
+
+    private Response response(Object data) {
+        try {
+            return Response.ok().entity(MAPPER.writer().writeValueAsString(data)).build();
+        }
+        catch (JsonProcessingException e) {
+            logger.error("JSON Processing error", e);
+            return Response.serverError().entity("Error constructing JSON response").build();
+        }
+    }
+
+    private Response getJobsStartedBetween(DateTime minStartDate, DateTime maxStartDate) {
+        return response(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+    }
+
+    private DateTime parseDate(String s) {
+        try {
+            return DATE_FORMATTER.parseDateTime(s);
+        } catch (IllegalArgumentException e) {
+            return null;
+        }
+    }
+
+    public void close() {
+        dbService.close();
+    }
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java
new file mode 100644
index 0000000..d101615
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.resources;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+/**
+ * Health Check: not terribly useful yet.
+ * TODO: add DB connection check
+ */
+@Path("/__health")
+public class HealthResource {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(HealthResource.class);
+
+  private static Boolean overrideHealthStatus = null;
+
+  public HealthResource() {
+    LOGGER.debug("New HealthResource created.");
+  }
+
+  public static Boolean overrideHealthStatusTo(Boolean newStatus) {
+    Boolean oldStatus = overrideHealthStatus;
+    overrideHealthStatus = newStatus;
+    LOGGER.debug("Changed overrideHealthStatus from {} to {}", oldStatus, overrideHealthStatus);
+    return oldStatus;
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  public Response health() {
+    if (isHealthy()) {
+      return Response
+          .ok("All is good.")
+          .build();
+    } else {
+      return Response
+          .serverError()
+          .entity("Not Healthy.")
+          .build();
+    }
+  }
+
+  private boolean isHealthy() {
+    if (overrideHealthStatus == null) {
+      return true;
+    }
+    return overrideHealthStatus;
+  }
+
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
new file mode 100644
index 0000000..1bf1f88
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.services;
+
+import java.io.Closeable;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.diff.ClusterProvider;
+import org.apache.cassandra.diff.JobConfiguration;
+import org.jetbrains.annotations.NotNull;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+
+// TODO cache jobsummary
+// TODO fix exception handling
+public class DBService implements Closeable {
+    private static final Logger logger = LoggerFactory.getLogger(DBService.class);
+    private static final long QUERY_TIMEOUT_MS = 3000;
+
+    private final Cluster cluster;
+    private final Session session;
+    private final String diffKeyspace;
+
+    private final PreparedStatement runningJobsStatement;
+    private final PreparedStatement jobSummaryStatement;
+    private final PreparedStatement jobResultStatement;
+    private final PreparedStatement jobStatusStatement;
+    private final PreparedStatement jobMismatchesStatement;
+    private final PreparedStatement jobErrorSummaryStatement;
+    private final PreparedStatement jobErrorRangesStatement;
+    private final PreparedStatement jobErrorDetailStatement;
+    private final PreparedStatement jobsStartDateStatement;
+    private final PreparedStatement jobsForSourceStatement;
+    private final PreparedStatement jobsForTargetStatement;
+    private final PreparedStatement jobsForKeyspaceStatement;
+
+    public DBService(JobConfiguration config) {
+        logger.info("Initializing DBService");
+        ClusterProvider provider = ClusterProvider.getProvider(config.clusterConfig("metadata"), "metadata");
+        cluster = provider.getCluster();
+
+        session = cluster.connect();
+        diffKeyspace = config.metadataOptions().keyspace;
+        runningJobsStatement = session.prepare(String.format(
+            " SELECT job_id " +
+            " FROM %s.running_jobs", diffKeyspace));
+        jobSummaryStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id," +
+            "   job_start_time," +
+            "   buckets," +
+            "   keyspace_name, " +
+            "   table_names, " +
+            "   source_cluster_name," +
+            "   source_cluster_desc," +
+            "   target_cluster_name," +
+            "   target_cluster_desc," +
+            "   total_tasks " +
+            " FROM %s.job_summary" +
+            " WHERE job_id = ?", diffKeyspace));
+        jobResultStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id," +
+            "   table_name, " +
+            "   matched_partitions," +
+            "   mismatched_partitions,"   +
+            "   matched_rows,"   +
+            "   matched_values,"   +
+            "   mismatched_values,"   +
+            "   partitions_only_in_source,"   +
+            "   partitions_only_in_target,"   +
+            "   skipped_partitions"   +
+            " FROM %s.job_results" +
+            " WHERE job_id = ? AND table_name = ?", diffKeyspace));
+        jobStatusStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id,"   +
+            "   bucket,"   +
+            "   table_name,"   +
+            "   completed "   +
+            " FROM %s.job_status" +
+            " WHERE job_id = ? AND bucket = ?", diffKeyspace));
+        jobMismatchesStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id," +
+            "   bucket," +
+            "   table_name," +
+            "   mismatching_token," +
+            "   mismatch_type" +
+            " FROM %s.mismatches" +
+            " WHERE job_id = ? AND bucket = ?", diffKeyspace));
+        jobErrorSummaryStatement = session.prepare(String.format(
+            " SELECT " +
+            "   count(start_token) AS error_count," +
+            "   table_name" +
+            " FROM %s.task_errors" +
+            " WHERE job_id = ? AND bucket = ?",
+            diffKeyspace));
+        jobErrorRangesStatement = session.prepare(String.format(
+            " SELECT " +
+            "   bucket,"   +
+            "   table_name,"   +
+            "   start_token,"   +
+            "   end_token"   +
+            " FROM %s.task_errors" +
+            " WHERE job_id = ? AND bucket = ?",
+            diffKeyspace));
+        jobErrorDetailStatement = session.prepare(String.format(
+            " SELECT " +
+            "   table_name,"   +
+            "   error_token"   +
+            " FROM %s.partition_errors" +
+            " WHERE job_id = ? AND bucket = ? AND table_name = ? AND start_token = ? AND end_token = ?", diffKeyspace));
+        jobsStartDateStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id" +
+            " FROM %s.job_start_index" +
+            " WHERE job_start_date = ? AND job_start_hour = ?", diffKeyspace));
+        jobsForSourceStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id" +
+            " FROM %s.source_cluster_index" +
+            " WHERE source_cluster_name = ?", diffKeyspace));
+        jobsForTargetStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id" +
+            " FROM %s.target_cluster_index" +
+            " WHERE target_cluster_name = ?", diffKeyspace));
+        jobsForKeyspaceStatement = session.prepare(String.format(
+            " SELECT " +
+            "   job_id" +
+            " FROM %s.keyspace_index" +
+            " WHERE keyspace_name = ?", diffKeyspace));
+    }
+
+    public List<UUID> fetchRunningJobs() {
+        List<UUID> jobs = new ArrayList<>();
+        ResultSet rs = session.execute(runningJobsStatement.bind());
+        rs.forEach(row -> jobs.add(row.getUUID("job_id")));
+        return jobs;
+    }
+
+    public Collection<JobSummary> fetchJobSummaries(List<UUID> jobIds) {
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(jobIds.size());
+        jobIds.forEach(id -> futures.add(session.executeAsync(jobSummaryStatement.bind(id))));
+
+        // Oldest first
+        SortedSet<JobSummary> summaries = Sets.newTreeSet(JobSummary.COMPARATOR);
+        processFutures(futures, JobSummary::fromRow, summaries::add);
+        return summaries;
+    }
+
+    public JobSummary fetchJobSummary(UUID jobId) {
+        Row row = session.execute(jobSummaryStatement.bind(jobId)).one();
+        if (row == null)
+            throw new RuntimeException(String.format("Job %s not found", jobId));
+        return JobSummary.fromRow(row);
+    }
+
+    public Collection<JobResult> fetchJobResults(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.tables.size());
+        for (String table : summary.tables)
+            futures.add(session.executeAsync(jobResultStatement.bind(jobId, table)));
+
+        SortedSet<JobResult> results = Sets.newTreeSet();
+        processFutures(futures, JobResult::fromRow, results::add);
+        return results;
+    }
+
+    public JobStatus fetchJobStatus(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++)
+            futures.add(session.executeAsync(jobStatusStatement.bind(jobId, i)));
+
+        Map<String, Long> completedByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(futures, row -> completedByTable.merge(row.getString("table_name"),
+                                                              row.getLong("completed"),
+                                                              Long::sum));
+        return new JobStatus(jobId, completedByTable);
+    }
+
+    public JobMismatches fetchMismatches(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++)
+            futures.add(session.executeAsync(jobMismatchesStatement.bind(jobId, i)));
+
+        Map<String, List<Mismatch>> mismatchesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(futures, row -> mismatchesByTable.merge(row.getString("table_name"),
+                                                               Lists.newArrayList(new Mismatch(row.getString("mismatching_token"),
+                                                                                               row.getString("mismatch_type"))),
+                                                               (l1, l2) -> { l1.addAll(l2); return l1;}));
+        return new JobMismatches(jobId, mismatchesByTable);
+    }
+
+    public JobErrorSummary fetchErrorSummary(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++)
+            futures.add(session.executeAsync(jobErrorSummaryStatement.bind(jobId, i)));
+
+        Map<String, Long> errorCountByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(futures, row -> {
+            String table = row.getString("table_name");
+            if (null != table) {
+                errorCountByTable.merge(row.getString("table_name"),
+                                        row.getLong("error_count"),
+                                        Long::sum);
+            }
+        });
+        return new JobErrorSummary(jobId, errorCountByTable);
+    }
+
+    public JobErrorRanges fetchErrorRanges(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++)
+            futures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
+
+        Map<String, List<Range>> errorRangesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(futures, row -> errorRangesByTable.merge(row.getString("table_name"),
+                                                                Lists.newArrayList(new Range(row.getString("start_token"),
+                                                                                             row.getString("end_token"))),
+                                                                (l1, l2) -> { l1.addAll(l2); return l1;}));
+        return new JobErrorRanges(jobId, errorRangesByTable);
+    }
+
+    public JobErrorDetail fetchErrorDetail(UUID jobId) {
+        JobSummary summary = fetchJobSummary(jobId);
+        List<ResultSetFuture> rangeFutures = Lists.newArrayListWithCapacity(summary.buckets);
+
+        for (int i = 0; i < summary.buckets; i++ )
+            rangeFutures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
+
+        List<ResultSetFuture> errorFutures = Lists.newArrayList();
+        processFutures(rangeFutures,
+                       row -> session.executeAsync(jobErrorDetailStatement.bind(jobId,
+                                                                                row.getInt("bucket"),
+                                                                                row.getString("table_name"),
+                                                                                row.getString("start_token"),
+                                                                                row.getString("end_token"))),
+                       errorFutures::add);
+        Map<String, List<String>> errorsByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+        processFutures(errorFutures,
+                       row -> errorsByTable.merge(row.getString("table_name"),
+                                                  Lists.newArrayList(row.getString("error_token")),
+                                                  (l1, l2) -> { l1.addAll(l2); return l1;}));
+        return new JobErrorDetail(jobId, errorsByTable);
+    }
+
+    public Collection<JobSummary> fetchJobsStartedBetween(DateTime start, DateTime end) {
+        int days = Days.daysBetween(start, end).getDays();
+        List<ResultSetFuture> idFutures = Lists.newArrayListWithCapacity(days * 24);
+        for (int i = 0; i <= days; i++) {
+            DateTime date = start.plusDays(i);
+            LocalDate ld = LocalDate.fromYearMonthDay(date.getYear(), date.getMonthOfYear(), date.getDayOfMonth());
+            for (int j = 0; j <= 23; j++) {
+                idFutures.add(session.executeAsync(jobsStartDateStatement.bind(ld, j)));
+            }
+        }
+
+        List<ResultSetFuture> jobFutures = Lists.newArrayList();
+        processFutures(idFutures,
+                       row -> session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id"))),
+                       jobFutures::add);
+
+        SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+        processFutures(jobFutures, JobSummary::fromRow, jobs::add);
+        return jobs;
+    }
+
+    public Collection<JobSummary> fetchJobsForSourceCluster(String sourceClusterName) {
+        ResultSet jobIds = session.execute(jobsForSourceStatement.bind(sourceClusterName));
+        List<ResultSetFuture> futures = Lists.newArrayList();
+        jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+
+        SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+        processFutures(futures, JobSummary::fromRow, jobs::add);
+        return jobs;
+    }
+
+    public Collection<JobSummary> fetchJobsForTargetCluster(String targetClusterName) {
+        ResultSet jobIds = session.execute(jobsForTargetStatement.bind(targetClusterName));
+        List<ResultSetFuture> futures = Lists.newArrayList();
+        jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+        // most recent first
+        SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+        processFutures(futures, JobSummary::fromRow, jobs::add);
+        return jobs;
+    }
+
+    public Collection<JobSummary> fetchJobsForKeyspace(String keyspace) {
+        ResultSet jobIds = session.execute(jobsForKeyspaceStatement.bind(keyspace));
+        List<ResultSetFuture> futures = Lists.newArrayList();
+        jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+        // most recent first
+        SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+        processFutures(futures, JobSummary::fromRow, jobs::add);
+        return jobs;
+    }
+
+    private void processFutures(List<ResultSetFuture> futures, Consumer<Row> consumer) {
+        processFutures(futures, Function.identity(), consumer);
+    }
+
+    private <T> void processFutures(List<ResultSetFuture> futures, Function<Row, T> transform, Consumer<T> consumer) {
+        Consumer<ResultSet> resultConsumer = resultSet -> resultSet.forEach(row -> consumer.accept(transform.apply(row)));
+        futures.forEach(f -> getResultsSafely(f).ifPresent(resultConsumer));
+    }
+
+    private Optional<ResultSet> getResultsSafely(ResultSetFuture future) {
+        try {
+            return Optional.ofNullable(future.get(QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        } catch (Exception e) {
+            logger.warn("Error getting result from async query", e);
+            return Optional.empty();
+        }
+    }
+
+    public void close()
+    {
+        session.close();
+        cluster.close();
+    }
+
+    public static class Range {
+        final String start;
+        final String end;
+
+        public Range(String start, String end) {
+            this.start = start;
+            this.end = end;
+        }
+    }
+
+    public static class JobErrorSummary {
+        final UUID jobId;
+        final Map<String, Long> errorsByTable;
+
+        public JobErrorSummary(UUID jobId, Map<String, Long> errorsByTable) {
+            this.jobId = jobId;
+            this.errorsByTable = errorsByTable;
+        }
+    }
+
+    public static class JobErrorRanges {
+        final UUID jobId;
+        final Map<String, List<Range>> rangesByTable;
+
+        public JobErrorRanges(UUID jobId, Map<String, List<Range>> rangesByTable) {
+            this.jobId = jobId;
+            this.rangesByTable = rangesByTable;
+        }
+    }
+
+    public static class JobErrorDetail {
+        final UUID jobId;
+        final Map<String, List<String>> errorsByTable;
+
+        public JobErrorDetail(UUID jobId, Map<String, List<String>> errorsByTable) {
+            this.jobId = jobId;
+            this.errorsByTable = errorsByTable;
+        }
+    }
+
+    public static class JobMismatches {
+
+        final UUID jobId;
+        final Map<String, List<Mismatch>> mismatchesByTable;
+
+        public JobMismatches(UUID jobId, Map<String, List<Mismatch>> mismatchesByTable) {
+            this.jobId = jobId;
+            this.mismatchesByTable = mismatchesByTable;
+        }
+    }
+
+    public static class Mismatch {
+        final String token;
+        final String type;
+
+        public Mismatch(String token, String type) {
+            this.token = token;
+            this.type = type;
+        }
+    }
+
+    public static class JobStatus {
+
+        final UUID jobId;
+        final Map<String, Long> completedByTable;
+
+        public JobStatus(UUID jobId, Map<String, Long> completedByTable) {
+            this.jobId = jobId;
+            this.completedByTable = completedByTable;
+        }
+    }
+
+    public static class JobResult implements Comparable<JobResult> {
+
+        final UUID jobId;
+        final String table;
+        final long matchedPartitions;
+        final long mismatchedPartitions;
+        final long matchedRows;
+        final long matchedValues;
+        final long mismatchedValues;
+        final long onlyInSource;
+        final long onlyInTarget;
+        final long skippedPartitions;
+
+        public JobResult(UUID jobId,
+                         String table,
+                         long matchedPartitions,
+                         long mismatchedPartitions,
+                         long matchedRows,
+                         long matchedValues,
+                         long mismatchedValues,
+                         long onlyInSource,
+                         long onlyInTarget,
+                         long skippedPartitions) {
+            this.jobId = jobId;
+            this.table = table;
+            this.matchedPartitions = matchedPartitions;
+            this.mismatchedPartitions = mismatchedPartitions;
+            this.matchedRows = matchedRows;
+            this.matchedValues = matchedValues;
+            this.mismatchedValues = mismatchedValues;
+            this.onlyInSource = onlyInSource;
+            this.onlyInTarget = onlyInTarget;
+            this.skippedPartitions = skippedPartitions;
+        }
+
+        static JobResult fromRow(Row row) {
+            return new JobResult(row.getUUID("job_id"),
+                                 row.getString("table_name"),
+                                 row.getLong("matched_partitions"),
+                                 row.getLong("mismatched_partitions"),
+                                 row.getLong("matched_rows"),
+                                 row.getLong("matched_values"),
+                                 row.getLong("mismatched_values"),
+                                 row.getLong("partitions_only_in_source"),
+                                 row.getLong("partitions_only_in_target"),
+                                 row.getLong("skipped_partitions"));
+        }
+
+        public int compareTo(@NotNull JobResult other) {
+            return this.table.compareTo(other.table);
+        }
+    }
+
+    public static class JobSummary {
+
+        public static final Comparator<JobSummary> COMPARATOR = Comparator.comparing(j -> j.startTime);
+
+        final UUID jobId;
+        final int buckets;
+        final String keyspace;
+        final List<String> tables;
+        final String sourceClusterName;
+        final String sourceClusterDesc;
+        final String targetClusterName;
+        final String targetClusterDesc;
+        final int tasks;
+        final String start;
+
+        // private so it isn't included in json serialization
+        private final DateTime startTime;
+
+        private JobSummary(UUID jobId,
+                           DateTime startTime,
+                           int buckets,
+                           String keyspace,
+                           List<String> tables,
+                           String sourceClusterName,
+                           String sourceClusterDesc,
+                           String targetClusterName,
+                           String targetClusterDesc,
+                           int tasks)
+        {
+            this.jobId = jobId;
+            this.startTime = startTime;
+            this.start = startTime.toString();
+            this.buckets = buckets;
+            this.keyspace = keyspace;
+            this.tables = tables;
+            this.sourceClusterName = sourceClusterName;
+            this.sourceClusterDesc = sourceClusterDesc;
+            this.targetClusterName = targetClusterName;
+            this.targetClusterDesc = targetClusterDesc;
+            this.tasks = tasks;
+        }
+
+        static JobSummary fromRow(Row row) {
+            return new JobSummary(row.getUUID("job_id"),
+                                  new DateTime(UUIDs.unixTimestamp(row.getUUID("job_start_time")), DateTimeZone.UTC),
+                                  row.getInt("buckets"),
+                                  row.getString("keyspace_name"),
+                                  row.getList("table_names", String.class),
+                                  row.getString("source_cluster_name"),
+                                  row.getString("source_cluster_desc"),
+                                  row.getString("target_cluster_name"),
+                                  row.getString("target_cluster_desc"),
+                                  row.getInt("total_tasks"));
+        }
+    }
+}
diff --git a/api-server/src/main/resources/log4j2.xml b/api-server/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..9e401d6
--- /dev/null
+++ b/api-server/src/main/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<Configuration>
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d [%t] %-5level %logger{36} - %msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 0000000..f73c4f5
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+         http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+      <groupId>org.apache.cassandra.diff</groupId>
+      <artifactId>diff</artifactId>
+      <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>common</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java b/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java
new file mode 100644
index 0000000..6c4e3e4
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import com.datastax.driver.core.Cluster;
+
+public interface ClusterProvider extends Serializable {
+    String CLUSTER_PROVIDER_CLASS = "impl";
+
+    void initialize(Map<String, String> conf, String identifier);
+    Cluster getCluster();
+    String getClusterName();
+
+    static ClusterProvider getProvider(Map<String, String> conf, String identifier) {
+        String providerImpl = conf.get(CLUSTER_PROVIDER_CLASS);
+        ClusterProvider provider;
+        try {
+            provider = (ClusterProvider)(Class.forName(providerImpl)).newInstance();
+            provider.initialize(conf, identifier);
+        } catch (Exception e) {
+            throw new RuntimeException("Could not instantiate ClusterProvider class: " + providerImpl +" " + conf, e);
+        }
+        return provider;
+    }
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java b/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java
new file mode 100644
index 0000000..b633feb
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.Map;
+
+import com.datastax.driver.core.Cluster;
+
+public class ContactPointsClusterProvider implements ClusterProvider {
+    private static final String PREFIX = "diff.cluster";
+    private static final String USERNAME_KEY = "cql_user";
+    private static final String PASSWORD_KEY = "cql_password";
+    private static final String CONTACT_POINTS_KEY = "contact_points";
+    private static final String PORT_KEY = "port";
+    private static final String CLUSTER_KEY = "name";
+    private static final String DC_KEY = "dc";
+
+    private String user;
+    private String password;
+    private String name;
+    private String dc;
+    private String[] contactPoints;
+    private int port;
+
+    public Cluster getCluster() {
+        return newCluster();
+    }
+
+    public void initialize(Map<String, String> conf, String identifier) {
+        user = getEnv(identifier, USERNAME_KEY);
+        password = getEnv(identifier, PASSWORD_KEY);
+        name = conf.get(CLUSTER_KEY);
+        dc = conf.get(DC_KEY);
+        contactPoints = conf.get(CONTACT_POINTS_KEY).split(",");
+        port = Integer.parseInt(conf.getOrDefault(PORT_KEY, "9042"));
+    }
+
+    private synchronized Cluster newCluster() {
+        // TODO add policies etc
+        Cluster.Builder builder = Cluster.builder()
+                                         .addContactPoints(contactPoints)
+                                         .withPort(port);
+
+        if (user != null)
+            builder.withCredentials(user, password);
+
+        return builder.build();
+    }
+
+    public String getClusterName() {
+        return name;
+    }
+
+    public String toString() {
+        return String.format("ContactPoints Cluster: name=%s, dc=%s, contact points= [%s]",
+                             "name", dc, String.join(",'", contactPoints));
+    }
+
+    static String getEnv(String identifier, String propName) {
+        return System.getenv(String.format("%s.%s.%s", PREFIX, identifier, propName));
+    }
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
new file mode 100644
index 0000000..f0cbb36
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+public interface JobConfiguration extends Serializable {
+    String keyspace();
+
+    List<String> tables();
+
+    int splits();
+
+    int buckets();
+
+    // rate limit is provided as a global limit - this is how many q/s we guess that the src clusters can take in total
+    int rateLimit();
+
+    default SpecificTokens specificTokens() {
+        return SpecificTokens.NONE;
+    }
+
+    Optional<UUID> jobId();
+
+    int tokenScanFetchSize();
+
+    int partitionReadFetchSize();
+
+    int readTimeoutMillis();
+
+    double reverseReadProbability();
+
+    String consistencyLevel();
+
+    MetadataKeyspaceOptions metadataOptions();
+
+    Map<String, String> clusterConfig(String identifier);
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java b/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java
new file mode 100644
index 0000000..b3e9eb0
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+
+public class MetadataKeyspaceOptions implements Serializable {
+    public String keyspace = "cassandradiff";
+    public String replication = "{'class':'SimpleStrategy', 'replication_factor':'1'}";
+    public int ttl = 60 * 60 * 24 * 365;
+    public boolean should_init = false;
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java b/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java
new file mode 100644
index 0000000..5400a54
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableSet;
+
+public class SpecificTokens implements Serializable, Predicate<BigInteger> {
+
+    public static final SpecificTokens NONE = new SpecificTokens(Collections.emptySet(), Modifier.REJECT);
+
+    public enum Modifier {REJECT, ACCEPT}
+
+    public final ImmutableSet<BigInteger> tokens;
+    public final Modifier modifier;
+
+    public SpecificTokens(Set<BigInteger> tokens, Modifier modifier) {
+        this.tokens = ImmutableSet.copyOf(tokens);
+        this.modifier = modifier;
+    }
+
+    public boolean test(BigInteger token) {
+        if (tokens.isEmpty())
+            return true;
+
+        // if this represents a list of tokens to accept, then this token is allowed
+        // only if it is present in the list. If this is a list of tokens to reject,
+        // then the opposite is true, only allow the token if it is not in the list.
+        return (modifier == Modifier.ACCEPT) == tokens.contains(token);
+    }
+
+    public boolean isEmpty() {
+        return tokens.isEmpty();
+    }
+
+    public String toString() {
+        return String.format("SpecificTokens: [ %s, %s ]", modifier, tokens);
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
new file mode 100644
index 0000000..69fc28c
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.math.BigInteger;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.CustomClassLoaderConstructor;
+
+public class YamlJobConfiguration implements JobConfiguration {
+    public int splits = 10000;
+    public String keyspace;
+    public List<String> tables;
+    public int buckets = 100;
+    public int rate_limit = 10000;
+    public String job_id = null;
+    public int token_scan_fetch_size;
+    public int partition_read_fetch_size;
+    public int read_timeout_millis;
+    public double reverse_read_probability;
+    public String consistency_level = "ALL";
+    public MetadataKeyspaceOptions metadata_options;
+    public Map<String, Map<String, String>> cluster_config;
+    public String specific_tokens = null;
+    public String disallowed_tokens = null;
+
+    public static YamlJobConfiguration load(String file) {
+        Yaml yaml = new Yaml(new CustomClassLoaderConstructor(YamlJobConfiguration.class,
+                                                              Thread.currentThread().getContextClassLoader()));
+        try {
+            return yaml.loadAs(new FileInputStream(file), YamlJobConfiguration.class);
+        } catch (FileNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String keyspace() {
+        return keyspace;
+    }
+
+    public List<String> tables() {
+        return tables;
+    }
+
+    public int splits() {
+        return splits;
+    }
+
+    public int buckets() {
+        return buckets;
+    }
+
+    public int rateLimit() {
+        return rate_limit;
+    }
+
+    public Optional<UUID> jobId() {
+        return job_id == null ? Optional.empty() : Optional.of(UUID.fromString(job_id));
+    }
+
+    public int tokenScanFetchSize() {
+        return token_scan_fetch_size;
+    }
+
+    public int partitionReadFetchSize() {
+        return partition_read_fetch_size;
+    }
+
+    public int readTimeoutMillis() {
+        return read_timeout_millis;
+    }
+
+    public double reverseReadProbability() {
+        return reverse_read_probability;
+    }
+
+    public String consistencyLevel() {
+        return consistency_level;
+    }
+
+    public MetadataKeyspaceOptions metadataOptions() {
+        return metadata_options;
+    }
+
+    public Map<String, String> clusterConfig(String identifier) {
+        return cluster_config.get(identifier);
+    }
+
+    public SpecificTokens specificTokens() {
+
+        if (disallowed_tokens != null && specific_tokens != null)
+            throw new RuntimeException("Cannot specify both disallowed and specific tokens");
+
+        if (disallowed_tokens != null) {
+            return new SpecificTokens(toTokens(disallowed_tokens), SpecificTokens.Modifier.REJECT);
+        } else if (specific_tokens != null) {
+            return new SpecificTokens(toTokens(specific_tokens), SpecificTokens.Modifier.ACCEPT);
+        }
+        return SpecificTokens.NONE;
+    }
+
+    public String toString() {
+        return "YamlJobConfiguration{" +
+               "splits=" + splits +
+               ", keyspace='" + keyspace + '\'' +
+               ", tables=" + tables +
+               ", buckets=" + buckets +
+               ", rate_limit=" + rate_limit +
+               ", job_id='" + job_id + '\'' +
+               ", token_scan_fetch_size=" + token_scan_fetch_size +
+               ", partition_read_fetch_size=" + partition_read_fetch_size +
+               ", read_timeout_millis=" + read_timeout_millis +
+               ", reverse_read_probability=" + reverse_read_probability +
+               ", consistency_level='" + consistency_level + '\'' +
+               ", metadata_options=" + metadata_options +
+               ", cluster_config=" + cluster_config +
+               '}';
+    }
+
+    private static Set<BigInteger> toTokens(String str) {
+        Set<BigInteger> tokens = new HashSet<>();
+        for (String token : str.split(",")) {
+            token = token.trim();
+            tokens.add(new BigInteger(token));
+        }
+        return tokens;
+    }
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fce7cc1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
+		 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.cassandra.diff</groupId>
+    <artifactId>diff</artifactId>
+    <packaging>pom</packaging>
+    <version>0.1-SNAPSHOT</version>
+    <modules>
+      <module>common</module>
+      <module>spark-job</module>
+      <module>api-server</module>
+    </modules>
+
+    <properties>
+      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencyManagement>
+      <dependencies>
+
+        <dependency>
+          <groupId>com.datastax.cassandra</groupId>
+          <artifactId>cassandra-driver-core</artifactId>
+          <version>3.7.1</version>
+          <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.yaml</groupId>
+          <artifactId>snakeyaml</artifactId>
+          <version>1.24</version>
+          <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+          <version>28.0-jre</version>
+          <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+          <version>1.7.28</version>
+          <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.jetbrains</groupId>
+          <artifactId>annotations</artifactId>
+          <version>17.0.0</version>
+          <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+          <version>2.12.1</version>
+          <scope>runtime</scope>
+        </dependency>
+
+        <dependency>
+          <groupId>junit</groupId>
+          <artifactId>junit</artifactId>
+          <version>4.12</version>
+          <scope>test</scope>
+        </dependency>
+
+      </dependencies>
+    </dependencyManagement>
+    <build>
+      <plugins>
+        <plugin>
+          <groupId>org.apache.maven.plugins</groupId>
+          <artifactId>maven-compiler-plugin</artifactId>
+          <version>3.6.1</version>
+          <configuration>
+            <source>1.8</source>
+            <target>1.8</target>
+          </configuration>
+        </plugin>
+      </plugins>
+    </build>
+</project>
diff --git a/spark-job/localconfig.yaml b/spark-job/localconfig.yaml
new file mode 100644
index 0000000..d4f5630
--- /dev/null
+++ b/spark-job/localconfig.yaml
@@ -0,0 +1,51 @@
+# Keyspace to diff
+keyspace: keyspace1
+# List of tables to diff
+tables:
+  - standard1
+
+# This is how many parts we split the full token range in.
+# Each of these splits is then compared between the clusters
+splits: 10000
+
+# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata
+buckets: 100
+
+# global rate limit - this is how many q/s you think the target clusters can handle
+rate_limit: 10000
+
+# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits
+# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef
+
+# Fetch size to use for the query fetching the tokens in the cluster
+token_scan_fetch_size: 1000
+# Fetch size to use for the queries fetching the rows of each partition
+partition_read_fetch_size: 1000
+
+read_timeout_millis: 10000
+reverse_read_probability: 0.5
+consistency_level: ALL
+metadata_options:
+  keyspace: cassandradiff
+  replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
+  ttl: 31536000
+  should_init: true
+cluster_config:
+  source:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_1"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
+  target:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test_2"
+    contact_points: "127.0.0.1"
+    port: "9043"
+    dc: "datacenter1"
+  metadata:
+    impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+    name: "local_test"
+    contact_points: "127.0.0.1"
+    port: "9042"
+    dc: "datacenter1"
diff --git a/spark-job/pom.xml b/spark-job/pom.xml
new file mode 100644
index 0000000..4d4c8fc
--- /dev/null
+++ b/spark-job/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+		 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <groupId>org.apache.cassandra.diff</groupId>
+        <artifactId>diff</artifactId>
+        <version>0.1-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>spark-job</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+
+        <!-- compile dependencies -->
+        <dependency>
+            <groupId>org.apache.cassandra.diff</groupId>
+            <artifactId>common</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <!-- provided dependencies -->
+        <dependency>
+          <groupId>org.jetbrains</groupId>
+          <artifactId>annotations</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.11</artifactId>
+            <version>2.3.3</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.11</artifactId>
+            <version>2.3.2</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.1</version>
+                <executions>
+                    <!-- Run shade goal on package phase -->
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <relocations>
+                        <relocation>
+                            <pattern>com.google</pattern>
+                            <shadedPattern>relocated.com.google</shadedPattern>
+                        </relocation>
+                    </relocations>
+                </configuration>
+            </plugin>
+
+        </plugins>
+    </build>
+
+</project>
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
new file mode 100644
index 0000000..bdd6488
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.*;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Wrapper for an ExecutorService which provides backpressure by blocking on submission when its
+ * task queue is full.
+ *
+ * The internal ListeningExecutorService is instantiated with an unbounded work queue, but
+ * this class uses a Semaphore to ensure that this queue cannot grow unreasonably. By default,
+ * the Semaphore has 2x as many permits as the executor thread pool has threads, so at most
+ * 2 x maxConcurrentTasks may be submitted before producers are blocked. The submit method
+ * also adds success/failure callbacks which return the permits, enabling producers to make
+ * progress at a manageable rate.
+ *
+ * Callers of the submit method also provide a Phaser, which they can use to ensure that any
+ * tasks *they themselves have submitted* are completed before they proceed. This allows multiple
+ * callers to submit tasks to the same ComparisonExecutor, but only wait for their own to complete
+ * before moving onto the next stage of processing. Managing the increment and decrement of pending
+ * tasks via the Phaser is handled transparently by ComparisonExecutor, so callers should not do
+ * this externally.
+ *
+ * Submitters also provide callbacks to be run on either successful execution or failure of the
+ * task. These callbacks are executed on the same thread as the task itself, which callers should bear
+ * in mind when constructing them.
+ *
+ */
+public class ComparisonExecutor {
+
+    private final ListeningExecutorService executor;
+    private final Semaphore semaphore;
+
+    static ComparisonExecutor newExecutor(int maxConcurrentTasks, MetricRegistry metricRegistry) {
+        return new ComparisonExecutor(
+            MoreExecutors.listeningDecorator(
+                Executors.newFixedThreadPool(maxConcurrentTasks,
+                                             new ThreadFactoryBuilder().setNameFormat("partition-comparison-%d")
+                                                                       .setDaemon(true)
+                                                                       .build())),
+            maxConcurrentTasks * 2,
+            metricRegistry);
+    }
+
+    @VisibleForTesting
+    ComparisonExecutor(ListeningExecutorService executor, int maxTasks, MetricRegistry metrics) {
+        this.executor = executor;
+        this.semaphore = new Semaphore(maxTasks);
+        if (metrics != null) {
+            metrics.register("BlockedTasks", (Gauge) semaphore::getQueueLength);
+            metrics.register("AvailableSlots", (Gauge) semaphore::availablePermits);
+        }
+    }
+
+    public <T> void submit(final Callable<T> callable,
+                           final Consumer<T> onSuccess,
+                           final Consumer<Throwable> onError,
+                           final Phaser phaser) {
+
+        phaser.register();
+        semaphore.acquireUninterruptibly();
+        try {
+            Futures.addCallback(executor.submit(callable), new FutureCallback<T>() {
+                public void onSuccess(T result) {
+                    fireThenReleaseAndArrive(onSuccess, result, phaser);
+                }
+
+                public void onFailure(Throwable t) {
+                    fireThenReleaseAndArrive(onError, t, phaser);
+                }
+            }, MoreExecutors.directExecutor());
+
+        } catch (RejectedExecutionException e) {
+            fireThenReleaseAndArrive(onError, e, phaser);
+        }
+
+    }
+
+    private <T> void fireThenReleaseAndArrive(Consumer<T> callback, T argument, Phaser phaser) {
+        try {
+            callback.accept(argument);
+        } finally {
+            semaphore.release();
+            phaser.arriveAndDeregister();
+        }
+    }
+
+    public void shutdown() {
+        executor.shutdown();
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
new file mode 100644
index 0000000..4e108e3
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.*;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.cassandra.diff.DiffContext.cqlizedString;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
+
+public class DiffCluster implements AutoCloseable
+{
+    private final static Logger logger = LoggerFactory.getLogger(DiffCluster.class);
+
+    public enum Type {SOURCE, TARGET}
+
+    private final Map<String, PreparedStatement[]> preparedStatements = new HashMap<>();
+    private final ConsistencyLevel consistencyLevel;
+    public final Cluster cluster;
+    private final Session session;
+    private final TokenHelper tokenHelper;
+    public final String keyspace;
+    public final List<BigInteger> tokenList;
+
+    public final RateLimiter getPartitionRateLimiter;
+    public final Type clusterId;
+    private final int tokenScanFetchSize;
+    private final int partitionReadFetchSize;
+    private final int readTimeoutMillis;
+
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+    public DiffCluster(Type clusterId,
+                       Cluster cluster,
+                       String keyspace,
+                       ConsistencyLevel consistencyLevel,
+                       RateLimiter getPartitionRateLimiter,
+                       int tokenScanFetchSize,
+                       int partitionReadFetchSize,
+                       int readTimeoutMillis)
+
+    {
+        this.keyspace = keyspace;
+        this.consistencyLevel = consistencyLevel;
+        this.cluster = cluster;
+        this.tokenHelper = TokenHelper.forPartitioner(cluster.getMetadata().getPartitioner());
+        this.clusterId = clusterId;
+        this.tokenList = Collections.emptyList();
+        this.getPartitionRateLimiter = getPartitionRateLimiter;
+        this.session = cluster.connect();
+        this.tokenScanFetchSize = tokenScanFetchSize;
+        this.partitionReadFetchSize = partitionReadFetchSize;
+        this.readTimeoutMillis = readTimeoutMillis;
+    }
+
+    public Iterator<PartitionKey> getPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+        try {
+            return Uninterruptibles.getUninterruptibly(fetchPartitionKeys(table, prevToken, token));
+        }
+        catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private ListenableFuture<Iterator<PartitionKey>> fetchPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+        BoundStatement statement = keyReader(table).bind(tokenHelper.forBindParam(prevToken),
+                                                         tokenHelper.forBindParam(token));
+        statement.setFetchSize(tokenScanFetchSize);
+        statement.setReadTimeoutMillis(readTimeoutMillis);
+        return Futures.transform(session.executeAsync(statement),
+                                 this::toPartitionKeys,
+                                 MoreExecutors.directExecutor());
+    }
+
+    private AbstractIterator<PartitionKey> toPartitionKeys(ResultSet resultSet) {
+        return new AbstractIterator<PartitionKey>() {
+            Iterator<Row> rows = resultSet.iterator();
+
+            protected PartitionKey computeNext() {
+                if (session.isClosed())
+                    throw new RuntimeException("Session was closed, cannot get next partition key");
+
+                if (stopped.get())
+                    throw new RuntimeException("Job was stopped, cannot get next partition key");
+
+                return rows.hasNext() ? new PartitionKey(rows.next()) : endOfData();
+            }
+        };
+    }
+
+    public Iterator<Row> getPartition(TableSpec table, PartitionKey key, boolean shouldReverse) {
+        return readPartition(table.getTable(), key, shouldReverse)
+               .getUninterruptibly()
+               .iterator();
+    }
+
+    private ResultSetFuture readPartition(String table, final PartitionKey key, boolean shouldReverse) {
+        BoundStatement statement = shouldReverse
+                                   ? reverseReader(table).bind(key.getComponents().toArray())
+                                   : forwardReader(table).bind(key.getComponents().toArray());
+        statement.setFetchSize(partitionReadFetchSize);
+        statement.setReadTimeoutMillis(readTimeoutMillis);
+        getPartitionRateLimiter.acquire();
+        return session.executeAsync(statement);
+    }
+
+    public void stop() {
+        stopped.set(true);
+    }
+
+    public void close()
+    {
+        logger.info("Closing cluster {}", this.clusterId);
+        session.closeAsync();
+        cluster.closeAsync();
+    }
+
+    private PreparedStatement keyReader(String table) {
+        return getStatementForTable(table, 0);
+    }
+
+    private PreparedStatement forwardReader(String table) {
+        return getStatementForTable(table, 1);
+    }
+
+    private PreparedStatement reverseReader(String table) {
+        return getStatementForTable(table, 2);
+    }
+
+    private PreparedStatement getStatementForTable(String table, int index) {
+        if (!preparedStatements.containsKey(table)) {
+            synchronized (this) {
+                if (!preparedStatements.containsKey(table)) {
+                    PreparedStatement keyStatement = getKeyStatement(table);
+                    PreparedStatement[] partitionReadStmts = getFullStatement(table);
+                    preparedStatements.put(table, new PreparedStatement[]{ keyStatement ,
+                                                                           partitionReadStmts[0],
+                                                                           partitionReadStmts[1] });
+                }
+            }
+        }
+        return preparedStatements.get(table)[index];
+    }
+
+    private PreparedStatement getKeyStatement(@NotNull String table) {
+        final TableMetadata tableMetadata = session.getCluster()
+                                                   .getMetadata()
+                                                   .getKeyspace(cqlizedString(keyspace))
+                                                   .getTable(cqlizedString(table));
+        String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
+
+        Select.Selection selection = QueryBuilder.select().distinct().column(token(partitionKeyColumns));
+        for (String column : partitionKeyColumns)
+            selection = selection.column(column);
+
+        BuiltStatement select = selection.from(tableMetadata)
+                                         .where(gt(token(partitionKeyColumns), bindMarker()))
+                                         .and(lte(token(partitionKeyColumns), bindMarker()));
+
+        logger.debug("Partition key/token read CQL : {}", select.toString());
+        return session.prepare(select).setConsistencyLevel(consistencyLevel);
+    }
+
+    private PreparedStatement[] getFullStatement(@NotNull String table) {
+        final TableMetadata tableMetadata = session.getCluster()
+                                                   .getMetadata()
+                                                   .getKeyspace(cqlizedString(keyspace))
+                                                   .getTable(cqlizedString(table));
+        String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
+        String[] allColumns = columnNames(tableMetadata.getColumns());
+
+        Select.Selection selection = QueryBuilder.select().column(token(partitionKeyColumns));
+        for (String column : allColumns)
+            selection = selection.column(column);
+
+        Select select = selection.from(tableMetadata);
+
+        for (String column : partitionKeyColumns)
+            select.where().and(eq(column, bindMarker()));
+
+        logger.info("Partition forward read CQL : {}", select.toString());
+        PreparedStatement forwardRead = session.prepare(select).setConsistencyLevel(consistencyLevel);
+
+        List<ColumnMetadata> clusteringColumns = tableMetadata.getClusteringColumns();
+        // if the table has no clustering columns a reverse read doesn't make sense
+        // and will never be executed, so just skip preparing the reverse query
+        if (clusteringColumns.isEmpty())
+            return new PreparedStatement[] {forwardRead, null};
+
+        // Depending on DiffContext.reverseReadProbability, we may attempt to read the
+        // partition in reverse order, so prepare a statement for that
+        List<ClusteringOrder> clusteringOrders = tableMetadata.getClusteringOrder();
+        Ordering[] reverseOrdering = new Ordering[clusteringColumns.size()];
+        for (int i=0; i<clusteringColumns.size(); i++) {
+            reverseOrdering[i] = clusteringOrders.get(i) == ClusteringOrder.ASC
+                                 ? desc(clusteringColumns.get(i).getName())
+                                 : asc(clusteringColumns.get(i).getName());
+        }
+
+        select.orderBy(reverseOrdering);
+        logger.info("Partition reverse read CQL : {}", select.toString());
+
+        PreparedStatement reverseRead = session.prepare(select).setConsistencyLevel(consistencyLevel);
+
+        return new PreparedStatement[] {forwardRead, reverseRead};
+    }
+
+    private static String[] columnNames(List<ColumnMetadata> columns) {
+        return columns.stream().map(ColumnMetadata::getName).map(DiffCluster::columnToString).toArray(String[]::new);
+    }
+
+    private static String columnToString(String name)
+    {
+        return '"'+name+'"';
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java
new file mode 100644
index 0000000..5a1a354
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DiffContext {
+
+    public final String keyspace;
+    public final TableSpec table;
+    public final BigInteger startToken;
+    public final BigInteger endToken;
+    public final DiffCluster source;
+    public final DiffCluster target;
+    private final SpecificTokens specificTokens;
+    private final double reverseReadProbability;
+
+    public DiffContext(final DiffCluster source,
+                       final DiffCluster target,
+                       final String keyspace,
+                       final TableSpec table,
+                       final BigInteger startToken,
+                       final BigInteger endToken,
+                       final SpecificTokens specificTokens,
+                       final double reverseReadProbability) {
+        this.keyspace = keyspace;
+        this.table = table;
+        this.startToken = startToken;
+        this.endToken = endToken;
+        this.source = source;
+        this.target = target;
+        this.specificTokens = specificTokens;
+        this.reverseReadProbability = reverseReadProbability;
+    }
+
+    public boolean shouldReverse() {
+        return ! table.getClusteringColumns().isEmpty()
+               && reverseReadProbability > ThreadLocalRandom.current().nextDouble();
+    }
+
+    public boolean isTokenAllowed(BigInteger token) {
+        return specificTokens.test(token);
+    }
+
+    public static String cqlizedString(final String str) {
+        if (str.toLowerCase().equals(str)) {
+            return str;
+        } else {
+            return "\"" + str + "\"";
+        }
+    }
+
+    public String toString() {
+        return String.format("DiffContext: [keyspace: %s, start_token: %s, end_token: %s]",
+                             keyspace, startToken, endToken);
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
new file mode 100644
index 0000000..bb14c25
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * To run this, something like this should be executed for local runs
+ *
+ * spark-submit --files ./spark-job/localconfig.yaml
+ *              --master "local[2]"
+ *              --class org.apache.cassandra.DiffJob spark-job/target/spark-job-0.1-SNAPSHOT.jar
+ *              localconfig.yaml
+ */
+
+public class DiffJob {
+    private static final Logger logger = LoggerFactory.getLogger(DiffJob.class);
+
+    public static void main(String ... args) {
+        if (args.length == 0) {
+            System.exit(-1);
+        }
+        SparkSession spark = SparkSession.builder().appName("cassandra-diff").getOrCreate();
+        JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+        String configFile = SparkFiles.get(args[0]);
+        YamlJobConfiguration configuration = YamlJobConfiguration.load(configFile);
+        DiffJob diffJob = new DiffJob();
+        diffJob.run(configuration, sc);
+        spark.stop();
+    }
+
+    public void run(JobConfiguration configuration, JavaSparkContext sc) {
+        SparkConf conf = sc.getConf();
+        // get partitioner from both clusters and verify that they match
+        ClusterProvider sourceProvider = ClusterProvider.getProvider(configuration.clusterConfig("source"), "source");
+        ClusterProvider targetProvider = ClusterProvider.getProvider(configuration.clusterConfig("target"), "target");
+        String sourcePartitioner;
+        String targetPartitioner;
+        try (Cluster sourceCluster = sourceProvider.getCluster();
+             Cluster targetCluster = targetProvider.getCluster()) {
+            sourcePartitioner = sourceCluster.getMetadata().getPartitioner();
+            targetPartitioner = targetCluster.getMetadata().getPartitioner();
+        }
+        if (!sourcePartitioner.equals(targetPartitioner)) {
+            throw new IllegalStateException(String.format("Cluster partitioners do not match; Source: %s, Target: %s,",
+                                                          sourcePartitioner, targetPartitioner));
+        }
+        TokenHelper tokenHelper = TokenHelper.forPartitioner(sourcePartitioner);
+
+        logger.info("Configuring job metadata store");
+        ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata");
+        JobMetadataDb.JobLifeCycle job = null;
+        UUID jobId = null;
+        try (Cluster metadataCluster = metadataProvider.getCluster();
+             Session metadataSession = metadataCluster.connect()) {
+
+            MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
+            JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions);
+
+            // Job params, which once a job is created cannot be modified in subsequent re-runs
+            logger.info("Creating or retrieving job parameters");
+            job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace);
+            Params params = getJobParams(job, configuration);
+            logger.info("Job Params: {}", params);
+            if (null == params)
+                throw new RuntimeException("Unable to initialize job params");
+
+            jobId = params.jobId;
+            List<Split> splits = getSplits(configuration, TokenHelper.forPartitioner(sourcePartitioner));
+
+            // Job options, which may be modified per-run
+            int instances = Integer.parseInt(conf.get("spark.executor.instances", "4"));
+            int cores = Integer.parseInt(conf.get("spark.executor.cores", "2"));
+            int executors = instances * cores;
+            // according to https://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections we should
+            // have 2-4 partitions per cpu in the cluster:
+            int slices = Math.min(4 * executors, splits.size());
+            int perExecutorRateLimit = configuration.rateLimit() / executors;
+
+            // Record the high level job summary info
+            job.initializeJob(params,
+                              sourceProvider.getClusterName(),
+                              sourceProvider.toString(),
+                              targetProvider.getClusterName(),
+                              targetProvider.toString());
+
+            logger.info("DiffJob {} comparing [{}] in keyspace {} on {} and {}",
+                        jobId,
+                        String.join(",", params.tables),
+                        params.keyspace,
+                        sourceProvider,
+                        targetProvider);
+
+            // Run the distributed diff and collate results
+            Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
+                                                  .map((split) -> new Differ(configuration,
+                                                                             params,
+                                                                             perExecutorRateLimit,
+                                                                             split,
+                                                                             tokenHelper,
+                                                                             sourceProvider,
+                                                                             targetProvider,
+                                                                             metadataProvider,
+                                                                             new TrackerProvider(configuration.metadataOptions().keyspace))
+                                                                  .run())
+                                                  .reduce(Differ::accumulate);
+            // Publish results. This also removes the job from the currently running list
+            job.finalizeJob(params.jobId, diffStats);
+            logger.info("FINISHED: {}", diffStats);
+        } catch (Exception e) {
+            // If the job errors out, try and mark the job as not running, so it can be restarted.
+            // If the error was thrown from JobMetadataDb.finalizeJob *after* the job had already
+            // been marked not running, this will log a warning, but is not fatal.
+            if (job != null && jobId != null)
+                job.markNotRunning(jobId);
+            throw new RuntimeException("Diff job failed", e);
+        } finally {
+            if (sc.isLocal())
+                Differ.shutdown();
+        }
+    }
+
+    private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf) {
+        if (conf.jobId().isPresent()) {
+            return job.getJobParams(conf.jobId().get());
+        } else {
+            return new Params(UUID.randomUUID(),
+                              conf.keyspace(),
+                              conf.tables(),
+                              conf.buckets(),
+                              conf.splits());
+        }
+    }
+
+    private static List<Split> getSplits(JobConfiguration config, TokenHelper tokenHelper) {
+        logger.info("Initializing splits");
+        List<Split> splits = calculateSplits(config.splits(), config.buckets(), tokenHelper);
+        logger.info("All Splits: {}", splits);
+        if (!config.specificTokens().isEmpty() && config.specificTokens().modifier == SpecificTokens.Modifier.ACCEPT) {
+            splits = getSplitsForTokens(config.specificTokens().tokens, splits);
+            logger.info("Splits for specific tokens ONLY: {}", splits);
+        }
+        // shuffle the splits to make sure the work is spread over the workers,
+        // important if it isn't a full cluster is being compared
+        Collections.shuffle(splits);
+        return splits;
+    }
+
+    @VisibleForTesting
+    static List<Split> calculateSplits(int numSplits, int numBuckets, TokenHelper tokenHelper) {
+        List<Split> splits = new ArrayList<>(numSplits);
+        BigInteger minToken = tokenHelper.min();
+        BigInteger maxToken = tokenHelper.max();
+
+        BigInteger totalTokens = maxToken.subtract(minToken);
+        BigInteger segmentSize = totalTokens.divide(BigInteger.valueOf(numSplits));
+
+        // add the first split starting at minToken without adding BigInt.ONE below
+        // Splits are grouped into buckets so we can shard the journal info across
+        // C* partitions
+        splits.add(new Split(0,  0, minToken, minToken.add(segmentSize)));
+        BigInteger prev = minToken.add(segmentSize);
+        for (int i = 1; i < numSplits - 1; i++) {
+            BigInteger next = prev.add(segmentSize);
+            // add ONE to avoid split overlap
+            splits.add(new Split(i, i % numBuckets, prev.add(BigInteger.ONE), next));
+            prev = next;
+        }
+        splits.add(new Split(numSplits - 1, (numSplits - 1) % numBuckets,  prev.add(BigInteger.ONE), maxToken)); // make sure we cover the whole range
+        return splits;
+    }
+
+    @VisibleForTesting
+    static List<Split> getSplitsForTokens(Set<BigInteger> tokens, List<Split> splits) {
+        return splits.stream().filter(split -> split.containsAny(tokens)).collect(Collectors.toList());
+    }
+
+    @VisibleForTesting
+    static class Split implements Serializable {
+        final int splitNumber;
+        final int bucket;
+        final BigInteger start;
+        final BigInteger end;
+
+        Split(int splitNumber, int bucket, BigInteger start, BigInteger end) {
+            this.splitNumber = splitNumber;
+            this.bucket = bucket;
+            this.start = start;
+            this.end = end;
+        }
+
+        public String toString() {
+            return "Split [" +
+                   start +
+                   ", " +
+                   end +
+                   ']';
+        }
+
+        public boolean containsAny(Set<BigInteger> specificTokens) {
+            for (BigInteger specificToken : specificTokens) {
+                if (specificToken.compareTo(start) >= 0 && specificToken.compareTo(end) <= 0)
+                    return true;
+            }
+            return false;
+        }
+    }
+
+    static class Params implements Serializable {
+        public final UUID jobId;
+        public final String keyspace;
+        public final ImmutableList<String> tables;
+        public final int buckets;
+        public final int tasks;
+
+        Params(UUID jobId, String keyspace, List<String> tables, int buckets, int tasks) {
+            this.jobId = jobId;
+            this.keyspace = keyspace;
+            this.tables = ImmutableList.copyOf(tables);
+            this.buckets = buckets;
+            this.tasks = tasks;
+        }
+
+        public String toString() {
+            return String.format("Params: [jobId: %s, keyspace: %s, tables: %s, buckets: %s, tasks: %s]",
+                                 jobId, keyspace, tables.stream().collect(Collectors.joining(",")), buckets, tasks);
+        }
+    }
+
+    static class TaskStatus {
+        public static final TaskStatus EMPTY = new TaskStatus(null, null);
+        public final BigInteger lastToken;
+        public final RangeStats stats;
+
+        TaskStatus(BigInteger lastToken, RangeStats stats) {
+            this.lastToken = lastToken;
+            this.stats = stats;
+        }
+
+        public String toString() {
+            return "TaskStatus{" +
+                   "lastToken=" + lastToken +
+                   ", stats=" + stats +
+                   '}';
+        }
+    }
+
+    public static class TrackerProvider implements Serializable {
+        private final String metadataKeyspace;
+
+        TrackerProvider(String metadataKeyspace) {
+            this.metadataKeyspace = metadataKeyspace;
+        }
+
+        public void initializeStatements(Session session) {
+            JobMetadataDb.ProgressTracker.initializeStatements(session, metadataKeyspace);
+        }
+
+        public JobMetadataDb.ProgressTracker getTracker(Session session, UUID jobId, Split split) {
+            return new JobMetadataDb.ProgressTracker(jobId, split.bucket, split.start, split.end, metadataKeyspace, session);
+        }
+    }
+ }
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
new file mode 100644
index 0000000..49576a2
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.function.BiConsumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+public class Differ implements Serializable
+{
+    private static final Logger logger = LoggerFactory.getLogger(Differ.class);
+
+    private static final MetricRegistry metrics = new MetricRegistry();
+
+    private static final int COMPARISON_THREADS = 8;
+    private static final ComparisonExecutor COMPARISON_EXECUTOR = ComparisonExecutor.newExecutor(COMPARISON_THREADS, metrics);
+
+    private final UUID jobId;
+    private final DiffJob.Split split;
+    private final TokenHelper tokenHelper;
+    private final String keyspace;
+    private final List<String> tables;
+    private final RateLimiter rateLimiter;
+    private final DiffJob.TrackerProvider trackerProvider;
+    private final double reverseReadProbability;
+    private final SpecificTokens specificTokens;
+
+    private static DiffCluster srcDiffCluster;
+    private static DiffCluster targetDiffCluster;
+    private static Session journalSession;
+
+    static
+    {
+        Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
+            StringWriter stackTrace = new StringWriter();
+            e.printStackTrace(new PrintWriter(stackTrace));
+            System.out.println("UNCAUGHT EXCEPTION: " + stackTrace.toString());
+            throw new RuntimeException(e);
+        });
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+            logger.info("In shutdown hook");
+            shutdown();
+        }));
+    }
+
+    public Differ(JobConfiguration config,
+                  DiffJob.Params params,
+                  int perExecutorRateLimit,
+                  DiffJob.Split split,
+                  TokenHelper tokenHelper,
+                  ClusterProvider sourceProvider,
+                  ClusterProvider targetProvider,
+                  ClusterProvider metadataProvider,
+                  DiffJob.TrackerProvider trackerProvider)
+    {
+        logger.info("Creating Differ for {}", split);
+        this.jobId = params.jobId;
+        this.split = split;
+        this.tokenHelper = tokenHelper;
+        this.keyspace = params.keyspace;
+        this.tables = params.tables;
+        this.trackerProvider = trackerProvider;
+        rateLimiter = RateLimiter.create(perExecutorRateLimit);
+        this.reverseReadProbability = config.reverseReadProbability();
+        this.specificTokens = config.specificTokens();
+        synchronized (Differ.class)
+        {
+            /*
+            Spark runs jobs on each worker in the same JVM, we need to initialize these only once, otherwise
+            we run OOM with health checker threads
+             */
+            // yes we could have JobConfiguration return this directly, but snakeyaml doesn't like relocated classes and the driver has to be shaded
+            ConsistencyLevel cl = ConsistencyLevel.valueOf(config.consistencyLevel());
+            if (srcDiffCluster == null)
+            {
+                srcDiffCluster = new DiffCluster(DiffCluster.Type.SOURCE,
+                                                 sourceProvider.getCluster(),
+                                                 params.keyspace,
+                                                 cl,
+                                                 rateLimiter,
+                                                 config.tokenScanFetchSize(),
+                                                 config.partitionReadFetchSize(),
+                                                 config.readTimeoutMillis());
+            }
+
+            if (targetDiffCluster == null)
+            {
+                targetDiffCluster = new DiffCluster(DiffCluster.Type.TARGET,
+                                                    targetProvider.getCluster(),
+                                                    params.keyspace,
+                                                    cl,
+                                                    rateLimiter,
+                                                    config.tokenScanFetchSize(),
+                                                    config.partitionReadFetchSize(),
+                                                    config.readTimeoutMillis());
+            }
+
+            if (journalSession == null)
+            {
+                journalSession = metadataProvider.getCluster().connect();
+                trackerProvider.initializeStatements(journalSession);
+            }
+        }
+    }
+
+    public Map<String, RangeStats> run() {
+        JobMetadataDb.ProgressTracker journal = trackerProvider.getTracker(journalSession, jobId, split);
+        Map<String, DiffJob.TaskStatus> tablesToDiff = filterTables(tables,
+                                                                    split,
+                                                                    journal::getLastStatus,
+                                                                    !specificTokens.isEmpty());
+
+        String metricsPrefix = String.format("%s.%s", srcDiffCluster.clusterId.name(), srcDiffCluster.keyspace);
+        logger.info("Diffing {} for tables {}", split, tablesToDiff);
+
+        for (Map.Entry<String, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
+            final String table = tableStatus.getKey();
+            DiffJob.TaskStatus status = tableStatus.getValue();
+            RangeStats diffStats = status.stats;
+
+            // if this split has already been fully processed, it's being re-run to check
+            // partitions with errors. In that case, we don't want to adjust the split
+            // start and we don't want to update the completed count when we're finished.
+            boolean isRerun = split.end.equals(status.lastToken);
+            BigInteger startToken = status.lastToken == null || isRerun ? split.start : status.lastToken;
+            validateRange(startToken, split.end, tokenHelper);
+
+            TableSpec sourceTable = TableSpec.make(table, srcDiffCluster);
+            TableSpec targetTable = TableSpec.make(table, targetDiffCluster);
+            validateTableSpecs(sourceTable, targetTable);
+
+            DiffContext ctx = new DiffContext(srcDiffCluster,
+                                              targetDiffCluster,
+                                              keyspace,
+                                              sourceTable,
+                                              startToken,
+                                              split.end,
+                                              specificTokens,
+                                              reverseReadProbability);
+
+            String timerName = String.format("%s.%s.split_times", metricsPrefix, table);
+            try (@SuppressWarnings("unused") Timer.Context timer = metrics.timer(timerName).time()) {
+                diffStats.accumulate(diffTable(ctx,
+                                               (error, token) -> journal.recordError(table, token, error),
+                                               (type, token) -> journal.recordMismatch(table, type, token),
+                                               (stats, token) -> journal.updateStatus(table, stats, token)));
+
+                // update the journal with the final state for the table. Use the split's ending token
+                // as the last seen token (even though we may not have actually read any partition for
+                // that token) as this effectively marks the split as done.
+                journal.finishTable(table, diffStats, !isRerun);
+            }
+        }
+
+        Map<String, RangeStats> statsByTable = tablesToDiff.entrySet()
+                                                           .stream()
+                                                           .collect(Collectors.toMap(Map.Entry::getKey,
+                                                                                     e -> e.getValue().stats));
+        updateMetrics(metricsPrefix, statsByTable);
+        return statsByTable;
+    }
+
+    public RangeStats diffTable(final DiffContext context,
+                                final BiConsumer<Throwable, BigInteger> partitionErrorReporter,
+                                final BiConsumer<MismatchType, BigInteger> mismatchReporter,
+                                final BiConsumer<RangeStats, BigInteger> journal) {
+
+        final Iterator<PartitionKey> sourceKeys = context.source.getPartitionKeys(context.table.getTable(),
+                                                                                  context.startToken,
+                                                                                  context.endToken);
+        final Iterator<PartitionKey> targetKeys = context.target.getPartitionKeys(context.table.getTable(),
+                                                                                  context.startToken,
+                                                                                  context.endToken);
+        final Function<PartitionKey, PartitionComparator> partitionTaskProvider =
+            (key) -> {
+                boolean reverse = context.shouldReverse();
+                return new PartitionComparator(context.table,
+                                               context.source.getPartition(context.table, key, reverse),
+                                               context.target.getPartition(context.table, key, reverse));
+            };
+
+        RangeComparator rangeComparator = new RangeComparator(context,
+                                                              partitionErrorReporter,
+                                                              mismatchReporter,
+                                                              journal,
+                                                              COMPARISON_EXECUTOR);
+
+        final RangeStats tableStats = rangeComparator.compare(sourceKeys, targetKeys, partitionTaskProvider);
+        logger.debug("Table [{}] stats - ({})", context.table.getTable(), tableStats);
+        return tableStats;
+    }
+
+    @VisibleForTesting
+    static Map<String, DiffJob.TaskStatus> filterTables(Iterable<String> tables,
+                                                        DiffJob.Split split,
+                                                        Function<String, DiffJob.TaskStatus> journal,
+                                                        boolean includeCompleted) {
+        Map<String, DiffJob.TaskStatus> tablesToProcess = new HashMap<>();
+        for (String table : tables) {
+            DiffJob.TaskStatus taskStatus = journal.apply(table);
+            RangeStats diffStats = taskStatus.stats;
+            BigInteger lastToken = taskStatus.lastToken;
+
+            // When we finish processing a split for a given table, we update the task status in journal
+            // to set the last seen token to the split's end token, to indicate that the split is complete.
+            if (!includeCompleted && lastToken != null && lastToken.equals(split.end)) {
+                logger.info("Found finished table {} for split {}", table, split);
+            }
+            else {
+                tablesToProcess.put(table, diffStats != null
+                                            ? taskStatus
+                                            : new DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
+            }
+        }
+        return tablesToProcess;
+    }
+
+    static void validateTableSpecs(TableSpec source, TableSpec target) {
+        Verify.verify(source.equalsNamesOnly(target),
+                      "Source and target table definitions do not match (Source: %s Target: %s)",
+                      source, target);
+    }
+
+    @VisibleForTesting
+    static void validateRange(BigInteger start, BigInteger end, TokenHelper tokens) {
+
+        Verify.verify(start != null && end != null, "Invalid token range [%s,%s]", start, end);
+
+        Verify.verify(start.compareTo(tokens.min()) >= 0 && end.compareTo(tokens.max()) <= 0 && start.compareTo(end) < 0,
+                      "Invalid token range [%s,%s] for partitioner range [%s,%s]",
+                       start, end, tokens.min(), tokens.max());
+    }
+
+    @VisibleForTesting
+    static Map<String, RangeStats> accumulate(Map<String, RangeStats> stats, Map<String, RangeStats> otherStats)
+    {
+        for (Map.Entry<String, RangeStats> otherEntry : otherStats.entrySet())
+        {
+            if (stats.containsKey(otherEntry.getKey()))
+                stats.get(otherEntry.getKey()).accumulate(otherEntry.getValue());
+            else
+                stats.put(otherEntry.getKey(), otherEntry.getValue());
+        }
+        return stats;
+    }
+
+    private static void updateMetrics(String prefix, Map<String, RangeStats> statsMap)
+    {
+        for (Map.Entry<String, RangeStats> entry : statsMap.entrySet())
+        {
+            String qualifier = String.format("%s.%s", prefix, entry.getKey());
+            RangeStats stats = entry.getValue();
+
+            metrics.meter(qualifier + ".partitions_read").mark(stats.getMatchedPartitions() + stats.getOnlyInSource() + stats.getOnlyInTarget() + stats.getMismatchedPartitions());
+            metrics.counter(qualifier + ".matched_partitions").inc(stats.getMatchedPartitions());
+            metrics.counter(qualifier + ".mismatched_partitions").inc(stats.getMismatchedPartitions());
+
+            metrics.counter(qualifier + ".partitions_only_in_source").inc(stats.getOnlyInSource());
+            metrics.counter(qualifier + ".partitions_only_in_target").inc(stats.getOnlyInTarget());
+            metrics.counter(qualifier + ".skipped_partitions").inc(stats.getSkippedPartitions());
+
+            metrics.counter(qualifier + ".matched_rows").inc(stats.getMatchedRows());
+            metrics.counter(qualifier + ".matched_values").inc(stats.getMatchedValues());
+            metrics.counter(qualifier + ".mismatched_values").inc(stats.getMismatchedValues());
+        }
+    }
+
+    public static void shutdown()
+    {
+        try
+        {
+            if (srcDiffCluster != null) {
+                srcDiffCluster.stop();
+                srcDiffCluster.close();
+            }
+            if (targetDiffCluster != null) {
+                targetDiffCluster.stop();
+                targetDiffCluster.close();
+            }
+            if (journalSession != null) {
+                journalSession.close();
+                journalSession.getCluster().close();
+            }
+            COMPARISON_EXECUTOR.shutdown();
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
new file mode 100644
index 0000000..0ac6521
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.utils.UUIDs;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+public class JobMetadataDb {
+    private static final Logger logger = LoggerFactory.getLogger(JobMetadataDb.class);
+
+    static class ProgressTracker {
+
+        private final UUID jobId;
+        private final int bucket;
+        private final String startToken;
+        private final String endToken;
+        private final String keyspace;
+        private Session session;
+
+        private static PreparedStatement updateStmt;
+        private static PreparedStatement mismatchStmt;
+        private static PreparedStatement errorSummaryStmt;
+        private static PreparedStatement errorDetailStmt;
+        private static PreparedStatement updateCompleteStmt;
+
+        public ProgressTracker(UUID jobId,
+                               int bucket,
+                               BigInteger startToken,
+                               BigInteger endToken,
+                               String keyspace,
+                               Session session) {
+            this.jobId = jobId;
+            this.bucket = bucket;
+            this.startToken = startToken.toString();
+            this.endToken = endToken.toString();
+            this.keyspace = keyspace;
+            this.session = session;
+        }
+
+        /**
+         * Runs on each executor to prepare statements shared across all instances
+         */
+        public static void initializeStatements(Session session, String keyspace) {
+            if (updateStmt == null) {
+                updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+                                                           " job_id," +
+                                                           " bucket," +
+                                                           " table_name," +
+                                                           " start_token," +
+                                                           " end_token," +
+                                                           " matched_partitions," +
+                                                           " mismatched_partitions," +
+                                                           " partitions_only_in_source," +
+                                                           " partitions_only_in_target," +
+                                                           " matched_rows," +
+                                                           " matched_values," +
+                                                           " mismatched_values," +
+                                                           " skipped_partitions," +
+                                                           " last_token )" +
+                                                           "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+                                                           keyspace, Schema.TASK_STATUS));
+            }
+            if (mismatchStmt == null) {
+                mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+                                                             " job_id," +
+                                                             " bucket," +
+                                                             " table_name," +
+                                                             " mismatching_token," +
+                                                             " mismatch_type )" +
+                                                             "VALUES (?, ?, ?, ?, ?)",
+                                                             keyspace, Schema.MISMATCHES));
+            }
+            if (updateCompleteStmt == null) {
+                updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " +
+                                                                   " SET completed = completed + 1" +
+                                                                   " WHERE job_id = ? " +
+                                                                   " AND bucket = ? " +
+                                                                   " AND table_name = ? ",
+                                                                   keyspace, Schema.JOB_STATUS))
+                                            .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+            }
+            if (errorSummaryStmt == null) {
+                errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+                                                                 " job_id," +
+                                                                 " bucket," +
+                                                                 " table_name," +
+                                                                 " start_token," +
+                                                                 " end_token)" +
+                                                                 " VALUES (?, ?, ?, ?, ?)",
+                                                                 keyspace, Schema.ERROR_SUMMARY));
+            }
+            if (errorDetailStmt == null) {
+                errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+                                                                " job_id," +
+                                                                " bucket," +
+                                                                " table_name," +
+                                                                " start_token," +
+                                                                " end_token," +
+                                                                " error_token)" +
+                                                                " VALUES (?, ?, ?, ?, ?, ?)",
+                                                                keyspace, Schema.ERROR_DETAIL));
+            }
+
+        }
+
+        /**
+         *
+         * @param table
+         * @return
+         */
+        public DiffJob.TaskStatus getLastStatus(String table) {
+            ResultSet rs = session.execute(String.format("SELECT last_token, " +
+                                                         "       matched_partitions, " +
+                                                         "       mismatched_partitions, " +
+                                                         "       partitions_only_in_source, " +
+                                                         "       partitions_only_in_target, " +
+                                                         "       matched_rows," +
+                                                         "       matched_values," +
+                                                         "       mismatched_values," +
+                                                         "       skipped_partitions " +
+                                                         " FROM %s.%s " +
+                                                         " WHERE job_id = ? " +
+                                                         " AND   bucket = ? " +
+                                                         " AND   table_name = ? " +
+                                                         " AND   start_token = ? " +
+                                                         " AND   end_token = ?",
+                                                         keyspace, Schema.TASK_STATUS),
+                                           jobId, bucket, table, startToken, endToken);
+            Row row = rs.one();
+            if (null == row)
+                return DiffJob.TaskStatus.EMPTY;
+
+            RangeStats stats = RangeStats.withValues(getOrDefaultLong(row, "matched_partitions"),
+                                                     getOrDefaultLong(row, "mismatched_partitions"),
+                                                     0L, // error counts are per-run and not persisted in the metadata db
+                                                     getOrDefaultLong(row, "skipped_partitions"),
+                                                     getOrDefaultLong(row, "partitions_only_in_source"),
+                                                     getOrDefaultLong(row, "partitions_only_in_target"),
+                                                     getOrDefaultLong(row, "matched_rows"),
+                                                     getOrDefaultLong(row, "matched_values"),
+                                                     getOrDefaultLong(row, "mismatched_values"));
+
+            BigInteger lastToken = row.isNull("last_token") ? null : new BigInteger(row.getString("last_token"));
+            return new DiffJob.TaskStatus(lastToken, stats);
+        }
+
+        /**
+         *
+         * @param table
+         * @param diffStats
+         * @param latestToken
+         */
+        public void updateStatus(String table, RangeStats diffStats, BigInteger latestToken) {
+            session.execute(bindUpdateStatement(table, diffStats, latestToken));
+        }
+
+        public void recordMismatch(String table, MismatchType type, BigInteger token) {
+            logger.info("Detected mismatch in table {}; partition with token {} is {}",
+                        table, token, type == MismatchType.PARTITION_MISMATCH
+                                      ? " different in source and target clusters"
+                                      : type == MismatchType.ONLY_IN_SOURCE ? "only present in source cluster"
+                                                                            : "only present in target cluster");
+            session.execute(bindMismatchesStatement(table, token, type.name()));
+        }
+
+        /**
+         *
+         * @param table
+         * @param token
+         * @param error
+         */
+        public void recordError(String table, BigInteger token, Throwable error) {
+            logger.error(String.format("Encountered error during partition comparison in table %s; " +
+                                       "error for partition with token %s", table, token), error);
+            BatchStatement batch = new BatchStatement();
+            batch.add(bindErrorSummaryStatement(table));
+            batch.add(bindErrorDetailStatement(table, token));
+            batch.setIdempotent(true);
+            session.execute(batch);
+        }
+
+        /**
+         *
+         * @param table
+         * @param stats
+         */
+        public void finishTable(String table, RangeStats stats, boolean updateCompletedCount) {
+            logger.info("Finishing range [{}, {}] for table {}", startToken, endToken, table);
+            // first flush out the last status.
+            session.execute(bindUpdateStatement(table, stats, endToken));
+            // then update the count of completed tasks
+            if (updateCompletedCount)
+                session.execute(updateCompleteStmt.bind(jobId, bucket, table));
+        }
+
+        private Statement bindMismatchesStatement(String table, BigInteger token, String type) {
+            return mismatchStmt.bind(jobId, bucket, table, token.toString(), type)
+                               .setIdempotent(true);
+        }
+
+        private Statement bindErrorSummaryStatement(String table) {
+            return errorSummaryStmt.bind(jobId, bucket, table, startToken, endToken)
+                                   .setIdempotent(true);
+        }
+
+        private Statement bindErrorDetailStatement(String table, BigInteger errorToken) {
+            return errorDetailStmt.bind(jobId, bucket, table, startToken, endToken, errorToken.toString())
+                                  .setIdempotent(true);
+        }
+
+        private Statement bindUpdateStatement(String table, RangeStats stats, BigInteger token) {
+           return bindUpdateStatement(table, stats, token.toString());
+        }
+
+        private Statement bindUpdateStatement(String table, RangeStats stats, String token) {
+            // We don't persist the partition error count from RangeStats as errors
+            // are likely to be transient and not data related, so we don't want to
+            // accumulate them across runs.
+            return updateStmt.bind(jobId,
+                                   bucket,
+                                   table,
+                                   startToken,
+                                   endToken,
+                                   stats.getMatchedPartitions(),
+                                   stats.getMismatchedPartitions(),
+                                   stats.getOnlyInSource(),
+                                   stats.getOnlyInTarget(),
+                                   stats.getMatchedRows(),
+                                   stats.getMatchedValues(),
+                                   stats.getMismatchedValues(),
+                                   stats.getSkippedPartitions(),
+                                   token)
+                             .setIdempotent(true);
+        }
+
+        private static long getOrDefaultLong(Row row, String column) {
+            return (null == row || row.isNull(column)) ? 0L : row.getLong(column);
+        }
+    }
+
+    static class JobLifeCycle {
+        final Session session;
+        final String keyspace;
+
+        public JobLifeCycle(Session session, String metadataKeyspace) {
+            this.session = session;
+            this.keyspace = metadataKeyspace;
+        }
+
+        public DiffJob.Params getJobParams(UUID jobId) {
+            ResultSet rs = session.execute(String.format("SELECT keyspace_name, " +
+                                                         "       table_names," +
+                                                         "       buckets," +
+                                                         "       total_tasks " +
+                                                         "FROM %s.%s " +
+                                                         "WHERE job_id = ?",
+                                                         keyspace, Schema.JOB_SUMMARY),
+                                           jobId);
+            Row row = rs.one();
+            if (null == row)
+                return null;
+
+            return new DiffJob.Params(jobId,
+                                      row.getString("keyspace_name"),
+                                      row.getList("table_names", String.class),
+                                      row.getInt("buckets"),
+                                      row.getInt("total_tasks"));
+        }
+
+
+        // Runs on Driver to insert top level job info
+        public void initializeJob(DiffJob.Params params,
+                                  String sourceClusterName,
+                                  String sourceClusterDesc,
+                                  String targetClusterName,
+                                  String targetClusterDesc) {
+
+            logger.info("Initializing job status");
+            // The job was previously run, so this could be a re-run to
+            // mop up any failed splits so mark it in progress.
+            ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS",
+                                                         keyspace, Schema.RUNNING_JOBS),
+                                           params.jobId);
+            if (!rs.one().getBool("[applied]")) {
+                logger.info("Aborting due to inability to mark job as running. " +
+                            "Did a previous run of job id {} fail non-gracefully?",
+                            params.jobId);
+                throw new RuntimeException("Unable to mark job running, aborting");
+            }
+
+            UUID timeUUID = UUIDs.timeBased();
+            DateTime startDateTime = new DateTime(UUIDs.unixTimestamp(timeUUID), DateTimeZone.UTC);
+
+            rs = session.execute(String.format("INSERT INTO %s.%s (" +
+                                               " job_id," +
+                                               " job_start_time," +
+                                               " buckets," +
+                                               " keyspace_name," +
+                                               " table_names," +
+                                               " source_cluster_name," +
+                                               " source_cluster_desc," +
+                                               " target_cluster_name," +
+                                               " target_cluster_desc," +
+                                               " total_tasks)" +
+                                               " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +
+                                               " IF NOT EXISTS",
+                                               keyspace, Schema.JOB_SUMMARY),
+                                 params.jobId,
+                                 timeUUID,
+                                 params.buckets,
+                                 params.keyspace,
+                                 params.tables,
+                                 sourceClusterName,
+                                 sourceClusterDesc,
+                                 targetClusterName,
+                                 targetClusterDesc,
+                                 params.tasks);
+
+            // This is a brand new job, index its details including start time
+            if (rs.one().getBool("[applied]")) {
+                BatchStatement batch = new BatchStatement();
+                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)",
+                                                            keyspace, Schema.SOURCE_CLUSTER_INDEX),
+                                              sourceClusterName, params.jobId));
+                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)",
+                                                            keyspace, Schema.TARGET_CLUSTER_INDEX),
+                                              targetClusterName, params.jobId));
+                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)",
+                                                            keyspace, Schema.KEYSPACE_INDEX),
+                                              keyspace, params.jobId));
+                batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " +
+                                                            "VALUES ('%s', ?, ?, ?)",
+                                                            keyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
+                                              startDateTime.getHourOfDay(), timeUUID, params.jobId));
+                session.execute(batch);
+            }
+        }
+
+        public void finalizeJob(UUID jobId, Map<String, RangeStats> results) {
+            logger.info("Finalizing job status");
+
+            markNotRunning(jobId);
+
+            BatchStatement batch = new BatchStatement();
+            for (Map.Entry<String, RangeStats> result : results.entrySet()) {
+                String table = result.getKey();
+                RangeStats stats = result.getValue();
+                session.execute(String.format("INSERT INTO %s.%s (" +
+                                              "  job_id," +
+                                              "  table_name," +
+                                              "  matched_partitions," +
+                                              "  mismatched_partitions," +
+                                              "  partitions_only_in_source," +
+                                              "  partitions_only_in_target," +
+                                              "  matched_rows," +
+                                              "  matched_values," +
+                                              "  mismatched_values," +
+                                              "  skipped_partitions) " +
+                                              "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+                                              keyspace, Schema.JOB_RESULTS),
+                                jobId,
+                                table,
+                                stats.getMatchedPartitions(),
+                                stats.getMismatchedPartitions(),
+                                stats.getOnlyInSource(),
+                                stats.getOnlyInTarget(),
+                                stats.getMatchedRows(),
+                                stats.getMatchedValues(),
+                                stats.getMismatchedValues(),
+                                stats.getSkippedPartitions());
+            }
+            session.execute(batch);
+        }
+
+
+        public void markNotRunning(UUID jobId) {
+            logger.info("Marking job {} as not running", jobId);
+
+            ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
+                                                         keyspace, Schema.RUNNING_JOBS),
+                                           jobId);
+            if (!rs.one().getBool("[applied]")) {
+                logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
+                            "during initialization as there may be no entry for this job in the {} table",
+                            jobId, Schema.RUNNING_JOBS);
+            }
+        }
+    }
+
+    static class Schema {
+
+        public static final String TASK_STATUS = "task_status";
+        private static final String TASK_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                         " job_id uuid," +
+                                                         " bucket int," +
+                                                         " table_name text," +
+                                                         " start_token varchar," +
+                                                         " end_token varchar," +
+                                                         " matched_partitions bigint," +
+                                                         " mismatched_partitions bigint, " +
+                                                         " partitions_only_in_source bigint," +
+                                                         " partitions_only_in_target bigint," +
+                                                         " matched_rows bigint," +
+                                                         " matched_values bigint," +
+                                                         " mismatched_values bigint," +
+                                                         " skipped_partitions bigint," +
+                                                         " last_token varchar," +
+                                                         " PRIMARY KEY((job_id, bucket), table_name, start_token, end_token))" +
+                                                         " WITH default_time_to_live = %s";
+
+        public static final String JOB_SUMMARY = "job_summary";
+        private static final String JOB_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                         " job_id uuid," +
+                                                         " job_start_time timeuuid," +
+                                                         " buckets int," +
+                                                         " keyspace_name text," +
+                                                         " table_names frozen<list<text>>," +
+                                                         " source_cluster_name text," +
+                                                         " source_cluster_desc text," +
+                                                         " target_cluster_name text," +
+                                                         " target_cluster_desc text," +
+                                                         " total_tasks int," +
+                                                         " PRIMARY KEY(job_id))" +
+                                                         " WITH default_time_to_live = %s";
+
+        public static final String JOB_RESULTS = "job_results";
+        private static final String JOB_RESULTS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                         " job_id uuid," +
+                                                         " table_name text," +
+                                                         " matched_partitions bigint," +
+                                                         " mismatched_partitions bigint," +
+                                                         " partitions_only_in_source bigint," +
+                                                         " partitions_only_in_target bigint," +
+                                                         " matched_rows bigint," +
+                                                         " matched_values bigint," +
+                                                         " mismatched_values bigint," +
+                                                         " skipped_partitions bigint," +
+                                                         " PRIMARY KEY(job_id, table_name))" +
+                                                         " WITH default_time_to_live = %s";
+
+        public static final String JOB_STATUS = "job_status";
+        private static final String JOB_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                        " job_id uuid," +
+                                                        " bucket int," +
+                                                        " table_name text," +
+                                                        " completed counter," +
+                                                        " PRIMARY KEY ((job_id, bucket), table_name))";
+
+        public static final String MISMATCHES = "mismatches";
+        private static final String MISMATCHES_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                        " job_id uuid," +
+                                                        " bucket int," +
+                                                        " table_name text, " +
+                                                        " mismatching_token varchar, " +
+                                                        " mismatch_type text, " +
+                                                        " PRIMARY KEY ((job_id, bucket), table_name, mismatching_token))" +
+                                                        " WITH default_time_to_live = %s";
+
+        public static final String ERROR_SUMMARY = "task_errors";
+        private static final String ERROR_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                           " job_id uuid," +
+                                                           " bucket int," +
+                                                           " table_name text," +
+                                                           " start_token varchar," +
+                                                           " end_token varchar," +
+                                                           " PRIMARY KEY ((job_id, bucket), table_name, start_token, end_token))" +
+                                                           " WITH default_time_to_live = %s";
+
+        public static final String ERROR_DETAIL = "partition_errors";
+        private static final String ERROR_DETAIL_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                          " job_id uuid," +
+                                                          " bucket int," +
+                                                          " table_name text," +
+                                                          " start_token varchar," +
+                                                          " end_token varchar," +
+                                                          " error_token varchar," +
+                                                          " PRIMARY KEY ((job_id, bucket, table_name, start_token, end_token), error_token))" +
+                                                          " WITH default_time_to_live = %s";
+
+        public static final String SOURCE_CLUSTER_INDEX = "source_cluster_index";
+        private static final String SOURCE_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                                  " source_cluster_name text," +
+                                                                  " job_id uuid," +
+                                                                  " PRIMARY KEY (source_cluster_name, job_id))" +
+                                                                  " WITH default_time_to_live = %s";
+
+        public static final String TARGET_CLUSTER_INDEX = "target_cluster_index";
+        private static final String TARGET_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                                  " target_cluster_name text," +
+                                                                  " job_id uuid," +
+                                                                  " PRIMARY KEY (target_cluster_name, job_id))" +
+                                                                  " WITH default_time_to_live = %s";
+
+        public static final String KEYSPACE_INDEX = "keyspace_index";
+        private static final String KEYSPACE_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                            " keyspace_name text," +
+                                                            " job_id uuid," +
+                                                            " PRIMARY KEY(keyspace_name, job_id))" +
+                                                            " WITH default_time_to_live = %s";
+
+        public static final String JOB_START_INDEX = "job_start_index";
+        private static final String JOB_START_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                             " job_start_date date," +
+                                                             " job_start_hour int," +
+                                                             " job_start_time timeuuid," +
+                                                             " job_id uuid," +
+                                                             " PRIMARY KEY ((job_start_date, job_start_hour), job_start_time))" +
+                                                             " WITH default_time_to_live = %s";
+
+        public static final String RUNNING_JOBS = "running_jobs";
+        private static final String RUNNING_JOBS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+                                                          " job_id uuid," +
+                                                          " PRIMARY KEY (job_id))" +
+                                                          " WITH default_time_to_live = %s";
+
+        private static final String KEYSPACE_SCHEMA = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s";
+
+
+        public static void maybeInitialize(Session session, MetadataKeyspaceOptions options) {
+            if (!options.should_init)
+                return;
+
+            logger.info("Initializing cassandradiff journal schema in \"{}\" keyspace", options.keyspace);
+            session.execute(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication));
+            session.execute(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl));
+            session.execute(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS));
+            session.execute(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl));
+            session.execute(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl));
+            session.execute(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl));
+            session.execute(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl));
+            session.execute(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl));
+            session.execute(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl));
+            session.execute(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl));
+            session.execute(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl));
+            session.execute(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl));
+            session.execute(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl));
+            logger.info("Schema initialized");
+        }
+    }
+}
+
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java b/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java
new file mode 100644
index 0000000..b23ac35
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+public enum MismatchType {
+
+    ONLY_IN_SOURCE,
+    ONLY_IN_TARGET,
+    PARTITION_MISMATCH
+
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
new file mode 100644
index 0000000..6434dc8
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.Iterator;
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+
+public class PartitionComparator implements Callable<PartitionStats> {
+
+    private static final Logger logger = LoggerFactory.getLogger(PartitionComparator.class);
+
+    private final TableSpec tableSpec;
+    private final Iterator<Row> source;
+    private final Iterator<Row> target;
+
+    public PartitionComparator(TableSpec tableSpec,
+                               Iterator<Row> source,
+                               Iterator<Row> target) {
+        this.tableSpec = tableSpec;
+        this.source = source;
+        this.target = target;
+    }
+
+    public PartitionStats call() {
+        PartitionStats partitionStats = new PartitionStats();
+
+        if (source == null || target == null) {
+            logger.error("Skipping partition because one result was null (timeout despite retries)");
+            partitionStats.skipped = true;
+            return partitionStats;
+        }
+
+        while (source.hasNext() && target.hasNext()) {
+
+            Row sourceRow = source.next();
+            Row targetRow = target.next();
+
+            // if primary keys don't match don't proceed any further, just mark the
+            // partition as mismatched and be done
+            if (!clusteringsEqual(sourceRow, targetRow)) {
+                partitionStats.allClusteringsMatch = false;
+                return partitionStats;
+            }
+
+            partitionStats.matchedRows++;
+
+            // if the rows match, but there are mismatching values in the regular columns
+            // we can continue processing the partition, so just flag it as mismatched and continue
+            checkRegularColumnEquality(partitionStats, sourceRow, targetRow);
+        }
+
+        // if one of the iterators isn't exhausted, then there's a mismatch at the partition level
+        if (source.hasNext() || target.hasNext())
+            partitionStats.allClusteringsMatch = false;
+
+        return partitionStats;
+    }
+
+    private boolean clusteringsEqual(Row source, Row target) {
+        for (ColumnMetadata column : tableSpec.getClusteringColumns()) {
+            Object fromSource = source.getObject(column.getName());
+            Object fromTarget = target.getObject(column.getName());
+
+            if ((fromSource == null) != (fromTarget == null))
+                return false;
+
+            if (fromSource != null && !fromSource.equals(fromTarget))
+                return false;
+        }
+        return true;
+    }
+
+    private void checkRegularColumnEquality(PartitionStats stats, Row source, Row target) {
+        for (ColumnMetadata column : tableSpec.getRegularColumns()) {
+            Object fromSource = source.getObject(column.getName());
+            Object fromTarget = target.getObject(column.getName());
+            if (fromSource == null) {
+                if (fromTarget == null) {
+                    stats.matchedValues++;
+                } else {
+                    stats.mismatchedValues++;
+                }
+            } else {
+                if (fromSource.equals(fromTarget)) {
+                    stats.matchedValues++;
+                } else {
+                    stats.mismatchedValues++;
+                }
+            }
+        }
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
new file mode 100644
index 0000000..d31da4f
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datastax.driver.core.*;
+import org.jetbrains.annotations.NotNull;
+
+public class PartitionKey implements Comparable<PartitionKey> {
+
+    private final Row row;
+
+    public PartitionKey(Row row) {
+        this.row = row;
+    }
+
+    public BigInteger getTokenAsBigInteger(){
+        Token token = getToken();
+        if (token.getType() == DataType.bigint()) {
+            return BigInteger.valueOf((Long) token.getValue());
+        } else {
+            return (BigInteger) token.getValue();
+        }
+    }
+
+    public List<Object> getComponents() {
+        int cols = row.getColumnDefinitions().size();
+        List<Object> columns = new ArrayList<>(cols);
+        // Note we start at index=1, because index=0 is the token
+        for (int i = 1; i < cols; i++)
+            columns.add(row.getObject(i));
+        return columns;
+    }
+
+    @VisibleForTesting
+    protected Token getToken() {
+        return row.getToken(0);
+    }
+
+    public int compareTo(@NotNull PartitionKey o) {
+        return getToken().compareTo(o.getToken());
+    }
+
+    public boolean equals(Object obj) {
+        return this == obj || (obj instanceof PartitionKey &&  this.compareTo((PartitionKey)obj) == 0);
+    }
+
+    public int hashCode() {
+        return Objects.hash(getTokenAsBigInteger());
+    }
+
+    public String toString() {
+        return StreamSupport.stream(row.getColumnDefinitions().spliterator(), false)
+                            .map(ColumnDefinitions.Definition::getName)
+                            .map(row::getObject)
+                            .filter(Objects::nonNull)
+                            .map(Object::toString)
+                            .collect(Collectors.joining(":"));
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java
new file mode 100644
index 0000000..7f020ff
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+public class PartitionStats {
+    public boolean skipped = false;
+    public boolean allClusteringsMatch = true;
+    public long matchedRows;
+    public long matchedValues;
+    public long mismatchedValues;
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
new file mode 100644
index 0000000..36ce2b5
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.*;
+
+import com.google.common.base.Verify;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RangeComparator {
+
+    private static final Logger logger = LoggerFactory.getLogger(RangeComparator.class);
+
+    private final DiffContext context;
+    private final BiConsumer<Throwable, BigInteger> errorReporter;
+    private final BiConsumer<MismatchType, BigInteger> mismatchReporter;
+    private final BiConsumer<RangeStats, BigInteger> journal;
+    private final ComparisonExecutor comparisonExecutor;
+
+    public RangeComparator(DiffContext context,
+                           BiConsumer<Throwable, BigInteger> errorReporter,
+                           BiConsumer<MismatchType,BigInteger> mismatchReporter,
+                           BiConsumer<RangeStats, BigInteger> journal,
+                           ComparisonExecutor comparisonExecutor) {
+        this.context = context;
+        this.errorReporter = errorReporter;
+        this.mismatchReporter = mismatchReporter;
+        this.journal = journal;
+        this.comparisonExecutor = comparisonExecutor;
+    }
+
+    public RangeStats compare(Iterator<PartitionKey> sourceKeys,
+                              Iterator<PartitionKey> targetKeys,
+                              Function<PartitionKey, PartitionComparator> partitionTaskProvider) {
+
+        final RangeStats rangeStats = RangeStats.newStats();
+        // We can catch this condition earlier, but it doesn't hurt to also check here
+        if (context.startToken.equals(context.endToken))
+            return rangeStats;
+
+        Phaser phaser = new Phaser(1);
+        AtomicLong partitionCount = new AtomicLong(0);
+        AtomicReference<BigInteger> highestTokenSeen = new AtomicReference<>(context.startToken);
+
+        logger.info("Comparing range [{},{}]", context.startToken, context.endToken);
+        try {
+            PartitionKey sourceKey = nextKey(sourceKeys);
+            PartitionKey targetKey = nextKey(targetKeys);
+
+            // special case for start of range - handles one cluster supplying an empty range
+            if ((sourceKey == null) != (targetKey == null)) {
+                if (sourceKey == null) {
+                    logger.info("First in range, source iter is empty {}", context);
+                    onlyInTarget(rangeStats, targetKey);
+                    targetKeys.forEachRemaining(key -> onlyInTarget(rangeStats, key));
+                } else {
+                    logger.info("First in range, target iter is empty {}", context);
+                    onlyInSource(rangeStats, sourceKey);
+                    sourceKeys.forEachRemaining(key -> onlyInSource(rangeStats, key));
+                }
+                return rangeStats;
+            }
+
+            while (sourceKey != null && targetKey != null) {
+
+                int ret = sourceKey.compareTo(targetKey);
+                if (ret > 0) {
+                    onlyInTarget(rangeStats, targetKey);
+                    targetKey = nextKey(targetKeys);
+                } else if (ret < 0) {
+                    onlyInSource(rangeStats, sourceKey);
+                    sourceKey = nextKey(sourceKeys);
+                } else {
+
+                    Verify.verify(sourceKey.equals(targetKey),
+                                  "Can only compare partitions with identical keys: (%s, %s)",
+                                  sourceKey, targetKey);
+
+                    // For results where the key exists in both, we'll fire off an async task to walk the
+                    // partition and compare all the rows. The result of that comparison is added to the
+                    // totals for the range and the highest seen token updated in the onSuccess callback
+
+                    if (!context.isTokenAllowed(sourceKey.getTokenAsBigInteger())) {
+                        logger.debug("Skipping disallowed token {}", sourceKey.getTokenAsBigInteger());
+                        rangeStats.skipPartition();
+                        sourceKey = nextKey(sourceKeys);
+                        targetKey = nextKey(targetKeys);
+                        continue;
+                    }
+
+                    BigInteger token = sourceKey.getTokenAsBigInteger();
+                    try {
+                        PartitionComparator comparisonTask = partitionTaskProvider.apply(sourceKey);
+                        comparisonExecutor.submit(comparisonTask,
+                                                  onSuccess(rangeStats, partitionCount, token, highestTokenSeen, mismatchReporter, journal),
+                                                  onError(rangeStats, token, errorReporter),
+                                                  phaser);
+                    } catch (Throwable t) {
+                        // Handle errors thrown when creating the comparison task. This should trap timeouts and
+                        // unavailables occurring when performing the initial query to read the full partition.
+                        // Errors thrown when paging through the partition in comparisonTask will be handled by
+                        // the onError callback.
+                        rangeStats.partitionError();
+                        errorReporter.accept(t, token);
+                    } finally {
+                        // if the cluster has been shutdown because the task failed the underlying iterators
+                        // of partition keys will return hasNext == false
+                        sourceKey = nextKey(sourceKeys);
+                        targetKey = nextKey(targetKeys);
+                    }
+                }
+            }
+
+            // handle case where only one iterator is exhausted
+            if (sourceKey != null)
+                onlyInSource(rangeStats, sourceKey);
+            else if (targetKey != null)
+                onlyInTarget(rangeStats, targetKey);
+
+            drain(sourceKeys, targetKeys, rangeStats);
+
+        } catch (Exception e) {
+            // Handles errors thrown by iteration of underlying resultsets of partition keys by
+            // calls to nextKey(). Such errors should cause the overall range comparison to fail,
+            // but we must ensure that any in-flight partition comparisons complete so that either
+            // the onSuccess or onError callback is fired for each one. This is necessary to ensure
+            // that we record the highest seen token and any failed partitions and can safely re-run.
+            logger.debug("Waiting for {} in flight tasks before propagating error", phaser.getUnarrivedParties());
+            phaser.arriveAndAwaitAdvance();
+            throw new RuntimeException(String.format("Error encountered during range comparison for [%s:%s]",
+                                       context.startToken, context.endToken), e);
+        }
+
+        logger.debug("Waiting for {} in flight tasks before returning", phaser.getUnarrivedParties());
+        phaser.arriveAndAwaitAdvance();
+
+        if (!rangeStats.allMatches())
+            logger.info("Segment [{}:{}] stats - ({})", context.startToken, context.endToken, rangeStats);
+
+        return rangeStats;
+    }
+
+    private void drain(Iterator<PartitionKey> sourceKeys,
+                             Iterator<PartitionKey> targetKeys,
+                             RangeStats rangeStats) {
+        if (sourceKeys.hasNext()) {
+            logger.info("Source keys not exhausted {}", context);
+            sourceKeys.forEachRemaining(key -> onlyInSource(rangeStats, key));
+        } else if (targetKeys.hasNext()) {
+            logger.info("Target keys not exhausted: {}", context);
+            targetKeys.forEachRemaining(key -> onlyInTarget(rangeStats, key));
+        }
+    }
+
+    private void onlyInTarget(RangeStats stats, PartitionKey key) {
+        stats.onlyInTarget();
+        mismatchReporter.accept(MismatchType.ONLY_IN_TARGET, key.getTokenAsBigInteger());
+    }
+
+    private void onlyInSource(RangeStats stats, PartitionKey key) {
+        stats.onlyInSource();
+        mismatchReporter.accept(MismatchType.ONLY_IN_SOURCE, key.getTokenAsBigInteger());
+    }
+
+    private PartitionKey nextKey(Iterator<PartitionKey> keys) {
+        return keys.hasNext() ? keys.next() : null;
+    }
+
+    private Consumer<PartitionStats> onSuccess(final RangeStats rangeStats,
+                                               final AtomicLong partitionCount,
+                                               final BigInteger currentToken,
+                                               final AtomicReference<BigInteger> highestSeenToken,
+                                               final BiConsumer<MismatchType, BigInteger> mismatchReporter,
+                                               final BiConsumer<RangeStats, BigInteger> journal) {
+        return (result) -> {
+
+            rangeStats.accumulate(result);
+            if (!result.allClusteringsMatch || result.mismatchedValues > 0) {
+                mismatchReporter.accept(MismatchType.PARTITION_MISMATCH, currentToken);
+                rangeStats.mismatchedPartition();
+            } else {
+                rangeStats.matchedPartition();
+            }
+
+            BigInteger highest = highestSeenToken.get();
+            while (currentToken.compareTo(highest) > 0) {
+                if (highestSeenToken.compareAndSet(highest, currentToken))
+                    break;
+
+                highest = highestSeenToken.get();
+            }
+
+            // checkpoint ever 10 partitions
+            if (partitionCount.incrementAndGet() % 10 == 0)
+                journal.accept(rangeStats, highestSeenToken.get());
+        };
+    }
+
+    private Consumer<Throwable> onError(final RangeStats rangeStats,
+                                        final BigInteger currentToken,
+                                        final BiConsumer<Throwable, BigInteger> errorReporter) {
+        return (error) -> {
+            rangeStats.partitionError();
+            errorReporter.accept(error, currentToken);
+        };
+    }
+}
+
+
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
new file mode 100644
index 0000000..a0f8043
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.*;
+import java.util.Objects;
+import java.util.concurrent.atomic.LongAdder;
+
+public class RangeStats implements Serializable {
+
+    private transient LongAdder matchedPartitions;
+    private transient LongAdder mismatchedPartitions;
+    private transient LongAdder errorPartitions;
+    private transient LongAdder skippedPartitions;
+    private transient LongAdder onlyInSource;
+    private transient LongAdder onlyInTarget;
+    private transient LongAdder matchedRows;
+    private transient LongAdder matchedValues;
+    private transient LongAdder mismatchedValues;
+
+    public static RangeStats newStats() {
+        return new RangeStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+    }
+
+    public static RangeStats withValues(long matchedPartitions,
+                                        long mismatchedPartitions,
+                                        long errorPartitions,
+                                        long skippedPartitions,
+                                        long onlyInSource,
+                                        long onlyInTarget,
+                                        long matchedRows,
+                                        long matchedValues,
+                                        long mismatchedValues) {
+
+        return new RangeStats(matchedPartitions,
+                              mismatchedPartitions,
+                              errorPartitions,
+                              skippedPartitions,
+                              onlyInSource,
+                              onlyInTarget,
+                              matchedRows,
+                              matchedValues,
+                              mismatchedValues);
+    }
+
+    private RangeStats(long matchedPartitions,
+                       long mismatchedPartitions,
+                       long errorPartitions,
+                       long skippedPartitions,
+                       long onlyInSource,
+                       long onlyInTarget,
+                       long matchedRows,
+                       long matchedValues,
+                       long mismatchedValues) {
+
+        this.matchedPartitions      = new LongAdder();
+        this.mismatchedPartitions   = new LongAdder();
+        this.errorPartitions        = new LongAdder();
+        this.skippedPartitions      = new LongAdder();
+        this.onlyInSource           = new LongAdder();
+        this.onlyInTarget           = new LongAdder();
+        this.matchedRows            = new LongAdder();
+        this.matchedValues          = new LongAdder();
+        this.mismatchedValues       = new LongAdder();
+
+        this.matchedPartitions.add(matchedPartitions);
+        this.mismatchedPartitions.add(mismatchedPartitions);
+        this.errorPartitions.add(errorPartitions);
+        this.skippedPartitions.add(skippedPartitions);
+        this.onlyInSource.add(onlyInSource);
+        this.onlyInTarget.add(onlyInTarget);
+        this.matchedRows.add(matchedRows);
+        this.matchedValues.add(matchedValues);
+        this.mismatchedValues.add(mismatchedValues);
+    }
+
+    public void matchedPartition() {
+        matchedPartitions.add(1L);
+    }
+
+    public long getMatchedPartitions() {
+        return matchedPartitions.sum();
+    }
+
+    public void onlyInSource() {
+        onlyInSource.add(1L);
+    }
+
+    public long getOnlyInSource() {
+        return onlyInSource.sum();
+    }
+
+    public void onlyInTarget() {
+        onlyInTarget.add(1L);
+    }
+
+    public long getOnlyInTarget() {
+        return onlyInTarget.sum();
+    }
+
+    public long getMatchedRows() {
+        return matchedRows.sum();
+    }
+
+    public long getMatchedValues() {
+        return matchedValues.sum();
+    }
+
+    public long getMismatchedValues() {
+        return mismatchedValues.sum();
+    }
+
+    public void mismatchedPartition() {
+        mismatchedPartitions.add(1L);
+    }
+
+    public long getMismatchedPartitions() {
+        return mismatchedPartitions.sum();
+    }
+
+    public void skipPartition() {
+        skippedPartitions.add(1L);
+    }
+
+    public long getSkippedPartitions() {
+        return skippedPartitions.sum();
+    }
+
+    public void partitionError() {
+        errorPartitions.add(1L);
+    }
+
+    public long getErrorPartitions() {
+        return errorPartitions.sum();
+    }
+
+    public RangeStats accumulate(final PartitionStats partitionStats) {
+        this.matchedRows.add(partitionStats.matchedRows);
+        this.matchedValues.add(partitionStats.matchedValues);
+        this.mismatchedValues.add(partitionStats.mismatchedValues);
+        if (partitionStats.skipped)
+            this.skippedPartitions.add(1L);
+
+        return this;
+    }
+
+    public RangeStats accumulate(final RangeStats rangeStats) {
+        this.matchedPartitions.add(rangeStats.matchedPartitions.sum());
+        this.mismatchedPartitions.add(rangeStats.mismatchedPartitions.sum());
+        this.errorPartitions.add(rangeStats.errorPartitions.sum());
+        this.skippedPartitions.add(rangeStats.skippedPartitions.sum());
+        this.onlyInSource.add(rangeStats.onlyInSource.sum());
+        this.onlyInTarget.add(rangeStats.onlyInTarget.sum());
+        this.matchedRows.add(rangeStats.matchedRows.sum());
+        this.matchedValues.add(rangeStats.matchedValues.sum());
+        this.mismatchedValues.add(rangeStats.mismatchedValues.sum());
+        return this;
+    }
+
+    public boolean allMatches () {
+        return onlyInSource.sum() == 0
+               && errorPartitions.sum() == 0
+               && onlyInTarget.sum() == 0
+               && mismatchedValues.sum() == 0
+               && skippedPartitions.sum() == 0;
+    }
+
+    public boolean isEmpty() {
+        return matchedPartitions.sum() == 0
+                && mismatchedPartitions.sum() == 0
+                && errorPartitions.sum() == 0
+                && skippedPartitions.sum() == 0
+                && onlyInSource.sum() == 0
+                && onlyInTarget.sum() == 0
+                && matchedRows.sum() == 0
+                && matchedValues.sum() == 0
+                && mismatchedValues.sum() == 0;
+    }
+
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof RangeStats))
+            return false;
+
+        RangeStats other = (RangeStats)o;
+        return this.matchedPartitions.sum() == other.matchedPartitions.sum()
+               && mismatchedPartitions.sum() == other.mismatchedPartitions.sum()
+               && errorPartitions.sum() == other.errorPartitions.sum()
+               && skippedPartitions.sum() == other.skippedPartitions.sum()
+               && onlyInSource.sum() == other.onlyInSource.sum()
+               && onlyInTarget.sum() == other.onlyInTarget.sum()
+               && matchedRows.sum() == other.matchedRows.sum()
+               && matchedValues.sum() == other.matchedValues.sum()
+               && mismatchedValues.sum() == other.mismatchedValues.sum();
+    }
+
+    public int hashCode() {
+        return Objects.hash(matchedPartitions.sum(),
+                            mismatchedPartitions.sum(),
+                            errorPartitions.sum(),
+                            skippedPartitions.sum(),
+                            onlyInSource.sum(),
+                            onlyInTarget.sum(),
+                            matchedRows.sum(),
+                            matchedValues.sum(),
+                            mismatchedValues.sum());
+    }
+
+    public String toString() {
+        return String.format("Matched Partitions - %d, " +
+                             "Mismatched Partitions - %d, " +
+                             "Partition Errors - %d, " +
+                             "Partitions Only In Source - %d, " +
+                             "Partitions Only In Target - %d, " +
+                             "Skipped Partitions - %d, " +
+                             "Matched Rows - %d, " +
+                             "Matched Values - %d, " +
+                             "Mismatched Values - %d ",
+                             matchedPartitions.sum(),
+                             mismatchedPartitions.sum(),
+                             errorPartitions.sum(),
+                             onlyInSource.sum(),
+                             onlyInTarget.sum(),
+                             skippedPartitions.sum(),
+                             matchedRows.sum(),
+                             matchedValues.sum(),
+                             mismatchedValues.sum());
+    }
+
+    // For serialization
+
+    private RangeStats() {}
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.writeLong(matchedPartitions.sum());
+        out.writeLong(mismatchedPartitions.sum());
+        out.writeLong(errorPartitions.sum());
+        out.writeLong(skippedPartitions.sum());
+        out.writeLong(onlyInSource.sum());
+        out.writeLong(onlyInTarget.sum());
+        out.writeLong(matchedRows.sum());
+        out.writeLong(matchedValues.sum());
+        out.writeLong(mismatchedValues.sum());
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        this.matchedPartitions      = new LongAdder();
+        this.mismatchedPartitions   = new LongAdder();
+        this.errorPartitions        = new LongAdder();
+        this.skippedPartitions      = new LongAdder();
+        this.onlyInSource           = new LongAdder();
+        this.onlyInTarget           = new LongAdder();
+        this.matchedRows            = new LongAdder();
+        this.matchedValues          = new LongAdder();
+        this.mismatchedValues       = new LongAdder();
+
+        this.matchedPartitions.add(in.readLong());
+        this.mismatchedPartitions.add(in.readLong());
+        this.errorPartitions.add(in.readLong());
+        this.skippedPartitions.add(in.readLong());
+        this.onlyInSource.add(in.readLong());
+        this.onlyInTarget.add(in.readLong());
+        this.matchedRows.add(in.readLong());
+        this.matchedValues.add(in.readLong());
+        this.mismatchedValues.add(in.readLong());
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
new file mode 100644
index 0000000..d2f0963
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import com.datastax.driver.core.*;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import static org.apache.cassandra.diff.DiffContext.cqlizedString;
+
+public class TableSpec {
+
+    private final String table;
+    private ImmutableList<ColumnMetadata> clusteringColumns;
+    private ImmutableList<ColumnMetadata> regularColumns;
+
+
+    public String getTable()
+    {
+        return table;
+    }
+
+
+    public ImmutableList<ColumnMetadata> getClusteringColumns() {
+        return clusteringColumns;
+    }
+
+    public ImmutableList<ColumnMetadata> getRegularColumns() {
+        return regularColumns;
+    }
+
+
+    /**
+     * @param table the table to diff
+     * @param clusteringColumns the clustering columns, retrieved from cluster using the client
+     * @param regularColumns the non-primary key columns, retrieved from cluster using the client
+     */
+    TableSpec(final String table,
+              final List<ColumnMetadata> clusteringColumns,
+              final List<ColumnMetadata> regularColumns) {
+        this.table = table;
+        this.clusteringColumns = ImmutableList.copyOf(clusteringColumns);
+        this.regularColumns = ImmutableList.copyOf(regularColumns);
+    }
+
+    public static TableSpec make(String table, DiffCluster diffCluster) {
+        final Cluster cluster = diffCluster.cluster;
+
+        final String cqlizedKeyspace = cqlizedString(diffCluster.keyspace);
+        final String cqlizedTable = cqlizedString(table);
+
+        KeyspaceMetadata ksMetadata = cluster.getMetadata().getKeyspace(cqlizedKeyspace);
+        if (ksMetadata == null) {
+            throw new IllegalArgumentException(String.format("Keyspace %s not found in %s cluster", diffCluster.keyspace, diffCluster.clusterId));
+        }
+
+        TableMetadata tableMetadata = ksMetadata.getTable(cqlizedTable);
+        List<ColumnMetadata> clusteringColumns = tableMetadata.getClusteringColumns();
+        List<ColumnMetadata> regularColumns = tableMetadata.getColumns()
+                                                           .stream()
+                                                           .filter(c -> !(clusteringColumns.contains(c)))
+                                                           .collect(Collectors.toList());
+        return new TableSpec(tableMetadata.getName(), clusteringColumns, regularColumns);
+    }
+
+    public boolean equalsNamesOnly(TableSpec other) {
+        return this.table.equals(other.table)
+            && columnNames(this.clusteringColumns).equals(columnNames(other.clusteringColumns))
+            && columnNames(this.regularColumns).equals(columnNames(other.regularColumns));
+    }
+
+    private static List<String> columnNames(List<ColumnMetadata> columns) {
+        return columns.stream().map(ColumnMetadata::getName).collect(Collectors.toList());
+    }
+
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof TableSpec))
+            return false;
+
+        TableSpec other = (TableSpec)o;
+        return this.table.equals(other.table)
+               && this.clusteringColumns.equals(other.clusteringColumns)
+               && this.regularColumns.equals(other.regularColumns);
+
+    }
+
+    public int hashCode() {
+        return Objects.hash(table, clusteringColumns, regularColumns);
+    }
+
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+                          .add("table", table)
+                          .add("clusteringColumns", clusteringColumns)
+                          .add("regularColumns", regularColumns)
+                          .toString();
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java b/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java
new file mode 100644
index 0000000..3521a4b
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+
+public enum TokenHelper {
+
+    MURMUR3 {
+        @Override
+        public BigInteger min() {
+            return BigInteger.valueOf(Long.MIN_VALUE);
+        }
+
+        @Override
+        public BigInteger max() {
+            return BigInteger.valueOf(Long.MAX_VALUE);
+        }
+
+        @Override
+        public Object forBindParam(BigInteger token) {
+            return token.longValue();
+        }
+    },
+
+    RANDOM {
+        @Override
+        public BigInteger min() {
+            return BigInteger.ONE.negate();
+        }
+
+        @Override
+        public BigInteger max() {
+            return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE);
+        }
+
+        @Override
+        public Object forBindParam(BigInteger token) {
+            return token;
+        }
+    };
+
+    public abstract BigInteger min();
+    public abstract BigInteger max();
+    public abstract Object forBindParam(BigInteger token);
+
+    public static TokenHelper forPartitioner(String partitionerName) {
+        if (partitionerName.endsWith("Murmur3Partitioner")) return MURMUR3;
+        else if (partitionerName.endsWith("RandomPartitioner")) return RANDOM;
+        else throw new IllegalArgumentException("Unsupported Partitioner :" + partitionerName);
+    }
+}
diff --git a/spark-job/src/main/resources/log4j2.xml b/spark-job/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..64aec61
--- /dev/null
+++ b/spark-job/src/main/resources/log4j2.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="INFO">
+            <AppenderRef ref="Console"/>
+        </Root>
+        <!--Logger name="org.apache.cassandra.diff" level="info">
+            <AppenderRef ref="Console"/>
+        </Logger>
+        <Logger name="org.apache.cassandra.diff" level="info">
+            <AppenderRef ref="Console"/>
+        </Logger-->
+    </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java b/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java
new file mode 100644
index 0000000..cc7fb95
--- /dev/null
+++ b/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datastax.driver.core;
+
+public class ColumnMetadataHelper {
+
+    public static ColumnMetadata column(String name) {
+        return ColumnMetadata.forAlias(null, name, null);
+    }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java
new file mode 100644
index 0000000..002769f
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.*;
+import org.junit.Test;
+
+import com.codahale.metrics.*;
+
+import static org.apache.cassandra.diff.TestUtils.assertThreadWaits;
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+
+public class ComparisonExecutorTest {
+
+    @Test
+    public void submitBlocksWhenMaxTasksExceeded() throws Exception {
+        // submit maxTasks, then assert that further submission blocks until tasks are processed
+        int maxTasks = 3;
+        MetricRegistry metrics = metrics();
+        final ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+        Gauge waitingToSubmit = metrics.getGauges().get("BlockedTasks");
+        assertEquals(0, waitingToSubmit.getValue());
+
+        final AtomicInteger successful = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+        final AtomicInteger failed = new AtomicInteger(0);
+        final Consumer<Throwable> onError = (t) -> failed.incrementAndGet();
+        final Phaser phaser = new Phaser(1);
+
+        BlockingTask[] tasks = new BlockingTask[5];
+        for (int i=0; i<5; i++)
+            tasks[i] = new BlockingTask(i);
+
+        // Ensure that the submission itself does not block before the max number of tasks are submitted
+        executor.submit(tasks[0], onSuccess, onError, phaser);
+        executor.submit(tasks[1], onSuccess, onError, phaser);
+        executor.submit(tasks[2], onSuccess, onError, phaser);
+        assertEquals(0, waitingToSubmit.getValue());
+
+        // Now submit another pair of tasks which should block as the executor is fully occupied
+        final CountDownLatch latch = new CountDownLatch(2);
+        Thread t1 = new Thread(() -> { latch.countDown(); executor.submit(tasks[3], onSuccess, onError, phaser);});
+        Thread t2 = new Thread(() -> { latch.countDown(); executor.submit(tasks[4], onSuccess, onError, phaser);});
+        t1.start();
+        t2.start();
+        // wait for both to attempt submission
+        latch.await();
+        assertThreadWaits(t1);
+        assertThreadWaits(t2);
+        assertEquals(2, waitingToSubmit.getValue());
+
+        // Let the first waiting task complete, which should allow t1 to complete its submission
+        tasks[0].latch.countDown();
+        t1.join();
+        // the second submission should still be waiting on a slot
+        assertThreadWaits(t2);
+        assertEquals(1, waitingToSubmit.getValue());
+
+        // Let another task complete, allowing t2 to complete its submission
+        tasks[1].latch.countDown();
+        t2.join();
+        assertEquals(0, waitingToSubmit.getValue());
+
+        // Let all tasks complete, wait for them to do so then verify counters
+        for (int i=2; i<=4; i++)
+            tasks[i].latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(5, successful.get());
+        assertEquals(0, failed.get());
+    }
+
+    @Test
+    public void handleTaskFailure() {
+        // Ensure that the failure callback is fired, a permit for task submission
+        // returned and the phaser notified when a task throws
+        int maxTasks = 5;
+        MetricRegistry metrics = metrics();
+        ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+        Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+        final AtomicInteger successful = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+        AtomicReference<Throwable> thrown = new AtomicReference<>();
+        final Consumer<Throwable> onError = thrown::set;
+        final Phaser phaser = new Phaser(1);
+
+        assertEquals(maxTasks, availableSlots.getValue());
+
+        RuntimeException toThrow = new RuntimeException("FAIL");
+        BlockingTask task = new BlockingTask(0, toThrow);
+        executor.submit(task, onSuccess, onError, phaser);
+        assertEquals(maxTasks - 1, availableSlots.getValue());
+
+        assertEquals(2, phaser.getUnarrivedParties());
+        task.latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(maxTasks, availableSlots.getValue());
+        assertEquals(0, successful.get());
+        assertEquals(toThrow, thrown.get());
+    }
+
+    @Test
+    public void handleUncaughtExceptionInFailureCallback() {
+        // Ensure that if the failure callback throws, a permit for submission is
+        // still returned and the phaser notified
+        int maxTasks = 5;
+        MetricRegistry metrics = metrics();
+        ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+        Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+        final AtomicInteger successful = new AtomicInteger(0);
+        final AtomicInteger failures = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+        final Consumer<Throwable> onError =  (t) -> { failures.incrementAndGet(); throw new RuntimeException("UNCAUGHT"); };
+        final Phaser phaser = new Phaser(1);
+
+        assertEquals(maxTasks, availableSlots.getValue());
+
+        RuntimeException toThrow = new RuntimeException("FAIL");
+        try {
+            onError.accept(toThrow);
+            fail("Failure callback should throw RuntimeException");
+        } catch (RuntimeException e) {
+            // expected - reset failure count
+            failures.set(0);
+        }
+
+        BlockingTask task = new BlockingTask(0, toThrow);
+        executor.submit(task, onSuccess, onError, phaser);
+        assertEquals(maxTasks - 1, availableSlots.getValue());
+
+        assertEquals(2, phaser.getUnarrivedParties());
+        task.latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(maxTasks, availableSlots.getValue());
+        assertEquals(0, successful.get());
+        assertEquals(1, failures.get());
+    }
+
+    @Test
+    public void handleUncaughtExceptionInSuccessCallback() {
+        // Ensure that if the success callback throws, a permit for submission is
+        // still returned and the phaser notified
+        int maxTasks = 5;
+        MetricRegistry metrics = metrics();
+        ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+        Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+        final AtomicInteger successful = new AtomicInteger(0);
+        final AtomicInteger failures = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) ->  { successful.incrementAndGet(); throw new RuntimeException("UNCAUGHT"); };
+        final Consumer<Throwable> onError =  (t) -> failures.incrementAndGet();
+        final Phaser phaser = new Phaser(1);
+
+        assertEquals(maxTasks, availableSlots.getValue());
+        try {
+            onSuccess.accept(0);
+            fail("Success callback should throw RuntimeException");
+        } catch (RuntimeException e) {
+            // expected - reset failure count
+            successful.set(0);
+        }
+
+        BlockingTask task = new BlockingTask(0);
+        executor.submit(task, onSuccess, onError, phaser);
+        assertEquals(maxTasks - 1, availableSlots.getValue());
+
+        assertEquals(2, phaser.getUnarrivedParties());
+        task.latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(maxTasks, availableSlots.getValue());
+        assertEquals(1, successful.get());
+        assertEquals(0, failures.get());
+    }
+
+    @Test
+    public void handleRejectedExecutionException() {
+        // In the case that the underlying ExecutorService rejects a task submission, a permit
+        // should be returned and the phaser notified
+        int maxTasks = 5;
+        final AtomicInteger successful = new AtomicInteger(0);
+        final AtomicInteger failures = new AtomicInteger(0);
+        final AtomicInteger rejections = new AtomicInteger(0);
+        final Consumer<Integer> onSuccess = (i) ->  successful.incrementAndGet();
+        final Consumer<Throwable> onError =  (t) -> failures.incrementAndGet();
+
+        MetricRegistry metrics = metrics();
+        ExecutorService rejectingExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+                                                                   new LinkedBlockingQueue<>(1),
+                                                                   (r, executor) -> { rejections.incrementAndGet();
+                                                                                      throw new RejectedExecutionException("REJECTED");});
+
+        ComparisonExecutor executor = new ComparisonExecutor(MoreExecutors.listeningDecorator(rejectingExecutor), maxTasks, metrics);
+        Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+        final Phaser phaser = new Phaser(1);
+
+        // Submit an initial pair of tasks to ensure that the underlying work queue is full
+        BlockingTask t0 = new BlockingTask(0);
+        BlockingTask t1 = new BlockingTask(1);
+        executor.submit(t0, onSuccess, onError, phaser);
+        executor.submit(t1, onSuccess, onError, phaser);
+        assertEquals(3, phaser.getUnarrivedParties());
+        assertEquals(maxTasks - 2, availableSlots.getValue());
+
+        // Submit a third task which will be rejected by the executor service
+        executor.submit(new BlockingTask(2), onSuccess, onError, phaser);
+        t0.latch.countDown();
+        t1.latch.countDown();
+
+        phaser.arriveAndAwaitAdvance();
+        assertEquals(maxTasks, availableSlots.getValue());
+        assertEquals(2, successful.get());
+        assertEquals(1, failures.get());
+        assertEquals(1, rejections.get());
+    }
+
+    class BlockingTask implements Callable<Integer> {
+
+        final int id;
+        final Exception e;
+        final CountDownLatch latch;
+
+        BlockingTask(int id) {
+            this(id, null);
+        }
+
+        BlockingTask(int id, Exception toThrow) {
+            this.id = id;
+            this.e = toThrow;
+            this.latch = new CountDownLatch(1);
+        }
+
+        public Integer call() throws Exception {
+            latch.await();
+            if (e != null)
+                throw e;
+            return id;
+        }
+    }
+
+    private static ListeningExecutorService executor(int threads) {
+        return MoreExecutors.listeningDecorator(
+            Executors.newFixedThreadPool(threads,
+                                         new ThreadFactoryBuilder().setNameFormat("partition-comparison-%d")
+                                                                   .setDaemon(true)
+                                                                   .build()));
+    }
+
+    private static MetricRegistry metrics() {
+        return new MetricRegistry();
+    }
+}
+
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
new file mode 100644
index 0000000..d8d92c7
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.diff.DiffJob;
+import org.apache.cassandra.diff.TokenHelper;
+
+import static org.junit.Assert.assertEquals;
+
+public class DiffJobTest
+{
+    @Test
+    public void testSplitsM3P()
+    {
+        splitTestHelper(TokenHelper.forPartitioner("Murmur3Partitioner"));
+    }
+    @Test
+    public void testSplitsRandom()
+    {
+        splitTestHelper(TokenHelper.forPartitioner("RandomPartitioner"));
+    }
+
+    private void splitTestHelper(TokenHelper tokens)
+    {
+        List<DiffJob.Split> splits = DiffJob.calculateSplits(50, 1, tokens);
+        assertEquals(splits.get(0).start, tokens.min());
+        DiffJob.Split prevSplit = null;
+        for (DiffJob.Split split : splits)
+        {
+            if (prevSplit != null)
+                assertEquals(prevSplit.end, split.start.subtract(BigInteger.ONE));
+            prevSplit = split;
+        }
+        assertEquals(splits.get(splits.size() - 1).end, tokens.max());
+        for (int i = 0; i < splits.size(); i++)
+            assertEquals(i, splits.get(i).splitNumber);
+    }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
new file mode 100644
index 0000000..73677d9
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.function.Function;
+
+import com.google.common.base.VerifyException;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.diff.DiffJob;
+import org.apache.cassandra.diff.Differ;
+import org.apache.cassandra.diff.RangeStats;
+import org.apache.cassandra.diff.TokenHelper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class DifferTest {
+
+    @Test(expected = VerifyException.class)
+    public void rejectNullStartOfRange() {
+        Differ.validateRange(null, BigInteger.TEN, TokenHelper.MURMUR3);
+    }
+
+    @Test(expected = VerifyException.class)
+    public void rejectNullEndOfRange() {
+        Differ.validateRange(BigInteger.TEN, null, TokenHelper.MURMUR3);
+    }
+
+    @Test(expected = VerifyException.class)
+    public void rejectWrappingRange() {
+        Differ.validateRange(BigInteger.TEN, BigInteger.ONE, TokenHelper.MURMUR3);
+    }
+
+    @Test(expected = VerifyException.class)
+    public void rejectRangeWithStartLessThanMinMurmurToken() {
+        Differ.validateRange(TokenHelper.MURMUR3.min().subtract(BigInteger.ONE),
+                             BigInteger.TEN,
+                             TokenHelper.MURMUR3);
+    }
+
+    @Test(expected = VerifyException.class)
+    public void rejectRangeWithEndGreaterThanMaxMurmurToken() {
+        Differ.validateRange(BigInteger.ONE,
+                             TokenHelper.MURMUR3.max().add(BigInteger.ONE),
+                             TokenHelper.MURMUR3);
+    }
+
+    @Test
+    public void filterTaskStatusForTables() {
+        // according to the journal:
+        // * t1 is already completed
+        // * t2 is started and has reported some progress, but has not completed
+        // * t3 has not reported any progress
+        DiffJob.Split split = new DiffJob.Split(1, 1, BigInteger.ONE, BigInteger.TEN);
+        Iterable<String> tables = Lists.newArrayList("t1", "t2", "t3");
+        Function<String, DiffJob.TaskStatus> journal = (table) -> {
+            switch (table) {
+                case "t1":
+                    return new DiffJob.TaskStatus(split.end, RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6));
+                case "t2":
+                    return new DiffJob.TaskStatus(BigInteger.valueOf(5L), RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5));
+                case "t3":
+                    return DiffJob.TaskStatus.EMPTY;
+                default:
+                    throw new AssertionError();
+            }
+        };
+
+        Map<String, DiffJob.TaskStatus> filtered = Differ.filterTables(tables, split, journal, false);
+        assertEquals(2, filtered.keySet().size());
+        assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
+        assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
+        assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
+        assertNull(filtered.get("t3").lastToken);
+
+        // if re-running (part of) a job because of failures or problematic partitions, we want to
+        // ignore the status of completed tasks and re-run them anyway as only specified tokens will
+        // be processed - so t1 should be included now
+        filtered = Differ.filterTables(tables, split, journal, true);
+        assertEquals(3, filtered.keySet().size());
+        assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6), filtered.get("t1").stats);
+        assertEquals(split.end, filtered.get("t1").lastToken);
+        assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
+        assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
+        assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
+        assertNull(filtered.get("t3").lastToken);
+    }
+
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
new file mode 100644
index 0000000..79b3638
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+import org.junit.Test;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.diff.PartitionComparator;
+import org.apache.cassandra.diff.PartitionStats;
+import org.apache.cassandra.diff.TableSpec;
+
+import static org.junit.Assert.assertEquals;
+
+public class PartitionComparatorTest {
+
+    @Test
+    public void sourceIsNull() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, null, rows(row(t, 0, 1, 2, 3)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, true, true, 0, 0, 0);
+    }
+
+    @Test
+    public void targetIsNull() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, rows(row(t, 0, 1, 2, 3)), null);
+        PartitionStats stats = comparator.call();
+        assertStats(stats, true, true, 0, 0, 0);
+    }
+
+    @Test
+    public void sourceIsEmpty() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, rows(), rows(row(t, 0, 1, 2, 3)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, false, 0, 0, 0);
+    }
+
+    @Test
+    public void targetIsEmpty() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, rows(row(t, 0, 1, 2, 3)), rows());
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, false, 0, 0, 0);
+    }
+
+    @Test
+    public void sourceAndTargetAreEmpty() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t, rows(), rows());
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 0, 0, 0);
+    }
+
+    @Test
+    public void sourceContainsExtraRowsAtStart() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 10, 11, 12, 13)),
+                                                    rows(row(t, 10, 11, 12, 13)));
+        PartitionStats stats = comparator.call();
+        // Comparison fails fast, so bails on the initial mismatch
+        assertStats(stats, false, false, 0, 0, 0);
+    }
+
+    @Test
+    public void targetContainsExtraRowsAtStart() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 10, 11, 12, 13)),
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 10, 11, 12, 13)));
+        PartitionStats stats = comparator.call();
+        // Comparison fails fast, so bails on the initial mismatch
+        assertStats(stats, false, false, 0, 0, 0);
+    }
+
+    @Test
+    public void sourceContainsExtraRowsAtEnd() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 10, 11, 12, 13)),
+                                                    rows(row(t, 0, 1, 2, 3)));
+        PartitionStats stats = comparator.call();
+        // The fact that the first row & all its v1 & v2 values match should be reflected in the stats
+        assertStats(stats, false, false, 1, 2, 0);
+    }
+
+    @Test
+    public void targetContainsExtraRowsAtEnd() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2, 3)),
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 10, 11, 12, 13)));
+        PartitionStats stats = comparator.call();
+        // The fact that the first row & all its v1 & v2 values match should be reflected in the stats
+        assertStats(stats, false, false, 1, 2, 0);
+    }
+
+    @Test
+    public void withoutClusteringsAllRowsMatching() {
+        TableSpec t = spec("table1", names(), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1),
+                                                         row(t, 1, 11),
+                                                         row(t, 2, 12)),
+                                                    rows(row(t, 0, 1),
+                                                         row(t, 1, 11),
+                                                         row(t, 2, 12)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 6, 0);
+    }
+
+    @Test
+    public void singleClusteringAllRowsMatching() {
+        TableSpec t = spec("table1", names("c1"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2),
+                                                         row(t, 1, 11, 12),
+                                                         row(t, 2, 21, 22)),
+                                                    rows(row(t, 0, 1, 2),
+                                                         row(t, 1, 11, 12),
+                                                         row(t, 2, 21, 22)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 6, 0);
+    }
+
+    @Test
+    public void multipleClusteringAllRowsMatching() {
+        TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 1, 11, 12, 13),
+                                                         row(t, 2, 21, 22, 23)),
+                                                    rows(row(t, 0, 1, 2, 3),
+                                                         row(t, 1, 11, 12, 13),
+                                                         row(t, 2, 21, 22, 23)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 6, 0);
+    }
+
+    @Test
+    public void withoutClusteringsWithMismatches() {
+        TableSpec t = spec("table1", names(), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1),
+                                                         row(t, 1, 11),
+                                                         row(t, 2, 12)),
+                                                    rows(row(t, 0, 1),
+                                                         row(t, 1, 1100),
+                                                         row(t, 2, 1200)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 4, 2);
+    }
+
+    @Test
+    public void singleClusteringWithMismatches() {
+        TableSpec t = spec("table1", names("c1"), names("v1", "v2"));
+        PartitionComparator comparator = comparator(t,
+                                                    rows(row(t, 0, 1, 2),
+                                                         row(t, 1, 11, 21),
+                                                         row(t, 2, 12, 22)),
+                                                    rows(row(t, 0, 1, 20),
+                                                         row(t, 1, 1100, 21),
+                                                         row(t, 2, 12, 1200)));
+        PartitionStats stats = comparator.call();
+        assertStats(stats, false, true, 3, 3, 3);
+    }
+
+    private void assertStats(PartitionStats stats,
+                             boolean skipped,
+                             boolean clusteringsMatch,
+                             int matchedRows,
+                             int matchedValues,
+                             int mismatchedValues) {
+        assertEquals(skipped, stats.skipped);
+        assertEquals(clusteringsMatch, stats.allClusteringsMatch);
+        assertEquals(matchedRows, stats.matchedRows);
+        assertEquals(matchedValues, stats.matchedValues);
+        assertEquals(mismatchedValues, stats.mismatchedValues);
+    }
+
+    PartitionComparator comparator(TableSpec table, Iterator<Row> source, Iterator<Row> target) {
+        return new PartitionComparator(table, source, target);
+    }
+
+    List<String> names(String...names) {
+        return Lists.newArrayList(names);
+    }
+
+    TableSpec spec(String table, List<String> clusteringColumns, List<String> regularColumns) {
+        return new TableSpec(table, columns(clusteringColumns), columns(regularColumns));
+    }
+
+    List<ColumnMetadata> columns(List<String> names) {
+        return names.stream().map(ColumnMetadataHelper::column).collect(Collectors.toList());
+    }
+
+    Iterator<Row> rows(Row...rows) {
+        return new AbstractIterator<Row>() {
+            int i = 0;
+            protected Row computeNext() {
+                if (i < rows.length)
+                    return rows[i++];
+                return endOfData();
+            }
+        };
+    }
+
+    Row row(TableSpec table, Object...values) {
+        return new TestRow(Stream.concat(table.getClusteringColumns().stream(),
+                                         table.getRegularColumns().stream())
+                                 .map(ColumnMetadata::getName).toArray(String[]::new),
+                           values);
+    }
+
+    static class TestRow implements Row {
+        private final String[] names;
+        private final Object[] values;
+
+        TestRow(String[] names, Object[] values) {
+            if (names.length != values.length)
+                throw new IllegalArgumentException(String.format("Number of column names (%d) doesn't " +
+                                                                 "match number of values(%d)",
+                                                                 names.length, values.length));
+            this.names = names;
+            this.values = values;
+        }
+
+        // Only getObject(String) is used by PartitionComparator
+        public Object getObject(String s) {
+            for (int i=0; i < names.length; i++)
+                if (names[i].equals(s))
+                    return values[i];
+
+            throw new IllegalArgumentException(s + " is not a column defined in this metadata");
+        }
+
+        public boolean isNull(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean getBool(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public byte getByte(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public short getShort(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public int getInt(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getLong(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Date getTimestamp(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public LocalDate getDate(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getTime(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public float getFloat(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public double getDouble(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer getBytesUnsafe(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer getBytes(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public String getString(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public BigInteger getVarint(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public BigDecimal getDecimal(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public UUID getUUID(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public InetAddress getInet(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> List<T> getList(String s, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> List<T> getList(String s, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> Set<T> getSet(String s, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> Set<T> getSet(String s, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <K, V> Map<K, V> getMap(String s, Class<K> aClass, Class<V> aClass1) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <K, V> Map<K, V> getMap(String s, TypeToken<K> typeToken, TypeToken<V> typeToken1) {
+            throw new UnsupportedOperationException();
+        }
+
+        public UDTValue getUDTValue(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public TupleValue getTupleValue(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(String s, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(String s, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(String s, TypeCodec<T> typeCodec) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ColumnDefinitions getColumnDefinitions() {
+            throw new UnsupportedOperationException();
+        }
+
+        public Token getToken(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Token getToken(String s) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Token getPartitionKeyToken() {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean isNull(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public boolean getBool(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public byte getByte(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public short getShort(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public int getInt(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getLong(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Date getTimestamp(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public LocalDate getDate(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public long getTime(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public float getFloat(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public double getDouble(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer getBytesUnsafe(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer getBytes(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public String getString(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public BigInteger getVarint(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public BigDecimal getDecimal(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public UUID getUUID(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public InetAddress getInet(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> List<T> getList(int i, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> List<T> getList(int i, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> Set<T> getSet(int i, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> Set<T> getSet(int i, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <K, V> Map<K, V> getMap(int i, Class<K> aClass, Class<V> aClass1) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <K, V> Map<K, V> getMap(int i, TypeToken<K> typeToken, TypeToken<V> typeToken1) {
+            throw new UnsupportedOperationException();
+        }
+
+        public UDTValue getUDTValue(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public TupleValue getTupleValue(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public Object getObject(int i) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(int i, Class<T> aClass) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(int i, TypeToken<T> typeToken) {
+            throw new UnsupportedOperationException();
+        }
+
+        public <T> T get(int i, TypeCodec<T> typeCodec) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java
new file mode 100644
index 0000000..0484212
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import com.google.common.collect.*;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Token;
+import org.jetbrains.annotations.NotNull;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.apache.cassandra.diff.TestUtils.assertThreadWaits;
+
+public class RangeComparatorTest {
+
+    private Multimap<BigInteger, Throwable> errors = HashMultimap.create();
+    private BiConsumer<Throwable, BigInteger> errorReporter = (e, t) -> errors.put(t, e);
+    private Multimap<BigInteger, MismatchType> mismatches = HashMultimap.create();
+    private BiConsumer<MismatchType, BigInteger> mismatchReporter = (m, t) -> mismatches.put(t, m);
+    private Multimap<BigInteger, RangeStats> journal= HashMultimap.create();
+    private BiConsumer<RangeStats, BigInteger> progressReporter = (r, t) -> journal.put(t, copyOf(r));
+    private Set<BigInteger> comparedPartitions = new HashSet<>();
+    private ComparisonExecutor executor = ComparisonExecutor.newExecutor(1, new MetricRegistry());
+
+    @Test
+    public void emptyRange() {
+        RangeComparator comparator = comparator(context(100L, 100L));
+        RangeStats stats = comparator.compare(keys(), keys(), this::alwaysMatch);
+        assertTrue(stats.isEmpty());
+        assertNothingReported(errors, mismatches, journal);
+        assertCompared();
+    }
+
+    @Test
+    public void sourceAndTargetKeysAreEmpty() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(), keys(), this::alwaysMatch);
+        assertTrue(stats.isEmpty());
+        assertNothingReported(errors, mismatches, journal);
+        assertCompared();
+    }
+
+    @Test
+    public void partitionPresentOnlyInSource() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(1, stats.getOnlyInSource());
+        assertEquals(0, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(2, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(0, 1);
+    }
+
+    @Test
+    public void partitionPresentOnlyInTarget() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(7, 8), keys(7, 8, 9), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(0, stats.getOnlyInSource());
+        assertEquals(1, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(9, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(7, 8);
+    }
+
+    @Test
+    public void multipleOnlyInSource() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(4, 5, 6, 7, 8), keys(4, 5), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(3, stats.getOnlyInSource());
+        assertEquals(0, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(6, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(7, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(8, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(4, 5);
+    }
+
+    @Test
+    public void multipleOnlyInTarget() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(4, 5), keys(4, 5, 6, 7, 8), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(0, stats.getOnlyInSource());
+        assertEquals(3, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(6, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(7, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(8, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(4, 5);
+    }
+
+    @Test
+    public void multipleOnlyInBoth() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 3, 5, 7, 9), keys(0, 2, 4, 6, 8, 9), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(4, stats.getOnlyInSource());
+        assertEquals(4, stats.getOnlyInTarget());
+        assertEquals(2, stats.getMatchedPartitions());
+        assertReported(1, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(3, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(5, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(7, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(2, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(4, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(6, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(8, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared(0, 9);
+    }
+
+    @Test
+    public void sourceKeysIsEmpty() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(), keys(4, 5), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(0, stats.getOnlyInSource());
+        assertEquals(2, stats.getOnlyInTarget());
+        assertEquals(0, stats.getMatchedPartitions());
+        assertReported(4, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertReported(5, MismatchType.ONLY_IN_TARGET, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared();
+    }
+
+    @Test
+    public void targetKeysIsEmpty() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(4, 5), keys(), this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(2, stats.getOnlyInSource());
+        assertEquals(0, stats.getOnlyInTarget());
+        assertEquals(0, stats.getMatchedPartitions());
+        assertReported(4, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertReported(5, MismatchType.ONLY_IN_SOURCE, mismatches);
+        assertNothingReported(errors, journal);
+        assertCompared();
+    }
+
+    @Test
+    public void skipComparisonOfDisallowedTokens() {
+        RangeComparator comparator = comparator(context(0L, 100L, 2, 3, 4));
+        RangeStats stats = comparator.compare(keys(0, 1, 2, 3, 4, 5, 6),
+                                              keys(0, 1, 2, 3, 4, 5, 6),
+                                              this::alwaysMatch);
+        assertFalse(stats.isEmpty());
+        assertEquals(0, stats.getOnlyInSource());
+        assertEquals(0, stats.getOnlyInTarget());
+        assertEquals(4, stats.getMatchedPartitions());
+        assertEquals(3, stats.getSkippedPartitions());
+        assertNothingReported(errors,  mismatches, journal);
+        assertCompared(0, 1, 5 , 6);
+    }
+
+    @Test
+    public void handleErrorReadingFirstSourceKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(0, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+        assertCompared();
+    }
+
+    @Test
+    public void handleErrorReadingFirstTargetKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(0, 1, 2), keys(0, toThrow, 0, 1, 2), toThrow);
+        assertCompared();
+    }
+
+    @Test
+    public void handleErrorReadingSourceKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(1, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+        assertCompared(0);
+    }
+
+    @Test
+    public void handleErrorReadingTargetKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(0, 1, 2), keys(1, toThrow, 0, 1, 2), toThrow);
+        assertCompared(0);
+    }
+
+    @Test
+    public void handleReadingLastSourceKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(2, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+        assertCompared(0, 1);
+    }
+
+    @Test
+    public void handleReadingLastTargetKey() {
+        RuntimeException toThrow = new RuntimeException("Test");
+        testErrorReadingKey(keys(0, 1, 2), keys(2, toThrow, 0, 1, 2), toThrow);
+        assertCompared(0, 1);
+    }
+
+    @Test
+    public void handleErrorConstructingFirstTask() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringConstruction(0, expected));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(1, 2);
+        assertReported(0, expected, errors);
+    }
+
+    @Test
+    public void handleErrorConstructingTask() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringConstruction(1, expected));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(0, 2);
+        assertReported(1, expected, errors);
+    }
+
+    @Test
+    public void handleErrorConstructingLastTask() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringConstruction(2, expected));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(0, 1);
+        assertReported(2, expected, errors);
+    }
+
+    @Test
+    public void handleTaskErrorOnFirstPartition() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringExecution(expected, 0));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(1, 2);
+        assertReported(0, expected, errors);
+    }
+
+    @Test
+    public void handleTaskErrorOnPartition() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringExecution(expected, 1));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(0, 2);
+        assertReported(1, expected, errors);
+    }
+
+    @Test
+    public void handleTaskErrorOnLastPartition() {
+        RuntimeException expected = new RuntimeException("Test");
+        RangeStats stats = testTaskError(throwDuringExecution(expected, 2));
+        assertEquals(1, stats.getErrorPartitions());
+        assertCompared(0, 1);
+        assertReported(2, expected, errors);
+    }
+
+    @Test
+    public void checkpointEveryTenPartitions() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        comparator.compare(keys(LongStream.range(0, 25).toArray()),
+                           keys(LongStream.range(0, 25).toArray()),
+                           this::alwaysMatch);
+        assertReported(9, RangeStats.withValues(10, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+        assertReported(19, RangeStats.withValues(20, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+        assertEquals(2, journal.keySet().size());
+    }
+
+    @Test
+    public void recordHighestSeenPartitionWhenTasksCompleteOutOfOrder() {
+        // every 10 partitions, the highest seen token is reported to the journal. Here,
+        // randomise the iteration of the keys to simulate tasks completing out of order
+        RangeComparator comparator = comparator(context(0L, 100L));
+        long[] tokens = new long[] {2, 8, 1, 4, 100, 3, 5, 7, 6, 9};
+        comparator.compare(keys(tokens),
+                           keys(tokens),
+                           this::alwaysMatch);
+        assertReported(100, RangeStats.withValues(10, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+        assertEquals(1, journal.keySet().size());
+        assertNothingReported(errors, mismatches);
+        assertCompared(tokens);
+    }
+
+    @Test
+    public void recordHighestSeenPartitionWhenTasksCompleteOutOfOrderWithErrors() {
+        // 2 partitions will error during comparison, but after 10 successful comparisons
+        // we'll still report progress to the journal
+        RuntimeException toThrow = new RuntimeException("Test");
+        RangeComparator comparator = comparator(context(0L, 100L));
+        long[] tokens = new long[] {2, 8, 1, 11, 4, 100, 3, 5, 7, 6, 9, 0};
+        comparator.compare(keys(tokens), keys(tokens), throwDuringExecution(toThrow, 3, 2));
+        assertReported(100, RangeStats.withValues(10, 0L, 2L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+        assertEquals(1, journal.keySet().size());
+        assertReported(2, toThrow, errors);
+        assertReported(3, toThrow, errors);
+        assertEquals(2, errors.keySet().size());
+        assertNothingReported(mismatches);
+        // the erroring tasks don't get counted in the test as compared
+        assertCompared(8, 1, 11, 4, 100, 5, 7, 6, 9, 0);
+    }
+
+    @Test
+    public void rowLevelMismatchIncrementsPartitionMismatches() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::rowMismatch);
+        assertEquals(0, stats.getMatchedPartitions());
+        assertEquals(3, stats.getMismatchedPartitions());
+        assertEquals(0, stats.getMatchedValues());
+        assertEquals(0, stats.getMismatchedValues());
+        assertNothingReported(errors, journal);
+        assertReported(0, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertReported(1, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertReported(2, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertCompared(0, 1, 2);
+    }
+
+    @Test
+    public void valueMismatchIncrementsPartitionMismatches() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::valuesMismatch);
+        assertEquals(0, stats.getMatchedPartitions());
+        assertEquals(3, stats.getMismatchedPartitions());
+        assertEquals(0, stats.getMatchedValues());
+        assertEquals(30, stats.getMismatchedValues());
+        assertNothingReported(errors, journal);
+        assertReported(0, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertReported(1, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertReported(2, MismatchType.PARTITION_MISMATCH, mismatches);
+        assertCompared(0, 1, 2);
+    }
+
+    @Test
+    public void matchingPartitionIncrementsCount() {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::alwaysMatch);
+        assertEquals(3, stats.getMatchedPartitions());
+        assertEquals(0, stats.getMismatchedPartitions());
+        assertEquals(0, stats.getMatchedValues());
+        assertEquals(0, stats.getMismatchedValues());
+        assertNothingReported(errors, mismatches, journal);
+        assertCompared(0, 1, 2);
+    }
+
+    @Test
+    public void waitForAllInFlightTasksToComplete() throws InterruptedException {
+        CountDownLatch taskSubmissions = new CountDownLatch(2);
+        List<CountDownLatch> taskGates = Lists.newArrayList(new CountDownLatch(1), new CountDownLatch(1));
+        RangeComparator comparator = comparator(context(0L, 100L));
+
+        Thread t = new Thread(() -> comparator.compare(keys(0, 1),
+                                                       keys(0, 1),
+                                                       waitUntilNotified(taskSubmissions, taskGates)),
+                              "CallingThread");
+        t.setDaemon(true);
+        t.start();
+
+        // wait for both tasks to be submitted then check that the calling thread enters a waiting state
+        taskSubmissions.await();
+        assertThreadWaits(t);
+
+        // let the first task complete and check that the calling thread is still waiting
+        taskGates.get(0).countDown();
+        assertThreadWaits(t);
+
+        // let the second task run and wait for the caller to terminate
+        taskGates.get(1).countDown();
+        t.join();
+
+        assertNothingReported(errors, mismatches, journal);
+        assertCompared(0, 1);
+    }
+
+    private RangeStats testTaskError(Function<PartitionKey, PartitionComparator> taskSupplier) {
+        RangeComparator comparator = comparator(context(0L, 100L));
+        Iterator<PartitionKey> sourceKeys = keys(0, 1, 2);
+        Iterator<PartitionKey> targetKeys = keys(0, 1, 2);
+        RangeStats stats = comparator.compare(sourceKeys, targetKeys, taskSupplier);
+        assertNothingReported(mismatches, journal);
+        return stats;
+    }
+
+    private void testErrorReadingKey(Iterator<PartitionKey> sourceKeys,
+                                     Iterator<PartitionKey> targetKeys,
+                                     Exception expected) {
+
+        RangeComparator comparator = comparator(context(0L, 100L));
+        try {
+            comparator.compare(sourceKeys, targetKeys, this::alwaysMatch);
+            fail("Expected exception " + expected.getLocalizedMessage());
+        } catch (Exception e) {
+            assertEquals(expected, e.getCause());
+        }
+        assertNothingReported(errors,  mismatches, journal);
+    }
+
+    private void assertCompared(long...tokens) {
+        assertEquals(comparedPartitions.size(), tokens.length);
+        for(long t : tokens)
+            assertTrue(comparedPartitions.contains(BigInteger.valueOf(t)));
+    }
+
+    private void assertNothingReported(Multimap...reported) {
+        for (Multimap m : reported)
+            assertTrue(m.isEmpty());
+    }
+
+    private <T> void assertReported(long token, T expected, Multimap<BigInteger, T> reported) {
+        Collection<T> values = reported.get(BigInteger.valueOf(token));
+        assertEquals(1, values.size());
+        assertEquals(expected, values.iterator().next());
+    }
+
+    Iterator<PartitionKey> keys(long throwAtToken, RuntimeException e, long...tokens) {
+        return new AbstractIterator<PartitionKey>() {
+            int i = 0;
+            protected PartitionKey computeNext() {
+                if (i < tokens.length) {
+                    long t = tokens[i++];
+                    if (t == throwAtToken)
+                        throw e;
+
+                    return key(t);
+                }
+                return endOfData();
+            }
+        };
+    }
+
+    Iterator<PartitionKey> keys(long...tokens) {
+        return new AbstractIterator<PartitionKey>() {
+            int i = 0;
+            protected PartitionKey computeNext() {
+                if (i < tokens.length)
+                    return key(tokens[i++]);
+                return endOfData();
+            }
+        };
+    }
+
+    // yield a PartitionComparator which always concludes that partitions being compared are identical
+    PartitionComparator alwaysMatch(PartitionKey key) {
+        return new PartitionComparator(null, null, null) {
+            public PartitionStats call() {
+                comparedPartitions.add(key.getTokenAsBigInteger());
+                return new PartitionStats();
+            }
+        };
+    }
+
+    // yield a PartitionComparator which always determines that the partitions have a row-level mismatch
+    PartitionComparator rowMismatch(PartitionKey key) {
+        return new PartitionComparator(null, null,  null) {
+            public PartitionStats call() {
+                comparedPartitions.add(key.getTokenAsBigInteger());
+                PartitionStats stats = new PartitionStats();
+                stats.allClusteringsMatch = false;
+                return stats;
+            }
+        };
+    }
+
+    // yield a PartitionComparator which always determines that the partitions have a 10 mismatching values
+    PartitionComparator valuesMismatch(PartitionKey key) {
+        return new PartitionComparator(null, null,  null) {
+            public PartitionStats call() {
+                comparedPartitions.add(key.getTokenAsBigInteger());
+                PartitionStats stats = new PartitionStats();
+                stats.mismatchedValues = 10;
+                return stats;
+            }
+        };
+    }
+
+    // yield a function which throws when creating a PartitionComparator for a give token
+    // simulates an error reading the source or target partition from the source or target
+    // cluster when constructing the task
+    Function<PartitionKey, PartitionComparator> throwDuringConstruction(long throwAt, RuntimeException toThrow) {
+        return (key) -> {
+            BigInteger t = key.getTokenAsBigInteger();
+            if (t.longValue() == throwAt)
+                throw toThrow;
+
+            return alwaysMatch(key);
+        };
+    }
+
+    // yields a PartitionComparator which throws the supplied exception when the token matches
+    // the one specified to simulate an error when processing the comparison
+    Function<PartitionKey, PartitionComparator> throwDuringExecution(RuntimeException toThrow, long...throwAt) {
+        return (key) -> {
+            BigInteger t = key.getTokenAsBigInteger();
+            return new PartitionComparator(null, null, null) {
+                public PartitionStats call() {
+                    for (long shouldThrow : throwAt)
+                        if (t.longValue() == shouldThrow)
+                           throw toThrow;
+
+                    comparedPartitions.add(t);
+                    return new PartitionStats();
+                }
+            };
+        };
+    }
+
+    // yields a PartitionComparator which waits on a CountDownLatch before returning from call(). The latches
+    // are supplied in an iterator and each successive task yielded uses the next supplied latch so that callers
+    // can control the rate of task progress
+    // The other supplied latch, firstTaskStarted, is used to signal to the caller that execution of the first
+    // task has started, so the test doesn't complete before this happens
+    Function<PartitionKey, PartitionComparator> waitUntilNotified(final CountDownLatch taskSubmissions,
+                                                                  final List<CountDownLatch> taskGates) {
+        final Iterator<CountDownLatch> gateIter = taskGates.iterator();
+        return (key) -> {
+
+            BigInteger t = key.getTokenAsBigInteger();
+            taskSubmissions.countDown();
+
+            return new PartitionComparator(null, null, null) {
+                public PartitionStats call() {
+                    if (!gateIter.hasNext())
+                        fail("Expected a latch to control task progress");
+
+                    try {
+                        gateIter.next().await();
+                    }
+                    catch (InterruptedException e) {
+                        e.printStackTrace();
+                        fail("Interrupted");
+                    }
+                    comparedPartitions.add(t);
+                    return new PartitionStats();
+                }
+            };
+        };
+    }
+
+    PartitionKey key(long token) {
+        return new TestPartitionKey(token);
+    }
+
+    RangeComparator comparator(DiffContext context) {
+        return new RangeComparator(context, errorReporter, mismatchReporter, progressReporter, executor);
+    }
+
+    DiffContext context(long startToken, long endToken, long...disallowedTokens) {
+        return new TestContext(BigInteger.valueOf(startToken),
+                               BigInteger.valueOf(endToken),
+                               new SpecificTokens(Arrays.stream(disallowedTokens)
+                                                        .mapToObj(BigInteger::valueOf)
+                                                        .collect(Collectors.toSet()),
+                                                  SpecificTokens.Modifier.REJECT));
+    }
+
+    DiffContext context(long startToken, long endToken) {
+        return new TestContext(BigInteger.valueOf(startToken), BigInteger.valueOf(endToken), SpecificTokens.NONE);
+    }
+
+    RangeStats copyOf(RangeStats stats) {
+        return RangeStats.withValues(stats.getMatchedPartitions(),
+                                     stats.getMismatchedPartitions(),
+                                     stats.getErrorPartitions(),
+                                     stats.getSkippedPartitions(),
+                                     stats.getOnlyInSource(),
+                                     stats.getOnlyInTarget(),
+                                     stats.getMatchedRows(),
+                                     stats.getMatchedValues(),
+                                     stats.getMismatchedValues());
+    }
+
+    static class TestPartitionKey extends PartitionKey {
+        final Token token;
+
+        TestPartitionKey(final long tokenValue) {
+            super(null);
+            token = new Token() {
+
+                public DataType getType() {
+                    return DataType.bigint();
+                }
+
+                public Object getValue() {
+                    return tokenValue;
+                }
+
+                public ByteBuffer serialize(ProtocolVersion protocolVersion) {
+                    return null;
+                }
+
+                public int compareTo(@NotNull Token o) {
+                    assert o.getValue() instanceof Long;
+                    return Long.compare(tokenValue, (long)o.getValue());
+                }
+            };
+        }
+
+        protected Token getToken() {
+            return token;
+        }
+    }
+
+    static class TestContext extends DiffContext {
+
+        public TestContext(BigInteger startToken,
+                           BigInteger endToken,
+                           SpecificTokens specificTokens) {
+            super(null, null, null, null, startToken, endToken, specificTokens, 0.0);
+        }
+    }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java b/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java
new file mode 100644
index 0000000..ad9e1c4
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import static org.junit.Assert.fail;
+
+public class TestUtils {
+
+    public static void assertThreadWaits(Thread t) {
+        for (int i=0; i < 1000; i++) {
+            try {
+                Thread.sleep(1);
+            }
+            catch (InterruptedException e) {
+                fail("Unexpected InterruptedException");
+            }
+            if (t.getState() == Thread.State.WAITING)
+                return;
+        }
+        fail(String.format("Thread %s expected to enter WAITING state, but failed to do so", t.getName()));
+    }
+
+}
diff --git a/spark-job/src/test/resources/cql-stress-narrow1.yaml b/spark-job/src/test/resources/cql-stress-narrow1.yaml
new file mode 100644
index 0000000..dae375b
--- /dev/null
+++ b/spark-job/src/test/resources/cql-stress-narrow1.yaml
@@ -0,0 +1,62 @@
+#
+# Keyspace info
+#
+keyspace: difftest
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+  CREATE KEYSPACE difftest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: narrow1
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+  CREATE TABLE narrow1 (
+        pk int,
+        v1 text,
+        v2 int,
+        PRIMARY KEY(pk)
+  ) 
+    WITH compaction = { 'class':'LeveledCompactionStrategy' }
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows.  Supported types are
+# 
+#      EXP(min..max)                        An exponential distribution over the range [min..max]
+#      EXTREME(min..max,shape)              An extreme value (Weibull) distribution over the range [min..max]
+#      GAUSSIAN(min..max,stdvrng)           A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+#      GAUSSIAN(min..max,mean,stdev)        A gaussian/normal distribution, with explicitly defined mean and stdev
+#      UNIFORM(min..max)                    A uniform distribution over the range [min, max]
+#      FIXED(val)                           A fixed distribution, always returning the same value
+#      Aliases: extr, gauss, normal, norm, weibull
+#
+#      If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(4..8), population: uniform(1..100B), cluster: fixed(1)
+#
+columnspec:
+  - name: pk
+    population: uniform(1..1B)
+  - name: v1
+    size: fixed(20)
+  - name: v2
+    population: uniform(1..100)
+
+insert:
+  partitions: fixed(50)       # number of unique partitions to update in a single operation
+  batchtype: UNLOGGED         # type of batch to use
+
+queries:
+   simple1:
+      cql: select * from narrow1 where pk = ? LIMIT 100
+      fields: samerow         # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
diff --git a/spark-job/src/test/resources/cql-stress-wide1.yaml b/spark-job/src/test/resources/cql-stress-wide1.yaml
new file mode 100644
index 0000000..145ecce
--- /dev/null
+++ b/spark-job/src/test/resources/cql-stress-wide1.yaml
@@ -0,0 +1,69 @@
+#
+# Keyspace info
+#
+keyspace: difftest
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+  CREATE KEYSPACE difftest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: wide1
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+  CREATE TABLE wide1 (
+        pk int,
+        c1 int,
+        c2 int,
+        v1 text,
+        v2 int,
+        PRIMARY KEY(pk, c1, c2)
+  ) 
+    WITH compaction = { 'class':'LeveledCompactionStrategy' }
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows.  Supported types are
+# 
+#      EXP(min..max)                        An exponential distribution over the range [min..max]
+#      EXTREME(min..max,shape)              An extreme value (Weibull) distribution over the range [min..max]
+#      GAUSSIAN(min..max,stdvrng)           A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+#      GAUSSIAN(min..max,mean,stdev)        A gaussian/normal distribution, with explicitly defined mean and stdev
+#      UNIFORM(min..max)                    A uniform distribution over the range [min, max]
+#      FIXED(val)                           A fixed distribution, always returning the same value
+#      Aliases: extr, gauss, normal, norm, weibull
+#
+#      If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(4..8), population: uniform(1..100B), cluster: fixed(1)
+#
+columnspec:
+  - name: pk
+    population: uniform(1..1B)
+  - name: c1
+    cluster: uniform(1..100)
+  - name: c2
+    cluster: uniform(1..100)
+  - name: v1
+    size: fixed(20)
+  - name: v2
+    population: uniform(1..100)
+
+insert:
+  partitions: fixed(1)       # number of unique partitions to update in a single operation
+  batchtype: UNLOGGED        # type of batch to use
+
+
+queries:
+   simple1:
+      cql: select * from wide where pk = ? AND c1 = ? AND c2 = ? LIMIT 1
+      fields: samerow