Distinguish partition error source cluster
- Wraps client error with ClusterSourcedException to distinguish the error source.
- Stores `error_source text` in the error details table in metadata.
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java b/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java
new file mode 100644
index 0000000..5412c65
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java
@@ -0,0 +1,31 @@
+package org.apache.cassandra.diff;
+
+import java.util.concurrent.Callable;
+
+import org.apache.cassandra.diff.DiffCluster.Type;
+
+/**
+ * Wraps the cause with the exception source indicator, {@param type} of the cluster.
+ * It is used to distinguish driver exceptions among testing clusters.
+ */
+public class ClusterSourcedException extends RuntimeException {
+ public final Type exceptionSource;
+
+ ClusterSourcedException(Type exceptionSource, Throwable cause) {
+ super(cause);
+ this.exceptionSource = exceptionSource;
+ }
+
+ public static <T> T catches(Type exceptionSource, Callable<T> callable) {
+ try {
+ return callable.call();
+ } catch (Exception ex) {
+ throw new ClusterSourcedException(exceptionSource, ex);
+ }
+ }
+
+ @Override
+ public String getMessage() {
+ return String.format("from %s - %s", exceptionSource.name(), super.getMessage());
+ }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
index f7545e3..1ec78f0 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -28,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -41,6 +42,7 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
public class Differ implements Serializable
@@ -208,9 +210,9 @@
final Function<PartitionKey, PartitionComparator> partitionTaskProvider =
(key) -> {
boolean reverse = context.shouldReverse();
- return new PartitionComparator(context.table,
- context.source.getPartition(context.table, key, reverse),
- context.target.getPartition(context.table, key, reverse));
+ Iterator<Row> source = fetchRows(context, key, reverse, DiffCluster.Type.SOURCE);
+ Iterator<Row> target = fetchRows(context, key, reverse, DiffCluster.Type.TARGET);
+ return new PartitionComparator(context.table, source, target);
};
RangeComparator rangeComparator = new RangeComparator(context,
@@ -224,6 +226,13 @@
return tableStats;
}
+ private Iterator<Row> fetchRows(DiffContext context, PartitionKey key, boolean shouldReverse, DiffCluster.Type type) {
+ Callable<Iterator<Row>> rows = () -> type == DiffCluster.Type.SOURCE
+ ? context.source.getPartition(context.table, key, shouldReverse)
+ : context.target.getPartition(context.table, key, shouldReverse);
+ return ClusterSourcedException.catches(type, rows);
+ }
+
@VisibleForTesting
static Map<KeyspaceTablePair, DiffJob.TaskStatus> filterTables(Iterable<KeyspaceTablePair> keyspaceTables,
DiffJob.Split split,
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
index d9d4035..a7247dd 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -25,6 +25,7 @@
import java.util.UUID;
import java.util.stream.Collectors;
+import com.google.common.base.Throwables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,8 +132,9 @@
" qualified_table_name," +
" start_token," +
" end_token," +
- " error_token)" +
- " VALUES (?, ?, ?, ?, ?, ?)",
+ " error_token," +
+ " error_source)" +
+ " VALUES (?, ?, ?, ?, ?, ?, ?)",
metadataKeyspace, Schema.ERROR_DETAIL));
}
@@ -218,7 +220,15 @@
"error for partition with token %s", table, token), error);
BatchStatement batch = new BatchStatement();
batch.add(bindErrorSummaryStatement(table));
- batch.add(bindErrorDetailStatement(table, token));
+ DiffCluster.Type exceptionSource = null;
+ int maxRetrace = 10; // In case there is a loop, we do not want to loop forever or throw. So just limit the number of retracing.
+ for (Throwable t = error; t.getCause() != null && maxRetrace > 0; t = t.getCause(), maxRetrace--) {
+ if (t instanceof ClusterSourcedException) {
+ exceptionSource = ((ClusterSourcedException) t).exceptionSource;
+ break;
+ }
+ }
+ batch.add(bindErrorDetailStatement(table, token, exceptionSource));
batch.setIdempotent(true);
session.execute(batch);
}
@@ -247,8 +257,9 @@
.setIdempotent(true);
}
- private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken) {
- return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString())
+ private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken, DiffCluster.Type exceptionSource) {
+ String errorSource = exceptionSource == null ? "" : exceptionSource.name();
+ return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString(), errorSource)
.setIdempotent(true);
}
@@ -526,6 +537,7 @@
" start_token varchar," +
" end_token varchar," +
" error_token varchar," +
+ " error_source varchar," +
" PRIMARY KEY ((job_id, bucket, qualified_table_name, start_token, end_token), error_token))" +
" WITH default_time_to_live = %s";
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
index 4214f2b..8aefb49 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
@@ -27,6 +27,7 @@
import com.datastax.driver.core.ColumnMetadata;
import com.datastax.driver.core.Row;
+import org.apache.cassandra.diff.DiffCluster.Type;
public class PartitionComparator implements Callable<PartitionStats> {
@@ -53,10 +54,10 @@
return partitionStats;
}
- while (source.hasNext() && target.hasNext()) {
+ while (hasNextRow(Type.SOURCE) && hasNextRow(Type.TARGET)) {
- Row sourceRow = source.next();
- Row targetRow = target.next();
+ Row sourceRow = getNextRow(Type.SOURCE);
+ Row targetRow = getNextRow(Type.TARGET);
// if primary keys don't match don't proceed any further, just mark the
// partition as mismatched and be done
@@ -73,12 +74,26 @@
}
// if one of the iterators isn't exhausted, then there's a mismatch at the partition level
- if (source.hasNext() || target.hasNext())
+ if (hasNextRow(Type.SOURCE) || hasNextRow(Type.TARGET))
partitionStats.allClusteringsMatch = false;
return partitionStats;
}
+ private boolean hasNextRow(Type type) {
+ Callable<Boolean> hasNext = () -> type == Type.SOURCE
+ ? source.hasNext()
+ : target.hasNext();
+ return ClusterSourcedException.catches(type, hasNext);
+ }
+
+ private Row getNextRow(Type type) {
+ Callable<Row> next = () -> type == Type.SOURCE
+ ? source.next()
+ : target.next();
+ return ClusterSourcedException.catches(type, next);
+ }
+
private boolean clusteringsEqual(Row source, Row target) {
for (ColumnMetadata column : tableSpec.getClusteringColumns()) {
Object fromSource = source.getObject(column.getName());
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
index bb5e937..5d6710e 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
@@ -125,8 +125,7 @@
// unavailables occurring when performing the initial query to read the full partition.
// Errors thrown when paging through the partition in comparisonTask will be handled by
// the onError callback.
- rangeStats.partitionError();
- errorReporter.accept(t, token);
+ recordError(rangeStats, token, errorReporter, t);
} finally {
// if the cluster has been shutdown because the task failed the underlying iterators
// of partition keys will return hasNext == false
@@ -224,10 +223,15 @@
private Consumer<Throwable> onError(final RangeStats rangeStats,
final BigInteger currentToken,
final BiConsumer<Throwable, BigInteger> errorReporter) {
- return (error) -> {
- rangeStats.partitionError();
- errorReporter.accept(error, currentToken);
- };
+ return (error) -> recordError(rangeStats, currentToken, errorReporter, error);
+ }
+
+ private void recordError(final RangeStats rangeStats,
+ final BigInteger currentToken,
+ final BiConsumer<Throwable, BigInteger> errorReporter,
+ final Throwable error) {
+ rangeStats.partitionError();
+ errorReporter.accept(error, currentToken);
}
}
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java
new file mode 100644
index 0000000..e96c96d
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java
@@ -0,0 +1,33 @@
+package org.apache.cassandra.diff;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.CustomMatcher;
+
+public class ClusterSourcedExceptionTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testCatchesExceptionHasExceptionSourceInfo() {
+ expectedException.expect(ClusterSourcedException.class);
+ expectedException.expectCause(CoreMatchers.isA(RuntimeException.class));
+ expectedException.expectMessage("from SOURCE");
+ expectedException.expect(new CustomMatcher<ClusterSourcedException>("matches the expected exceptionSource: SOURCE") {
+ @Override
+ public boolean matches(Object item) {
+ if (item instanceof ClusterSourcedException) {
+ ClusterSourcedException ex = (ClusterSourcedException) item;
+ return ex.exceptionSource == DiffCluster.Type.SOURCE;
+ }
+ return false;
+ }
+ });
+ ClusterSourcedException.catches(DiffCluster.Type.SOURCE, () -> {
+ throw new RuntimeException();
+ });
+ }
+}