Merge pull request #3101 from frison/STORM-1515

STORM-1515: Fix LocalState Corruption
diff --git a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
index 6e1b44f..d5996d2 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/LocalState.java
@@ -32,6 +32,7 @@
 import org.apache.storm.thrift.TBase;
 import org.apache.storm.thrift.TDeserializer;
 import org.apache.storm.thrift.TSerializer;
+import org.apache.storm.thrift.protocol.TProtocolException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -123,6 +124,10 @@
             } catch (Exception e) {
                 attempts++;
                 if (attempts >= 10) {
+                    if (e.getCause() instanceof TProtocolException) {
+                        LOG.warn("LocalState file is corrupted, resetting state.", e);
+                        return new HashMap<>();
+                    }
                     throw new RuntimeException(e);
                 }
             }
diff --git a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
index 97765b2..2a14857 100644
--- a/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
+++ b/storm-server/src/test/java/org/apache/storm/LocalStateTest.java
@@ -57,15 +57,37 @@
 
     @Test
     public void testEmptyState() throws IOException {
-        TmpPath tmp_dir = new TmpPath();
-        String dir = tmp_dir.getPath();
-        LocalState ls = new LocalState(dir, true);
-        GlobalStreamId gs_a = new GlobalStreamId("a", "a");
-        FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"));
-        FileOutputStream version = FileUtils.openOutputStream(new File(dir, "12345.version"));
-        Assert.assertNull(ls.get("c"));
-        ls.put("a", gs_a);
-        Assert.assertEquals(gs_a, ls.get("a"));
+        try (TmpPath tmp_dir = new TmpPath()) {
+            GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
 
+            String dir = tmp_dir.getPath();
+            LocalState ls = new LocalState(dir, true);
+
+            FileUtils.touch(new File(dir, "12345"));
+            FileUtils.touch(new File(dir, "12345.version"));
+
+            Assert.assertNull(ls.get("c"));
+            ls.put("a", globalStreamId_a);
+            Assert.assertEquals(globalStreamId_a, ls.get("a"));
+        }
+    }
+
+    @Test
+    public void testAllNulState() throws IOException {
+        try (TmpPath tmp_dir = new TmpPath()) {
+            GlobalStreamId globalStreamId_a = new GlobalStreamId("a", "a");
+
+            String dir = tmp_dir.getPath();
+            LocalState ls = new LocalState(dir, true);
+
+            FileUtils.touch(new File(dir, "12345.version"));
+
+            try (FileOutputStream data = FileUtils.openOutputStream(new File(dir, "12345"))) {
+                Assert.assertNull(ls.get("c"));
+                data.write(new byte[100]);
+                ls.put("a", globalStreamId_a);
+                Assert.assertEquals(globalStreamId_a, ls.get("a"));
+            }
+        }
     }
 }