APEXCORE-703 Window processing timeout for finished/undeployed container.
During an operator shutdown mark it as INACTIVE to exclude it from the blocked operators check.
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index d029b16..8d99dc1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1378,6 +1378,7 @@
}
deactivatedOpers.add(oper);
}
+ oper.setState(State.INACTIVE);
sca.undeployOpers.add(oper.getId());
slowestUpstreamOp.remove(oper);
// record operator stop event
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
index 471dca2..84f6a5a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
@@ -497,7 +497,7 @@
@Override
public String toString()
{
- return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).toString();
+ return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).append("state", state).toString();
}
}
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ecc010c..ab2a3ae 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1440,7 +1440,7 @@
void removePTOperator(PTOperator oper)
{
- LOG.debug("Removing operator " + oper);
+ LOG.debug("Removing operator {}", oper);
// per partition merge operators
if (!oper.upstreamMerge.isEmpty()) {
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 53f18f9..3f2c20b 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -369,6 +369,103 @@
Assert.assertEquals("sourcePortName " + node3DI, mergeNodeDI.outputs.get(0).portName, node3In.sourcePortName);
}
+ private static void shutdownOperator(StreamingContainerManager scm, PTOperator p1, PTOperator p2)
+ {
+ assignContainer(scm, "c1");
+ assignContainer(scm, "c2");
+
+ ContainerHeartbeat c1hb = new ContainerHeartbeat();
+ c1hb.setContainerStats(new ContainerStats(p1.getContainer().getExternalId()));
+ scm.processHeartbeat(c1hb);
+
+ ContainerHeartbeat c2hb = new ContainerHeartbeat();
+ c2hb.setContainerStats(new ContainerStats(p2.getContainer().getExternalId()));
+ scm.processHeartbeat(c2hb);
+
+ OperatorHeartbeat o1hb = new OperatorHeartbeat();
+ c1hb.getContainerStats().addNodeStats(o1hb);
+ o1hb.setNodeId(p1.getId());
+ o1hb.setState(DeployState.ACTIVE);
+ OperatorStats o1stats = new OperatorStats();
+ o1hb.getOperatorStatsContainer().add(o1stats);
+ o1stats.checkpoint = new Checkpoint(2, 0, 0);
+ o1stats.windowId = 3;
+ scm.processHeartbeat(c1hb);
+ Assert.assertEquals(PTOperator.State.ACTIVE, p1.getState());
+
+ OperatorHeartbeat o2hb = new OperatorHeartbeat();
+ c2hb.getContainerStats().addNodeStats(o2hb);
+ o2hb.setNodeId(p2.getId());
+ o2hb.setState(DeployState.ACTIVE);
+ OperatorStats o2stats = new OperatorStats();
+ o2stats.checkpoint = new Checkpoint(2, 0, 0);
+ o2stats.windowId = 3;
+ scm.processHeartbeat(c2hb);
+ Assert.assertEquals(PTOperator.State.ACTIVE, p1.getState());
+ Assert.assertEquals(PTOperator.State.ACTIVE, p2.getState());
+
+ o1hb.setState(DeployState.SHUTDOWN);
+ o1stats.checkpoint = new Checkpoint(4, 0,0);
+ o1stats.windowId = 5;
+ scm.processHeartbeat(c1hb);
+ Assert.assertEquals(PTOperator.State.INACTIVE, p1.getState());
+ }
+
+ @Test
+ public void testShutdownOperatorTimeout() throws Exception
+ {
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+
+ dag.addStream("s1", o1.outport1, o2.inport1);
+
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+ dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 50);
+ dag.setAttribute(OperatorContext.TIMEOUT_WINDOW_COUNT, 1);
+
+ StreamingContainerManager scm = new StreamingContainerManager(dag);
+
+ PhysicalPlan plan = scm.getPhysicalPlan();
+
+ PTOperator p1 = plan.getOperators(dag.getMeta(o1)).get(0);
+ PTOperator p2 = plan.getOperators(dag.getMeta(o2)).get(0);
+
+ shutdownOperator(scm, p1, p2);
+
+ scm.monitorHeartbeat(false);
+ Assert.assertTrue(scm.containerStopRequests.isEmpty());
+ Thread.sleep(100);
+ scm.monitorHeartbeat(false);
+ Assert.assertFalse(scm.containerStopRequests.containsKey(p1.getContainer().getExternalId()));
+ Assert.assertTrue(scm.containerStopRequests.containsKey(p2.getContainer().getExternalId()));
+ }
+
+ @Test
+ public void testShutdownOperatorRecovery() throws Exception
+ {
+ GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+ GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+
+ dag.addStream("s1", o1.outport1, o2.inport1);
+
+ dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+
+ StreamingContainerManager scm = new StreamingContainerManager(dag);
+ scm.containerStartRequests.poll();
+ scm.containerStartRequests.poll();
+
+ PhysicalPlan plan = scm.getPhysicalPlan();
+
+ PTOperator p1 = plan.getOperators(dag.getMeta(o1)).get(0);
+ PTOperator p2 = plan.getOperators(dag.getMeta(o2)).get(0);
+
+ shutdownOperator(scm, p1, p2);
+
+ scm.scheduleContainerRestart(p1.getContainer().getExternalId());
+ ContainerStartRequest dr = scm.containerStartRequests.poll();
+ Assert.assertTrue(dr.container.getOperators().contains(p1));
+ }
+
@Test
public void testRecoveryOrder() throws Exception
{
@@ -717,7 +814,9 @@
ce.getKey().bufferServerAddress = null;
}
- PTOperator o1p1 = physicalPlan.getOperators(dag.getMeta(o1)).get(0);
+ List<PTOperator> o1p = physicalPlan.getOperators(dag.getMeta(o1));
+ Assert.assertEquals("o1 partitions", 1, o1p.size());
+ PTOperator o1p1 = o1p.get(0);
MockContainer mc1 = mockContainers.get(o1p1.getContainer());
MockOperatorStats o1p1mos = mc1.stats(o1p1.getId());
o1p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
@@ -729,7 +828,7 @@
o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
mc2.sendHeartbeat();
- Assert.assertEquals("2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size());
+ Assert.assertEquals("o2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size());
PTOperator o2p2 = physicalPlan.getOperators(dag.getMeta(o2)).get(1);
MockContainer mc3 = mockContainers.get(o2p2.getContainer());
@@ -737,6 +836,7 @@
o2p2mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
mc3.sendHeartbeat();
+ Assert.assertEquals("o3 partitions", 1, physicalPlan.getOperators(dag.getMeta(o3)).size());
PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0);
MockContainer mc4 = mockContainers.get(o3p1.getContainer());
MockOperatorStats o3p1mos = mc4.stats(o3p1.getId());
@@ -749,6 +849,10 @@
o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
mc1.sendHeartbeat();
+ o1p = physicalPlan.getOperators(dag.getMeta(o1));
+ Assert.assertEquals("o1 partitions", 1, o1p.size());
+ Assert.assertEquals("o1p1 present", o1p1, o1p.get(0));
+ Assert.assertEquals("input operator state", PTOperator.State.INACTIVE, o1p1.getState());
scm.monitorHeartbeat(false);
Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId());
scm.monitorHeartbeat(false); // committedWindowId updated in next cycle