Allow diff tool to run in-jvm multiple times, and resolve some conflicting shading

closes #4
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index bb14c25..26a74e6 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -149,7 +149,10 @@
             throw new RuntimeException("Diff job failed", e);
         } finally {
             if (sc.isLocal())
+            {
                 Differ.shutdown();
+                JobMetadataDb.ProgressTracker.resetStatements();
+            }
         }
     }
 
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
index 49576a2..2272b44 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -307,14 +307,17 @@
             if (srcDiffCluster != null) {
                 srcDiffCluster.stop();
                 srcDiffCluster.close();
+                srcDiffCluster = null;
             }
             if (targetDiffCluster != null) {
                 targetDiffCluster.stop();
                 targetDiffCluster.close();
+                targetDiffCluster = null;
             }
             if (journalSession != null) {
                 journalSession.close();
                 journalSession.getCluster().close();
+                journalSession = null;
             }
             COMPARISON_EXECUTOR.shutdown();
         }
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
index 0ac6521..1eb121c 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -129,6 +129,15 @@
 
         }
 
+        public static void resetStatements()
+        {
+            updateStmt = null;
+            mismatchStmt = null;
+            errorSummaryStmt = null;
+            errorDetailStmt = null;
+            updateCompleteStmt = null;
+        }
+
         /**
          *
          * @param table
@@ -400,15 +409,23 @@
 
 
         public void markNotRunning(UUID jobId) {
-            logger.info("Marking job {} as not running", jobId);
+            try
+            {
+                logger.info("Marking job {} as not running", jobId);
 
-            ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
-                                                         keyspace, Schema.RUNNING_JOBS),
-                                           jobId);
-            if (!rs.one().getBool("[applied]")) {
-                logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
-                            "during initialization as there may be no entry for this job in the {} table",
+                ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
+                        keyspace, Schema.RUNNING_JOBS),
+                        jobId);
+                if (!rs.one().getBool("[applied]"))
+                {
+                    logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
+                                    "during initialization as there may be no entry for this job in the {} table",
                             jobId, Schema.RUNNING_JOBS);
+                }
+            } catch (Exception e) {
+                // Because this is called from another exception handler, we don't want to lose the original exception
+                // just because we may not have been able to mark the job as not running. Just log here
+                logger.error("Could not mark job {} as not running.", e);
             }
         }
     }