Distinguish partition error source cluster

- Wraps client error with ClusterSourcedException to distinguish the error source.
- Stores `error_source text` in the error details table in metadata.
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java b/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java
new file mode 100644
index 0000000..5412c65
--- /dev/null
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/ClusterSourcedException.java
@@ -0,0 +1,31 @@
+package org.apache.cassandra.diff;
+
+import java.util.concurrent.Callable;
+
+import org.apache.cassandra.diff.DiffCluster.Type;
+
+/**
+ * Wraps the cause with the exception source indicator, {@param type} of the cluster.
+ * It is used to distinguish driver exceptions among testing clusters.
+ */
+public class ClusterSourcedException extends RuntimeException {
+    public final Type exceptionSource;
+
+    ClusterSourcedException(Type exceptionSource, Throwable cause) {
+        super(cause);
+        this.exceptionSource = exceptionSource;
+    }
+
+    public static <T> T catches(Type exceptionSource, Callable<T> callable) {
+        try {
+            return callable.call();
+        } catch (Exception ex) {
+            throw new ClusterSourcedException(exceptionSource, ex);
+        }
+    }
+
+    @Override
+    public String getMessage() {
+        return String.format("from %s - %s", exceptionSource.name(), super.getMessage());
+    }
+}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
index f7545e3..1ec78f0 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -28,6 +28,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -41,6 +42,7 @@
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 
 public class Differ implements Serializable
@@ -208,9 +210,9 @@
         final Function<PartitionKey, PartitionComparator> partitionTaskProvider =
             (key) -> {
                 boolean reverse = context.shouldReverse();
-                return new PartitionComparator(context.table,
-                                               context.source.getPartition(context.table, key, reverse),
-                                               context.target.getPartition(context.table, key, reverse));
+                Iterator<Row> source = fetchRows(context, key, reverse, DiffCluster.Type.SOURCE);
+                Iterator<Row> target = fetchRows(context, key, reverse, DiffCluster.Type.TARGET);
+                return new PartitionComparator(context.table, source, target);
             };
 
         RangeComparator rangeComparator = new RangeComparator(context,
@@ -224,6 +226,13 @@
         return tableStats;
     }
 
+    private Iterator<Row> fetchRows(DiffContext context, PartitionKey key, boolean shouldReverse, DiffCluster.Type type) {
+        Callable<Iterator<Row>> rows = () -> type == DiffCluster.Type.SOURCE
+                                             ? context.source.getPartition(context.table, key, shouldReverse)
+                                             : context.target.getPartition(context.table, key, shouldReverse);
+        return ClusterSourcedException.catches(type, rows);
+    }
+
     @VisibleForTesting
     static Map<KeyspaceTablePair, DiffJob.TaskStatus> filterTables(Iterable<KeyspaceTablePair> keyspaceTables,
                                                                    DiffJob.Split split,
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
index d9d4035..a7247dd 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -25,6 +25,7 @@
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -131,8 +132,9 @@
                                                                 " qualified_table_name," +
                                                                 " start_token," +
                                                                 " end_token," +
-                                                                " error_token)" +
-                                                                " VALUES (?, ?, ?, ?, ?, ?)",
+                                                                " error_token," +
+                                                                " error_source)" +
+                                                                " VALUES (?, ?, ?, ?, ?, ?, ?)",
                                                                 metadataKeyspace, Schema.ERROR_DETAIL));
             }
 
@@ -218,7 +220,15 @@
                                        "error for partition with token %s", table, token), error);
             BatchStatement batch = new BatchStatement();
             batch.add(bindErrorSummaryStatement(table));
-            batch.add(bindErrorDetailStatement(table, token));
+            DiffCluster.Type exceptionSource = null;
+            int maxRetrace = 10; // In case there is a loop, we do not want to loop forever or throw. So just limit the number of retracing.
+            for (Throwable t = error; t.getCause() != null && maxRetrace > 0; t = t.getCause(), maxRetrace--) {
+                if (t instanceof ClusterSourcedException) {
+                    exceptionSource = ((ClusterSourcedException) t).exceptionSource;
+                    break;
+                }
+            }
+            batch.add(bindErrorDetailStatement(table, token, exceptionSource));
             batch.setIdempotent(true);
             session.execute(batch);
         }
@@ -247,8 +257,9 @@
                                    .setIdempotent(true);
         }
 
-        private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken) {
-            return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString())
+        private Statement bindErrorDetailStatement(KeyspaceTablePair table, BigInteger errorToken, DiffCluster.Type exceptionSource) {
+            String errorSource = exceptionSource == null ? "" : exceptionSource.name();
+            return errorDetailStmt.bind(jobId, bucket, table.toCqlValueString(), startToken, endToken, errorToken.toString(), errorSource)
                                   .setIdempotent(true);
         }
 
@@ -526,6 +537,7 @@
                                                           " start_token varchar," +
                                                           " end_token varchar," +
                                                           " error_token varchar," +
+                                                          " error_source varchar," +
                                                           " PRIMARY KEY ((job_id, bucket, qualified_table_name, start_token, end_token), error_token))" +
                                                           " WITH default_time_to_live = %s";
 
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
index 4214f2b..8aefb49 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/PartitionComparator.java
@@ -27,6 +27,7 @@
 
 import com.datastax.driver.core.ColumnMetadata;
 import com.datastax.driver.core.Row;
+import org.apache.cassandra.diff.DiffCluster.Type;
 
 public class PartitionComparator implements Callable<PartitionStats> {
 
@@ -53,10 +54,10 @@
             return partitionStats;
         }
 
-        while (source.hasNext() && target.hasNext()) {
+        while (hasNextRow(Type.SOURCE) && hasNextRow(Type.TARGET)) {
 
-            Row sourceRow = source.next();
-            Row targetRow = target.next();
+            Row sourceRow = getNextRow(Type.SOURCE);
+            Row targetRow = getNextRow(Type.TARGET);
 
             // if primary keys don't match don't proceed any further, just mark the
             // partition as mismatched and be done
@@ -73,12 +74,26 @@
         }
 
         // if one of the iterators isn't exhausted, then there's a mismatch at the partition level
-        if (source.hasNext() || target.hasNext())
+        if (hasNextRow(Type.SOURCE) || hasNextRow(Type.TARGET))
             partitionStats.allClusteringsMatch = false;
 
         return partitionStats;
     }
 
+    private boolean hasNextRow(Type type) {
+        Callable<Boolean> hasNext = () -> type == Type.SOURCE
+                                          ? source.hasNext()
+                                          : target.hasNext();
+        return ClusterSourcedException.catches(type, hasNext);
+    }
+
+    private Row getNextRow(Type type) {
+        Callable<Row> next = () -> type == Type.SOURCE
+                                   ? source.next()
+                                   : target.next();
+        return ClusterSourcedException.catches(type, next);
+    }
+
     private boolean clusteringsEqual(Row source, Row target) {
         for (ColumnMetadata column : tableSpec.getClusteringColumns()) {
             Object fromSource = source.getObject(column.getName());
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
index bb5e937..5d6710e 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/RangeComparator.java
@@ -125,8 +125,7 @@
                         // unavailables occurring when performing the initial query to read the full partition.
                         // Errors thrown when paging through the partition in comparisonTask will be handled by
                         // the onError callback.
-                        rangeStats.partitionError();
-                        errorReporter.accept(t, token);
+                        recordError(rangeStats, token, errorReporter, t);
                     } finally {
                         // if the cluster has been shutdown because the task failed the underlying iterators
                         // of partition keys will return hasNext == false
@@ -224,10 +223,15 @@
     private Consumer<Throwable> onError(final RangeStats rangeStats,
                                         final BigInteger currentToken,
                                         final BiConsumer<Throwable, BigInteger> errorReporter) {
-        return (error) -> {
-            rangeStats.partitionError();
-            errorReporter.accept(error, currentToken);
-        };
+        return (error) -> recordError(rangeStats, currentToken, errorReporter, error);
+    }
+
+    private void recordError(final RangeStats rangeStats,
+                             final BigInteger currentToken,
+                             final BiConsumer<Throwable, BigInteger> errorReporter,
+                             final Throwable error) {
+        rangeStats.partitionError();
+        errorReporter.accept(error, currentToken);
     }
 }
 
diff --git a/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java b/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java
new file mode 100644
index 0000000..e96c96d
--- /dev/null
+++ b/spark-job/src/test/java/org/apache/cassandra/diff/ClusterSourcedExceptionTest.java
@@ -0,0 +1,33 @@
+package org.apache.cassandra.diff;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import org.hamcrest.CoreMatchers;
+import org.hamcrest.CustomMatcher;
+
+public class ClusterSourcedExceptionTest {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Test
+    public void testCatchesExceptionHasExceptionSourceInfo() {
+        expectedException.expect(ClusterSourcedException.class);
+        expectedException.expectCause(CoreMatchers.isA(RuntimeException.class));
+        expectedException.expectMessage("from SOURCE");
+        expectedException.expect(new CustomMatcher<ClusterSourcedException>("matches the expected exceptionSource: SOURCE") {
+            @Override
+            public boolean matches(Object item) {
+                if (item instanceof ClusterSourcedException) {
+                    ClusterSourcedException ex = (ClusterSourcedException) item;
+                    return ex.exceptionSource == DiffCluster.Type.SOURCE;
+                }
+                return false;
+            }
+        });
+        ClusterSourcedException.catches(DiffCluster.Type.SOURCE, () -> {
+            throw new RuntimeException();
+        });
+    }
+}