Improvements to BulkImportCacheCleaner (#1890)
* Check if the ZK parent node exists before performing a sync and getting
the children in TransactionWatcher ZooArbitrator, preventing NoNodeExceptions.
* Modify exception handling in BulkImportCacheCleaner. Handle InterruptedException
properly by propagating the interrupt status.
* Also add transaction ids to debugging logs.
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
index 6b7dd0d..b439459 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/TransactionWatcher.java
@@ -105,12 +105,14 @@
final Set<Long> result = new HashSet<>();
final String parent = context.getZooKeeperRoot() + "/" + type;
reader.sync(parent);
- List<String> children = reader.getChildren(parent);
- for (String child : children) {
- if (child.endsWith("-running")) {
- continue;
+ if (reader.exists(parent)) {
+ List<String> children = reader.getChildren(parent);
+ for (String child : children) {
+ if (child.endsWith("-running")) {
+ continue;
+ }
+ result.add(Long.parseLong(child));
}
- result.add(Long.parseLong(child));
}
return result;
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
index 6f96203..dc56908 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/BulkImportCacheCleaner.java
@@ -54,9 +54,13 @@
for (Tablet tablet : server.getOnlineTablets().values()) {
tablet.cleanupBulkLoadedFiles(tids);
}
- } catch (KeeperException | InterruptedException e) {
+ } catch (KeeperException e) {
// we'll just clean it up again later
- log.debug("Error reading bulk import live transactions {}", e);
+ log.debug("Error reading bulk import live transactions {}", tids, e);
+ } catch (InterruptedException e) {
+ // propagate the interrupt status.
+ Thread.currentThread().interrupt();
+ log.warn("Interrupted while reading bulk import live transactions {}", tids, e);
}
}