initial commit
Co-authored-by: Bhaskar Muppana <mbhaskar@apple.com>
Co-authored-by: Jeff Jirsa <jjirsa@apple.com>
Co-authored-by: Marcus Eriksson <marcuse@apache.org>
Co-authored-by: Michael Kjellman <kjellman@apple.com>
Co-authored-by: Nate McCall <zznate.m@gmail.com>
Co-authored-by: Sam Tunnicliffe <sam@beobal.com>
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..7b48652
--- /dev/null
+++ b/README.md
@@ -0,0 +1,86 @@
+# Cassandra diff
+
+## Configuration
+See `spark-job/localconfig.yaml` for an example config.
+
+## Custom cluster providers
+To make it easy to run in any environment the cluster providers are pluggable - there are two interfaces to implement.
+First, the `ClusterProvider` interface is used to create a connection to the clusters, and it is configured using
+`JobConfiguration#clusterConfig` (see below).
+### cluster_config
+This section has 3 parts - `source`, `target` and `metadata` where source and target describes the clusters that should
+be compared and metadata describes where we store information about any mismatches and the progress the job has done.
+Metadata can be stored in one of the source/target clusters or in a separate cluster.
+
+The fields under source/target/metadata are passed in to the `ClusterProvider` (described by `impl`) as a map, so any
+custom cluster providers can be configured here.
+
+## Setting up clusters for diff
+One way of setting up clusters for diff is to restore a snapshot to two different clusters and then modifying one
+of the clusters to be able to make sure that the queries still return the same results. This could include
+upgrades/replacements/bounces/decommission/expansion.
+
+## Environment variables
+Currently usernames and passwords are set as environment variables when running the diff tool and the api server:
+
+* `diff.cluster.<identifier>.cql_user` - the user name to use
+* `diff.cluster.<identifier>.cql_password` - password
+
+where `<identifier>` should be `source`, `target` and `metadata` for the username/password combinations for the
+matching clusters in the configuration.
+
+## Example
+This example starts two cassandra single-node clusters in docker, runs stress to populate them and then runs diff
+to make sure the data matches;
+
+You need to have docker and spark setup.
+
+```shell script
+$ git clone <wherever>/cassandra-diff.git
+$ cd cassandra-diff
+$ mvn package
+$ docker run --name cas-src -d -p 9042:9042 cassandra:3.0.18
+$ docker run --name cas-tgt -d -p 9043:9042 cassandra:latest
+$ docker exec cas-src cassandra-stress write n=1k
+$ docker exec cas-tgt cassandra-stress write n=1k
+$ spark-submit --verbose --files ./spark-job/localconfig.yaml --class org.apache.cassandra.diff.DiffJob spark-job/target/spark-job-0.1-SNAPSHOT.jar localconfig.yaml
+# ... logs
+INFO DiffJob:124 - FINISHED: {standard1=Matched Partitions - 1000, Mismatched Partitions - 0, Partition Errors - 0, Partitions Only In Source - 0, Partitions Only In Target - 0, Skipped Partitions - 0, Matched Rows - 1000, Matched Values - 6000, Mismatched Values - 0 }
+## start api-server:
+$ mvn install
+$ cd api-server
+$ mvn exec:java
+$ curl -s localhost:8089/jobs/recent | python -mjson.tool
+ [
+ {
+ "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+ "buckets": 100,
+ "keyspace": "keyspace1",
+ "tables": [
+ "standard1"
+ ],
+ "sourceClusterName": "local_test_1",
+ "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+ "targetClusterName": "local_test_2",
+ "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+ "tasks": 10000,
+ "start": "2019-08-16T11:47:36.123Z"
+ }
+ ]
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | python -mjson.tool
+ [
+ {
+ "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+ "table": "standard1",
+ "matchedPartitions": 1000,
+ "mismatchedPartitions": 0,
+ "matchedRows": 1000,
+ "matchedValues": 6000,
+ "mismatchedValues": 0,
+ "onlyInSource": 0,
+ "onlyInTarget": 0,
+ "skippedPartitions": 0
+ }
+ ]
+
+```
diff --git a/api-server/README.md b/api-server/README.md
new file mode 100644
index 0000000..254601c
--- /dev/null
+++ b/api-server/README.md
@@ -0,0 +1,117 @@
+# Diff API Server
+## Configuration
+See main project README - the api server reads the `metadata_options` and `cluster_config.metadata` to connect
+
+## Running locally
+`mvn exec:java`
+
+## Endpoints
+### `/jobs/running/id`
+Returns the ids of currently running jobs
+
+### `/jobs/running`
+Summaries of all currently running jobs
+
+### `/jobs/recent`
+Summaries of recently run jobs
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/recent | python -mjson.tool
+ [
+ {
+ "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+ "buckets": 100,
+ "keyspace": "keyspace1",
+ "tables": [
+ "standard1"
+ ],
+ "sourceClusterName": "local_test_1",
+ "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+ "targetClusterName": "local_test_2",
+ "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+ "tasks": 10000,
+ "start": "2019-08-16T11:47:36.123Z"
+ }
+ ]
+
+```
+
+### `/jobs/{jobid}`
+Summary about a single job
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a | python -mjson.tool
+ {
+ "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+ "buckets": 100,
+ "keyspace": "keyspace1",
+ "tables": [
+ "standard1"
+ ],
+ "sourceClusterName": "local_test_1",
+ "sourceClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+ "targetClusterName": "local_test_2",
+ "targetClusterDesc": "ContactPoints Cluster: name=name, dc=datacenter1, contact points= [127.0.0.1]",
+ "tasks": 10000,
+ "start": "2019-08-16T11:47:36.123Z"
+ }
+```
+
+### `/jobs/{jobid}/results`
+The results for the given job.
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/results | python -mjson.tool
+[
+ {
+ "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+ "table": "standard1",
+ "matchedPartitions": 1000,
+ "mismatchedPartitions": 0,
+ "matchedRows": 1000,
+ "matchedValues": 6000,
+ "mismatchedValues": 0,
+ "onlyInSource": 0,
+ "onlyInTarget": 0,
+ "skippedPartitions": 0
+ }
+]
+```
+
+### `/jobs/{jobid}/status`
+Current status for a job, shows how many splits have been finished.
+Example:
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/status | python -mjson.tool
+{
+ "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+ "completedByTable": {
+ "standard1": 10000
+ }
+}
+```
+
+### `/jobs/{jobid}/mismatches`
+Number of mismatches for the job.
+```shell script
+$ curl -s localhost:8089/jobs/99b8d556-07ed-4bfd-b978-7d9b7b2cc21a/mismatches | python -mjson.tool
+ {
+ "jobId": "99b8d556-07ed-4bfd-b978-7d9b7b2cc21a",
+ "mismatchesByTable": {}
+ }
+```
+
+### `/jobs/{jobid}/errors/summary`
+Summary of the number errors for the job.
+
+### `/jobs/{jobid}/errors/ranges`
+Lists failed ranges for the job.
+
+### `/jobs/{jobid}/errors`
+Details about the job errors.
+
+### `/jobs/by-start-date/{started-after}`
+### `/jobs/by-start-date/{started-after}/{started-before}`
+### `/jobs/by-source-cluster/{source}`
+### `/jobs/by-target-cluster/{target}`
+### `/jobs/by-keyspace/{keyspace}`
diff --git a/api-server/pom.xml b/api-server/pom.xml
new file mode 100644
index 0000000..ab431da
--- /dev/null
+++ b/api-server/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.cassandra.diff</groupId>
+ <artifactId>diff</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>api-server</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>org.apache.cassandra.diff</groupId>
+ <artifactId>common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-frontend-jaxrs</artifactId>
+ <version>3.1.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.cxf</groupId>
+ <artifactId>cxf-rt-transports-http-jetty</artifactId>
+ <version>3.1.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <version>2.9.9</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.9.9.2</version>
+ </dependency>
+
+ <!-- provided dependencies -->
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <!-- runtime dependencies -->
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>3.4.1</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.6.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>java</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <mainClass>org.apache.cassandra.diff.api.DiffAPIServer</mainClass>
+ <arguments>
+ <argument>
+ ../spark-job/localconfig.yaml
+ </argument>
+ </arguments>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
new file mode 100644
index 0000000..6f14128
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/DiffAPIServer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api;
+
+import java.io.IOException;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.diff.YamlJobConfiguration;
+import org.apache.cassandra.diff.api.resources.DiffJobsResource;
+import org.apache.cassandra.diff.api.resources.HealthResource;
+import org.apache.cxf.endpoint.Server;
+import org.apache.cxf.jaxrs.JAXRSServerFactoryBean;
+import org.apache.cxf.jaxrs.lifecycle.SingletonResourceProvider;
+
+public class DiffAPIServer {
+ public static void main(String[] args) throws IOException {
+ String filename = args[0];
+ JAXRSServerFactoryBean factoryBean = new JAXRSServerFactoryBean();
+
+ DiffJobsResource diffResource = new DiffJobsResource(YamlJobConfiguration.load(filename));
+ factoryBean.setResourceProviders(Lists.newArrayList(new SingletonResourceProvider(diffResource),
+ new SingletonResourceProvider(new HealthResource())));
+ factoryBean.setAddress("http://localhost:8089/");
+ Server server = factoryBean.create();
+
+ try {
+ server.start();
+ System.in.read();
+ } catch (Throwable t) {
+ t.printStackTrace(System.out);
+ throw t;
+ } finally {
+ diffResource.close();
+ if (server.isStarted())
+ server.stop();
+ System.exit(0);
+ }
+ }
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java
new file mode 100644
index 0000000..564f960
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/DiffJobsResource.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.resources;
+
+import java.util.SortedSet;
+import java.util.UUID;
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.diff.JobConfiguration;
+import org.apache.cassandra.diff.api.services.DBService;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+@Path("/jobs")
+public class DiffJobsResource {
+
+ private static final Logger logger = LoggerFactory.getLogger(DiffJobsResource.class);
+ private static final String DATE_FORMAT = "yyyy-MM-dd";
+ private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern(DATE_FORMAT).withZoneUTC();
+ private static final ObjectMapper MAPPER = new ObjectMapper().setVisibility(PropertyAccessor.FIELD,
+ JsonAutoDetect.Visibility.NON_PRIVATE);
+ private final DBService dbService;
+
+ public DiffJobsResource(JobConfiguration conf) {
+ dbService = new DBService(conf);
+ }
+
+ @GET
+ @Path("/running/id")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRunningJobIds() {
+ return response(dbService.fetchRunningJobs());
+ }
+
+ @GET
+ @Path("/running")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRunningJobs() {
+ return response(dbService.fetchJobSummaries(dbService.fetchRunningJobs()));
+ }
+
+ @GET
+ @Path("/recent")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getRecentJobs() {
+
+ SortedSet<DBService.JobSummary> recentJobs = Sets.newTreeSet(DBService.JobSummary.COMPARATOR.reversed());
+ DateTime now = DateTime.now(DateTimeZone.UTC);
+ DateTime maxStartDate = now;
+ DateTime minStartDate = maxStartDate.minusDays(30);
+ recentJobs.addAll(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+
+ while (recentJobs.size() < 10 && (maxStartDate.compareTo(now.minusDays(90)) >= 0)) {
+ maxStartDate = minStartDate;
+ minStartDate = minStartDate.minusDays(30);
+ recentJobs.addAll(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+ }
+
+ return response(recentJobs);
+ }
+
+ @GET
+ @Path("/{jobid}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJob(@PathParam("jobid") final String jobId) {
+ return response(dbService.fetchJobSummary(UUID.fromString(jobId)));
+ }
+
+ @GET
+ @Path("/{jobid}/results")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobResults(@PathParam("jobid") final String jobId) {
+ return response(dbService.fetchJobResults(UUID.fromString(jobId)));
+ }
+
+ @GET
+ @Path("/{jobid}/status")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobStatus(@PathParam("jobid") final String jobId) {
+ return response(dbService.fetchJobStatus(UUID.fromString(jobId)));
+ }
+
+ @GET
+ @Path("/{jobid}/mismatches")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobMismatches(@PathParam("jobid") final String jobId) {
+ return response(dbService.fetchMismatches(UUID.fromString(jobId)));
+ }
+
+ @GET
+ @Path("/{jobid}/errors/summary")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobErrorSummary(@PathParam("jobid") final String jobId) {
+ return response(dbService.fetchErrorSummary(UUID.fromString(jobId)));
+ }
+
+ @GET
+ @Path("/{jobid}/errors/ranges")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobErrorRanges(@PathParam("jobid") final String jobId) {
+ return response(dbService.fetchErrorRanges(UUID.fromString(jobId)));
+ }
+
+ @GET
+ @Path("/{jobid}/errors")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobErrorDetail(@PathParam("jobid") final String jobId) {
+ return response(dbService.fetchErrorDetail(UUID.fromString(jobId)));
+ }
+
+ @GET
+ @Path("/by-start-date/{started-after}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobsStartedSince(@PathParam("started-after") final String minStart) {
+
+ DateTime minStartDate = parseDate(minStart);
+ if (minStartDate == null)
+ return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+
+ DateTime maxStartDate = DateTime.now(DateTimeZone.UTC);
+
+ return getJobsStartedBetween(minStartDate, maxStartDate);
+ }
+
+ @GET
+ @Path("/by-start-date/{started-after}/{started-before}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobsStartedSince(@PathParam("started-after") final String minStart,
+ @PathParam("started-before") final String maxStart) {
+
+ DateTime minStartDate = parseDate(minStart);
+ if (minStartDate == null)
+ return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+
+ DateTime maxStartDate = null;
+ if (isNullOrEmpty(maxStart)) {
+ DateTime.now(DateTimeZone.UTC);
+ }
+ else {
+ maxStartDate = parseDate(maxStart);
+ if (maxStartDate == null)
+ return Response.status(400).entity("Invalid date, please supply in the format yyyy-MM-dd").build();
+ if (maxStartDate.compareTo(minStartDate) < 0)
+ return Response.status(400).entity("Invalid date range, started-before cannot be earlier than started-after").build();
+ }
+
+ return getJobsStartedBetween(minStartDate, maxStartDate);
+ }
+
+ @GET
+ @Path("/by-source-cluster/{source}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobsBySourceCluster(@PathParam("source") final String sourceClusterName) {
+ return response(dbService.fetchJobsForSourceCluster(sourceClusterName));
+ }
+
+ @GET
+ @Path("/by-target-cluster/{target}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobsByTargetCluster(@PathParam("target") final String targetClusterName) {
+ return response(dbService.fetchJobsForTargetCluster(targetClusterName));
+ }
+
+ @GET
+ @Path("/by-keyspace/{keyspace}")
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response getJobsByKeyspace(@PathParam("keyspace") final String keyspace) {
+ return response(dbService.fetchJobsForKeyspace(keyspace));
+ }
+
+ private Response response(Object data) {
+ try {
+ return Response.ok().entity(MAPPER.writer().writeValueAsString(data)).build();
+ }
+ catch (JsonProcessingException e) {
+ logger.error("JSON Processing error", e);
+ return Response.serverError().entity("Error constructing JSON response").build();
+ }
+ }
+
+ private Response getJobsStartedBetween(DateTime minStartDate, DateTime maxStartDate) {
+ return response(dbService.fetchJobsStartedBetween(minStartDate, maxStartDate));
+ }
+
+ private DateTime parseDate(String s) {
+ try {
+ return DATE_FORMATTER.parseDateTime(s);
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ }
+
+ public void close() {
+ dbService.close();
+ }
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java
new file mode 100644
index 0000000..d101615
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/resources/HealthResource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.resources;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+/**
+ * Health Check: not terribly useful yet.
+ * TODO: add DB connection check
+ */
+@Path("/__health")
+public class HealthResource {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HealthResource.class);
+
+ private static Boolean overrideHealthStatus = null;
+
+ public HealthResource() {
+ LOGGER.debug("New HealthResource created.");
+ }
+
+ public static Boolean overrideHealthStatusTo(Boolean newStatus) {
+ Boolean oldStatus = overrideHealthStatus;
+ overrideHealthStatus = newStatus;
+ LOGGER.debug("Changed overrideHealthStatus from {} to {}", oldStatus, overrideHealthStatus);
+ return oldStatus;
+ }
+
+ @GET
+ @Produces(MediaType.APPLICATION_JSON)
+ public Response health() {
+ if (isHealthy()) {
+ return Response
+ .ok("All is good.")
+ .build();
+ } else {
+ return Response
+ .serverError()
+ .entity("Not Healthy.")
+ .build();
+ }
+ }
+
+ private boolean isHealthy() {
+ if (overrideHealthStatus == null) {
+ return true;
+ }
+ return overrideHealthStatus;
+ }
+
+}
diff --git a/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
new file mode 100644
index 0000000..1bf1f88
--- /dev/null
+++ b/api-server/src/main/java/org/apache/cassandra/diff/api/services/DBService.java
@@ -0,0 +1,546 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff.api.services;
+
+import java.io.Closeable;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.utils.UUIDs;
+import org.apache.cassandra.diff.ClusterProvider;
+import org.apache.cassandra.diff.JobConfiguration;
+import org.jetbrains.annotations.NotNull;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.Days;
+
+// TODO cache jobsummary
+// TODO fix exception handling
+public class DBService implements Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(DBService.class);
+ private static final long QUERY_TIMEOUT_MS = 3000;
+
+ private final Cluster cluster;
+ private final Session session;
+ private final String diffKeyspace;
+
+ private final PreparedStatement runningJobsStatement;
+ private final PreparedStatement jobSummaryStatement;
+ private final PreparedStatement jobResultStatement;
+ private final PreparedStatement jobStatusStatement;
+ private final PreparedStatement jobMismatchesStatement;
+ private final PreparedStatement jobErrorSummaryStatement;
+ private final PreparedStatement jobErrorRangesStatement;
+ private final PreparedStatement jobErrorDetailStatement;
+ private final PreparedStatement jobsStartDateStatement;
+ private final PreparedStatement jobsForSourceStatement;
+ private final PreparedStatement jobsForTargetStatement;
+ private final PreparedStatement jobsForKeyspaceStatement;
+
+ public DBService(JobConfiguration config) {
+ logger.info("Initializing DBService");
+ ClusterProvider provider = ClusterProvider.getProvider(config.clusterConfig("metadata"), "metadata");
+ cluster = provider.getCluster();
+
+ session = cluster.connect();
+ diffKeyspace = config.metadataOptions().keyspace;
+ runningJobsStatement = session.prepare(String.format(
+ " SELECT job_id " +
+ " FROM %s.running_jobs", diffKeyspace));
+ jobSummaryStatement = session.prepare(String.format(
+ " SELECT " +
+ " job_id," +
+ " job_start_time," +
+ " buckets," +
+ " keyspace_name, " +
+ " table_names, " +
+ " source_cluster_name," +
+ " source_cluster_desc," +
+ " target_cluster_name," +
+ " target_cluster_desc," +
+ " total_tasks " +
+ " FROM %s.job_summary" +
+ " WHERE job_id = ?", diffKeyspace));
+ jobResultStatement = session.prepare(String.format(
+ " SELECT " +
+ " job_id," +
+ " table_name, " +
+ " matched_partitions," +
+ " mismatched_partitions," +
+ " matched_rows," +
+ " matched_values," +
+ " mismatched_values," +
+ " partitions_only_in_source," +
+ " partitions_only_in_target," +
+ " skipped_partitions" +
+ " FROM %s.job_results" +
+ " WHERE job_id = ? AND table_name = ?", diffKeyspace));
+ jobStatusStatement = session.prepare(String.format(
+ " SELECT " +
+ " job_id," +
+ " bucket," +
+ " table_name," +
+ " completed " +
+ " FROM %s.job_status" +
+ " WHERE job_id = ? AND bucket = ?", diffKeyspace));
+ jobMismatchesStatement = session.prepare(String.format(
+ " SELECT " +
+ " job_id," +
+ " bucket," +
+ " table_name," +
+ " mismatching_token," +
+ " mismatch_type" +
+ " FROM %s.mismatches" +
+ " WHERE job_id = ? AND bucket = ?", diffKeyspace));
+ jobErrorSummaryStatement = session.prepare(String.format(
+ " SELECT " +
+ " count(start_token) AS error_count," +
+ " table_name" +
+ " FROM %s.task_errors" +
+ " WHERE job_id = ? AND bucket = ?",
+ diffKeyspace));
+ jobErrorRangesStatement = session.prepare(String.format(
+ " SELECT " +
+ " bucket," +
+ " table_name," +
+ " start_token," +
+ " end_token" +
+ " FROM %s.task_errors" +
+ " WHERE job_id = ? AND bucket = ?",
+ diffKeyspace));
+ jobErrorDetailStatement = session.prepare(String.format(
+ " SELECT " +
+ " table_name," +
+ " error_token" +
+ " FROM %s.partition_errors" +
+ " WHERE job_id = ? AND bucket = ? AND table_name = ? AND start_token = ? AND end_token = ?", diffKeyspace));
+ jobsStartDateStatement = session.prepare(String.format(
+ " SELECT " +
+ " job_id" +
+ " FROM %s.job_start_index" +
+ " WHERE job_start_date = ? AND job_start_hour = ?", diffKeyspace));
+ jobsForSourceStatement = session.prepare(String.format(
+ " SELECT " +
+ " job_id" +
+ " FROM %s.source_cluster_index" +
+ " WHERE source_cluster_name = ?", diffKeyspace));
+ jobsForTargetStatement = session.prepare(String.format(
+ " SELECT " +
+ " job_id" +
+ " FROM %s.target_cluster_index" +
+ " WHERE target_cluster_name = ?", diffKeyspace));
+ jobsForKeyspaceStatement = session.prepare(String.format(
+ " SELECT " +
+ " job_id" +
+ " FROM %s.keyspace_index" +
+ " WHERE keyspace_name = ?", diffKeyspace));
+ }
+
+ public List<UUID> fetchRunningJobs() {
+ List<UUID> jobs = new ArrayList<>();
+ ResultSet rs = session.execute(runningJobsStatement.bind());
+ rs.forEach(row -> jobs.add(row.getUUID("job_id")));
+ return jobs;
+ }
+
+ public Collection<JobSummary> fetchJobSummaries(List<UUID> jobIds) {
+ List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(jobIds.size());
+ jobIds.forEach(id -> futures.add(session.executeAsync(jobSummaryStatement.bind(id))));
+
+ // Oldest first
+ SortedSet<JobSummary> summaries = Sets.newTreeSet(JobSummary.COMPARATOR);
+ processFutures(futures, JobSummary::fromRow, summaries::add);
+ return summaries;
+ }
+
+ public JobSummary fetchJobSummary(UUID jobId) {
+ Row row = session.execute(jobSummaryStatement.bind(jobId)).one();
+ if (row == null)
+ throw new RuntimeException(String.format("Job %s not found", jobId));
+ return JobSummary.fromRow(row);
+ }
+
+ public Collection<JobResult> fetchJobResults(UUID jobId) {
+ JobSummary summary = fetchJobSummary(jobId);
+ List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.tables.size());
+ for (String table : summary.tables)
+ futures.add(session.executeAsync(jobResultStatement.bind(jobId, table)));
+
+ SortedSet<JobResult> results = Sets.newTreeSet();
+ processFutures(futures, JobResult::fromRow, results::add);
+ return results;
+ }
+
+ public JobStatus fetchJobStatus(UUID jobId) {
+ JobSummary summary = fetchJobSummary(jobId);
+ List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+ for (int i = 0; i < summary.buckets; i++)
+ futures.add(session.executeAsync(jobStatusStatement.bind(jobId, i)));
+
+ Map<String, Long> completedByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+ processFutures(futures, row -> completedByTable.merge(row.getString("table_name"),
+ row.getLong("completed"),
+ Long::sum));
+ return new JobStatus(jobId, completedByTable);
+ }
+
+ public JobMismatches fetchMismatches(UUID jobId) {
+ JobSummary summary = fetchJobSummary(jobId);
+ List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+ for (int i = 0; i < summary.buckets; i++)
+ futures.add(session.executeAsync(jobMismatchesStatement.bind(jobId, i)));
+
+ Map<String, List<Mismatch>> mismatchesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+ processFutures(futures, row -> mismatchesByTable.merge(row.getString("table_name"),
+ Lists.newArrayList(new Mismatch(row.getString("mismatching_token"),
+ row.getString("mismatch_type"))),
+ (l1, l2) -> { l1.addAll(l2); return l1;}));
+ return new JobMismatches(jobId, mismatchesByTable);
+ }
+
+ public JobErrorSummary fetchErrorSummary(UUID jobId) {
+ JobSummary summary = fetchJobSummary(jobId);
+ List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+ for (int i = 0; i < summary.buckets; i++)
+ futures.add(session.executeAsync(jobErrorSummaryStatement.bind(jobId, i)));
+
+ Map<String, Long> errorCountByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+ processFutures(futures, row -> {
+ String table = row.getString("table_name");
+ if (null != table) {
+ errorCountByTable.merge(row.getString("table_name"),
+ row.getLong("error_count"),
+ Long::sum);
+ }
+ });
+ return new JobErrorSummary(jobId, errorCountByTable);
+ }
+
+ public JobErrorRanges fetchErrorRanges(UUID jobId) {
+ JobSummary summary = fetchJobSummary(jobId);
+ List<ResultSetFuture> futures = Lists.newArrayListWithCapacity(summary.buckets);
+
+ for (int i = 0; i < summary.buckets; i++)
+ futures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
+
+ Map<String, List<Range>> errorRangesByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+ processFutures(futures, row -> errorRangesByTable.merge(row.getString("table_name"),
+ Lists.newArrayList(new Range(row.getString("start_token"),
+ row.getString("end_token"))),
+ (l1, l2) -> { l1.addAll(l2); return l1;}));
+ return new JobErrorRanges(jobId, errorRangesByTable);
+ }
+
+ public JobErrorDetail fetchErrorDetail(UUID jobId) {
+ JobSummary summary = fetchJobSummary(jobId);
+ List<ResultSetFuture> rangeFutures = Lists.newArrayListWithCapacity(summary.buckets);
+
+ for (int i = 0; i < summary.buckets; i++ )
+ rangeFutures.add(session.executeAsync(jobErrorRangesStatement.bind(jobId, i)));
+
+ List<ResultSetFuture> errorFutures = Lists.newArrayList();
+ processFutures(rangeFutures,
+ row -> session.executeAsync(jobErrorDetailStatement.bind(jobId,
+ row.getInt("bucket"),
+ row.getString("table_name"),
+ row.getString("start_token"),
+ row.getString("end_token"))),
+ errorFutures::add);
+ Map<String, List<String>> errorsByTable = Maps.newHashMapWithExpectedSize(summary.tables.size());
+ processFutures(errorFutures,
+ row -> errorsByTable.merge(row.getString("table_name"),
+ Lists.newArrayList(row.getString("error_token")),
+ (l1, l2) -> { l1.addAll(l2); return l1;}));
+ return new JobErrorDetail(jobId, errorsByTable);
+ }
+
+ public Collection<JobSummary> fetchJobsStartedBetween(DateTime start, DateTime end) {
+ int days = Days.daysBetween(start, end).getDays();
+ List<ResultSetFuture> idFutures = Lists.newArrayListWithCapacity(days * 24);
+ for (int i = 0; i <= days; i++) {
+ DateTime date = start.plusDays(i);
+ LocalDate ld = LocalDate.fromYearMonthDay(date.getYear(), date.getMonthOfYear(), date.getDayOfMonth());
+ for (int j = 0; j <= 23; j++) {
+ idFutures.add(session.executeAsync(jobsStartDateStatement.bind(ld, j)));
+ }
+ }
+
+ List<ResultSetFuture> jobFutures = Lists.newArrayList();
+ processFutures(idFutures,
+ row -> session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id"))),
+ jobFutures::add);
+
+ SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+ processFutures(jobFutures, JobSummary::fromRow, jobs::add);
+ return jobs;
+ }
+
+ public Collection<JobSummary> fetchJobsForSourceCluster(String sourceClusterName) {
+ ResultSet jobIds = session.execute(jobsForSourceStatement.bind(sourceClusterName));
+ List<ResultSetFuture> futures = Lists.newArrayList();
+ jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+
+ SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+ processFutures(futures, JobSummary::fromRow, jobs::add);
+ return jobs;
+ }
+
+ public Collection<JobSummary> fetchJobsForTargetCluster(String targetClusterName) {
+ ResultSet jobIds = session.execute(jobsForTargetStatement.bind(targetClusterName));
+ List<ResultSetFuture> futures = Lists.newArrayList();
+ jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+ // most recent first
+ SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+ processFutures(futures, JobSummary::fromRow, jobs::add);
+ return jobs;
+ }
+
+ public Collection<JobSummary> fetchJobsForKeyspace(String keyspace) {
+ ResultSet jobIds = session.execute(jobsForKeyspaceStatement.bind(keyspace));
+ List<ResultSetFuture> futures = Lists.newArrayList();
+ jobIds.forEach(row -> futures.add(session.executeAsync(jobSummaryStatement.bind(row.getUUID("job_id")))));
+
+ // most recent first
+ SortedSet<JobSummary> jobs = Sets.newTreeSet(JobSummary.COMPARATOR.reversed());
+ processFutures(futures, JobSummary::fromRow, jobs::add);
+ return jobs;
+ }
+
+ private void processFutures(List<ResultSetFuture> futures, Consumer<Row> consumer) {
+ processFutures(futures, Function.identity(), consumer);
+ }
+
+ private <T> void processFutures(List<ResultSetFuture> futures, Function<Row, T> transform, Consumer<T> consumer) {
+ Consumer<ResultSet> resultConsumer = resultSet -> resultSet.forEach(row -> consumer.accept(transform.apply(row)));
+ futures.forEach(f -> getResultsSafely(f).ifPresent(resultConsumer));
+ }
+
+ private Optional<ResultSet> getResultsSafely(ResultSetFuture future) {
+ try {
+ return Optional.ofNullable(future.get(QUERY_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ logger.warn("Error getting result from async query", e);
+ return Optional.empty();
+ }
+ }
+
+ public void close()
+ {
+ session.close();
+ cluster.close();
+ }
+
+ public static class Range {
+ final String start;
+ final String end;
+
+ public Range(String start, String end) {
+ this.start = start;
+ this.end = end;
+ }
+ }
+
+ public static class JobErrorSummary {
+ final UUID jobId;
+ final Map<String, Long> errorsByTable;
+
+ public JobErrorSummary(UUID jobId, Map<String, Long> errorsByTable) {
+ this.jobId = jobId;
+ this.errorsByTable = errorsByTable;
+ }
+ }
+
+ public static class JobErrorRanges {
+ final UUID jobId;
+ final Map<String, List<Range>> rangesByTable;
+
+ public JobErrorRanges(UUID jobId, Map<String, List<Range>> rangesByTable) {
+ this.jobId = jobId;
+ this.rangesByTable = rangesByTable;
+ }
+ }
+
+ public static class JobErrorDetail {
+ final UUID jobId;
+ final Map<String, List<String>> errorsByTable;
+
+ public JobErrorDetail(UUID jobId, Map<String, List<String>> errorsByTable) {
+ this.jobId = jobId;
+ this.errorsByTable = errorsByTable;
+ }
+ }
+
+ public static class JobMismatches {
+
+ final UUID jobId;
+ final Map<String, List<Mismatch>> mismatchesByTable;
+
+ public JobMismatches(UUID jobId, Map<String, List<Mismatch>> mismatchesByTable) {
+ this.jobId = jobId;
+ this.mismatchesByTable = mismatchesByTable;
+ }
+ }
+
+ public static class Mismatch {
+ final String token;
+ final String type;
+
+ public Mismatch(String token, String type) {
+ this.token = token;
+ this.type = type;
+ }
+ }
+
+ public static class JobStatus {
+
+ final UUID jobId;
+ final Map<String, Long> completedByTable;
+
+ public JobStatus(UUID jobId, Map<String, Long> completedByTable) {
+ this.jobId = jobId;
+ this.completedByTable = completedByTable;
+ }
+ }
+
+ public static class JobResult implements Comparable<JobResult> {
+
+ final UUID jobId;
+ final String table;
+ final long matchedPartitions;
+ final long mismatchedPartitions;
+ final long matchedRows;
+ final long matchedValues;
+ final long mismatchedValues;
+ final long onlyInSource;
+ final long onlyInTarget;
+ final long skippedPartitions;
+
+ public JobResult(UUID jobId,
+ String table,
+ long matchedPartitions,
+ long mismatchedPartitions,
+ long matchedRows,
+ long matchedValues,
+ long mismatchedValues,
+ long onlyInSource,
+ long onlyInTarget,
+ long skippedPartitions) {
+ this.jobId = jobId;
+ this.table = table;
+ this.matchedPartitions = matchedPartitions;
+ this.mismatchedPartitions = mismatchedPartitions;
+ this.matchedRows = matchedRows;
+ this.matchedValues = matchedValues;
+ this.mismatchedValues = mismatchedValues;
+ this.onlyInSource = onlyInSource;
+ this.onlyInTarget = onlyInTarget;
+ this.skippedPartitions = skippedPartitions;
+ }
+
+ static JobResult fromRow(Row row) {
+ return new JobResult(row.getUUID("job_id"),
+ row.getString("table_name"),
+ row.getLong("matched_partitions"),
+ row.getLong("mismatched_partitions"),
+ row.getLong("matched_rows"),
+ row.getLong("matched_values"),
+ row.getLong("mismatched_values"),
+ row.getLong("partitions_only_in_source"),
+ row.getLong("partitions_only_in_target"),
+ row.getLong("skipped_partitions"));
+ }
+
+ public int compareTo(@NotNull JobResult other) {
+ return this.table.compareTo(other.table);
+ }
+ }
+
+ public static class JobSummary {
+
+ public static final Comparator<JobSummary> COMPARATOR = Comparator.comparing(j -> j.startTime);
+
+ final UUID jobId;
+ final int buckets;
+ final String keyspace;
+ final List<String> tables;
+ final String sourceClusterName;
+ final String sourceClusterDesc;
+ final String targetClusterName;
+ final String targetClusterDesc;
+ final int tasks;
+ final String start;
+
+ // private so it isn't included in json serialization
+ private final DateTime startTime;
+
+ private JobSummary(UUID jobId,
+ DateTime startTime,
+ int buckets,
+ String keyspace,
+ List<String> tables,
+ String sourceClusterName,
+ String sourceClusterDesc,
+ String targetClusterName,
+ String targetClusterDesc,
+ int tasks)
+ {
+ this.jobId = jobId;
+ this.startTime = startTime;
+ this.start = startTime.toString();
+ this.buckets = buckets;
+ this.keyspace = keyspace;
+ this.tables = tables;
+ this.sourceClusterName = sourceClusterName;
+ this.sourceClusterDesc = sourceClusterDesc;
+ this.targetClusterName = targetClusterName;
+ this.targetClusterDesc = targetClusterDesc;
+ this.tasks = tasks;
+ }
+
+ static JobSummary fromRow(Row row) {
+ return new JobSummary(row.getUUID("job_id"),
+ new DateTime(UUIDs.unixTimestamp(row.getUUID("job_start_time")), DateTimeZone.UTC),
+ row.getInt("buckets"),
+ row.getString("keyspace_name"),
+ row.getList("table_names", String.class),
+ row.getString("source_cluster_name"),
+ row.getString("source_cluster_desc"),
+ row.getString("target_cluster_name"),
+ row.getString("target_cluster_desc"),
+ row.getInt("total_tasks"));
+ }
+ }
+}
diff --git a/api-server/src/main/resources/log4j2.xml b/api-server/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..9e401d6
--- /dev/null
+++ b/api-server/src/main/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<Configuration>
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
diff --git a/common/pom.xml b/common/pom.xml
new file mode 100644
index 0000000..f73c4f5
--- /dev/null
+++ b/common/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.cassandra.diff</groupId>
+ <artifactId>diff</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>common</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ </dependency>
+
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java b/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java
new file mode 100644
index 0000000..6c4e3e4
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/ClusterProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import com.datastax.driver.core.Cluster;
+
+public interface ClusterProvider extends Serializable {
+ String CLUSTER_PROVIDER_CLASS = "impl";
+
+ void initialize(Map<String, String> conf, String identifier);
+ Cluster getCluster();
+ String getClusterName();
+
+ static ClusterProvider getProvider(Map<String, String> conf, String identifier) {
+ String providerImpl = conf.get(CLUSTER_PROVIDER_CLASS);
+ ClusterProvider provider;
+ try {
+ provider = (ClusterProvider)(Class.forName(providerImpl)).newInstance();
+ provider.initialize(conf, identifier);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate ClusterProvider class: " + providerImpl +" " + conf, e);
+ }
+ return provider;
+ }
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java b/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java
new file mode 100644
index 0000000..b633feb
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/ContactPointsClusterProvider.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.Map;
+
+import com.datastax.driver.core.Cluster;
+
+public class ContactPointsClusterProvider implements ClusterProvider {
+ private static final String PREFIX = "diff.cluster";
+ private static final String USERNAME_KEY = "cql_user";
+ private static final String PASSWORD_KEY = "cql_password";
+ private static final String CONTACT_POINTS_KEY = "contact_points";
+ private static final String PORT_KEY = "port";
+ private static final String CLUSTER_KEY = "name";
+ private static final String DC_KEY = "dc";
+
+ private String user;
+ private String password;
+ private String name;
+ private String dc;
+ private String[] contactPoints;
+ private int port;
+
+ public Cluster getCluster() {
+ return newCluster();
+ }
+
+ public void initialize(Map<String, String> conf, String identifier) {
+ user = getEnv(identifier, USERNAME_KEY);
+ password = getEnv(identifier, PASSWORD_KEY);
+ name = conf.get(CLUSTER_KEY);
+ dc = conf.get(DC_KEY);
+ contactPoints = conf.get(CONTACT_POINTS_KEY).split(",");
+ port = Integer.parseInt(conf.getOrDefault(PORT_KEY, "9042"));
+ }
+
+ private synchronized Cluster newCluster() {
+ // TODO add policies etc
+ Cluster.Builder builder = Cluster.builder()
+ .addContactPoints(contactPoints)
+ .withPort(port);
+
+ if (user != null)
+ builder.withCredentials(user, password);
+
+ return builder.build();
+ }
+
+ public String getClusterName() {
+ return name;
+ }
+
+ public String toString() {
+ return String.format("ContactPoints Cluster: name=%s, dc=%s, contact points= [%s]",
+ "name", dc, String.join(",'", contactPoints));
+ }
+
+ static String getEnv(String identifier, String propName) {
+ return System.getenv(String.format("%s.%s.%s", PREFIX, identifier, propName));
+ }
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
new file mode 100644
index 0000000..f0cbb36
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/JobConfiguration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+
+public interface JobConfiguration extends Serializable {
+ String keyspace();
+
+ List<String> tables();
+
+ int splits();
+
+ int buckets();
+
+ // rate limit is provided as a global limit - this is how many q/s we guess that the src clusters can take in total
+ int rateLimit();
+
+ default SpecificTokens specificTokens() {
+ return SpecificTokens.NONE;
+ }
+
+ Optional<UUID> jobId();
+
+ int tokenScanFetchSize();
+
+ int partitionReadFetchSize();
+
+ int readTimeoutMillis();
+
+ double reverseReadProbability();
+
+ String consistencyLevel();
+
+ MetadataKeyspaceOptions metadataOptions();
+
+ Map<String, String> clusterConfig(String identifier);
+
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java b/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java
new file mode 100644
index 0000000..b3e9eb0
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/MetadataKeyspaceOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+
+public class MetadataKeyspaceOptions implements Serializable {
+ public String keyspace = "cassandradiff";
+ public String replication = "{'class':'SimpleStrategy', 'replication_factor':'1'}";
+ public int ttl = 60 * 60 * 24 * 365;
+ public boolean should_init = false;
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java b/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java
new file mode 100644
index 0000000..5400a54
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/SpecificTokens.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import com.google.common.collect.ImmutableSet;
+
+public class SpecificTokens implements Serializable, Predicate<BigInteger> {
+
+ public static final SpecificTokens NONE = new SpecificTokens(Collections.emptySet(), Modifier.REJECT);
+
+ public enum Modifier {REJECT, ACCEPT}
+
+ public final ImmutableSet<BigInteger> tokens;
+ public final Modifier modifier;
+
+ public SpecificTokens(Set<BigInteger> tokens, Modifier modifier) {
+ this.tokens = ImmutableSet.copyOf(tokens);
+ this.modifier = modifier;
+ }
+
+ public boolean test(BigInteger token) {
+ if (tokens.isEmpty())
+ return true;
+
+ // if this represents a list of tokens to accept, then this token is allowed
+ // only if it is present in the list. If this is a list of tokens to reject,
+ // then the opposite is true, only allow the token if it is not in the list.
+ return (modifier == Modifier.ACCEPT) == tokens.contains(token);
+ }
+
+ public boolean isEmpty() {
+ return tokens.isEmpty();
+ }
+
+ public String toString() {
+ return String.format("SpecificTokens: [ %s, %s ]", modifier, tokens);
+ }
+}
diff --git a/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
new file mode 100644
index 0000000..69fc28c
--- /dev/null
+++ b/common/src/main/java/org/apache/cassandra/diff/YamlJobConfiguration.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.math.BigInteger;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.constructor.CustomClassLoaderConstructor;
+
+public class YamlJobConfiguration implements JobConfiguration {
+ public int splits = 10000;
+ public String keyspace;
+ public List<String> tables;
+ public int buckets = 100;
+ public int rate_limit = 10000;
+ public String job_id = null;
+ public int token_scan_fetch_size;
+ public int partition_read_fetch_size;
+ public int read_timeout_millis;
+ public double reverse_read_probability;
+ public String consistency_level = "ALL";
+ public MetadataKeyspaceOptions metadata_options;
+ public Map<String, Map<String, String>> cluster_config;
+ public String specific_tokens = null;
+ public String disallowed_tokens = null;
+
+ public static YamlJobConfiguration load(String file) {
+ Yaml yaml = new Yaml(new CustomClassLoaderConstructor(YamlJobConfiguration.class,
+ Thread.currentThread().getContextClassLoader()));
+ try {
+ return yaml.loadAs(new FileInputStream(file), YamlJobConfiguration.class);
+ } catch (FileNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String keyspace() {
+ return keyspace;
+ }
+
+ public List<String> tables() {
+ return tables;
+ }
+
+ public int splits() {
+ return splits;
+ }
+
+ public int buckets() {
+ return buckets;
+ }
+
+ public int rateLimit() {
+ return rate_limit;
+ }
+
+ public Optional<UUID> jobId() {
+ return job_id == null ? Optional.empty() : Optional.of(UUID.fromString(job_id));
+ }
+
+ public int tokenScanFetchSize() {
+ return token_scan_fetch_size;
+ }
+
+ public int partitionReadFetchSize() {
+ return partition_read_fetch_size;
+ }
+
+ public int readTimeoutMillis() {
+ return read_timeout_millis;
+ }
+
+ public double reverseReadProbability() {
+ return reverse_read_probability;
+ }
+
+ public String consistencyLevel() {
+ return consistency_level;
+ }
+
+ public MetadataKeyspaceOptions metadataOptions() {
+ return metadata_options;
+ }
+
+ public Map<String, String> clusterConfig(String identifier) {
+ return cluster_config.get(identifier);
+ }
+
+ public SpecificTokens specificTokens() {
+
+ if (disallowed_tokens != null && specific_tokens != null)
+ throw new RuntimeException("Cannot specify both disallowed and specific tokens");
+
+ if (disallowed_tokens != null) {
+ return new SpecificTokens(toTokens(disallowed_tokens), SpecificTokens.Modifier.REJECT);
+ } else if (specific_tokens != null) {
+ return new SpecificTokens(toTokens(specific_tokens), SpecificTokens.Modifier.ACCEPT);
+ }
+ return SpecificTokens.NONE;
+ }
+
+ public String toString() {
+ return "YamlJobConfiguration{" +
+ "splits=" + splits +
+ ", keyspace='" + keyspace + '\'' +
+ ", tables=" + tables +
+ ", buckets=" + buckets +
+ ", rate_limit=" + rate_limit +
+ ", job_id='" + job_id + '\'' +
+ ", token_scan_fetch_size=" + token_scan_fetch_size +
+ ", partition_read_fetch_size=" + partition_read_fetch_size +
+ ", read_timeout_millis=" + read_timeout_millis +
+ ", reverse_read_probability=" + reverse_read_probability +
+ ", consistency_level='" + consistency_level + '\'' +
+ ", metadata_options=" + metadata_options +
+ ", cluster_config=" + cluster_config +
+ '}';
+ }
+
+ private static Set<BigInteger> toTokens(String str) {
+ Set<BigInteger> tokens = new HashSet<>();
+ for (String token : str.split(",")) {
+ token = token.trim();
+ tokens.add(new BigInteger(token));
+ }
+ return tokens;
+ }
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..fce7cc1
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.cassandra.diff</groupId>
+ <artifactId>diff</artifactId>
+ <packaging>pom</packaging>
+ <version>0.1-SNAPSHOT</version>
+ <modules>
+ <module>common</module>
+ <module>spark-job</module>
+ <module>api-server</module>
+ </modules>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ </properties>
+
+ <dependencyManagement>
+ <dependencies>
+
+ <dependency>
+ <groupId>com.datastax.cassandra</groupId>
+ <artifactId>cassandra-driver-core</artifactId>
+ <version>3.7.1</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.yaml</groupId>
+ <artifactId>snakeyaml</artifactId>
+ <version>1.24</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>28.0-jre</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.28</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ <version>17.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>2.12.1</version>
+ <scope>runtime</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+ </dependencyManagement>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.6.1</version>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/spark-job/localconfig.yaml b/spark-job/localconfig.yaml
new file mode 100644
index 0000000..d4f5630
--- /dev/null
+++ b/spark-job/localconfig.yaml
@@ -0,0 +1,51 @@
+# Keyspace to diff
+keyspace: keyspace1
+# List of tables to diff
+tables:
+ - standard1
+
+# This is how many parts we split the full token range in.
+# Each of these splits is then compared between the clusters
+splits: 10000
+
+# Number of buckets - splits / buckets should be under 100k to avoid wide partitions when storing the metadata
+buckets: 100
+
+# global rate limit - this is how many q/s you think the target clusters can handle
+rate_limit: 10000
+
+# optional job id - if restarting a job, set the correct job_id here to avoid re-diffing old splits
+# job_id: 4e2c6c6b-bed7-4c4e-bd4c-28bef89c3cef
+
+# Fetch size to use for the query fetching the tokens in the cluster
+token_scan_fetch_size: 1000
+# Fetch size to use for the queries fetching the rows of each partition
+partition_read_fetch_size: 1000
+
+read_timeout_millis: 10000
+reverse_read_probability: 0.5
+consistency_level: ALL
+metadata_options:
+ keyspace: cassandradiff
+ replication: "{'class':'SimpleStrategy', 'replication_factor':'1'}"
+ ttl: 31536000
+ should_init: true
+cluster_config:
+ source:
+ impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+ name: "local_test_1"
+ contact_points: "127.0.0.1"
+ port: "9042"
+ dc: "datacenter1"
+ target:
+ impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+ name: "local_test_2"
+ contact_points: "127.0.0.1"
+ port: "9043"
+ dc: "datacenter1"
+ metadata:
+ impl: "org.apache.cassandra.diff.ContactPointsClusterProvider"
+ name: "local_test"
+ contact_points: "127.0.0.1"
+ port: "9042"
+ dc: "datacenter1"
diff --git a/spark-job/pom.xml b/spark-job/pom.xml
new file mode 100644
index 0000000..4d4c8fc
--- /dev/null
+++ b/spark-job/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.cassandra.diff</groupId>
+ <artifactId>diff</artifactId>
+ <version>0.1-SNAPSHOT</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>spark-job</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- compile dependencies -->
+ <dependency>
+ <groupId>org.apache.cassandra.diff</groupId>
+ <artifactId>common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <!-- provided dependencies -->
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.11</artifactId>
+ <version>2.3.3</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.11</artifactId>
+ <version>2.3.2</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.1</version>
+ <executions>
+ <!-- Run shade goal on package phase -->
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google</pattern>
+ <shadedPattern>relocated.com.google</shadedPattern>
+ </relocation>
+ </relocations>
+ </configuration>
+ </plugin>
+
+ </plugins>
+ </build>
+
+</project>
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
new file mode 100644
index 0000000..bdd6488
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/ComparisonExecutor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.concurrent.*;
+import java.util.function.Consumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.*;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+
+/**
+ * Wrapper for an ExecutorService which provides backpressure by blocking on submission when its
+ * task queue is full.
+ *
+ * The internal ListeningExecutorService is instantiated with an unbounded work queue, but
+ * this class uses a Semaphore to ensure that this queue cannot grow unreasonably. By default,
+ * the Semaphore has 2x as many permits as the executor thread pool has threads, so at most
+ * 2 x maxConcurrentTasks may be submitted before producers are blocked. The submit method
+ * also adds success/failure callbacks which return the permits, enabling producers to make
+ * progress at a manageable rate.
+ *
+ * Callers of the submit method also provide a Phaser, which they can use to ensure that any
+ * tasks *they themselves have submitted* are completed before they proceed. This allows multiple
+ * callers to submit tasks to the same ComparisonExecutor, but only wait for their own to complete
+ * before moving onto the next stage of processing. Managing the increment and decrement of pending
+ * tasks via the Phaser is handled transparently by ComparisonExecutor, so callers should not do
+ * this externally.
+ *
+ * Submitters also provide callbacks to be run on either successful execution or failure of the
+ * task. These callbacks are executed on the same thread as the task itself, which callers should bear
+ * in mind when constructing them.
+ *
+ */
+public class ComparisonExecutor {
+
+ private final ListeningExecutorService executor;
+ private final Semaphore semaphore;
+
+ static ComparisonExecutor newExecutor(int maxConcurrentTasks, MetricRegistry metricRegistry) {
+ return new ComparisonExecutor(
+ MoreExecutors.listeningDecorator(
+ Executors.newFixedThreadPool(maxConcurrentTasks,
+ new ThreadFactoryBuilder().setNameFormat("partition-comparison-%d")
+ .setDaemon(true)
+ .build())),
+ maxConcurrentTasks * 2,
+ metricRegistry);
+ }
+
+ @VisibleForTesting
+ ComparisonExecutor(ListeningExecutorService executor, int maxTasks, MetricRegistry metrics) {
+ this.executor = executor;
+ this.semaphore = new Semaphore(maxTasks);
+ if (metrics != null) {
+ metrics.register("BlockedTasks", (Gauge) semaphore::getQueueLength);
+ metrics.register("AvailableSlots", (Gauge) semaphore::availablePermits);
+ }
+ }
+
+ public <T> void submit(final Callable<T> callable,
+ final Consumer<T> onSuccess,
+ final Consumer<Throwable> onError,
+ final Phaser phaser) {
+
+ phaser.register();
+ semaphore.acquireUninterruptibly();
+ try {
+ Futures.addCallback(executor.submit(callable), new FutureCallback<T>() {
+ public void onSuccess(T result) {
+ fireThenReleaseAndArrive(onSuccess, result, phaser);
+ }
+
+ public void onFailure(Throwable t) {
+ fireThenReleaseAndArrive(onError, t, phaser);
+ }
+ }, MoreExecutors.directExecutor());
+
+ } catch (RejectedExecutionException e) {
+ fireThenReleaseAndArrive(onError, e, phaser);
+ }
+
+ }
+
+ private <T> void fireThenReleaseAndArrive(Consumer<T> callback, T argument, Phaser phaser) {
+ try {
+ callback.accept(argument);
+ } finally {
+ semaphore.release();
+ phaser.arriveAndDeregister();
+ }
+ }
+
+ public void shutdown() {
+ executor.shutdown();
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
new file mode 100644
index 0000000..4e108e3
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffCluster.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.querybuilder.*;
+import org.jetbrains.annotations.NotNull;
+
+import static org.apache.cassandra.diff.DiffContext.cqlizedString;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.*;
+
+public class DiffCluster implements AutoCloseable
+{
+ private final static Logger logger = LoggerFactory.getLogger(DiffCluster.class);
+
+ public enum Type {SOURCE, TARGET}
+
+ private final Map<String, PreparedStatement[]> preparedStatements = new HashMap<>();
+ private final ConsistencyLevel consistencyLevel;
+ public final Cluster cluster;
+ private final Session session;
+ private final TokenHelper tokenHelper;
+ public final String keyspace;
+ public final List<BigInteger> tokenList;
+
+ public final RateLimiter getPartitionRateLimiter;
+ public final Type clusterId;
+ private final int tokenScanFetchSize;
+ private final int partitionReadFetchSize;
+ private final int readTimeoutMillis;
+
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+
+ public DiffCluster(Type clusterId,
+ Cluster cluster,
+ String keyspace,
+ ConsistencyLevel consistencyLevel,
+ RateLimiter getPartitionRateLimiter,
+ int tokenScanFetchSize,
+ int partitionReadFetchSize,
+ int readTimeoutMillis)
+
+ {
+ this.keyspace = keyspace;
+ this.consistencyLevel = consistencyLevel;
+ this.cluster = cluster;
+ this.tokenHelper = TokenHelper.forPartitioner(cluster.getMetadata().getPartitioner());
+ this.clusterId = clusterId;
+ this.tokenList = Collections.emptyList();
+ this.getPartitionRateLimiter = getPartitionRateLimiter;
+ this.session = cluster.connect();
+ this.tokenScanFetchSize = tokenScanFetchSize;
+ this.partitionReadFetchSize = partitionReadFetchSize;
+ this.readTimeoutMillis = readTimeoutMillis;
+ }
+
+ public Iterator<PartitionKey> getPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+ try {
+ return Uninterruptibles.getUninterruptibly(fetchPartitionKeys(table, prevToken, token));
+ }
+ catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ListenableFuture<Iterator<PartitionKey>> fetchPartitionKeys(String table, final BigInteger prevToken, final BigInteger token) {
+ BoundStatement statement = keyReader(table).bind(tokenHelper.forBindParam(prevToken),
+ tokenHelper.forBindParam(token));
+ statement.setFetchSize(tokenScanFetchSize);
+ statement.setReadTimeoutMillis(readTimeoutMillis);
+ return Futures.transform(session.executeAsync(statement),
+ this::toPartitionKeys,
+ MoreExecutors.directExecutor());
+ }
+
+ private AbstractIterator<PartitionKey> toPartitionKeys(ResultSet resultSet) {
+ return new AbstractIterator<PartitionKey>() {
+ Iterator<Row> rows = resultSet.iterator();
+
+ protected PartitionKey computeNext() {
+ if (session.isClosed())
+ throw new RuntimeException("Session was closed, cannot get next partition key");
+
+ if (stopped.get())
+ throw new RuntimeException("Job was stopped, cannot get next partition key");
+
+ return rows.hasNext() ? new PartitionKey(rows.next()) : endOfData();
+ }
+ };
+ }
+
+ public Iterator<Row> getPartition(TableSpec table, PartitionKey key, boolean shouldReverse) {
+ return readPartition(table.getTable(), key, shouldReverse)
+ .getUninterruptibly()
+ .iterator();
+ }
+
+ private ResultSetFuture readPartition(String table, final PartitionKey key, boolean shouldReverse) {
+ BoundStatement statement = shouldReverse
+ ? reverseReader(table).bind(key.getComponents().toArray())
+ : forwardReader(table).bind(key.getComponents().toArray());
+ statement.setFetchSize(partitionReadFetchSize);
+ statement.setReadTimeoutMillis(readTimeoutMillis);
+ getPartitionRateLimiter.acquire();
+ return session.executeAsync(statement);
+ }
+
+ public void stop() {
+ stopped.set(true);
+ }
+
+ public void close()
+ {
+ logger.info("Closing cluster {}", this.clusterId);
+ session.closeAsync();
+ cluster.closeAsync();
+ }
+
+ private PreparedStatement keyReader(String table) {
+ return getStatementForTable(table, 0);
+ }
+
+ private PreparedStatement forwardReader(String table) {
+ return getStatementForTable(table, 1);
+ }
+
+ private PreparedStatement reverseReader(String table) {
+ return getStatementForTable(table, 2);
+ }
+
+ private PreparedStatement getStatementForTable(String table, int index) {
+ if (!preparedStatements.containsKey(table)) {
+ synchronized (this) {
+ if (!preparedStatements.containsKey(table)) {
+ PreparedStatement keyStatement = getKeyStatement(table);
+ PreparedStatement[] partitionReadStmts = getFullStatement(table);
+ preparedStatements.put(table, new PreparedStatement[]{ keyStatement ,
+ partitionReadStmts[0],
+ partitionReadStmts[1] });
+ }
+ }
+ }
+ return preparedStatements.get(table)[index];
+ }
+
+ private PreparedStatement getKeyStatement(@NotNull String table) {
+ final TableMetadata tableMetadata = session.getCluster()
+ .getMetadata()
+ .getKeyspace(cqlizedString(keyspace))
+ .getTable(cqlizedString(table));
+ String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
+
+ Select.Selection selection = QueryBuilder.select().distinct().column(token(partitionKeyColumns));
+ for (String column : partitionKeyColumns)
+ selection = selection.column(column);
+
+ BuiltStatement select = selection.from(tableMetadata)
+ .where(gt(token(partitionKeyColumns), bindMarker()))
+ .and(lte(token(partitionKeyColumns), bindMarker()));
+
+ logger.debug("Partition key/token read CQL : {}", select.toString());
+ return session.prepare(select).setConsistencyLevel(consistencyLevel);
+ }
+
+ private PreparedStatement[] getFullStatement(@NotNull String table) {
+ final TableMetadata tableMetadata = session.getCluster()
+ .getMetadata()
+ .getKeyspace(cqlizedString(keyspace))
+ .getTable(cqlizedString(table));
+ String[] partitionKeyColumns = columnNames(tableMetadata.getPartitionKey());
+ String[] allColumns = columnNames(tableMetadata.getColumns());
+
+ Select.Selection selection = QueryBuilder.select().column(token(partitionKeyColumns));
+ for (String column : allColumns)
+ selection = selection.column(column);
+
+ Select select = selection.from(tableMetadata);
+
+ for (String column : partitionKeyColumns)
+ select.where().and(eq(column, bindMarker()));
+
+ logger.info("Partition forward read CQL : {}", select.toString());
+ PreparedStatement forwardRead = session.prepare(select).setConsistencyLevel(consistencyLevel);
+
+ List<ColumnMetadata> clusteringColumns = tableMetadata.getClusteringColumns();
+ // if the table has no clustering columns a reverse read doesn't make sense
+ // and will never be executed, so just skip preparing the reverse query
+ if (clusteringColumns.isEmpty())
+ return new PreparedStatement[] {forwardRead, null};
+
+ // Depending on DiffContext.reverseReadProbability, we may attempt to read the
+ // partition in reverse order, so prepare a statement for that
+ List<ClusteringOrder> clusteringOrders = tableMetadata.getClusteringOrder();
+ Ordering[] reverseOrdering = new Ordering[clusteringColumns.size()];
+ for (int i=0; i<clusteringColumns.size(); i++) {
+ reverseOrdering[i] = clusteringOrders.get(i) == ClusteringOrder.ASC
+ ? desc(clusteringColumns.get(i).getName())
+ : asc(clusteringColumns.get(i).getName());
+ }
+
+ select.orderBy(reverseOrdering);
+ logger.info("Partition reverse read CQL : {}", select.toString());
+
+ PreparedStatement reverseRead = session.prepare(select).setConsistencyLevel(consistencyLevel);
+
+ return new PreparedStatement[] {forwardRead, reverseRead};
+ }
+
+ private static String[] columnNames(List<ColumnMetadata> columns) {
+ return columns.stream().map(ColumnMetadata::getName).map(DiffCluster::columnToString).toArray(String[]::new);
+ }
+
+ private static String columnToString(String name)
+ {
+ return '"'+name+'"';
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java
new file mode 100644
index 0000000..5a1a354
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffContext.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class DiffContext {
+
+ public final String keyspace;
+ public final TableSpec table;
+ public final BigInteger startToken;
+ public final BigInteger endToken;
+ public final DiffCluster source;
+ public final DiffCluster target;
+ private final SpecificTokens specificTokens;
+ private final double reverseReadProbability;
+
+ public DiffContext(final DiffCluster source,
+ final DiffCluster target,
+ final String keyspace,
+ final TableSpec table,
+ final BigInteger startToken,
+ final BigInteger endToken,
+ final SpecificTokens specificTokens,
+ final double reverseReadProbability) {
+ this.keyspace = keyspace;
+ this.table = table;
+ this.startToken = startToken;
+ this.endToken = endToken;
+ this.source = source;
+ this.target = target;
+ this.specificTokens = specificTokens;
+ this.reverseReadProbability = reverseReadProbability;
+ }
+
+ public boolean shouldReverse() {
+ return ! table.getClusteringColumns().isEmpty()
+ && reverseReadProbability > ThreadLocalRandom.current().nextDouble();
+ }
+
+ public boolean isTokenAllowed(BigInteger token) {
+ return specificTokens.test(token);
+ }
+
+ public static String cqlizedString(final String str) {
+ if (str.toLowerCase().equals(str)) {
+ return str;
+ } else {
+ return "\"" + str + "\"";
+ }
+ }
+
+ public String toString() {
+ return String.format("DiffContext: [keyspace: %s, start_token: %s, end_token: %s]",
+ keyspace, startToken, endToken);
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
new file mode 100644
index 0000000..bb14c25
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.Serializable;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkFiles;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * To run this, something like this should be executed for local runs
+ *
+ * spark-submit --files ./spark-job/localconfig.yaml
+ * --master "local[2]"
+ * --class org.apache.cassandra.DiffJob spark-job/target/spark-job-0.1-SNAPSHOT.jar
+ * localconfig.yaml
+ */
+
+public class DiffJob {
+ private static final Logger logger = LoggerFactory.getLogger(DiffJob.class);
+
+ public static void main(String ... args) {
+ if (args.length == 0) {
+ System.exit(-1);
+ }
+ SparkSession spark = SparkSession.builder().appName("cassandra-diff").getOrCreate();
+ JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
+ String configFile = SparkFiles.get(args[0]);
+ YamlJobConfiguration configuration = YamlJobConfiguration.load(configFile);
+ DiffJob diffJob = new DiffJob();
+ diffJob.run(configuration, sc);
+ spark.stop();
+ }
+
+ public void run(JobConfiguration configuration, JavaSparkContext sc) {
+ SparkConf conf = sc.getConf();
+ // get partitioner from both clusters and verify that they match
+ ClusterProvider sourceProvider = ClusterProvider.getProvider(configuration.clusterConfig("source"), "source");
+ ClusterProvider targetProvider = ClusterProvider.getProvider(configuration.clusterConfig("target"), "target");
+ String sourcePartitioner;
+ String targetPartitioner;
+ try (Cluster sourceCluster = sourceProvider.getCluster();
+ Cluster targetCluster = targetProvider.getCluster()) {
+ sourcePartitioner = sourceCluster.getMetadata().getPartitioner();
+ targetPartitioner = targetCluster.getMetadata().getPartitioner();
+ }
+ if (!sourcePartitioner.equals(targetPartitioner)) {
+ throw new IllegalStateException(String.format("Cluster partitioners do not match; Source: %s, Target: %s,",
+ sourcePartitioner, targetPartitioner));
+ }
+ TokenHelper tokenHelper = TokenHelper.forPartitioner(sourcePartitioner);
+
+ logger.info("Configuring job metadata store");
+ ClusterProvider metadataProvider = ClusterProvider.getProvider(configuration.clusterConfig("metadata"), "metadata");
+ JobMetadataDb.JobLifeCycle job = null;
+ UUID jobId = null;
+ try (Cluster metadataCluster = metadataProvider.getCluster();
+ Session metadataSession = metadataCluster.connect()) {
+
+ MetadataKeyspaceOptions metadataOptions = configuration.metadataOptions();
+ JobMetadataDb.Schema.maybeInitialize(metadataSession, metadataOptions);
+
+ // Job params, which once a job is created cannot be modified in subsequent re-runs
+ logger.info("Creating or retrieving job parameters");
+ job = new JobMetadataDb.JobLifeCycle(metadataSession, metadataOptions.keyspace);
+ Params params = getJobParams(job, configuration);
+ logger.info("Job Params: {}", params);
+ if (null == params)
+ throw new RuntimeException("Unable to initialize job params");
+
+ jobId = params.jobId;
+ List<Split> splits = getSplits(configuration, TokenHelper.forPartitioner(sourcePartitioner));
+
+ // Job options, which may be modified per-run
+ int instances = Integer.parseInt(conf.get("spark.executor.instances", "4"));
+ int cores = Integer.parseInt(conf.get("spark.executor.cores", "2"));
+ int executors = instances * cores;
+ // according to https://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections we should
+ // have 2-4 partitions per cpu in the cluster:
+ int slices = Math.min(4 * executors, splits.size());
+ int perExecutorRateLimit = configuration.rateLimit() / executors;
+
+ // Record the high level job summary info
+ job.initializeJob(params,
+ sourceProvider.getClusterName(),
+ sourceProvider.toString(),
+ targetProvider.getClusterName(),
+ targetProvider.toString());
+
+ logger.info("DiffJob {} comparing [{}] in keyspace {} on {} and {}",
+ jobId,
+ String.join(",", params.tables),
+ params.keyspace,
+ sourceProvider,
+ targetProvider);
+
+ // Run the distributed diff and collate results
+ Map<String, RangeStats> diffStats = sc.parallelize(splits, slices)
+ .map((split) -> new Differ(configuration,
+ params,
+ perExecutorRateLimit,
+ split,
+ tokenHelper,
+ sourceProvider,
+ targetProvider,
+ metadataProvider,
+ new TrackerProvider(configuration.metadataOptions().keyspace))
+ .run())
+ .reduce(Differ::accumulate);
+ // Publish results. This also removes the job from the currently running list
+ job.finalizeJob(params.jobId, diffStats);
+ logger.info("FINISHED: {}", diffStats);
+ } catch (Exception e) {
+ // If the job errors out, try and mark the job as not running, so it can be restarted.
+ // If the error was thrown from JobMetadataDb.finalizeJob *after* the job had already
+ // been marked not running, this will log a warning, but is not fatal.
+ if (job != null && jobId != null)
+ job.markNotRunning(jobId);
+ throw new RuntimeException("Diff job failed", e);
+ } finally {
+ if (sc.isLocal())
+ Differ.shutdown();
+ }
+ }
+
+ private static Params getJobParams(JobMetadataDb.JobLifeCycle job, JobConfiguration conf) {
+ if (conf.jobId().isPresent()) {
+ return job.getJobParams(conf.jobId().get());
+ } else {
+ return new Params(UUID.randomUUID(),
+ conf.keyspace(),
+ conf.tables(),
+ conf.buckets(),
+ conf.splits());
+ }
+ }
+
+ private static List<Split> getSplits(JobConfiguration config, TokenHelper tokenHelper) {
+ logger.info("Initializing splits");
+ List<Split> splits = calculateSplits(config.splits(), config.buckets(), tokenHelper);
+ logger.info("All Splits: {}", splits);
+ if (!config.specificTokens().isEmpty() && config.specificTokens().modifier == SpecificTokens.Modifier.ACCEPT) {
+ splits = getSplitsForTokens(config.specificTokens().tokens, splits);
+ logger.info("Splits for specific tokens ONLY: {}", splits);
+ }
+ // shuffle the splits to make sure the work is spread over the workers,
+ // important if it isn't a full cluster is being compared
+ Collections.shuffle(splits);
+ return splits;
+ }
+
+ @VisibleForTesting
+ static List<Split> calculateSplits(int numSplits, int numBuckets, TokenHelper tokenHelper) {
+ List<Split> splits = new ArrayList<>(numSplits);
+ BigInteger minToken = tokenHelper.min();
+ BigInteger maxToken = tokenHelper.max();
+
+ BigInteger totalTokens = maxToken.subtract(minToken);
+ BigInteger segmentSize = totalTokens.divide(BigInteger.valueOf(numSplits));
+
+ // add the first split starting at minToken without adding BigInt.ONE below
+ // Splits are grouped into buckets so we can shard the journal info across
+ // C* partitions
+ splits.add(new Split(0, 0, minToken, minToken.add(segmentSize)));
+ BigInteger prev = minToken.add(segmentSize);
+ for (int i = 1; i < numSplits - 1; i++) {
+ BigInteger next = prev.add(segmentSize);
+ // add ONE to avoid split overlap
+ splits.add(new Split(i, i % numBuckets, prev.add(BigInteger.ONE), next));
+ prev = next;
+ }
+ splits.add(new Split(numSplits - 1, (numSplits - 1) % numBuckets, prev.add(BigInteger.ONE), maxToken)); // make sure we cover the whole range
+ return splits;
+ }
+
+ @VisibleForTesting
+ static List<Split> getSplitsForTokens(Set<BigInteger> tokens, List<Split> splits) {
+ return splits.stream().filter(split -> split.containsAny(tokens)).collect(Collectors.toList());
+ }
+
+ @VisibleForTesting
+ static class Split implements Serializable {
+ final int splitNumber;
+ final int bucket;
+ final BigInteger start;
+ final BigInteger end;
+
+ Split(int splitNumber, int bucket, BigInteger start, BigInteger end) {
+ this.splitNumber = splitNumber;
+ this.bucket = bucket;
+ this.start = start;
+ this.end = end;
+ }
+
+ public String toString() {
+ return "Split [" +
+ start +
+ ", " +
+ end +
+ ']';
+ }
+
+ public boolean containsAny(Set<BigInteger> specificTokens) {
+ for (BigInteger specificToken : specificTokens) {
+ if (specificToken.compareTo(start) >= 0 && specificToken.compareTo(end) <= 0)
+ return true;
+ }
+ return false;
+ }
+ }
+
+ static class Params implements Serializable {
+ public final UUID jobId;
+ public final String keyspace;
+ public final ImmutableList<String> tables;
+ public final int buckets;
+ public final int tasks;
+
+ Params(UUID jobId, String keyspace, List<String> tables, int buckets, int tasks) {
+ this.jobId = jobId;
+ this.keyspace = keyspace;
+ this.tables = ImmutableList.copyOf(tables);
+ this.buckets = buckets;
+ this.tasks = tasks;
+ }
+
+ public String toString() {
+ return String.format("Params: [jobId: %s, keyspace: %s, tables: %s, buckets: %s, tasks: %s]",
+ jobId, keyspace, tables.stream().collect(Collectors.joining(",")), buckets, tasks);
+ }
+ }
+
+ static class TaskStatus {
+ public static final TaskStatus EMPTY = new TaskStatus(null, null);
+ public final BigInteger lastToken;
+ public final RangeStats stats;
+
+ TaskStatus(BigInteger lastToken, RangeStats stats) {
+ this.lastToken = lastToken;
+ this.stats = stats;
+ }
+
+ public String toString() {
+ return "TaskStatus{" +
+ "lastToken=" + lastToken +
+ ", stats=" + stats +
+ '}';
+ }
+ }
+
+ public static class TrackerProvider implements Serializable {
+ private final String metadataKeyspace;
+
+ TrackerProvider(String metadataKeyspace) {
+ this.metadataKeyspace = metadataKeyspace;
+ }
+
+ public void initializeStatements(Session session) {
+ JobMetadataDb.ProgressTracker.initializeStatements(session, metadataKeyspace);
+ }
+
+ public JobMetadataDb.ProgressTracker getTracker(Session session, UUID jobId, Split split) {
+ return new JobMetadataDb.ProgressTracker(jobId, split.bucket, split.start, split.end, metadataKeyspace, session);
+ }
+ }
+ }
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
new file mode 100644
index 0000000..49576a2
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.math.BigInteger;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.function.BiConsumer;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Verify;
+import com.google.common.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Session;
+
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+public class Differ implements Serializable
+{
+ private static final Logger logger = LoggerFactory.getLogger(Differ.class);
+
+ private static final MetricRegistry metrics = new MetricRegistry();
+
+ private static final int COMPARISON_THREADS = 8;
+ private static final ComparisonExecutor COMPARISON_EXECUTOR = ComparisonExecutor.newExecutor(COMPARISON_THREADS, metrics);
+
+ private final UUID jobId;
+ private final DiffJob.Split split;
+ private final TokenHelper tokenHelper;
+ private final String keyspace;
+ private final List<String> tables;
+ private final RateLimiter rateLimiter;
+ private final DiffJob.TrackerProvider trackerProvider;
+ private final double reverseReadProbability;
+ private final SpecificTokens specificTokens;
+
+ private static DiffCluster srcDiffCluster;
+ private static DiffCluster targetDiffCluster;
+ private static Session journalSession;
+
+ static
+ {
+ Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
+ StringWriter stackTrace = new StringWriter();
+ e.printStackTrace(new PrintWriter(stackTrace));
+ System.out.println("UNCAUGHT EXCEPTION: " + stackTrace.toString());
+ throw new RuntimeException(e);
+ });
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ logger.info("In shutdown hook");
+ shutdown();
+ }));
+ }
+
+ public Differ(JobConfiguration config,
+ DiffJob.Params params,
+ int perExecutorRateLimit,
+ DiffJob.Split split,
+ TokenHelper tokenHelper,
+ ClusterProvider sourceProvider,
+ ClusterProvider targetProvider,
+ ClusterProvider metadataProvider,
+ DiffJob.TrackerProvider trackerProvider)
+ {
+ logger.info("Creating Differ for {}", split);
+ this.jobId = params.jobId;
+ this.split = split;
+ this.tokenHelper = tokenHelper;
+ this.keyspace = params.keyspace;
+ this.tables = params.tables;
+ this.trackerProvider = trackerProvider;
+ rateLimiter = RateLimiter.create(perExecutorRateLimit);
+ this.reverseReadProbability = config.reverseReadProbability();
+ this.specificTokens = config.specificTokens();
+ synchronized (Differ.class)
+ {
+ /*
+ Spark runs jobs on each worker in the same JVM, we need to initialize these only once, otherwise
+ we run OOM with health checker threads
+ */
+ // yes we could have JobConfiguration return this directly, but snakeyaml doesn't like relocated classes and the driver has to be shaded
+ ConsistencyLevel cl = ConsistencyLevel.valueOf(config.consistencyLevel());
+ if (srcDiffCluster == null)
+ {
+ srcDiffCluster = new DiffCluster(DiffCluster.Type.SOURCE,
+ sourceProvider.getCluster(),
+ params.keyspace,
+ cl,
+ rateLimiter,
+ config.tokenScanFetchSize(),
+ config.partitionReadFetchSize(),
+ config.readTimeoutMillis());
+ }
+
+ if (targetDiffCluster == null)
+ {
+ targetDiffCluster = new DiffCluster(DiffCluster.Type.TARGET,
+ targetProvider.getCluster(),
+ params.keyspace,
+ cl,
+ rateLimiter,
+ config.tokenScanFetchSize(),
+ config.partitionReadFetchSize(),
+ config.readTimeoutMillis());
+ }
+
+ if (journalSession == null)
+ {
+ journalSession = metadataProvider.getCluster().connect();
+ trackerProvider.initializeStatements(journalSession);
+ }
+ }
+ }
+
+ public Map<String, RangeStats> run() {
+ JobMetadataDb.ProgressTracker journal = trackerProvider.getTracker(journalSession, jobId, split);
+ Map<String, DiffJob.TaskStatus> tablesToDiff = filterTables(tables,
+ split,
+ journal::getLastStatus,
+ !specificTokens.isEmpty());
+
+ String metricsPrefix = String.format("%s.%s", srcDiffCluster.clusterId.name(), srcDiffCluster.keyspace);
+ logger.info("Diffing {} for tables {}", split, tablesToDiff);
+
+ for (Map.Entry<String, DiffJob.TaskStatus> tableStatus : tablesToDiff.entrySet()) {
+ final String table = tableStatus.getKey();
+ DiffJob.TaskStatus status = tableStatus.getValue();
+ RangeStats diffStats = status.stats;
+
+ // if this split has already been fully processed, it's being re-run to check
+ // partitions with errors. In that case, we don't want to adjust the split
+ // start and we don't want to update the completed count when we're finished.
+ boolean isRerun = split.end.equals(status.lastToken);
+ BigInteger startToken = status.lastToken == null || isRerun ? split.start : status.lastToken;
+ validateRange(startToken, split.end, tokenHelper);
+
+ TableSpec sourceTable = TableSpec.make(table, srcDiffCluster);
+ TableSpec targetTable = TableSpec.make(table, targetDiffCluster);
+ validateTableSpecs(sourceTable, targetTable);
+
+ DiffContext ctx = new DiffContext(srcDiffCluster,
+ targetDiffCluster,
+ keyspace,
+ sourceTable,
+ startToken,
+ split.end,
+ specificTokens,
+ reverseReadProbability);
+
+ String timerName = String.format("%s.%s.split_times", metricsPrefix, table);
+ try (@SuppressWarnings("unused") Timer.Context timer = metrics.timer(timerName).time()) {
+ diffStats.accumulate(diffTable(ctx,
+ (error, token) -> journal.recordError(table, token, error),
+ (type, token) -> journal.recordMismatch(table, type, token),
+ (stats, token) -> journal.updateStatus(table, stats, token)));
+
+ // update the journal with the final state for the table. Use the split's ending token
+ // as the last seen token (even though we may not have actually read any partition for
+ // that token) as this effectively marks the split as done.
+ journal.finishTable(table, diffStats, !isRerun);
+ }
+ }
+
+ Map<String, RangeStats> statsByTable = tablesToDiff.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> e.getValue().stats));
+ updateMetrics(metricsPrefix, statsByTable);
+ return statsByTable;
+ }
+
+ public RangeStats diffTable(final DiffContext context,
+ final BiConsumer<Throwable, BigInteger> partitionErrorReporter,
+ final BiConsumer<MismatchType, BigInteger> mismatchReporter,
+ final BiConsumer<RangeStats, BigInteger> journal) {
+
+ final Iterator<PartitionKey> sourceKeys = context.source.getPartitionKeys(context.table.getTable(),
+ context.startToken,
+ context.endToken);
+ final Iterator<PartitionKey> targetKeys = context.target.getPartitionKeys(context.table.getTable(),
+ context.startToken,
+ context.endToken);
+ final Function<PartitionKey, PartitionComparator> partitionTaskProvider =
+ (key) -> {
+ boolean reverse = context.shouldReverse();
+ return new PartitionComparator(context.table,
+ context.source.getPartition(context.table, key, reverse),
+ context.target.getPartition(context.table, key, reverse));
+ };
+
+ RangeComparator rangeComparator = new RangeComparator(context,
+ partitionErrorReporter,
+ mismatchReporter,
+ journal,
+ COMPARISON_EXECUTOR);
+
+ final RangeStats tableStats = rangeComparator.compare(sourceKeys, targetKeys, partitionTaskProvider);
+ logger.debug("Table [{}] stats - ({})", context.table.getTable(), tableStats);
+ return tableStats;
+ }
+
+ @VisibleForTesting
+ static Map<String, DiffJob.TaskStatus> filterTables(Iterable<String> tables,
+ DiffJob.Split split,
+ Function<String, DiffJob.TaskStatus> journal,
+ boolean includeCompleted) {
+ Map<String, DiffJob.TaskStatus> tablesToProcess = new HashMap<>();
+ for (String table : tables) {
+ DiffJob.TaskStatus taskStatus = journal.apply(table);
+ RangeStats diffStats = taskStatus.stats;
+ BigInteger lastToken = taskStatus.lastToken;
+
+ // When we finish processing a split for a given table, we update the task status in journal
+ // to set the last seen token to the split's end token, to indicate that the split is complete.
+ if (!includeCompleted && lastToken != null && lastToken.equals(split.end)) {
+ logger.info("Found finished table {} for split {}", table, split);
+ }
+ else {
+ tablesToProcess.put(table, diffStats != null
+ ? taskStatus
+ : new DiffJob.TaskStatus(taskStatus.lastToken, RangeStats.newStats()));
+ }
+ }
+ return tablesToProcess;
+ }
+
+ static void validateTableSpecs(TableSpec source, TableSpec target) {
+ Verify.verify(source.equalsNamesOnly(target),
+ "Source and target table definitions do not match (Source: %s Target: %s)",
+ source, target);
+ }
+
+ @VisibleForTesting
+ static void validateRange(BigInteger start, BigInteger end, TokenHelper tokens) {
+
+ Verify.verify(start != null && end != null, "Invalid token range [%s,%s]", start, end);
+
+ Verify.verify(start.compareTo(tokens.min()) >= 0 && end.compareTo(tokens.max()) <= 0 && start.compareTo(end) < 0,
+ "Invalid token range [%s,%s] for partitioner range [%s,%s]",
+ start, end, tokens.min(), tokens.max());
+ }
+
+ @VisibleForTesting
+ static Map<String, RangeStats> accumulate(Map<String, RangeStats> stats, Map<String, RangeStats> otherStats)
+ {
+ for (Map.Entry<String, RangeStats> otherEntry : otherStats.entrySet())
+ {
+ if (stats.containsKey(otherEntry.getKey()))
+ stats.get(otherEntry.getKey()).accumulate(otherEntry.getValue());
+ else
+ stats.put(otherEntry.getKey(), otherEntry.getValue());
+ }
+ return stats;
+ }
+
+ private static void updateMetrics(String prefix, Map<String, RangeStats> statsMap)
+ {
+ for (Map.Entry<String, RangeStats> entry : statsMap.entrySet())
+ {
+ String qualifier = String.format("%s.%s", prefix, entry.getKey());
+ RangeStats stats = entry.getValue();
+
+ metrics.meter(qualifier + ".partitions_read").mark(stats.getMatchedPartitions() + stats.getOnlyInSource() + stats.getOnlyInTarget() + stats.getMismatchedPartitions());
+ metrics.counter(qualifier + ".matched_partitions").inc(stats.getMatchedPartitions());
+ metrics.counter(qualifier + ".mismatched_partitions").inc(stats.getMismatchedPartitions());
+
+ metrics.counter(qualifier + ".partitions_only_in_source").inc(stats.getOnlyInSource());
+ metrics.counter(qualifier + ".partitions_only_in_target").inc(stats.getOnlyInTarget());
+ metrics.counter(qualifier + ".skipped_partitions").inc(stats.getSkippedPartitions());
+
+ metrics.counter(qualifier + ".matched_rows").inc(stats.getMatchedRows());
+ metrics.counter(qualifier + ".matched_values").inc(stats.getMatchedValues());
+ metrics.counter(qualifier + ".mismatched_values").inc(stats.getMismatchedValues());
+ }
+ }
+
+ public static void shutdown()
+ {
+ try
+ {
+ if (srcDiffCluster != null) {
+ srcDiffCluster.stop();
+ srcDiffCluster.close();
+ }
+ if (targetDiffCluster != null) {
+ targetDiffCluster.stop();
+ targetDiffCluster.close();
+ }
+ if (journalSession != null) {
+ journalSession.close();
+ journalSession.getCluster().close();
+ }
+ COMPARISON_EXECUTOR.shutdown();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
new file mode 100644
index 0000000..0ac6521
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -0,0 +1,567 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.UUID;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.utils.UUIDs;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+public class JobMetadataDb {
+ private static final Logger logger = LoggerFactory.getLogger(JobMetadataDb.class);
+
+ static class ProgressTracker {
+
+ private final UUID jobId;
+ private final int bucket;
+ private final String startToken;
+ private final String endToken;
+ private final String keyspace;
+ private Session session;
+
+ private static PreparedStatement updateStmt;
+ private static PreparedStatement mismatchStmt;
+ private static PreparedStatement errorSummaryStmt;
+ private static PreparedStatement errorDetailStmt;
+ private static PreparedStatement updateCompleteStmt;
+
+ public ProgressTracker(UUID jobId,
+ int bucket,
+ BigInteger startToken,
+ BigInteger endToken,
+ String keyspace,
+ Session session) {
+ this.jobId = jobId;
+ this.bucket = bucket;
+ this.startToken = startToken.toString();
+ this.endToken = endToken.toString();
+ this.keyspace = keyspace;
+ this.session = session;
+ }
+
+ /**
+ * Runs on each executor to prepare statements shared across all instances
+ */
+ public static void initializeStatements(Session session, String keyspace) {
+ if (updateStmt == null) {
+ updateStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+ " job_id," +
+ " bucket," +
+ " table_name," +
+ " start_token," +
+ " end_token," +
+ " matched_partitions," +
+ " mismatched_partitions," +
+ " partitions_only_in_source," +
+ " partitions_only_in_target," +
+ " matched_rows," +
+ " matched_values," +
+ " mismatched_values," +
+ " skipped_partitions," +
+ " last_token )" +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ keyspace, Schema.TASK_STATUS));
+ }
+ if (mismatchStmt == null) {
+ mismatchStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+ " job_id," +
+ " bucket," +
+ " table_name," +
+ " mismatching_token," +
+ " mismatch_type )" +
+ "VALUES (?, ?, ?, ?, ?)",
+ keyspace, Schema.MISMATCHES));
+ }
+ if (updateCompleteStmt == null) {
+ updateCompleteStmt = session.prepare(String.format("UPDATE %s.%s " +
+ " SET completed = completed + 1" +
+ " WHERE job_id = ? " +
+ " AND bucket = ? " +
+ " AND table_name = ? ",
+ keyspace, Schema.JOB_STATUS))
+ .setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
+ }
+ if (errorSummaryStmt == null) {
+ errorSummaryStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+ " job_id," +
+ " bucket," +
+ " table_name," +
+ " start_token," +
+ " end_token)" +
+ " VALUES (?, ?, ?, ?, ?)",
+ keyspace, Schema.ERROR_SUMMARY));
+ }
+ if (errorDetailStmt == null) {
+ errorDetailStmt = session.prepare(String.format("INSERT INTO %s.%s (" +
+ " job_id," +
+ " bucket," +
+ " table_name," +
+ " start_token," +
+ " end_token," +
+ " error_token)" +
+ " VALUES (?, ?, ?, ?, ?, ?)",
+ keyspace, Schema.ERROR_DETAIL));
+ }
+
+ }
+
+ /**
+ *
+ * @param table
+ * @return
+ */
+ public DiffJob.TaskStatus getLastStatus(String table) {
+ ResultSet rs = session.execute(String.format("SELECT last_token, " +
+ " matched_partitions, " +
+ " mismatched_partitions, " +
+ " partitions_only_in_source, " +
+ " partitions_only_in_target, " +
+ " matched_rows," +
+ " matched_values," +
+ " mismatched_values," +
+ " skipped_partitions " +
+ " FROM %s.%s " +
+ " WHERE job_id = ? " +
+ " AND bucket = ? " +
+ " AND table_name = ? " +
+ " AND start_token = ? " +
+ " AND end_token = ?",
+ keyspace, Schema.TASK_STATUS),
+ jobId, bucket, table, startToken, endToken);
+ Row row = rs.one();
+ if (null == row)
+ return DiffJob.TaskStatus.EMPTY;
+
+ RangeStats stats = RangeStats.withValues(getOrDefaultLong(row, "matched_partitions"),
+ getOrDefaultLong(row, "mismatched_partitions"),
+ 0L, // error counts are per-run and not persisted in the metadata db
+ getOrDefaultLong(row, "skipped_partitions"),
+ getOrDefaultLong(row, "partitions_only_in_source"),
+ getOrDefaultLong(row, "partitions_only_in_target"),
+ getOrDefaultLong(row, "matched_rows"),
+ getOrDefaultLong(row, "matched_values"),
+ getOrDefaultLong(row, "mismatched_values"));
+
+ BigInteger lastToken = row.isNull("last_token") ? null : new BigInteger(row.getString("last_token"));
+ return new DiffJob.TaskStatus(lastToken, stats);
+ }
+
+ /**
+ *
+ * @param table
+ * @param diffStats
+ * @param latestToken
+ */
+ public void updateStatus(String table, RangeStats diffStats, BigInteger latestToken) {
+ session.execute(bindUpdateStatement(table, diffStats, latestToken));
+ }
+
+ public void recordMismatch(String table, MismatchType type, BigInteger token) {
+ logger.info("Detected mismatch in table {}; partition with token {} is {}",
+ table, token, type == MismatchType.PARTITION_MISMATCH
+ ? " different in source and target clusters"
+ : type == MismatchType.ONLY_IN_SOURCE ? "only present in source cluster"
+ : "only present in target cluster");
+ session.execute(bindMismatchesStatement(table, token, type.name()));
+ }
+
+ /**
+ *
+ * @param table
+ * @param token
+ * @param error
+ */
+ public void recordError(String table, BigInteger token, Throwable error) {
+ logger.error(String.format("Encountered error during partition comparison in table %s; " +
+ "error for partition with token %s", table, token), error);
+ BatchStatement batch = new BatchStatement();
+ batch.add(bindErrorSummaryStatement(table));
+ batch.add(bindErrorDetailStatement(table, token));
+ batch.setIdempotent(true);
+ session.execute(batch);
+ }
+
+ /**
+ *
+ * @param table
+ * @param stats
+ */
+ public void finishTable(String table, RangeStats stats, boolean updateCompletedCount) {
+ logger.info("Finishing range [{}, {}] for table {}", startToken, endToken, table);
+ // first flush out the last status.
+ session.execute(bindUpdateStatement(table, stats, endToken));
+ // then update the count of completed tasks
+ if (updateCompletedCount)
+ session.execute(updateCompleteStmt.bind(jobId, bucket, table));
+ }
+
+ private Statement bindMismatchesStatement(String table, BigInteger token, String type) {
+ return mismatchStmt.bind(jobId, bucket, table, token.toString(), type)
+ .setIdempotent(true);
+ }
+
+ private Statement bindErrorSummaryStatement(String table) {
+ return errorSummaryStmt.bind(jobId, bucket, table, startToken, endToken)
+ .setIdempotent(true);
+ }
+
+ private Statement bindErrorDetailStatement(String table, BigInteger errorToken) {
+ return errorDetailStmt.bind(jobId, bucket, table, startToken, endToken, errorToken.toString())
+ .setIdempotent(true);
+ }
+
+ private Statement bindUpdateStatement(String table, RangeStats stats, BigInteger token) {
+ return bindUpdateStatement(table, stats, token.toString());
+ }
+
+ private Statement bindUpdateStatement(String table, RangeStats stats, String token) {
+ // We don't persist the partition error count from RangeStats as errors
+ // are likely to be transient and not data related, so we don't want to
+ // accumulate them across runs.
+ return updateStmt.bind(jobId,
+ bucket,
+ table,
+ startToken,
+ endToken,
+ stats.getMatchedPartitions(),
+ stats.getMismatchedPartitions(),
+ stats.getOnlyInSource(),
+ stats.getOnlyInTarget(),
+ stats.getMatchedRows(),
+ stats.getMatchedValues(),
+ stats.getMismatchedValues(),
+ stats.getSkippedPartitions(),
+ token)
+ .setIdempotent(true);
+ }
+
+ private static long getOrDefaultLong(Row row, String column) {
+ return (null == row || row.isNull(column)) ? 0L : row.getLong(column);
+ }
+ }
+
+ static class JobLifeCycle {
+ final Session session;
+ final String keyspace;
+
+ public JobLifeCycle(Session session, String metadataKeyspace) {
+ this.session = session;
+ this.keyspace = metadataKeyspace;
+ }
+
+ public DiffJob.Params getJobParams(UUID jobId) {
+ ResultSet rs = session.execute(String.format("SELECT keyspace_name, " +
+ " table_names," +
+ " buckets," +
+ " total_tasks " +
+ "FROM %s.%s " +
+ "WHERE job_id = ?",
+ keyspace, Schema.JOB_SUMMARY),
+ jobId);
+ Row row = rs.one();
+ if (null == row)
+ return null;
+
+ return new DiffJob.Params(jobId,
+ row.getString("keyspace_name"),
+ row.getList("table_names", String.class),
+ row.getInt("buckets"),
+ row.getInt("total_tasks"));
+ }
+
+
+ // Runs on Driver to insert top level job info
+ public void initializeJob(DiffJob.Params params,
+ String sourceClusterName,
+ String sourceClusterDesc,
+ String targetClusterName,
+ String targetClusterDesc) {
+
+ logger.info("Initializing job status");
+ // The job was previously run, so this could be a re-run to
+ // mop up any failed splits so mark it in progress.
+ ResultSet rs = session.execute(String.format("INSERT INTO %s.%s (job_id) VALUES (?) IF NOT EXISTS",
+ keyspace, Schema.RUNNING_JOBS),
+ params.jobId);
+ if (!rs.one().getBool("[applied]")) {
+ logger.info("Aborting due to inability to mark job as running. " +
+ "Did a previous run of job id {} fail non-gracefully?",
+ params.jobId);
+ throw new RuntimeException("Unable to mark job running, aborting");
+ }
+
+ UUID timeUUID = UUIDs.timeBased();
+ DateTime startDateTime = new DateTime(UUIDs.unixTimestamp(timeUUID), DateTimeZone.UTC);
+
+ rs = session.execute(String.format("INSERT INTO %s.%s (" +
+ " job_id," +
+ " job_start_time," +
+ " buckets," +
+ " keyspace_name," +
+ " table_names," +
+ " source_cluster_name," +
+ " source_cluster_desc," +
+ " target_cluster_name," +
+ " target_cluster_desc," +
+ " total_tasks)" +
+ " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" +
+ " IF NOT EXISTS",
+ keyspace, Schema.JOB_SUMMARY),
+ params.jobId,
+ timeUUID,
+ params.buckets,
+ params.keyspace,
+ params.tables,
+ sourceClusterName,
+ sourceClusterDesc,
+ targetClusterName,
+ targetClusterDesc,
+ params.tasks);
+
+ // This is a brand new job, index its details including start time
+ if (rs.one().getBool("[applied]")) {
+ BatchStatement batch = new BatchStatement();
+ batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (source_cluster_name, job_id) VALUES (?, ?)",
+ keyspace, Schema.SOURCE_CLUSTER_INDEX),
+ sourceClusterName, params.jobId));
+ batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (target_cluster_name, job_id) VALUES (?, ?)",
+ keyspace, Schema.TARGET_CLUSTER_INDEX),
+ targetClusterName, params.jobId));
+ batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (keyspace_name, job_id) VALUES (?, ?)",
+ keyspace, Schema.KEYSPACE_INDEX),
+ keyspace, params.jobId));
+ batch.add(new SimpleStatement(String.format("INSERT INTO %s.%s (job_start_date, job_start_hour, job_start_time, job_id) " +
+ "VALUES ('%s', ?, ?, ?)",
+ keyspace, Schema.JOB_START_INDEX, startDateTime.toString("yyyy-MM-dd")),
+ startDateTime.getHourOfDay(), timeUUID, params.jobId));
+ session.execute(batch);
+ }
+ }
+
+ public void finalizeJob(UUID jobId, Map<String, RangeStats> results) {
+ logger.info("Finalizing job status");
+
+ markNotRunning(jobId);
+
+ BatchStatement batch = new BatchStatement();
+ for (Map.Entry<String, RangeStats> result : results.entrySet()) {
+ String table = result.getKey();
+ RangeStats stats = result.getValue();
+ session.execute(String.format("INSERT INTO %s.%s (" +
+ " job_id," +
+ " table_name," +
+ " matched_partitions," +
+ " mismatched_partitions," +
+ " partitions_only_in_source," +
+ " partitions_only_in_target," +
+ " matched_rows," +
+ " matched_values," +
+ " mismatched_values," +
+ " skipped_partitions) " +
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
+ keyspace, Schema.JOB_RESULTS),
+ jobId,
+ table,
+ stats.getMatchedPartitions(),
+ stats.getMismatchedPartitions(),
+ stats.getOnlyInSource(),
+ stats.getOnlyInTarget(),
+ stats.getMatchedRows(),
+ stats.getMatchedValues(),
+ stats.getMismatchedValues(),
+ stats.getSkippedPartitions());
+ }
+ session.execute(batch);
+ }
+
+
+ public void markNotRunning(UUID jobId) {
+ logger.info("Marking job {} as not running", jobId);
+
+ ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
+ keyspace, Schema.RUNNING_JOBS),
+ jobId);
+ if (!rs.one().getBool("[applied]")) {
+ logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
+ "during initialization as there may be no entry for this job in the {} table",
+ jobId, Schema.RUNNING_JOBS);
+ }
+ }
+ }
+
+ static class Schema {
+
+ public static final String TASK_STATUS = "task_status";
+ private static final String TASK_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_id uuid," +
+ " bucket int," +
+ " table_name text," +
+ " start_token varchar," +
+ " end_token varchar," +
+ " matched_partitions bigint," +
+ " mismatched_partitions bigint, " +
+ " partitions_only_in_source bigint," +
+ " partitions_only_in_target bigint," +
+ " matched_rows bigint," +
+ " matched_values bigint," +
+ " mismatched_values bigint," +
+ " skipped_partitions bigint," +
+ " last_token varchar," +
+ " PRIMARY KEY((job_id, bucket), table_name, start_token, end_token))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String JOB_SUMMARY = "job_summary";
+ private static final String JOB_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_id uuid," +
+ " job_start_time timeuuid," +
+ " buckets int," +
+ " keyspace_name text," +
+ " table_names frozen<list<text>>," +
+ " source_cluster_name text," +
+ " source_cluster_desc text," +
+ " target_cluster_name text," +
+ " target_cluster_desc text," +
+ " total_tasks int," +
+ " PRIMARY KEY(job_id))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String JOB_RESULTS = "job_results";
+ private static final String JOB_RESULTS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_id uuid," +
+ " table_name text," +
+ " matched_partitions bigint," +
+ " mismatched_partitions bigint," +
+ " partitions_only_in_source bigint," +
+ " partitions_only_in_target bigint," +
+ " matched_rows bigint," +
+ " matched_values bigint," +
+ " mismatched_values bigint," +
+ " skipped_partitions bigint," +
+ " PRIMARY KEY(job_id, table_name))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String JOB_STATUS = "job_status";
+ private static final String JOB_STATUS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_id uuid," +
+ " bucket int," +
+ " table_name text," +
+ " completed counter," +
+ " PRIMARY KEY ((job_id, bucket), table_name))";
+
+ public static final String MISMATCHES = "mismatches";
+ private static final String MISMATCHES_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_id uuid," +
+ " bucket int," +
+ " table_name text, " +
+ " mismatching_token varchar, " +
+ " mismatch_type text, " +
+ " PRIMARY KEY ((job_id, bucket), table_name, mismatching_token))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String ERROR_SUMMARY = "task_errors";
+ private static final String ERROR_SUMMARY_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_id uuid," +
+ " bucket int," +
+ " table_name text," +
+ " start_token varchar," +
+ " end_token varchar," +
+ " PRIMARY KEY ((job_id, bucket), table_name, start_token, end_token))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String ERROR_DETAIL = "partition_errors";
+ private static final String ERROR_DETAIL_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_id uuid," +
+ " bucket int," +
+ " table_name text," +
+ " start_token varchar," +
+ " end_token varchar," +
+ " error_token varchar," +
+ " PRIMARY KEY ((job_id, bucket, table_name, start_token, end_token), error_token))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String SOURCE_CLUSTER_INDEX = "source_cluster_index";
+ private static final String SOURCE_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " source_cluster_name text," +
+ " job_id uuid," +
+ " PRIMARY KEY (source_cluster_name, job_id))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String TARGET_CLUSTER_INDEX = "target_cluster_index";
+ private static final String TARGET_CLUSTER_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " target_cluster_name text," +
+ " job_id uuid," +
+ " PRIMARY KEY (target_cluster_name, job_id))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String KEYSPACE_INDEX = "keyspace_index";
+ private static final String KEYSPACE_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " keyspace_name text," +
+ " job_id uuid," +
+ " PRIMARY KEY(keyspace_name, job_id))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String JOB_START_INDEX = "job_start_index";
+ private static final String JOB_START_INDEX_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_start_date date," +
+ " job_start_hour int," +
+ " job_start_time timeuuid," +
+ " job_id uuid," +
+ " PRIMARY KEY ((job_start_date, job_start_hour), job_start_time))" +
+ " WITH default_time_to_live = %s";
+
+ public static final String RUNNING_JOBS = "running_jobs";
+ private static final String RUNNING_JOBS_SCHEMA = "CREATE TABLE IF NOT EXISTS %s.%s (" +
+ " job_id uuid," +
+ " PRIMARY KEY (job_id))" +
+ " WITH default_time_to_live = %s";
+
+ private static final String KEYSPACE_SCHEMA = "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = %s";
+
+
+ public static void maybeInitialize(Session session, MetadataKeyspaceOptions options) {
+ if (!options.should_init)
+ return;
+
+ logger.info("Initializing cassandradiff journal schema in \"{}\" keyspace", options.keyspace);
+ session.execute(String.format(KEYSPACE_SCHEMA, options.keyspace, options.replication));
+ session.execute(String.format(JOB_SUMMARY_SCHEMA, options.keyspace, JOB_SUMMARY, options.ttl));
+ session.execute(String.format(JOB_STATUS_SCHEMA, options.keyspace, JOB_STATUS));
+ session.execute(String.format(JOB_RESULTS_SCHEMA, options.keyspace, JOB_RESULTS, options.ttl));
+ session.execute(String.format(TASK_STATUS_SCHEMA, options.keyspace, TASK_STATUS, options.ttl));
+ session.execute(String.format(MISMATCHES_SCHEMA, options.keyspace, MISMATCHES, options.ttl));
+ session.execute(String.format(ERROR_SUMMARY_SCHEMA, options.keyspace, ERROR_SUMMARY, options.ttl));
+ session.execute(String.format(ERROR_DETAIL_SCHEMA, options.keyspace, ERROR_DETAIL, options.ttl));
+ session.execute(String.format(SOURCE_CLUSTER_INDEX_SCHEMA, options.keyspace, SOURCE_CLUSTER_INDEX, options.ttl));
+ session.execute(String.format(TARGET_CLUSTER_INDEX_SCHEMA, options.keyspace, TARGET_CLUSTER_INDEX, options.ttl));
+ session.execute(String.format(KEYSPACE_INDEX_SCHEMA, options.keyspace, KEYSPACE_INDEX, options.ttl));
+ session.execute(String.format(JOB_START_INDEX_SCHEMA, options.keyspace, JOB_START_INDEX, options.ttl));
+ session.execute(String.format(RUNNING_JOBS_SCHEMA, options.keyspace, RUNNING_JOBS, options.ttl));
+ logger.info("Schema initialized");
+ }
+ }
+}
+
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java b/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java
new file mode 100644
index 0000000..b23ac35
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/MismatchType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+public enum MismatchType {
+
+ ONLY_IN_SOURCE,
+ ONLY_IN_TARGET,
+ PARTITION_MISMATCH
+
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
new file mode 100644
index 0000000..6434dc8
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.Iterator;
+import java.util.concurrent.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.*;
+
+public class PartitionComparator implements Callable<PartitionStats> {
+
+ private static final Logger logger = LoggerFactory.getLogger(PartitionComparator.class);
+
+ private final TableSpec tableSpec;
+ private final Iterator<Row> source;
+ private final Iterator<Row> target;
+
+ public PartitionComparator(TableSpec tableSpec,
+ Iterator<Row> source,
+ Iterator<Row> target) {
+ this.tableSpec = tableSpec;
+ this.source = source;
+ this.target = target;
+ }
+
+ public PartitionStats call() {
+ PartitionStats partitionStats = new PartitionStats();
+
+ if (source == null || target == null) {
+ logger.error("Skipping partition because one result was null (timeout despite retries)");
+ partitionStats.skipped = true;
+ return partitionStats;
+ }
+
+ while (source.hasNext() && target.hasNext()) {
+
+ Row sourceRow = source.next();
+ Row targetRow = target.next();
+
+ // if primary keys don't match don't proceed any further, just mark the
+ // partition as mismatched and be done
+ if (!clusteringsEqual(sourceRow, targetRow)) {
+ partitionStats.allClusteringsMatch = false;
+ return partitionStats;
+ }
+
+ partitionStats.matchedRows++;
+
+ // if the rows match, but there are mismatching values in the regular columns
+ // we can continue processing the partition, so just flag it as mismatched and continue
+ checkRegularColumnEquality(partitionStats, sourceRow, targetRow);
+ }
+
+ // if one of the iterators isn't exhausted, then there's a mismatch at the partition level
+ if (source.hasNext() || target.hasNext())
+ partitionStats.allClusteringsMatch = false;
+
+ return partitionStats;
+ }
+
+ private boolean clusteringsEqual(Row source, Row target) {
+ for (ColumnMetadata column : tableSpec.getClusteringColumns()) {
+ Object fromSource = source.getObject(column.getName());
+ Object fromTarget = target.getObject(column.getName());
+
+ if ((fromSource == null) != (fromTarget == null))
+ return false;
+
+ if (fromSource != null && !fromSource.equals(fromTarget))
+ return false;
+ }
+ return true;
+ }
+
+ private void checkRegularColumnEquality(PartitionStats stats, Row source, Row target) {
+ for (ColumnMetadata column : tableSpec.getRegularColumns()) {
+ Object fromSource = source.getObject(column.getName());
+ Object fromTarget = target.getObject(column.getName());
+ if (fromSource == null) {
+ if (fromTarget == null) {
+ stats.matchedValues++;
+ } else {
+ stats.mismatchedValues++;
+ }
+ } else {
+ if (fromSource.equals(fromTarget)) {
+ stats.matchedValues++;
+ } else {
+ stats.mismatchedValues++;
+ }
+ }
+ }
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
new file mode 100644
index 0000000..d31da4f
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionKey.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import com.datastax.driver.core.*;
+import org.jetbrains.annotations.NotNull;
+
+public class PartitionKey implements Comparable<PartitionKey> {
+
+ private final Row row;
+
+ public PartitionKey(Row row) {
+ this.row = row;
+ }
+
+ public BigInteger getTokenAsBigInteger(){
+ Token token = getToken();
+ if (token.getType() == DataType.bigint()) {
+ return BigInteger.valueOf((Long) token.getValue());
+ } else {
+ return (BigInteger) token.getValue();
+ }
+ }
+
+ public List<Object> getComponents() {
+ int cols = row.getColumnDefinitions().size();
+ List<Object> columns = new ArrayList<>(cols);
+ // Note we start at index=1, because index=0 is the token
+ for (int i = 1; i < cols; i++)
+ columns.add(row.getObject(i));
+ return columns;
+ }
+
+ @VisibleForTesting
+ protected Token getToken() {
+ return row.getToken(0);
+ }
+
+ public int compareTo(@NotNull PartitionKey o) {
+ return getToken().compareTo(o.getToken());
+ }
+
+ public boolean equals(Object obj) {
+ return this == obj || (obj instanceof PartitionKey && this.compareTo((PartitionKey)obj) == 0);
+ }
+
+ public int hashCode() {
+ return Objects.hash(getTokenAsBigInteger());
+ }
+
+ public String toString() {
+ return StreamSupport.stream(row.getColumnDefinitions().spliterator(), false)
+ .map(ColumnDefinitions.Definition::getName)
+ .map(row::getObject)
+ .filter(Objects::nonNull)
+ .map(Object::toString)
+ .collect(Collectors.joining(":"));
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java
new file mode 100644
index 0000000..7f020ff
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionStats.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+public class PartitionStats {
+ public boolean skipped = false;
+ public boolean allClusteringsMatch = true;
+ public long matchedRows;
+ public long matchedValues;
+ public long mismatchedValues;
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
new file mode 100644
index 0000000..36ce2b5
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.*;
+
+import com.google.common.base.Verify;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RangeComparator {
+
+ private static final Logger logger = LoggerFactory.getLogger(RangeComparator.class);
+
+ private final DiffContext context;
+ private final BiConsumer<Throwable, BigInteger> errorReporter;
+ private final BiConsumer<MismatchType, BigInteger> mismatchReporter;
+ private final BiConsumer<RangeStats, BigInteger> journal;
+ private final ComparisonExecutor comparisonExecutor;
+
+ public RangeComparator(DiffContext context,
+ BiConsumer<Throwable, BigInteger> errorReporter,
+ BiConsumer<MismatchType,BigInteger> mismatchReporter,
+ BiConsumer<RangeStats, BigInteger> journal,
+ ComparisonExecutor comparisonExecutor) {
+ this.context = context;
+ this.errorReporter = errorReporter;
+ this.mismatchReporter = mismatchReporter;
+ this.journal = journal;
+ this.comparisonExecutor = comparisonExecutor;
+ }
+
+ public RangeStats compare(Iterator<PartitionKey> sourceKeys,
+ Iterator<PartitionKey> targetKeys,
+ Function<PartitionKey, PartitionComparator> partitionTaskProvider) {
+
+ final RangeStats rangeStats = RangeStats.newStats();
+ // We can catch this condition earlier, but it doesn't hurt to also check here
+ if (context.startToken.equals(context.endToken))
+ return rangeStats;
+
+ Phaser phaser = new Phaser(1);
+ AtomicLong partitionCount = new AtomicLong(0);
+ AtomicReference<BigInteger> highestTokenSeen = new AtomicReference<>(context.startToken);
+
+ logger.info("Comparing range [{},{}]", context.startToken, context.endToken);
+ try {
+ PartitionKey sourceKey = nextKey(sourceKeys);
+ PartitionKey targetKey = nextKey(targetKeys);
+
+ // special case for start of range - handles one cluster supplying an empty range
+ if ((sourceKey == null) != (targetKey == null)) {
+ if (sourceKey == null) {
+ logger.info("First in range, source iter is empty {}", context);
+ onlyInTarget(rangeStats, targetKey);
+ targetKeys.forEachRemaining(key -> onlyInTarget(rangeStats, key));
+ } else {
+ logger.info("First in range, target iter is empty {}", context);
+ onlyInSource(rangeStats, sourceKey);
+ sourceKeys.forEachRemaining(key -> onlyInSource(rangeStats, key));
+ }
+ return rangeStats;
+ }
+
+ while (sourceKey != null && targetKey != null) {
+
+ int ret = sourceKey.compareTo(targetKey);
+ if (ret > 0) {
+ onlyInTarget(rangeStats, targetKey);
+ targetKey = nextKey(targetKeys);
+ } else if (ret < 0) {
+ onlyInSource(rangeStats, sourceKey);
+ sourceKey = nextKey(sourceKeys);
+ } else {
+
+ Verify.verify(sourceKey.equals(targetKey),
+ "Can only compare partitions with identical keys: (%s, %s)",
+ sourceKey, targetKey);
+
+ // For results where the key exists in both, we'll fire off an async task to walk the
+ // partition and compare all the rows. The result of that comparison is added to the
+ // totals for the range and the highest seen token updated in the onSuccess callback
+
+ if (!context.isTokenAllowed(sourceKey.getTokenAsBigInteger())) {
+ logger.debug("Skipping disallowed token {}", sourceKey.getTokenAsBigInteger());
+ rangeStats.skipPartition();
+ sourceKey = nextKey(sourceKeys);
+ targetKey = nextKey(targetKeys);
+ continue;
+ }
+
+ BigInteger token = sourceKey.getTokenAsBigInteger();
+ try {
+ PartitionComparator comparisonTask = partitionTaskProvider.apply(sourceKey);
+ comparisonExecutor.submit(comparisonTask,
+ onSuccess(rangeStats, partitionCount, token, highestTokenSeen, mismatchReporter, journal),
+ onError(rangeStats, token, errorReporter),
+ phaser);
+ } catch (Throwable t) {
+ // Handle errors thrown when creating the comparison task. This should trap timeouts and
+ // unavailables occurring when performing the initial query to read the full partition.
+ // Errors thrown when paging through the partition in comparisonTask will be handled by
+ // the onError callback.
+ rangeStats.partitionError();
+ errorReporter.accept(t, token);
+ } finally {
+ // if the cluster has been shutdown because the task failed the underlying iterators
+ // of partition keys will return hasNext == false
+ sourceKey = nextKey(sourceKeys);
+ targetKey = nextKey(targetKeys);
+ }
+ }
+ }
+
+ // handle case where only one iterator is exhausted
+ if (sourceKey != null)
+ onlyInSource(rangeStats, sourceKey);
+ else if (targetKey != null)
+ onlyInTarget(rangeStats, targetKey);
+
+ drain(sourceKeys, targetKeys, rangeStats);
+
+ } catch (Exception e) {
+ // Handles errors thrown by iteration of underlying resultsets of partition keys by
+ // calls to nextKey(). Such errors should cause the overall range comparison to fail,
+ // but we must ensure that any in-flight partition comparisons complete so that either
+ // the onSuccess or onError callback is fired for each one. This is necessary to ensure
+ // that we record the highest seen token and any failed partitions and can safely re-run.
+ logger.debug("Waiting for {} in flight tasks before propagating error", phaser.getUnarrivedParties());
+ phaser.arriveAndAwaitAdvance();
+ throw new RuntimeException(String.format("Error encountered during range comparison for [%s:%s]",
+ context.startToken, context.endToken), e);
+ }
+
+ logger.debug("Waiting for {} in flight tasks before returning", phaser.getUnarrivedParties());
+ phaser.arriveAndAwaitAdvance();
+
+ if (!rangeStats.allMatches())
+ logger.info("Segment [{}:{}] stats - ({})", context.startToken, context.endToken, rangeStats);
+
+ return rangeStats;
+ }
+
+ private void drain(Iterator<PartitionKey> sourceKeys,
+ Iterator<PartitionKey> targetKeys,
+ RangeStats rangeStats) {
+ if (sourceKeys.hasNext()) {
+ logger.info("Source keys not exhausted {}", context);
+ sourceKeys.forEachRemaining(key -> onlyInSource(rangeStats, key));
+ } else if (targetKeys.hasNext()) {
+ logger.info("Target keys not exhausted: {}", context);
+ targetKeys.forEachRemaining(key -> onlyInTarget(rangeStats, key));
+ }
+ }
+
+ private void onlyInTarget(RangeStats stats, PartitionKey key) {
+ stats.onlyInTarget();
+ mismatchReporter.accept(MismatchType.ONLY_IN_TARGET, key.getTokenAsBigInteger());
+ }
+
+ private void onlyInSource(RangeStats stats, PartitionKey key) {
+ stats.onlyInSource();
+ mismatchReporter.accept(MismatchType.ONLY_IN_SOURCE, key.getTokenAsBigInteger());
+ }
+
+ private PartitionKey nextKey(Iterator<PartitionKey> keys) {
+ return keys.hasNext() ? keys.next() : null;
+ }
+
+ private Consumer<PartitionStats> onSuccess(final RangeStats rangeStats,
+ final AtomicLong partitionCount,
+ final BigInteger currentToken,
+ final AtomicReference<BigInteger> highestSeenToken,
+ final BiConsumer<MismatchType, BigInteger> mismatchReporter,
+ final BiConsumer<RangeStats, BigInteger> journal) {
+ return (result) -> {
+
+ rangeStats.accumulate(result);
+ if (!result.allClusteringsMatch || result.mismatchedValues > 0) {
+ mismatchReporter.accept(MismatchType.PARTITION_MISMATCH, currentToken);
+ rangeStats.mismatchedPartition();
+ } else {
+ rangeStats.matchedPartition();
+ }
+
+ BigInteger highest = highestSeenToken.get();
+ while (currentToken.compareTo(highest) > 0) {
+ if (highestSeenToken.compareAndSet(highest, currentToken))
+ break;
+
+ highest = highestSeenToken.get();
+ }
+
+ // checkpoint ever 10 partitions
+ if (partitionCount.incrementAndGet() % 10 == 0)
+ journal.accept(rangeStats, highestSeenToken.get());
+ };
+ }
+
+ private Consumer<Throwable> onError(final RangeStats rangeStats,
+ final BigInteger currentToken,
+ final BiConsumer<Throwable, BigInteger> errorReporter) {
+ return (error) -> {
+ rangeStats.partitionError();
+ errorReporter.accept(error, currentToken);
+ };
+ }
+}
+
+
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
new file mode 100644
index 0000000..a0f8043
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeStats.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.io.*;
+import java.util.Objects;
+import java.util.concurrent.atomic.LongAdder;
+
+public class RangeStats implements Serializable {
+
+ private transient LongAdder matchedPartitions;
+ private transient LongAdder mismatchedPartitions;
+ private transient LongAdder errorPartitions;
+ private transient LongAdder skippedPartitions;
+ private transient LongAdder onlyInSource;
+ private transient LongAdder onlyInTarget;
+ private transient LongAdder matchedRows;
+ private transient LongAdder matchedValues;
+ private transient LongAdder mismatchedValues;
+
+ public static RangeStats newStats() {
+ return new RangeStats(0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L);
+ }
+
+ public static RangeStats withValues(long matchedPartitions,
+ long mismatchedPartitions,
+ long errorPartitions,
+ long skippedPartitions,
+ long onlyInSource,
+ long onlyInTarget,
+ long matchedRows,
+ long matchedValues,
+ long mismatchedValues) {
+
+ return new RangeStats(matchedPartitions,
+ mismatchedPartitions,
+ errorPartitions,
+ skippedPartitions,
+ onlyInSource,
+ onlyInTarget,
+ matchedRows,
+ matchedValues,
+ mismatchedValues);
+ }
+
+ private RangeStats(long matchedPartitions,
+ long mismatchedPartitions,
+ long errorPartitions,
+ long skippedPartitions,
+ long onlyInSource,
+ long onlyInTarget,
+ long matchedRows,
+ long matchedValues,
+ long mismatchedValues) {
+
+ this.matchedPartitions = new LongAdder();
+ this.mismatchedPartitions = new LongAdder();
+ this.errorPartitions = new LongAdder();
+ this.skippedPartitions = new LongAdder();
+ this.onlyInSource = new LongAdder();
+ this.onlyInTarget = new LongAdder();
+ this.matchedRows = new LongAdder();
+ this.matchedValues = new LongAdder();
+ this.mismatchedValues = new LongAdder();
+
+ this.matchedPartitions.add(matchedPartitions);
+ this.mismatchedPartitions.add(mismatchedPartitions);
+ this.errorPartitions.add(errorPartitions);
+ this.skippedPartitions.add(skippedPartitions);
+ this.onlyInSource.add(onlyInSource);
+ this.onlyInTarget.add(onlyInTarget);
+ this.matchedRows.add(matchedRows);
+ this.matchedValues.add(matchedValues);
+ this.mismatchedValues.add(mismatchedValues);
+ }
+
+ public void matchedPartition() {
+ matchedPartitions.add(1L);
+ }
+
+ public long getMatchedPartitions() {
+ return matchedPartitions.sum();
+ }
+
+ public void onlyInSource() {
+ onlyInSource.add(1L);
+ }
+
+ public long getOnlyInSource() {
+ return onlyInSource.sum();
+ }
+
+ public void onlyInTarget() {
+ onlyInTarget.add(1L);
+ }
+
+ public long getOnlyInTarget() {
+ return onlyInTarget.sum();
+ }
+
+ public long getMatchedRows() {
+ return matchedRows.sum();
+ }
+
+ public long getMatchedValues() {
+ return matchedValues.sum();
+ }
+
+ public long getMismatchedValues() {
+ return mismatchedValues.sum();
+ }
+
+ public void mismatchedPartition() {
+ mismatchedPartitions.add(1L);
+ }
+
+ public long getMismatchedPartitions() {
+ return mismatchedPartitions.sum();
+ }
+
+ public void skipPartition() {
+ skippedPartitions.add(1L);
+ }
+
+ public long getSkippedPartitions() {
+ return skippedPartitions.sum();
+ }
+
+ public void partitionError() {
+ errorPartitions.add(1L);
+ }
+
+ public long getErrorPartitions() {
+ return errorPartitions.sum();
+ }
+
+ public RangeStats accumulate(final PartitionStats partitionStats) {
+ this.matchedRows.add(partitionStats.matchedRows);
+ this.matchedValues.add(partitionStats.matchedValues);
+ this.mismatchedValues.add(partitionStats.mismatchedValues);
+ if (partitionStats.skipped)
+ this.skippedPartitions.add(1L);
+
+ return this;
+ }
+
+ public RangeStats accumulate(final RangeStats rangeStats) {
+ this.matchedPartitions.add(rangeStats.matchedPartitions.sum());
+ this.mismatchedPartitions.add(rangeStats.mismatchedPartitions.sum());
+ this.errorPartitions.add(rangeStats.errorPartitions.sum());
+ this.skippedPartitions.add(rangeStats.skippedPartitions.sum());
+ this.onlyInSource.add(rangeStats.onlyInSource.sum());
+ this.onlyInTarget.add(rangeStats.onlyInTarget.sum());
+ this.matchedRows.add(rangeStats.matchedRows.sum());
+ this.matchedValues.add(rangeStats.matchedValues.sum());
+ this.mismatchedValues.add(rangeStats.mismatchedValues.sum());
+ return this;
+ }
+
+ public boolean allMatches () {
+ return onlyInSource.sum() == 0
+ && errorPartitions.sum() == 0
+ && onlyInTarget.sum() == 0
+ && mismatchedValues.sum() == 0
+ && skippedPartitions.sum() == 0;
+ }
+
+ public boolean isEmpty() {
+ return matchedPartitions.sum() == 0
+ && mismatchedPartitions.sum() == 0
+ && errorPartitions.sum() == 0
+ && skippedPartitions.sum() == 0
+ && onlyInSource.sum() == 0
+ && onlyInTarget.sum() == 0
+ && matchedRows.sum() == 0
+ && matchedValues.sum() == 0
+ && mismatchedValues.sum() == 0;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof RangeStats))
+ return false;
+
+ RangeStats other = (RangeStats)o;
+ return this.matchedPartitions.sum() == other.matchedPartitions.sum()
+ && mismatchedPartitions.sum() == other.mismatchedPartitions.sum()
+ && errorPartitions.sum() == other.errorPartitions.sum()
+ && skippedPartitions.sum() == other.skippedPartitions.sum()
+ && onlyInSource.sum() == other.onlyInSource.sum()
+ && onlyInTarget.sum() == other.onlyInTarget.sum()
+ && matchedRows.sum() == other.matchedRows.sum()
+ && matchedValues.sum() == other.matchedValues.sum()
+ && mismatchedValues.sum() == other.mismatchedValues.sum();
+ }
+
+ public int hashCode() {
+ return Objects.hash(matchedPartitions.sum(),
+ mismatchedPartitions.sum(),
+ errorPartitions.sum(),
+ skippedPartitions.sum(),
+ onlyInSource.sum(),
+ onlyInTarget.sum(),
+ matchedRows.sum(),
+ matchedValues.sum(),
+ mismatchedValues.sum());
+ }
+
+ public String toString() {
+ return String.format("Matched Partitions - %d, " +
+ "Mismatched Partitions - %d, " +
+ "Partition Errors - %d, " +
+ "Partitions Only In Source - %d, " +
+ "Partitions Only In Target - %d, " +
+ "Skipped Partitions - %d, " +
+ "Matched Rows - %d, " +
+ "Matched Values - %d, " +
+ "Mismatched Values - %d ",
+ matchedPartitions.sum(),
+ mismatchedPartitions.sum(),
+ errorPartitions.sum(),
+ onlyInSource.sum(),
+ onlyInTarget.sum(),
+ skippedPartitions.sum(),
+ matchedRows.sum(),
+ matchedValues.sum(),
+ mismatchedValues.sum());
+ }
+
+ // For serialization
+
+ private RangeStats() {}
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.writeLong(matchedPartitions.sum());
+ out.writeLong(mismatchedPartitions.sum());
+ out.writeLong(errorPartitions.sum());
+ out.writeLong(skippedPartitions.sum());
+ out.writeLong(onlyInSource.sum());
+ out.writeLong(onlyInTarget.sum());
+ out.writeLong(matchedRows.sum());
+ out.writeLong(matchedValues.sum());
+ out.writeLong(mismatchedValues.sum());
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ this.matchedPartitions = new LongAdder();
+ this.mismatchedPartitions = new LongAdder();
+ this.errorPartitions = new LongAdder();
+ this.skippedPartitions = new LongAdder();
+ this.onlyInSource = new LongAdder();
+ this.onlyInTarget = new LongAdder();
+ this.matchedRows = new LongAdder();
+ this.matchedValues = new LongAdder();
+ this.mismatchedValues = new LongAdder();
+
+ this.matchedPartitions.add(in.readLong());
+ this.mismatchedPartitions.add(in.readLong());
+ this.errorPartitions.add(in.readLong());
+ this.skippedPartitions.add(in.readLong());
+ this.onlyInSource.add(in.readLong());
+ this.onlyInTarget.add(in.readLong());
+ this.matchedRows.add(in.readLong());
+ this.matchedValues.add(in.readLong());
+ this.mismatchedValues.add(in.readLong());
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
new file mode 100644
index 0000000..d2f0963
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TableSpec.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import com.datastax.driver.core.*;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+
+import static org.apache.cassandra.diff.DiffContext.cqlizedString;
+
+public class TableSpec {
+
+ private final String table;
+ private ImmutableList<ColumnMetadata> clusteringColumns;
+ private ImmutableList<ColumnMetadata> regularColumns;
+
+
+ public String getTable()
+ {
+ return table;
+ }
+
+
+ public ImmutableList<ColumnMetadata> getClusteringColumns() {
+ return clusteringColumns;
+ }
+
+ public ImmutableList<ColumnMetadata> getRegularColumns() {
+ return regularColumns;
+ }
+
+
+ /**
+ * @param table the table to diff
+ * @param clusteringColumns the clustering columns, retrieved from cluster using the client
+ * @param regularColumns the non-primary key columns, retrieved from cluster using the client
+ */
+ TableSpec(final String table,
+ final List<ColumnMetadata> clusteringColumns,
+ final List<ColumnMetadata> regularColumns) {
+ this.table = table;
+ this.clusteringColumns = ImmutableList.copyOf(clusteringColumns);
+ this.regularColumns = ImmutableList.copyOf(regularColumns);
+ }
+
+ public static TableSpec make(String table, DiffCluster diffCluster) {
+ final Cluster cluster = diffCluster.cluster;
+
+ final String cqlizedKeyspace = cqlizedString(diffCluster.keyspace);
+ final String cqlizedTable = cqlizedString(table);
+
+ KeyspaceMetadata ksMetadata = cluster.getMetadata().getKeyspace(cqlizedKeyspace);
+ if (ksMetadata == null) {
+ throw new IllegalArgumentException(String.format("Keyspace %s not found in %s cluster", diffCluster.keyspace, diffCluster.clusterId));
+ }
+
+ TableMetadata tableMetadata = ksMetadata.getTable(cqlizedTable);
+ List<ColumnMetadata> clusteringColumns = tableMetadata.getClusteringColumns();
+ List<ColumnMetadata> regularColumns = tableMetadata.getColumns()
+ .stream()
+ .filter(c -> !(clusteringColumns.contains(c)))
+ .collect(Collectors.toList());
+ return new TableSpec(tableMetadata.getName(), clusteringColumns, regularColumns);
+ }
+
+ public boolean equalsNamesOnly(TableSpec other) {
+ return this.table.equals(other.table)
+ && columnNames(this.clusteringColumns).equals(columnNames(other.clusteringColumns))
+ && columnNames(this.regularColumns).equals(columnNames(other.regularColumns));
+ }
+
+ private static List<String> columnNames(List<ColumnMetadata> columns) {
+ return columns.stream().map(ColumnMetadata::getName).collect(Collectors.toList());
+ }
+
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof TableSpec))
+ return false;
+
+ TableSpec other = (TableSpec)o;
+ return this.table.equals(other.table)
+ && this.clusteringColumns.equals(other.clusteringColumns)
+ && this.regularColumns.equals(other.regularColumns);
+
+ }
+
+ public int hashCode() {
+ return Objects.hash(table, clusteringColumns, regularColumns);
+ }
+
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("table", table)
+ .add("clusteringColumns", clusteringColumns)
+ .add("regularColumns", regularColumns)
+ .toString();
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java b/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java
new file mode 100644
index 0000000..3521a4b
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/TokenHelper.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+
+public enum TokenHelper {
+
+ MURMUR3 {
+ @Override
+ public BigInteger min() {
+ return BigInteger.valueOf(Long.MIN_VALUE);
+ }
+
+ @Override
+ public BigInteger max() {
+ return BigInteger.valueOf(Long.MAX_VALUE);
+ }
+
+ @Override
+ public Object forBindParam(BigInteger token) {
+ return token.longValue();
+ }
+ },
+
+ RANDOM {
+ @Override
+ public BigInteger min() {
+ return BigInteger.ONE.negate();
+ }
+
+ @Override
+ public BigInteger max() {
+ return BigInteger.valueOf(2).pow(127).subtract(BigInteger.ONE);
+ }
+
+ @Override
+ public Object forBindParam(BigInteger token) {
+ return token;
+ }
+ };
+
+ public abstract BigInteger min();
+ public abstract BigInteger max();
+ public abstract Object forBindParam(BigInteger token);
+
+ public static TokenHelper forPartitioner(String partitionerName) {
+ if (partitionerName.endsWith("Murmur3Partitioner")) return MURMUR3;
+ else if (partitionerName.endsWith("RandomPartitioner")) return RANDOM;
+ else throw new IllegalArgumentException("Unsupported Partitioner :" + partitionerName);
+ }
+}
diff --git a/spark-job/src/main/resources/log4j2.xml b/spark-job/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..64aec61
--- /dev/null
+++ b/spark-job/src/main/resources/log4j2.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="INFO">
+ <AppenderRef ref="Console"/>
+ </Root>
+ <!--Logger name="org.apache.cassandra.diff" level="info">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ <Logger name="org.apache.cassandra.diff" level="info">
+ <AppenderRef ref="Console"/>
+ </Logger-->
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java b/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java
new file mode 100644
index 0000000..cc7fb95
--- /dev/null
+++ b/spark-job/src/test/java/com/datastax/driver/core/ColumnMetadataHelper.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.datastax.driver.core;
+
+public class ColumnMetadataHelper {
+
+ public static ColumnMetadata column(String name) {
+ return ColumnMetadata.forAlias(null, name, null);
+ }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java
new file mode 100644
index 0000000..002769f
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/ComparisonExecutorTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+import com.google.common.util.concurrent.*;
+import org.junit.Test;
+
+import com.codahale.metrics.*;
+
+import static org.apache.cassandra.diff.TestUtils.assertThreadWaits;
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+
+public class ComparisonExecutorTest {
+
+ @Test
+ public void submitBlocksWhenMaxTasksExceeded() throws Exception {
+ // submit maxTasks, then assert that further submission blocks until tasks are processed
+ int maxTasks = 3;
+ MetricRegistry metrics = metrics();
+ final ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+ Gauge waitingToSubmit = metrics.getGauges().get("BlockedTasks");
+ assertEquals(0, waitingToSubmit.getValue());
+
+ final AtomicInteger successful = new AtomicInteger(0);
+ final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+ final AtomicInteger failed = new AtomicInteger(0);
+ final Consumer<Throwable> onError = (t) -> failed.incrementAndGet();
+ final Phaser phaser = new Phaser(1);
+
+ BlockingTask[] tasks = new BlockingTask[5];
+ for (int i=0; i<5; i++)
+ tasks[i] = new BlockingTask(i);
+
+ // Ensure that the submission itself does not block before the max number of tasks are submitted
+ executor.submit(tasks[0], onSuccess, onError, phaser);
+ executor.submit(tasks[1], onSuccess, onError, phaser);
+ executor.submit(tasks[2], onSuccess, onError, phaser);
+ assertEquals(0, waitingToSubmit.getValue());
+
+ // Now submit another pair of tasks which should block as the executor is fully occupied
+ final CountDownLatch latch = new CountDownLatch(2);
+ Thread t1 = new Thread(() -> { latch.countDown(); executor.submit(tasks[3], onSuccess, onError, phaser);});
+ Thread t2 = new Thread(() -> { latch.countDown(); executor.submit(tasks[4], onSuccess, onError, phaser);});
+ t1.start();
+ t2.start();
+ // wait for both to attempt submission
+ latch.await();
+ assertThreadWaits(t1);
+ assertThreadWaits(t2);
+ assertEquals(2, waitingToSubmit.getValue());
+
+ // Let the first waiting task complete, which should allow t1 to complete its submission
+ tasks[0].latch.countDown();
+ t1.join();
+ // the second submission should still be waiting on a slot
+ assertThreadWaits(t2);
+ assertEquals(1, waitingToSubmit.getValue());
+
+ // Let another task complete, allowing t2 to complete its submission
+ tasks[1].latch.countDown();
+ t2.join();
+ assertEquals(0, waitingToSubmit.getValue());
+
+ // Let all tasks complete, wait for them to do so then verify counters
+ for (int i=2; i<=4; i++)
+ tasks[i].latch.countDown();
+
+ phaser.arriveAndAwaitAdvance();
+ assertEquals(5, successful.get());
+ assertEquals(0, failed.get());
+ }
+
+ @Test
+ public void handleTaskFailure() {
+ // Ensure that the failure callback is fired, a permit for task submission
+ // returned and the phaser notified when a task throws
+ int maxTasks = 5;
+ MetricRegistry metrics = metrics();
+ ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+ Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+ final AtomicInteger successful = new AtomicInteger(0);
+ final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+ AtomicReference<Throwable> thrown = new AtomicReference<>();
+ final Consumer<Throwable> onError = thrown::set;
+ final Phaser phaser = new Phaser(1);
+
+ assertEquals(maxTasks, availableSlots.getValue());
+
+ RuntimeException toThrow = new RuntimeException("FAIL");
+ BlockingTask task = new BlockingTask(0, toThrow);
+ executor.submit(task, onSuccess, onError, phaser);
+ assertEquals(maxTasks - 1, availableSlots.getValue());
+
+ assertEquals(2, phaser.getUnarrivedParties());
+ task.latch.countDown();
+
+ phaser.arriveAndAwaitAdvance();
+ assertEquals(maxTasks, availableSlots.getValue());
+ assertEquals(0, successful.get());
+ assertEquals(toThrow, thrown.get());
+ }
+
+ @Test
+ public void handleUncaughtExceptionInFailureCallback() {
+ // Ensure that if the failure callback throws, a permit for submission is
+ // still returned and the phaser notified
+ int maxTasks = 5;
+ MetricRegistry metrics = metrics();
+ ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+ Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+ final AtomicInteger successful = new AtomicInteger(0);
+ final AtomicInteger failures = new AtomicInteger(0);
+ final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+ final Consumer<Throwable> onError = (t) -> { failures.incrementAndGet(); throw new RuntimeException("UNCAUGHT"); };
+ final Phaser phaser = new Phaser(1);
+
+ assertEquals(maxTasks, availableSlots.getValue());
+
+ RuntimeException toThrow = new RuntimeException("FAIL");
+ try {
+ onError.accept(toThrow);
+ fail("Failure callback should throw RuntimeException");
+ } catch (RuntimeException e) {
+ // expected - reset failure count
+ failures.set(0);
+ }
+
+ BlockingTask task = new BlockingTask(0, toThrow);
+ executor.submit(task, onSuccess, onError, phaser);
+ assertEquals(maxTasks - 1, availableSlots.getValue());
+
+ assertEquals(2, phaser.getUnarrivedParties());
+ task.latch.countDown();
+
+ phaser.arriveAndAwaitAdvance();
+ assertEquals(maxTasks, availableSlots.getValue());
+ assertEquals(0, successful.get());
+ assertEquals(1, failures.get());
+ }
+
+ @Test
+ public void handleUncaughtExceptionInSuccessCallback() {
+ // Ensure that if the success callback throws, a permit for submission is
+ // still returned and the phaser notified
+ int maxTasks = 5;
+ MetricRegistry metrics = metrics();
+ ComparisonExecutor executor = new ComparisonExecutor(executor(1), maxTasks, metrics);
+ Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+ final AtomicInteger successful = new AtomicInteger(0);
+ final AtomicInteger failures = new AtomicInteger(0);
+ final Consumer<Integer> onSuccess = (i) -> { successful.incrementAndGet(); throw new RuntimeException("UNCAUGHT"); };
+ final Consumer<Throwable> onError = (t) -> failures.incrementAndGet();
+ final Phaser phaser = new Phaser(1);
+
+ assertEquals(maxTasks, availableSlots.getValue());
+ try {
+ onSuccess.accept(0);
+ fail("Success callback should throw RuntimeException");
+ } catch (RuntimeException e) {
+ // expected - reset failure count
+ successful.set(0);
+ }
+
+ BlockingTask task = new BlockingTask(0);
+ executor.submit(task, onSuccess, onError, phaser);
+ assertEquals(maxTasks - 1, availableSlots.getValue());
+
+ assertEquals(2, phaser.getUnarrivedParties());
+ task.latch.countDown();
+
+ phaser.arriveAndAwaitAdvance();
+ assertEquals(maxTasks, availableSlots.getValue());
+ assertEquals(1, successful.get());
+ assertEquals(0, failures.get());
+ }
+
+ @Test
+ public void handleRejectedExecutionException() {
+ // In the case that the underlying ExecutorService rejects a task submission, a permit
+ // should be returned and the phaser notified
+ int maxTasks = 5;
+ final AtomicInteger successful = new AtomicInteger(0);
+ final AtomicInteger failures = new AtomicInteger(0);
+ final AtomicInteger rejections = new AtomicInteger(0);
+ final Consumer<Integer> onSuccess = (i) -> successful.incrementAndGet();
+ final Consumer<Throwable> onError = (t) -> failures.incrementAndGet();
+
+ MetricRegistry metrics = metrics();
+ ExecutorService rejectingExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(1),
+ (r, executor) -> { rejections.incrementAndGet();
+ throw new RejectedExecutionException("REJECTED");});
+
+ ComparisonExecutor executor = new ComparisonExecutor(MoreExecutors.listeningDecorator(rejectingExecutor), maxTasks, metrics);
+ Gauge availableSlots = metrics.getGauges().get("AvailableSlots");
+ final Phaser phaser = new Phaser(1);
+
+ // Submit an initial pair of tasks to ensure that the underlying work queue is full
+ BlockingTask t0 = new BlockingTask(0);
+ BlockingTask t1 = new BlockingTask(1);
+ executor.submit(t0, onSuccess, onError, phaser);
+ executor.submit(t1, onSuccess, onError, phaser);
+ assertEquals(3, phaser.getUnarrivedParties());
+ assertEquals(maxTasks - 2, availableSlots.getValue());
+
+ // Submit a third task which will be rejected by the executor service
+ executor.submit(new BlockingTask(2), onSuccess, onError, phaser);
+ t0.latch.countDown();
+ t1.latch.countDown();
+
+ phaser.arriveAndAwaitAdvance();
+ assertEquals(maxTasks, availableSlots.getValue());
+ assertEquals(2, successful.get());
+ assertEquals(1, failures.get());
+ assertEquals(1, rejections.get());
+ }
+
+ class BlockingTask implements Callable<Integer> {
+
+ final int id;
+ final Exception e;
+ final CountDownLatch latch;
+
+ BlockingTask(int id) {
+ this(id, null);
+ }
+
+ BlockingTask(int id, Exception toThrow) {
+ this.id = id;
+ this.e = toThrow;
+ this.latch = new CountDownLatch(1);
+ }
+
+ public Integer call() throws Exception {
+ latch.await();
+ if (e != null)
+ throw e;
+ return id;
+ }
+ }
+
+ private static ListeningExecutorService executor(int threads) {
+ return MoreExecutors.listeningDecorator(
+ Executors.newFixedThreadPool(threads,
+ new ThreadFactoryBuilder().setNameFormat("partition-comparison-%d")
+ .setDaemon(true)
+ .build()));
+ }
+
+ private static MetricRegistry metrics() {
+ return new MetricRegistry();
+ }
+}
+
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
new file mode 100644
index 0000000..d8d92c7
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DiffJobTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.diff.DiffJob;
+import org.apache.cassandra.diff.TokenHelper;
+
+import static org.junit.Assert.assertEquals;
+
+public class DiffJobTest
+{
+ @Test
+ public void testSplitsM3P()
+ {
+ splitTestHelper(TokenHelper.forPartitioner("Murmur3Partitioner"));
+ }
+ @Test
+ public void testSplitsRandom()
+ {
+ splitTestHelper(TokenHelper.forPartitioner("RandomPartitioner"));
+ }
+
+ private void splitTestHelper(TokenHelper tokens)
+ {
+ List<DiffJob.Split> splits = DiffJob.calculateSplits(50, 1, tokens);
+ assertEquals(splits.get(0).start, tokens.min());
+ DiffJob.Split prevSplit = null;
+ for (DiffJob.Split split : splits)
+ {
+ if (prevSplit != null)
+ assertEquals(prevSplit.end, split.start.subtract(BigInteger.ONE));
+ prevSplit = split;
+ }
+ assertEquals(splits.get(splits.size() - 1).end, tokens.max());
+ for (int i = 0; i < splits.size(); i++)
+ assertEquals(i, splits.get(i).splitNumber);
+ }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
new file mode 100644
index 0000000..73677d9
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/DifferTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.util.Map;
+import java.util.function.Function;
+
+import com.google.common.base.VerifyException;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.cassandra.diff.DiffJob;
+import org.apache.cassandra.diff.Differ;
+import org.apache.cassandra.diff.RangeStats;
+import org.apache.cassandra.diff.TokenHelper;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class DifferTest {
+
+ @Test(expected = VerifyException.class)
+ public void rejectNullStartOfRange() {
+ Differ.validateRange(null, BigInteger.TEN, TokenHelper.MURMUR3);
+ }
+
+ @Test(expected = VerifyException.class)
+ public void rejectNullEndOfRange() {
+ Differ.validateRange(BigInteger.TEN, null, TokenHelper.MURMUR3);
+ }
+
+ @Test(expected = VerifyException.class)
+ public void rejectWrappingRange() {
+ Differ.validateRange(BigInteger.TEN, BigInteger.ONE, TokenHelper.MURMUR3);
+ }
+
+ @Test(expected = VerifyException.class)
+ public void rejectRangeWithStartLessThanMinMurmurToken() {
+ Differ.validateRange(TokenHelper.MURMUR3.min().subtract(BigInteger.ONE),
+ BigInteger.TEN,
+ TokenHelper.MURMUR3);
+ }
+
+ @Test(expected = VerifyException.class)
+ public void rejectRangeWithEndGreaterThanMaxMurmurToken() {
+ Differ.validateRange(BigInteger.ONE,
+ TokenHelper.MURMUR3.max().add(BigInteger.ONE),
+ TokenHelper.MURMUR3);
+ }
+
+ @Test
+ public void filterTaskStatusForTables() {
+ // according to the journal:
+ // * t1 is already completed
+ // * t2 is started and has reported some progress, but has not completed
+ // * t3 has not reported any progress
+ DiffJob.Split split = new DiffJob.Split(1, 1, BigInteger.ONE, BigInteger.TEN);
+ Iterable<String> tables = Lists.newArrayList("t1", "t2", "t3");
+ Function<String, DiffJob.TaskStatus> journal = (table) -> {
+ switch (table) {
+ case "t1":
+ return new DiffJob.TaskStatus(split.end, RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6));
+ case "t2":
+ return new DiffJob.TaskStatus(BigInteger.valueOf(5L), RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5));
+ case "t3":
+ return DiffJob.TaskStatus.EMPTY;
+ default:
+ throw new AssertionError();
+ }
+ };
+
+ Map<String, DiffJob.TaskStatus> filtered = Differ.filterTables(tables, split, journal, false);
+ assertEquals(2, filtered.keySet().size());
+ assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
+ assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
+ assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
+ assertNull(filtered.get("t3").lastToken);
+
+ // if re-running (part of) a job because of failures or problematic partitions, we want to
+ // ignore the status of completed tasks and re-run them anyway as only specified tokens will
+ // be processed - so t1 should be included now
+ filtered = Differ.filterTables(tables, split, journal, true);
+ assertEquals(3, filtered.keySet().size());
+ assertEquals(RangeStats.withValues(6, 6, 6, 6, 6, 6, 6, 6, 6), filtered.get("t1").stats);
+ assertEquals(split.end, filtered.get("t1").lastToken);
+ assertEquals(RangeStats.withValues(5, 5, 5, 5, 5, 5, 5, 5, 5), filtered.get("t2").stats);
+ assertEquals(BigInteger.valueOf(5L), filtered.get("t2").lastToken);
+ assertEquals(RangeStats.newStats(), filtered.get("t3").stats);
+ assertNull(filtered.get("t3").lastToken);
+ }
+
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
new file mode 100644
index 0000000..79b3638
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/PartitionComparatorTest.java
@@ -0,0 +1,524 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import com.google.common.reflect.TypeToken;
+import org.junit.Test;
+
+import com.datastax.driver.core.*;
+import org.apache.cassandra.diff.PartitionComparator;
+import org.apache.cassandra.diff.PartitionStats;
+import org.apache.cassandra.diff.TableSpec;
+
+import static org.junit.Assert.assertEquals;
+
+public class PartitionComparatorTest {
+
+ @Test
+ public void sourceIsNull() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t, null, rows(row(t, 0, 1, 2, 3)));
+ PartitionStats stats = comparator.call();
+ assertStats(stats, true, true, 0, 0, 0);
+ }
+
+ @Test
+ public void targetIsNull() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t, rows(row(t, 0, 1, 2, 3)), null);
+ PartitionStats stats = comparator.call();
+ assertStats(stats, true, true, 0, 0, 0);
+ }
+
+ @Test
+ public void sourceIsEmpty() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t, rows(), rows(row(t, 0, 1, 2, 3)));
+ PartitionStats stats = comparator.call();
+ assertStats(stats, false, false, 0, 0, 0);
+ }
+
+ @Test
+ public void targetIsEmpty() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t, rows(row(t, 0, 1, 2, 3)), rows());
+ PartitionStats stats = comparator.call();
+ assertStats(stats, false, false, 0, 0, 0);
+ }
+
+ @Test
+ public void sourceAndTargetAreEmpty() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t, rows(), rows());
+ PartitionStats stats = comparator.call();
+ assertStats(stats, false, true, 0, 0, 0);
+ }
+
+ @Test
+ public void sourceContainsExtraRowsAtStart() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 0, 1, 2, 3),
+ row(t, 10, 11, 12, 13)),
+ rows(row(t, 10, 11, 12, 13)));
+ PartitionStats stats = comparator.call();
+ // Comparison fails fast, so bails on the initial mismatch
+ assertStats(stats, false, false, 0, 0, 0);
+ }
+
+ @Test
+ public void targetContainsExtraRowsAtStart() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 10, 11, 12, 13)),
+ rows(row(t, 0, 1, 2, 3),
+ row(t, 10, 11, 12, 13)));
+ PartitionStats stats = comparator.call();
+ // Comparison fails fast, so bails on the initial mismatch
+ assertStats(stats, false, false, 0, 0, 0);
+ }
+
+ @Test
+ public void sourceContainsExtraRowsAtEnd() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 0, 1, 2, 3),
+ row(t, 10, 11, 12, 13)),
+ rows(row(t, 0, 1, 2, 3)));
+ PartitionStats stats = comparator.call();
+ // The fact that the first row & all its v1 & v2 values match should be reflected in the stats
+ assertStats(stats, false, false, 1, 2, 0);
+ }
+
+ @Test
+ public void targetContainsExtraRowsAtEnd() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 0, 1, 2, 3)),
+ rows(row(t, 0, 1, 2, 3),
+ row(t, 10, 11, 12, 13)));
+ PartitionStats stats = comparator.call();
+ // The fact that the first row & all its v1 & v2 values match should be reflected in the stats
+ assertStats(stats, false, false, 1, 2, 0);
+ }
+
+ @Test
+ public void withoutClusteringsAllRowsMatching() {
+ TableSpec t = spec("table1", names(), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 0, 1),
+ row(t, 1, 11),
+ row(t, 2, 12)),
+ rows(row(t, 0, 1),
+ row(t, 1, 11),
+ row(t, 2, 12)));
+ PartitionStats stats = comparator.call();
+ assertStats(stats, false, true, 3, 6, 0);
+ }
+
+ @Test
+ public void singleClusteringAllRowsMatching() {
+ TableSpec t = spec("table1", names("c1"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 0, 1, 2),
+ row(t, 1, 11, 12),
+ row(t, 2, 21, 22)),
+ rows(row(t, 0, 1, 2),
+ row(t, 1, 11, 12),
+ row(t, 2, 21, 22)));
+ PartitionStats stats = comparator.call();
+ assertStats(stats, false, true, 3, 6, 0);
+ }
+
+ @Test
+ public void multipleClusteringAllRowsMatching() {
+ TableSpec t = spec("table1", names("c1", "c2"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 0, 1, 2, 3),
+ row(t, 1, 11, 12, 13),
+ row(t, 2, 21, 22, 23)),
+ rows(row(t, 0, 1, 2, 3),
+ row(t, 1, 11, 12, 13),
+ row(t, 2, 21, 22, 23)));
+ PartitionStats stats = comparator.call();
+ assertStats(stats, false, true, 3, 6, 0);
+ }
+
+ @Test
+ public void withoutClusteringsWithMismatches() {
+ TableSpec t = spec("table1", names(), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 0, 1),
+ row(t, 1, 11),
+ row(t, 2, 12)),
+ rows(row(t, 0, 1),
+ row(t, 1, 1100),
+ row(t, 2, 1200)));
+ PartitionStats stats = comparator.call();
+ assertStats(stats, false, true, 3, 4, 2);
+ }
+
+ @Test
+ public void singleClusteringWithMismatches() {
+ TableSpec t = spec("table1", names("c1"), names("v1", "v2"));
+ PartitionComparator comparator = comparator(t,
+ rows(row(t, 0, 1, 2),
+ row(t, 1, 11, 21),
+ row(t, 2, 12, 22)),
+ rows(row(t, 0, 1, 20),
+ row(t, 1, 1100, 21),
+ row(t, 2, 12, 1200)));
+ PartitionStats stats = comparator.call();
+ assertStats(stats, false, true, 3, 3, 3);
+ }
+
+ private void assertStats(PartitionStats stats,
+ boolean skipped,
+ boolean clusteringsMatch,
+ int matchedRows,
+ int matchedValues,
+ int mismatchedValues) {
+ assertEquals(skipped, stats.skipped);
+ assertEquals(clusteringsMatch, stats.allClusteringsMatch);
+ assertEquals(matchedRows, stats.matchedRows);
+ assertEquals(matchedValues, stats.matchedValues);
+ assertEquals(mismatchedValues, stats.mismatchedValues);
+ }
+
+ PartitionComparator comparator(TableSpec table, Iterator<Row> source, Iterator<Row> target) {
+ return new PartitionComparator(table, source, target);
+ }
+
+ List<String> names(String...names) {
+ return Lists.newArrayList(names);
+ }
+
+ TableSpec spec(String table, List<String> clusteringColumns, List<String> regularColumns) {
+ return new TableSpec(table, columns(clusteringColumns), columns(regularColumns));
+ }
+
+ List<ColumnMetadata> columns(List<String> names) {
+ return names.stream().map(ColumnMetadataHelper::column).collect(Collectors.toList());
+ }
+
+ Iterator<Row> rows(Row...rows) {
+ return new AbstractIterator<Row>() {
+ int i = 0;
+ protected Row computeNext() {
+ if (i < rows.length)
+ return rows[i++];
+ return endOfData();
+ }
+ };
+ }
+
+ Row row(TableSpec table, Object...values) {
+ return new TestRow(Stream.concat(table.getClusteringColumns().stream(),
+ table.getRegularColumns().stream())
+ .map(ColumnMetadata::getName).toArray(String[]::new),
+ values);
+ }
+
+ static class TestRow implements Row {
+ private final String[] names;
+ private final Object[] values;
+
+ TestRow(String[] names, Object[] values) {
+ if (names.length != values.length)
+ throw new IllegalArgumentException(String.format("Number of column names (%d) doesn't " +
+ "match number of values(%d)",
+ names.length, values.length));
+ this.names = names;
+ this.values = values;
+ }
+
+ // Only getObject(String) is used by PartitionComparator
+ public Object getObject(String s) {
+ for (int i=0; i < names.length; i++)
+ if (names[i].equals(s))
+ return values[i];
+
+ throw new IllegalArgumentException(s + " is not a column defined in this metadata");
+ }
+
+ public boolean isNull(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean getBool(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public byte getByte(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public short getShort(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public int getInt(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getLong(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Date getTimestamp(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public LocalDate getDate(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getTime(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public float getFloat(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public double getDouble(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getBytesUnsafe(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getBytes(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public String getString(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public BigInteger getVarint(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public BigDecimal getDecimal(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public UUID getUUID(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public InetAddress getInet(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<T> getList(String s, Class<T> aClass) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<T> getList(String s, TypeToken<T> typeToken) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> Set<T> getSet(String s, Class<T> aClass) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> Set<T> getSet(String s, TypeToken<T> typeToken) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <K, V> Map<K, V> getMap(String s, Class<K> aClass, Class<V> aClass1) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <K, V> Map<K, V> getMap(String s, TypeToken<K> typeToken, TypeToken<V> typeToken1) {
+ throw new UnsupportedOperationException();
+ }
+
+ public UDTValue getUDTValue(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public TupleValue getTupleValue(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T get(String s, Class<T> aClass) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T get(String s, TypeToken<T> typeToken) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T get(String s, TypeCodec<T> typeCodec) {
+ throw new UnsupportedOperationException();
+ }
+
+ public ColumnDefinitions getColumnDefinitions() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Token getToken(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Token getToken(String s) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Token getPartitionKeyToken() {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean isNull(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean getBool(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public byte getByte(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public short getShort(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public int getInt(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getLong(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Date getTimestamp(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public LocalDate getDate(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public long getTime(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public float getFloat(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public double getDouble(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getBytesUnsafe(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer getBytes(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public String getString(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public BigInteger getVarint(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public BigDecimal getDecimal(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public UUID getUUID(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public InetAddress getInet(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<T> getList(int i, Class<T> aClass) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<T> getList(int i, TypeToken<T> typeToken) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> Set<T> getSet(int i, Class<T> aClass) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> Set<T> getSet(int i, TypeToken<T> typeToken) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <K, V> Map<K, V> getMap(int i, Class<K> aClass, Class<V> aClass1) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <K, V> Map<K, V> getMap(int i, TypeToken<K> typeToken, TypeToken<V> typeToken1) {
+ throw new UnsupportedOperationException();
+ }
+
+ public UDTValue getUDTValue(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public TupleValue getTupleValue(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public Object getObject(int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T get(int i, Class<T> aClass) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T get(int i, TypeToken<T> typeToken) {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T get(int i, TypeCodec<T> typeCodec) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java
new file mode 100644
index 0000000..0484212
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/RangeComparatorTest.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.LongStream;
+
+import com.google.common.collect.*;
+import org.junit.Test;
+
+import com.codahale.metrics.MetricRegistry;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.Token;
+import org.jetbrains.annotations.NotNull;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.apache.cassandra.diff.TestUtils.assertThreadWaits;
+
+public class RangeComparatorTest {
+
+ private Multimap<BigInteger, Throwable> errors = HashMultimap.create();
+ private BiConsumer<Throwable, BigInteger> errorReporter = (e, t) -> errors.put(t, e);
+ private Multimap<BigInteger, MismatchType> mismatches = HashMultimap.create();
+ private BiConsumer<MismatchType, BigInteger> mismatchReporter = (m, t) -> mismatches.put(t, m);
+ private Multimap<BigInteger, RangeStats> journal= HashMultimap.create();
+ private BiConsumer<RangeStats, BigInteger> progressReporter = (r, t) -> journal.put(t, copyOf(r));
+ private Set<BigInteger> comparedPartitions = new HashSet<>();
+ private ComparisonExecutor executor = ComparisonExecutor.newExecutor(1, new MetricRegistry());
+
+ @Test
+ public void emptyRange() {
+ RangeComparator comparator = comparator(context(100L, 100L));
+ RangeStats stats = comparator.compare(keys(), keys(), this::alwaysMatch);
+ assertTrue(stats.isEmpty());
+ assertNothingReported(errors, mismatches, journal);
+ assertCompared();
+ }
+
+ @Test
+ public void sourceAndTargetKeysAreEmpty() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(), keys(), this::alwaysMatch);
+ assertTrue(stats.isEmpty());
+ assertNothingReported(errors, mismatches, journal);
+ assertCompared();
+ }
+
+ @Test
+ public void partitionPresentOnlyInSource() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1), this::alwaysMatch);
+ assertFalse(stats.isEmpty());
+ assertEquals(1, stats.getOnlyInSource());
+ assertEquals(0, stats.getOnlyInTarget());
+ assertEquals(2, stats.getMatchedPartitions());
+ assertReported(2, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertNothingReported(errors, journal);
+ assertCompared(0, 1);
+ }
+
+ @Test
+ public void partitionPresentOnlyInTarget() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(7, 8), keys(7, 8, 9), this::alwaysMatch);
+ assertFalse(stats.isEmpty());
+ assertEquals(0, stats.getOnlyInSource());
+ assertEquals(1, stats.getOnlyInTarget());
+ assertEquals(2, stats.getMatchedPartitions());
+ assertReported(9, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertNothingReported(errors, journal);
+ assertCompared(7, 8);
+ }
+
+ @Test
+ public void multipleOnlyInSource() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(4, 5, 6, 7, 8), keys(4, 5), this::alwaysMatch);
+ assertFalse(stats.isEmpty());
+ assertEquals(3, stats.getOnlyInSource());
+ assertEquals(0, stats.getOnlyInTarget());
+ assertEquals(2, stats.getMatchedPartitions());
+ assertReported(6, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertReported(7, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertReported(8, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertNothingReported(errors, journal);
+ assertCompared(4, 5);
+ }
+
+ @Test
+ public void multipleOnlyInTarget() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(4, 5), keys(4, 5, 6, 7, 8), this::alwaysMatch);
+ assertFalse(stats.isEmpty());
+ assertEquals(0, stats.getOnlyInSource());
+ assertEquals(3, stats.getOnlyInTarget());
+ assertEquals(2, stats.getMatchedPartitions());
+ assertReported(6, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertReported(7, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertReported(8, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertNothingReported(errors, journal);
+ assertCompared(4, 5);
+ }
+
+ @Test
+ public void multipleOnlyInBoth() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(0, 1, 3, 5, 7, 9), keys(0, 2, 4, 6, 8, 9), this::alwaysMatch);
+ assertFalse(stats.isEmpty());
+ assertEquals(4, stats.getOnlyInSource());
+ assertEquals(4, stats.getOnlyInTarget());
+ assertEquals(2, stats.getMatchedPartitions());
+ assertReported(1, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertReported(3, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertReported(5, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertReported(7, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertReported(2, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertReported(4, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertReported(6, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertReported(8, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertNothingReported(errors, journal);
+ assertCompared(0, 9);
+ }
+
+ @Test
+ public void sourceKeysIsEmpty() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(), keys(4, 5), this::alwaysMatch);
+ assertFalse(stats.isEmpty());
+ assertEquals(0, stats.getOnlyInSource());
+ assertEquals(2, stats.getOnlyInTarget());
+ assertEquals(0, stats.getMatchedPartitions());
+ assertReported(4, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertReported(5, MismatchType.ONLY_IN_TARGET, mismatches);
+ assertNothingReported(errors, journal);
+ assertCompared();
+ }
+
+ @Test
+ public void targetKeysIsEmpty() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(4, 5), keys(), this::alwaysMatch);
+ assertFalse(stats.isEmpty());
+ assertEquals(2, stats.getOnlyInSource());
+ assertEquals(0, stats.getOnlyInTarget());
+ assertEquals(0, stats.getMatchedPartitions());
+ assertReported(4, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertReported(5, MismatchType.ONLY_IN_SOURCE, mismatches);
+ assertNothingReported(errors, journal);
+ assertCompared();
+ }
+
+ @Test
+ public void skipComparisonOfDisallowedTokens() {
+ RangeComparator comparator = comparator(context(0L, 100L, 2, 3, 4));
+ RangeStats stats = comparator.compare(keys(0, 1, 2, 3, 4, 5, 6),
+ keys(0, 1, 2, 3, 4, 5, 6),
+ this::alwaysMatch);
+ assertFalse(stats.isEmpty());
+ assertEquals(0, stats.getOnlyInSource());
+ assertEquals(0, stats.getOnlyInTarget());
+ assertEquals(4, stats.getMatchedPartitions());
+ assertEquals(3, stats.getSkippedPartitions());
+ assertNothingReported(errors, mismatches, journal);
+ assertCompared(0, 1, 5 , 6);
+ }
+
+ @Test
+ public void handleErrorReadingFirstSourceKey() {
+ RuntimeException toThrow = new RuntimeException("Test");
+ testErrorReadingKey(keys(0, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+ assertCompared();
+ }
+
+ @Test
+ public void handleErrorReadingFirstTargetKey() {
+ RuntimeException toThrow = new RuntimeException("Test");
+ testErrorReadingKey(keys(0, 1, 2), keys(0, toThrow, 0, 1, 2), toThrow);
+ assertCompared();
+ }
+
+ @Test
+ public void handleErrorReadingSourceKey() {
+ RuntimeException toThrow = new RuntimeException("Test");
+ testErrorReadingKey(keys(1, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+ assertCompared(0);
+ }
+
+ @Test
+ public void handleErrorReadingTargetKey() {
+ RuntimeException toThrow = new RuntimeException("Test");
+ testErrorReadingKey(keys(0, 1, 2), keys(1, toThrow, 0, 1, 2), toThrow);
+ assertCompared(0);
+ }
+
+ @Test
+ public void handleReadingLastSourceKey() {
+ RuntimeException toThrow = new RuntimeException("Test");
+ testErrorReadingKey(keys(2, toThrow, 0, 1, 2), keys(0, 1, 2), toThrow);
+ assertCompared(0, 1);
+ }
+
+ @Test
+ public void handleReadingLastTargetKey() {
+ RuntimeException toThrow = new RuntimeException("Test");
+ testErrorReadingKey(keys(0, 1, 2), keys(2, toThrow, 0, 1, 2), toThrow);
+ assertCompared(0, 1);
+ }
+
+ @Test
+ public void handleErrorConstructingFirstTask() {
+ RuntimeException expected = new RuntimeException("Test");
+ RangeStats stats = testTaskError(throwDuringConstruction(0, expected));
+ assertEquals(1, stats.getErrorPartitions());
+ assertCompared(1, 2);
+ assertReported(0, expected, errors);
+ }
+
+ @Test
+ public void handleErrorConstructingTask() {
+ RuntimeException expected = new RuntimeException("Test");
+ RangeStats stats = testTaskError(throwDuringConstruction(1, expected));
+ assertEquals(1, stats.getErrorPartitions());
+ assertCompared(0, 2);
+ assertReported(1, expected, errors);
+ }
+
+ @Test
+ public void handleErrorConstructingLastTask() {
+ RuntimeException expected = new RuntimeException("Test");
+ RangeStats stats = testTaskError(throwDuringConstruction(2, expected));
+ assertEquals(1, stats.getErrorPartitions());
+ assertCompared(0, 1);
+ assertReported(2, expected, errors);
+ }
+
+ @Test
+ public void handleTaskErrorOnFirstPartition() {
+ RuntimeException expected = new RuntimeException("Test");
+ RangeStats stats = testTaskError(throwDuringExecution(expected, 0));
+ assertEquals(1, stats.getErrorPartitions());
+ assertCompared(1, 2);
+ assertReported(0, expected, errors);
+ }
+
+ @Test
+ public void handleTaskErrorOnPartition() {
+ RuntimeException expected = new RuntimeException("Test");
+ RangeStats stats = testTaskError(throwDuringExecution(expected, 1));
+ assertEquals(1, stats.getErrorPartitions());
+ assertCompared(0, 2);
+ assertReported(1, expected, errors);
+ }
+
+ @Test
+ public void handleTaskErrorOnLastPartition() {
+ RuntimeException expected = new RuntimeException("Test");
+ RangeStats stats = testTaskError(throwDuringExecution(expected, 2));
+ assertEquals(1, stats.getErrorPartitions());
+ assertCompared(0, 1);
+ assertReported(2, expected, errors);
+ }
+
+ @Test
+ public void checkpointEveryTenPartitions() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ comparator.compare(keys(LongStream.range(0, 25).toArray()),
+ keys(LongStream.range(0, 25).toArray()),
+ this::alwaysMatch);
+ assertReported(9, RangeStats.withValues(10, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+ assertReported(19, RangeStats.withValues(20, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+ assertEquals(2, journal.keySet().size());
+ }
+
+ @Test
+ public void recordHighestSeenPartitionWhenTasksCompleteOutOfOrder() {
+ // every 10 partitions, the highest seen token is reported to the journal. Here,
+ // randomise the iteration of the keys to simulate tasks completing out of order
+ RangeComparator comparator = comparator(context(0L, 100L));
+ long[] tokens = new long[] {2, 8, 1, 4, 100, 3, 5, 7, 6, 9};
+ comparator.compare(keys(tokens),
+ keys(tokens),
+ this::alwaysMatch);
+ assertReported(100, RangeStats.withValues(10, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+ assertEquals(1, journal.keySet().size());
+ assertNothingReported(errors, mismatches);
+ assertCompared(tokens);
+ }
+
+ @Test
+ public void recordHighestSeenPartitionWhenTasksCompleteOutOfOrderWithErrors() {
+ // 2 partitions will error during comparison, but after 10 successful comparisons
+ // we'll still report progress to the journal
+ RuntimeException toThrow = new RuntimeException("Test");
+ RangeComparator comparator = comparator(context(0L, 100L));
+ long[] tokens = new long[] {2, 8, 1, 11, 4, 100, 3, 5, 7, 6, 9, 0};
+ comparator.compare(keys(tokens), keys(tokens), throwDuringExecution(toThrow, 3, 2));
+ assertReported(100, RangeStats.withValues(10, 0L, 2L, 0L, 0L, 0L, 0L, 0L, 0L), journal);
+ assertEquals(1, journal.keySet().size());
+ assertReported(2, toThrow, errors);
+ assertReported(3, toThrow, errors);
+ assertEquals(2, errors.keySet().size());
+ assertNothingReported(mismatches);
+ // the erroring tasks don't get counted in the test as compared
+ assertCompared(8, 1, 11, 4, 100, 5, 7, 6, 9, 0);
+ }
+
+ @Test
+ public void rowLevelMismatchIncrementsPartitionMismatches() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::rowMismatch);
+ assertEquals(0, stats.getMatchedPartitions());
+ assertEquals(3, stats.getMismatchedPartitions());
+ assertEquals(0, stats.getMatchedValues());
+ assertEquals(0, stats.getMismatchedValues());
+ assertNothingReported(errors, journal);
+ assertReported(0, MismatchType.PARTITION_MISMATCH, mismatches);
+ assertReported(1, MismatchType.PARTITION_MISMATCH, mismatches);
+ assertReported(2, MismatchType.PARTITION_MISMATCH, mismatches);
+ assertCompared(0, 1, 2);
+ }
+
+ @Test
+ public void valueMismatchIncrementsPartitionMismatches() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::valuesMismatch);
+ assertEquals(0, stats.getMatchedPartitions());
+ assertEquals(3, stats.getMismatchedPartitions());
+ assertEquals(0, stats.getMatchedValues());
+ assertEquals(30, stats.getMismatchedValues());
+ assertNothingReported(errors, journal);
+ assertReported(0, MismatchType.PARTITION_MISMATCH, mismatches);
+ assertReported(1, MismatchType.PARTITION_MISMATCH, mismatches);
+ assertReported(2, MismatchType.PARTITION_MISMATCH, mismatches);
+ assertCompared(0, 1, 2);
+ }
+
+ @Test
+ public void matchingPartitionIncrementsCount() {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ RangeStats stats = comparator.compare(keys(0, 1, 2), keys(0, 1 ,2), this::alwaysMatch);
+ assertEquals(3, stats.getMatchedPartitions());
+ assertEquals(0, stats.getMismatchedPartitions());
+ assertEquals(0, stats.getMatchedValues());
+ assertEquals(0, stats.getMismatchedValues());
+ assertNothingReported(errors, mismatches, journal);
+ assertCompared(0, 1, 2);
+ }
+
+ @Test
+ public void waitForAllInFlightTasksToComplete() throws InterruptedException {
+ CountDownLatch taskSubmissions = new CountDownLatch(2);
+ List<CountDownLatch> taskGates = Lists.newArrayList(new CountDownLatch(1), new CountDownLatch(1));
+ RangeComparator comparator = comparator(context(0L, 100L));
+
+ Thread t = new Thread(() -> comparator.compare(keys(0, 1),
+ keys(0, 1),
+ waitUntilNotified(taskSubmissions, taskGates)),
+ "CallingThread");
+ t.setDaemon(true);
+ t.start();
+
+ // wait for both tasks to be submitted then check that the calling thread enters a waiting state
+ taskSubmissions.await();
+ assertThreadWaits(t);
+
+ // let the first task complete and check that the calling thread is still waiting
+ taskGates.get(0).countDown();
+ assertThreadWaits(t);
+
+ // let the second task run and wait for the caller to terminate
+ taskGates.get(1).countDown();
+ t.join();
+
+ assertNothingReported(errors, mismatches, journal);
+ assertCompared(0, 1);
+ }
+
+ private RangeStats testTaskError(Function<PartitionKey, PartitionComparator> taskSupplier) {
+ RangeComparator comparator = comparator(context(0L, 100L));
+ Iterator<PartitionKey> sourceKeys = keys(0, 1, 2);
+ Iterator<PartitionKey> targetKeys = keys(0, 1, 2);
+ RangeStats stats = comparator.compare(sourceKeys, targetKeys, taskSupplier);
+ assertNothingReported(mismatches, journal);
+ return stats;
+ }
+
+ private void testErrorReadingKey(Iterator<PartitionKey> sourceKeys,
+ Iterator<PartitionKey> targetKeys,
+ Exception expected) {
+
+ RangeComparator comparator = comparator(context(0L, 100L));
+ try {
+ comparator.compare(sourceKeys, targetKeys, this::alwaysMatch);
+ fail("Expected exception " + expected.getLocalizedMessage());
+ } catch (Exception e) {
+ assertEquals(expected, e.getCause());
+ }
+ assertNothingReported(errors, mismatches, journal);
+ }
+
+ private void assertCompared(long...tokens) {
+ assertEquals(comparedPartitions.size(), tokens.length);
+ for(long t : tokens)
+ assertTrue(comparedPartitions.contains(BigInteger.valueOf(t)));
+ }
+
+ private void assertNothingReported(Multimap...reported) {
+ for (Multimap m : reported)
+ assertTrue(m.isEmpty());
+ }
+
+ private <T> void assertReported(long token, T expected, Multimap<BigInteger, T> reported) {
+ Collection<T> values = reported.get(BigInteger.valueOf(token));
+ assertEquals(1, values.size());
+ assertEquals(expected, values.iterator().next());
+ }
+
+ Iterator<PartitionKey> keys(long throwAtToken, RuntimeException e, long...tokens) {
+ return new AbstractIterator<PartitionKey>() {
+ int i = 0;
+ protected PartitionKey computeNext() {
+ if (i < tokens.length) {
+ long t = tokens[i++];
+ if (t == throwAtToken)
+ throw e;
+
+ return key(t);
+ }
+ return endOfData();
+ }
+ };
+ }
+
+ Iterator<PartitionKey> keys(long...tokens) {
+ return new AbstractIterator<PartitionKey>() {
+ int i = 0;
+ protected PartitionKey computeNext() {
+ if (i < tokens.length)
+ return key(tokens[i++]);
+ return endOfData();
+ }
+ };
+ }
+
+ // yield a PartitionComparator which always concludes that partitions being compared are identical
+ PartitionComparator alwaysMatch(PartitionKey key) {
+ return new PartitionComparator(null, null, null) {
+ public PartitionStats call() {
+ comparedPartitions.add(key.getTokenAsBigInteger());
+ return new PartitionStats();
+ }
+ };
+ }
+
+ // yield a PartitionComparator which always determines that the partitions have a row-level mismatch
+ PartitionComparator rowMismatch(PartitionKey key) {
+ return new PartitionComparator(null, null, null) {
+ public PartitionStats call() {
+ comparedPartitions.add(key.getTokenAsBigInteger());
+ PartitionStats stats = new PartitionStats();
+ stats.allClusteringsMatch = false;
+ return stats;
+ }
+ };
+ }
+
+ // yield a PartitionComparator which always determines that the partitions have a 10 mismatching values
+ PartitionComparator valuesMismatch(PartitionKey key) {
+ return new PartitionComparator(null, null, null) {
+ public PartitionStats call() {
+ comparedPartitions.add(key.getTokenAsBigInteger());
+ PartitionStats stats = new PartitionStats();
+ stats.mismatchedValues = 10;
+ return stats;
+ }
+ };
+ }
+
+ // yield a function which throws when creating a PartitionComparator for a give token
+ // simulates an error reading the source or target partition from the source or target
+ // cluster when constructing the task
+ Function<PartitionKey, PartitionComparator> throwDuringConstruction(long throwAt, RuntimeException toThrow) {
+ return (key) -> {
+ BigInteger t = key.getTokenAsBigInteger();
+ if (t.longValue() == throwAt)
+ throw toThrow;
+
+ return alwaysMatch(key);
+ };
+ }
+
+ // yields a PartitionComparator which throws the supplied exception when the token matches
+ // the one specified to simulate an error when processing the comparison
+ Function<PartitionKey, PartitionComparator> throwDuringExecution(RuntimeException toThrow, long...throwAt) {
+ return (key) -> {
+ BigInteger t = key.getTokenAsBigInteger();
+ return new PartitionComparator(null, null, null) {
+ public PartitionStats call() {
+ for (long shouldThrow : throwAt)
+ if (t.longValue() == shouldThrow)
+ throw toThrow;
+
+ comparedPartitions.add(t);
+ return new PartitionStats();
+ }
+ };
+ };
+ }
+
+ // yields a PartitionComparator which waits on a CountDownLatch before returning from call(). The latches
+ // are supplied in an iterator and each successive task yielded uses the next supplied latch so that callers
+ // can control the rate of task progress
+ // The other supplied latch, firstTaskStarted, is used to signal to the caller that execution of the first
+ // task has started, so the test doesn't complete before this happens
+ Function<PartitionKey, PartitionComparator> waitUntilNotified(final CountDownLatch taskSubmissions,
+ final List<CountDownLatch> taskGates) {
+ final Iterator<CountDownLatch> gateIter = taskGates.iterator();
+ return (key) -> {
+
+ BigInteger t = key.getTokenAsBigInteger();
+ taskSubmissions.countDown();
+
+ return new PartitionComparator(null, null, null) {
+ public PartitionStats call() {
+ if (!gateIter.hasNext())
+ fail("Expected a latch to control task progress");
+
+ try {
+ gateIter.next().await();
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ fail("Interrupted");
+ }
+ comparedPartitions.add(t);
+ return new PartitionStats();
+ }
+ };
+ };
+ }
+
+ PartitionKey key(long token) {
+ return new TestPartitionKey(token);
+ }
+
+ RangeComparator comparator(DiffContext context) {
+ return new RangeComparator(context, errorReporter, mismatchReporter, progressReporter, executor);
+ }
+
+ DiffContext context(long startToken, long endToken, long...disallowedTokens) {
+ return new TestContext(BigInteger.valueOf(startToken),
+ BigInteger.valueOf(endToken),
+ new SpecificTokens(Arrays.stream(disallowedTokens)
+ .mapToObj(BigInteger::valueOf)
+ .collect(Collectors.toSet()),
+ SpecificTokens.Modifier.REJECT));
+ }
+
+ DiffContext context(long startToken, long endToken) {
+ return new TestContext(BigInteger.valueOf(startToken), BigInteger.valueOf(endToken), SpecificTokens.NONE);
+ }
+
+ RangeStats copyOf(RangeStats stats) {
+ return RangeStats.withValues(stats.getMatchedPartitions(),
+ stats.getMismatchedPartitions(),
+ stats.getErrorPartitions(),
+ stats.getSkippedPartitions(),
+ stats.getOnlyInSource(),
+ stats.getOnlyInTarget(),
+ stats.getMatchedRows(),
+ stats.getMatchedValues(),
+ stats.getMismatchedValues());
+ }
+
+ static class TestPartitionKey extends PartitionKey {
+ final Token token;
+
+ TestPartitionKey(final long tokenValue) {
+ super(null);
+ token = new Token() {
+
+ public DataType getType() {
+ return DataType.bigint();
+ }
+
+ public Object getValue() {
+ return tokenValue;
+ }
+
+ public ByteBuffer serialize(ProtocolVersion protocolVersion) {
+ return null;
+ }
+
+ public int compareTo(@NotNull Token o) {
+ assert o.getValue() instanceof Long;
+ return Long.compare(tokenValue, (long)o.getValue());
+ }
+ };
+ }
+
+ protected Token getToken() {
+ return token;
+ }
+ }
+
+ static class TestContext extends DiffContext {
+
+ public TestContext(BigInteger startToken,
+ BigInteger endToken,
+ SpecificTokens specificTokens) {
+ super(null, null, null, null, startToken, endToken, specificTokens, 0.0);
+ }
+ }
+}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java b/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java
new file mode 100644
index 0000000..ad9e1c4
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/TestUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.diff;
+
+import static org.junit.Assert.fail;
+
+public class TestUtils {
+
+ public static void assertThreadWaits(Thread t) {
+ for (int i=0; i < 1000; i++) {
+ try {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException e) {
+ fail("Unexpected InterruptedException");
+ }
+ if (t.getState() == Thread.State.WAITING)
+ return;
+ }
+ fail(String.format("Thread %s expected to enter WAITING state, but failed to do so", t.getName()));
+ }
+
+}
diff --git a/spark-job/src/test/resources/cql-stress-narrow1.yaml b/spark-job/src/test/resources/cql-stress-narrow1.yaml
new file mode 100644
index 0000000..dae375b
--- /dev/null
+++ b/spark-job/src/test/resources/cql-stress-narrow1.yaml
@@ -0,0 +1,62 @@
+#
+# Keyspace info
+#
+keyspace: difftest
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+ CREATE KEYSPACE difftest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: narrow1
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+ CREATE TABLE narrow1 (
+ pk int,
+ v1 text,
+ v2 int,
+ PRIMARY KEY(pk)
+ )
+ WITH compaction = { 'class':'LeveledCompactionStrategy' }
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows. Supported types are
+#
+# EXP(min..max) An exponential distribution over the range [min..max]
+# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
+# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+# GAUSSIAN(min..max,mean,stdev) A gaussian/normal distribution, with explicitly defined mean and stdev
+# UNIFORM(min..max) A uniform distribution over the range [min, max]
+# FIXED(val) A fixed distribution, always returning the same value
+# Aliases: extr, gauss, normal, norm, weibull
+#
+# If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(4..8), population: uniform(1..100B), cluster: fixed(1)
+#
+columnspec:
+ - name: pk
+ population: uniform(1..1B)
+ - name: v1
+ size: fixed(20)
+ - name: v2
+ population: uniform(1..100)
+
+insert:
+ partitions: fixed(50) # number of unique partitions to update in a single operation
+ batchtype: UNLOGGED # type of batch to use
+
+queries:
+ simple1:
+ cql: select * from narrow1 where pk = ? LIMIT 100
+ fields: samerow # samerow or multirow (select arguments from the same row, or randomly from all rows in the partition)
diff --git a/spark-job/src/test/resources/cql-stress-wide1.yaml b/spark-job/src/test/resources/cql-stress-wide1.yaml
new file mode 100644
index 0000000..145ecce
--- /dev/null
+++ b/spark-job/src/test/resources/cql-stress-wide1.yaml
@@ -0,0 +1,69 @@
+#
+# Keyspace info
+#
+keyspace: difftest
+
+#
+# The CQL for creating a keyspace (optional if it already exists)
+#
+keyspace_definition: |
+ CREATE KEYSPACE difftest WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
+
+#
+# Table info
+#
+table: wide1
+
+#
+# The CQL for creating a table you wish to stress (optional if it already exists)
+#
+table_definition: |
+ CREATE TABLE wide1 (
+ pk int,
+ c1 int,
+ c2 int,
+ v1 text,
+ v2 int,
+ PRIMARY KEY(pk, c1, c2)
+ )
+ WITH compaction = { 'class':'LeveledCompactionStrategy' }
+
+#
+# Optional meta information on the generated columns in the above table
+# The min and max only apply to text and blob types
+# The distribution field represents the total unique population
+# distribution of that column across rows. Supported types are
+#
+# EXP(min..max) An exponential distribution over the range [min..max]
+# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
+# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
+# GAUSSIAN(min..max,mean,stdev) A gaussian/normal distribution, with explicitly defined mean and stdev
+# UNIFORM(min..max) A uniform distribution over the range [min, max]
+# FIXED(val) A fixed distribution, always returning the same value
+# Aliases: extr, gauss, normal, norm, weibull
+#
+# If preceded by ~, the distribution is inverted
+#
+# Defaults for all columns are size: uniform(4..8), population: uniform(1..100B), cluster: fixed(1)
+#
+columnspec:
+ - name: pk
+ population: uniform(1..1B)
+ - name: c1
+ cluster: uniform(1..100)
+ - name: c2
+ cluster: uniform(1..100)
+ - name: v1
+ size: fixed(20)
+ - name: v2
+ population: uniform(1..100)
+
+insert:
+ partitions: fixed(1) # number of unique partitions to update in a single operation
+ batchtype: UNLOGGED # type of batch to use
+
+
+queries:
+ simple1:
+ cql: select * from wide where pk = ? AND c1 = ? AND c2 = ? LIMIT 1
+ fields: samerow