Merge branch 'APEXCORE-649' of github.com:tushargosavi/apex-core
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 0c389a4..c8275ea 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -147,7 +147,7 @@
   }
 
   @Override
-  public void finalize(int operatorId, long windowId) throws IOException
+  public void flush(int operatorId, long windowId) throws IOException
   {
     // Checkpoint already present in HDFS during save, when syncCheckpoint is true.
     if (isSyncCheckpoint()) {
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index b5a43fe..4b71761 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -41,6 +41,7 @@
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
@@ -157,27 +158,23 @@
     try {
       FileStatus status = fileContext.getFileStatus(lPath);
       if (!status.isDirectory()) {
-        throw new IOException("Checkpoint location is not a directory ");
+        throw new RuntimeException("Checkpoint location is not a directory");
       }
     } catch (FileNotFoundException ex) {
-      // During initialization this directory may not exists.
-      // return an empty array.
-      return new long[0];
+      // During initialization checkpoint directory may not exists.
+      fileContext.mkdir(lPath, FsPermission.getDirDefault(), true);
     }
 
     RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
-    if (!fileStatusRemoteIterator.hasNext()) {
-      throw new IOException("Storage Agent has not saved anything yet!");
-    }
     List<Long> lwindows = new ArrayList<>();
-    do {
+    while (fileStatusRemoteIterator.hasNext()) {
       FileStatus fileStatus = fileStatusRemoteIterator.next();
       String name = fileStatus.getPath().getName();
       if (name.equals(TMP_FILE)) {
         continue;
       }
       lwindows.add(STATELESS_CHECKPOINT_WINDOW_ID.equals(name) ? Stateless.WINDOW_ID : Long.parseLong(name, 16));
-    } while (fileStatusRemoteIterator.hasNext());
+    }
     long[] windowIds = new long[lwindows.size()];
     for (int i = 0; i < windowIds.length; i++) {
       windowIds[i] = lwindows.get(i);
diff --git a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
index 337ccdd..f797b92 100644
--- a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
+++ b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
@@ -41,7 +41,7 @@
    * @param windowId
    * @throws IOException
    */
-  void finalize(int operatorId, long windowId) throws IOException;
+  void flush(int operatorId, long windowId) throws IOException;
 
   /**
    * Check if StorageAgent is configured to take synchronous checkpoints.
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6ec5267..d029b16 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,9 +65,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.common.util.CascadeStorageAgent;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
+import org.apache.apex.engine.util.CascadeStorageAgent;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ToStringBuilder;
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index c84a249..88b002f 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -539,7 +539,7 @@
                 checkpointStats = null;
                 return;
               } else {
-                asyncStorageAgent.finalize(id, windowId);
+                asyncStorageAgent.flush(id, windowId);
               }
             }
           }
@@ -688,7 +688,7 @@
     @Override
     public Stats.CheckpointStats call() throws Exception
     {
-      agent.finalize(id, windowId);
+      agent.flush(id, windowId);
       stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime;
       return stats;
     }
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index f4e2100..ecc010c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1227,7 +1227,7 @@
       StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
       agent.save(oo, oper.id, windowId);
       if (agent instanceof AsyncStorageAgent) {
-        ((AsyncStorageAgent)agent).finalize(oper.id, windowId);
+        ((AsyncStorageAgent)agent).flush(oper.id, windowId);
       }
     } catch (IOException e) {
       // inconsistent state, no recovery option, requires shutdown
diff --git a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java b/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
similarity index 89%
rename from common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
rename to engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
index d6fec8e..9903010 100644
--- a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
+++ b/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
@@ -16,10 +16,10 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.common.util;
+package org.apache.apex.engine.util;
 
 import java.io.IOException;
-import java.io.ObjectStreamException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Map;
@@ -27,6 +27,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.common.util.AsyncStorageAgent;
+import org.apache.hadoop.classification.InterfaceStability;
+
 import com.google.common.collect.Maps;
 
 import com.datatorrent.api.StorageAgent;
@@ -39,12 +42,14 @@
  * restart to avoiding copying checkpoints from old application directory to improve application
  * restart time.
  */
+@InterfaceStability.Evolving
 public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable
 {
+  private static final long serialVersionUID = 985557590735264920L;
   private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class);
   private final StorageAgent parent;
   private final StorageAgent current;
-  private final transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
+  private transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
 
   public CascadeStorageAgent(StorageAgent parent, StorageAgent current)
   {
@@ -169,10 +174,10 @@
   }
 
   @Override
-  public void finalize(int operatorId, long windowId) throws IOException
+  public void flush(int operatorId, long windowId) throws IOException
   {
     if (current instanceof AsyncStorageAgent) {
-      ((AsyncStorageAgent)current).finalize(operatorId, windowId);
+      ((AsyncStorageAgent)current).flush(operatorId, windowId);
     }
   }
 
@@ -185,9 +190,10 @@
     return true;
   }
 
-  public Object readResolve() throws ObjectStreamException
+  private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
   {
-    return new CascadeStorageAgent(parent, current);
+    input.defaultReadObject();
+    oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
   }
 
   public StorageAgent getCurrentStorageAgent()
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 2f46049..177e3fa 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -44,7 +44,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.common.util.CascadeStorageAgent;
+import org.apache.apex.engine.util.CascadeStorageAgent;
 import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.lang.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index cb2d760..53f18f9 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -468,16 +468,6 @@
     long[] windowsIds = sa.getWindowIds(1);
     Arrays.sort(windowsIds);
     Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
-
-    for (long windowId : windowIds) {
-      sa.delete(1, windowId);
-    }
-    try {
-      sa.getWindowIds(1);
-      Assert.fail("There should not be any most recently saved windowId!");
-    } catch (IOException io) {
-      Assert.assertTrue("No State Saved", true);
-    }
   }
 
   @Test
@@ -495,16 +485,6 @@
     long[] windowsIds = sa.getWindowIds(1);
     Arrays.sort(windowsIds);
     Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
-
-    for (long windowId : windowIds) {
-      sa.delete(1, windowId);
-    }
-    try {
-      sa.getWindowIds(1);
-      Assert.fail("There should not be any most recently saved windowId!");
-    } catch (IOException io) {
-      Assert.assertTrue("No State Saved", true);
-    }
   }
 
   @Test
diff --git a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java b/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
similarity index 97%
rename from common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
rename to engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
index 40f24f0..43b5636 100644
--- a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
+++ b/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package com.datatorrent.common.util;
+package org.apache.apex.engine.util;
 
 import java.io.File;
 import java.io.IOException;
@@ -27,13 +27,13 @@
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
-import org.apache.apex.common.util.CascadeStorageAgent;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.DAG;
+import com.datatorrent.common.util.FSStorageAgent;
 
 public class CascadeStorageAgentTest
 {