Merge pull request #3101 from frison/STORM-1515
STORM-1515: Fix LocalState Corruption
diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
index 6e1b44f..d5996d2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
@@ -32,6 +32,7 @@
import org.apache.storm.thrift.TBase;
import org.apache.storm.thrift.TDeserializer;
import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.protocol.TProtocolException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -123,6 +124,10 @@
} catch (Exception e) {
attempts++;
if (attempts >= 10) {
+ if (e.getCause() instanceof TProtocolException) {
+ LOG.warn("LocalState file is corrupted, resetting state.", e);
+ return new HashMap<>();
+ }
throw new RuntimeException(e);
}
}
diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
index 97765b2..2a14857 100644
--- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
+++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
@@ -57,15 +57,37 @@
@Test
public void testEmptyState() throws IOException {
- TmpPath tmp_dir = new TmpPath();
- String dir = tmp_dir.getPath();
- LocalState ls = new LocalState(dir, true);
- GlobalStreamId gs_a = new GlobalStreamId("a", "a");
- FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"));
- FileOutputStream version = FileUtils.openOutputStream(new File(dir, "12345.version"));
- Assert.assertNull(ls.get("c"));
- ls.put("a", gs_a);
- Assert.assertEquals(gs_a, ls.get("a"));
+ try (TmpPath tmp_dir = new TmpPath()) {
+ GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+ String dir = tmp_dir.getPath();
+ LocalState ls = new LocalState(dir, true);
+
+ FileUtils.touch(new File(dir, "12345"));
+ FileUtils.touch(new File(dir, "12345.version"));
+
+ Assert.assertNull(ls.get("c"));
+ ls.put("a", globalStreamId_a);
+ Assert.assertEquals(globalStreamId_a, ls.get("a"));
+ }
+ }
+
+ @Test
+ public void testAllNulState() throws IOException {
+ try (TmpPath tmp_dir = new TmpPath()) {
+ GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+
+ String dir = tmp_dir.getPath();
+ LocalState ls = new LocalState(dir, true);
+
+ FileUtils.touch(new File(dir, "12345.version"));
+
+ try (FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"))) {
+ Assert.assertNull(ls.get("c"));
+ data.write(new byte[100]);
+ ls.put("a", globalStreamId_a);
+ Assert.assertEquals(globalStreamId_a, ls.get("a"));
+ }
+ }
}
}