Merge pull request #566 from jasonjckn/0.9.0-nonodeexists
Ignore NoNodeExists exceptions
diff --git a/src/jvm/backtype/storm/utils/Utils.java b/src/jvm/backtype/storm/utils/Utils.java
index 36d4d5c..6176918 100644
--- a/src/jvm/backtype/storm/utils/Utils.java
+++ b/src/jvm/backtype/storm/utils/Utils.java
@@ -357,4 +357,15 @@
buffer.get(ret, 0, ret.length);
return ret;
}
+
+ public static boolean exceptionCauseIsInstanceOf(Class klass, Throwable throwable) {
+ Throwable t = throwable;
+ while(t != null) {
+ if(klass.isInstance(t)) {
+ return true;
+ }
+ t = t.getCause();
+ }
+ return false;
+ }
}
diff --git a/src/jvm/storm/trident/topology/state/RotatingTransactionalState.java b/src/jvm/storm/trident/topology/state/RotatingTransactionalState.java
index 8e34e3c..9f22cc7 100644
--- a/src/jvm/storm/trident/topology/state/RotatingTransactionalState.java
+++ b/src/jvm/storm/trident/topology/state/RotatingTransactionalState.java
@@ -1,5 +1,8 @@
package storm.trident.topology.state;
+import backtype.storm.utils.Utils;
+import org.apache.zookeeper.KeeperException;
+
import java.util.HashSet;
import java.util.List;
import java.util.SortedMap;
@@ -96,7 +99,15 @@
SortedMap<Long, Object> toDelete = _curr.headMap(txid);
for(long tx: new HashSet<Long>(toDelete.keySet())) {
_curr.remove(tx);
- _state.delete(txPath(tx));
+ try {
+ _state.delete(txPath(tx));
+ } catch(RuntimeException e) {
+ // Ignore NoNodeExists exceptions because when sync() it may populate _curr with stale data since
+ // zookeeper reads are eventually consistent.
+ if(!Utils.exceptionCauseIsInstanceOf(KeeperException.NoNodeException.class, e)) {
+ throw e;
+ }
+ }
}
}