Allow diff tool to run in-jvm multiple times, and resolve some conflicting shading
closes #4
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
index bb14c25..26a74e6 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/DiffJob.java
@@ -149,7 +149,10 @@
throw new RuntimeException("Diff job failed", e);
} finally {
if (sc.isLocal())
+ {
Differ.shutdown();
+ JobMetadataDb.ProgressTracker.resetStatements();
+ }
}
}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
index 49576a2..2272b44 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/Differ.java
@@ -307,14 +307,17 @@
if (srcDiffCluster != null) {
srcDiffCluster.stop();
srcDiffCluster.close();
+ srcDiffCluster = null;
}
if (targetDiffCluster != null) {
targetDiffCluster.stop();
targetDiffCluster.close();
+ targetDiffCluster = null;
}
if (journalSession != null) {
journalSession.close();
journalSession.getCluster().close();
+ journalSession = null;
}
COMPARISON_EXECUTOR.shutdown();
}
diff --git a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
index 0ac6521..1eb121c 100644
--- a/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
+++ b/spark-job/src/main/java/org/apache/cassandra/diff/JobMetadataDb.java
@@ -129,6 +129,15 @@
}
+ public static void resetStatements()
+ {
+ updateStmt = null;
+ mismatchStmt = null;
+ errorSummaryStmt = null;
+ errorDetailStmt = null;
+ updateCompleteStmt = null;
+ }
+
/**
*
* @param table
@@ -400,15 +409,23 @@
public void markNotRunning(UUID jobId) {
- logger.info("Marking job {} as not running", jobId);
+ try
+ {
+ logger.info("Marking job {} as not running", jobId);
- ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
- keyspace, Schema.RUNNING_JOBS),
- jobId);
- if (!rs.one().getBool("[applied]")) {
- logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
- "during initialization as there may be no entry for this job in the {} table",
+ ResultSet rs = session.execute(String.format("DELETE FROM %s.%s WHERE job_id = ? IF EXISTS",
+ keyspace, Schema.RUNNING_JOBS),
+ jobId);
+ if (!rs.one().getBool("[applied]"))
+ {
+ logger.warn("Non-fatal: Unable to mark job %s as not running, check logs for errors " +
+ "during initialization as there may be no entry for this job in the {} table",
jobId, Schema.RUNNING_JOBS);
+ }
+ } catch (Exception e) {
+ // Because this is called from another exception handler, we don't want to lose the original exception
+ // just because we may not have been able to mark the job as not running. Just log here
+ logger.error("Could not mark job {} as not running.", e);
}
}
}