Merge branch 'APEXCORE-649' of github.com:tushargosavi/apex-core
diff --git a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
index 0c389a4..c8275ea 100644
--- a/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/AsyncFSStorageAgent.java
@@ -147,7 +147,7 @@
}
@Override
- public void finalize(int operatorId, long windowId) throws IOException
+ public void flush(int operatorId, long windowId) throws IOException
{
// Checkpoint already present in HDFS during save, when syncCheckpoint is true.
if (isSyncCheckpoint()) {
diff --git a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
index b5a43fe..4b71761 100644
--- a/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
+++ b/common/src/main/java/com/datatorrent/common/util/FSStorageAgent.java
@@ -41,6 +41,7 @@
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
@@ -157,27 +158,23 @@
try {
FileStatus status = fileContext.getFileStatus(lPath);
if (!status.isDirectory()) {
- throw new IOException("Checkpoint location is not a directory ");
+ throw new RuntimeException("Checkpoint location is not a directory");
}
} catch (FileNotFoundException ex) {
- // During initialization this directory may not exists.
- // return an empty array.
- return new long[0];
+ // During initialization checkpoint directory may not exists.
+ fileContext.mkdir(lPath, FsPermission.getDirDefault(), true);
}
RemoteIterator<FileStatus> fileStatusRemoteIterator = fileContext.listStatus(lPath);
- if (!fileStatusRemoteIterator.hasNext()) {
- throw new IOException("Storage Agent has not saved anything yet!");
- }
List<Long> lwindows = new ArrayList<>();
- do {
+ while (fileStatusRemoteIterator.hasNext()) {
FileStatus fileStatus = fileStatusRemoteIterator.next();
String name = fileStatus.getPath().getName();
if (name.equals(TMP_FILE)) {
continue;
}
lwindows.add(STATELESS_CHECKPOINT_WINDOW_ID.equals(name) ? Stateless.WINDOW_ID : Long.parseLong(name, 16));
- } while (fileStatusRemoteIterator.hasNext());
+ }
long[] windowIds = new long[lwindows.size()];
for (int i = 0; i < windowIds.length; i++) {
windowIds[i] = lwindows.get(i);
diff --git a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
index 337ccdd..f797b92 100644
--- a/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
+++ b/common/src/main/java/org/apache/apex/common/util/AsyncStorageAgent.java
@@ -41,7 +41,7 @@
* @param windowId
* @throws IOException
*/
- void finalize(int operatorId, long windowId) throws IOException;
+ void flush(int operatorId, long windowId) throws IOException;
/**
* Check if StorageAgent is configured to take synchronous checkpoints.
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 6ec5267..d029b16 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -65,9 +65,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.apex.common.util.CascadeStorageAgent;
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
+import org.apache.apex.engine.util.CascadeStorageAgent;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index c84a249..88b002f 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -539,7 +539,7 @@
checkpointStats = null;
return;
} else {
- asyncStorageAgent.finalize(id, windowId);
+ asyncStorageAgent.flush(id, windowId);
}
}
}
@@ -688,7 +688,7 @@
@Override
public Stats.CheckpointStats call() throws Exception
{
- agent.finalize(id, windowId);
+ agent.flush(id, windowId);
stats.checkpointTime = System.currentTimeMillis() - stats.checkpointStartTime;
return stats;
}
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index f4e2100..ecc010c 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1227,7 +1227,7 @@
StorageAgent agent = oper.operatorMeta.getValue(OperatorContext.STORAGE_AGENT);
agent.save(oo, oper.id, windowId);
if (agent instanceof AsyncStorageAgent) {
- ((AsyncStorageAgent)agent).finalize(oper.id, windowId);
+ ((AsyncStorageAgent)agent).flush(oper.id, windowId);
}
} catch (IOException e) {
// inconsistent state, no recovery option, requires shutdown
diff --git a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java b/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
similarity index 89%
rename from common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
rename to engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
index d6fec8e..9903010 100644
--- a/common/src/main/java/org/apache/apex/common/util/CascadeStorageAgent.java
+++ b/engine/src/main/java/org/apache/apex/engine/util/CascadeStorageAgent.java
@@ -16,10 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.common.util;
+package org.apache.apex.engine.util;
import java.io.IOException;
-import java.io.ObjectStreamException;
+import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
@@ -27,6 +27,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.common.util.AsyncStorageAgent;
+import org.apache.hadoop.classification.InterfaceStability;
+
import com.google.common.collect.Maps;
import com.datatorrent.api.StorageAgent;
@@ -39,12 +42,14 @@
* restart to avoiding copying checkpoints from old application directory to improve application
* restart time.
*/
+@InterfaceStability.Evolving
public class CascadeStorageAgent implements StorageAgent, AsyncStorageAgent, Serializable
{
+ private static final long serialVersionUID = 985557590735264920L;
private static final Logger logger = LoggerFactory.getLogger(CascadeStorageAgent.class);
private final StorageAgent parent;
private final StorageAgent current;
- private final transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
+ private transient Map<Integer, long[]> oldOperatorToWindowIdsMap;
public CascadeStorageAgent(StorageAgent parent, StorageAgent current)
{
@@ -169,10 +174,10 @@
}
@Override
- public void finalize(int operatorId, long windowId) throws IOException
+ public void flush(int operatorId, long windowId) throws IOException
{
if (current instanceof AsyncStorageAgent) {
- ((AsyncStorageAgent)current).finalize(operatorId, windowId);
+ ((AsyncStorageAgent)current).flush(operatorId, windowId);
}
}
@@ -185,9 +190,10 @@
return true;
}
- public Object readResolve() throws ObjectStreamException
+ private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException
{
- return new CascadeStorageAgent(parent, current);
+ input.defaultReadObject();
+ oldOperatorToWindowIdsMap = Maps.newConcurrentMap();
}
public StorageAgent getCurrentStorageAgent()
diff --git a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
index 2f46049..177e3fa 100644
--- a/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StramRecoveryTest.java
@@ -44,7 +44,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.apex.common.util.CascadeStorageAgent;
+import org.apache.apex.engine.util.CascadeStorageAgent;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index cb2d760..53f18f9 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -468,16 +468,6 @@
long[] windowsIds = sa.getWindowIds(1);
Arrays.sort(windowsIds);
Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
-
- for (long windowId : windowIds) {
- sa.delete(1, windowId);
- }
- try {
- sa.getWindowIds(1);
- Assert.fail("There should not be any most recently saved windowId!");
- } catch (IOException io) {
- Assert.assertTrue("No State Saved", true);
- }
}
@Test
@@ -495,16 +485,6 @@
long[] windowsIds = sa.getWindowIds(1);
Arrays.sort(windowsIds);
Assert.assertArrayEquals("Saved windowIds", windowIds, windowsIds);
-
- for (long windowId : windowIds) {
- sa.delete(1, windowId);
- }
- try {
- sa.getWindowIds(1);
- Assert.fail("There should not be any most recently saved windowId!");
- } catch (IOException io) {
- Assert.assertTrue("No State Saved", true);
- }
}
@Test
diff --git a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java b/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
similarity index 97%
rename from common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
rename to engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
index 40f24f0..43b5636 100644
--- a/common/src/test/java/com/datatorrent/common/util/CascadeStorageAgentTest.java
+++ b/engine/src/test/java/org/apache/apex/engine/util/CascadeStorageAgentTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package com.datatorrent.common.util;
+package org.apache.apex.engine.util;
import java.io.File;
import java.io.IOException;
@@ -27,13 +27,13 @@
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
-import org.apache.apex.common.util.CascadeStorageAgent;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.DAG;
+import com.datatorrent.common.util.FSStorageAgent;
public class CascadeStorageAgentTest
{