APEXCORE-703 Window processing timeout for finished/undeployed container.
During an operator shutdown mark it as INACTIVE to exclude it from the blocked operators check.
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index d029b16..8d99dc1 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -1378,6 +1378,7 @@
                 }
                 deactivatedOpers.add(oper);
               }
+              oper.setState(State.INACTIVE);
               sca.undeployOpers.add(oper.getId());
               slowestUpstreamOp.remove(oper);
               // record operator stop event
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
index 471dca2..84f6a5a 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PTOperator.java
@@ -497,7 +497,7 @@
   @Override
   public String toString()
   {
-    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).toString();
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).append("state", state).toString();
   }
 
 }
diff --git a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
index ecc010c..ab2a3ae 100644
--- a/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
+++ b/engine/src/main/java/com/datatorrent/stram/plan/physical/PhysicalPlan.java
@@ -1440,7 +1440,7 @@
 
   void removePTOperator(PTOperator oper)
   {
-    LOG.debug("Removing operator " + oper);
+    LOG.debug("Removing operator {}", oper);
 
     // per partition merge operators
     if (!oper.upstreamMerge.isEmpty()) {
diff --git a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
index 53f18f9..3f2c20b 100644
--- a/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/StreamingContainerManagerTest.java
@@ -369,6 +369,103 @@
     Assert.assertEquals("sourcePortName " + node3DI, mergeNodeDI.outputs.get(0).portName, node3In.sourcePortName);
   }
 
+  private static void shutdownOperator(StreamingContainerManager scm, PTOperator p1, PTOperator p2)
+  {
+    assignContainer(scm, "c1");
+    assignContainer(scm, "c2");
+
+    ContainerHeartbeat c1hb = new ContainerHeartbeat();
+    c1hb.setContainerStats(new ContainerStats(p1.getContainer().getExternalId()));
+    scm.processHeartbeat(c1hb);
+
+    ContainerHeartbeat c2hb = new ContainerHeartbeat();
+    c2hb.setContainerStats(new ContainerStats(p2.getContainer().getExternalId()));
+    scm.processHeartbeat(c2hb);
+
+    OperatorHeartbeat o1hb = new OperatorHeartbeat();
+    c1hb.getContainerStats().addNodeStats(o1hb);
+    o1hb.setNodeId(p1.getId());
+    o1hb.setState(DeployState.ACTIVE);
+    OperatorStats o1stats = new OperatorStats();
+    o1hb.getOperatorStatsContainer().add(o1stats);
+    o1stats.checkpoint = new Checkpoint(2, 0, 0);
+    o1stats.windowId = 3;
+    scm.processHeartbeat(c1hb);
+    Assert.assertEquals(PTOperator.State.ACTIVE, p1.getState());
+
+    OperatorHeartbeat o2hb = new OperatorHeartbeat();
+    c2hb.getContainerStats().addNodeStats(o2hb);
+    o2hb.setNodeId(p2.getId());
+    o2hb.setState(DeployState.ACTIVE);
+    OperatorStats o2stats = new OperatorStats();
+    o2stats.checkpoint = new Checkpoint(2, 0, 0);
+    o2stats.windowId = 3;
+    scm.processHeartbeat(c2hb);
+    Assert.assertEquals(PTOperator.State.ACTIVE, p1.getState());
+    Assert.assertEquals(PTOperator.State.ACTIVE, p2.getState());
+
+    o1hb.setState(DeployState.SHUTDOWN);
+    o1stats.checkpoint = new Checkpoint(4, 0,0);
+    o1stats.windowId = 5;
+    scm.processHeartbeat(c1hb);
+    Assert.assertEquals(PTOperator.State.INACTIVE, p1.getState());
+  }
+
+  @Test
+  public void testShutdownOperatorTimeout() throws Exception
+  {
+    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+
+    dag.addStream("s1", o1.outport1, o2.inport1);
+
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+    dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, 50);
+    dag.setAttribute(OperatorContext.TIMEOUT_WINDOW_COUNT, 1);
+
+    StreamingContainerManager scm = new StreamingContainerManager(dag);
+
+    PhysicalPlan plan = scm.getPhysicalPlan();
+
+    PTOperator p1 = plan.getOperators(dag.getMeta(o1)).get(0);
+    PTOperator p2 = plan.getOperators(dag.getMeta(o2)).get(0);
+
+    shutdownOperator(scm, p1, p2);
+
+    scm.monitorHeartbeat(false);
+    Assert.assertTrue(scm.containerStopRequests.isEmpty());
+    Thread.sleep(100);
+    scm.monitorHeartbeat(false);
+    Assert.assertFalse(scm.containerStopRequests.containsKey(p1.getContainer().getExternalId()));
+    Assert.assertTrue(scm.containerStopRequests.containsKey(p2.getContainer().getExternalId()));
+  }
+
+  @Test
+  public void testShutdownOperatorRecovery() throws Exception
+  {
+    GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+
+    dag.addStream("s1", o1.outport1, o2.inport1);
+
+    dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+
+    StreamingContainerManager scm = new StreamingContainerManager(dag);
+    scm.containerStartRequests.poll();
+    scm.containerStartRequests.poll();
+
+    PhysicalPlan plan = scm.getPhysicalPlan();
+
+    PTOperator p1 = plan.getOperators(dag.getMeta(o1)).get(0);
+    PTOperator p2 = plan.getOperators(dag.getMeta(o2)).get(0);
+
+    shutdownOperator(scm, p1, p2);
+
+    scm.scheduleContainerRestart(p1.getContainer().getExternalId());
+    ContainerStartRequest dr = scm.containerStartRequests.poll();
+    Assert.assertTrue(dr.container.getOperators().contains(p1));
+  }
+
   @Test
   public void testRecoveryOrder() throws Exception
   {
@@ -717,7 +814,9 @@
       ce.getKey().bufferServerAddress = null;
     }
 
-    PTOperator o1p1 = physicalPlan.getOperators(dag.getMeta(o1)).get(0);
+    List<PTOperator> o1p = physicalPlan.getOperators(dag.getMeta(o1));
+    Assert.assertEquals("o1 partitions", 1,  o1p.size());
+    PTOperator o1p1 = o1p.get(0);
     MockContainer mc1 = mockContainers.get(o1p1.getContainer());
     MockOperatorStats o1p1mos = mc1.stats(o1p1.getId());
     o1p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
@@ -729,7 +828,7 @@
     o2p1mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
     mc2.sendHeartbeat();
 
-    Assert.assertEquals("2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size());
+    Assert.assertEquals("o2 partitions", 2, physicalPlan.getOperators(dag.getMeta(o2)).size());
 
     PTOperator o2p2 = physicalPlan.getOperators(dag.getMeta(o2)).get(1);
     MockContainer mc3 = mockContainers.get(o2p2.getContainer());
@@ -737,6 +836,7 @@
     o2p2mos.currentWindowId(1).checkpointWindowId(1).deployState(DeployState.ACTIVE);
     mc3.sendHeartbeat();
 
+    Assert.assertEquals("o3 partitions", 1, physicalPlan.getOperators(dag.getMeta(o3)).size());
     PTOperator o3p1 = physicalPlan.getOperators(dag.getMeta(o3)).get(0);
     MockContainer mc4 = mockContainers.get(o3p1.getContainer());
     MockOperatorStats o3p1mos = mc4.stats(o3p1.getId());
@@ -749,6 +849,10 @@
 
     o1p1mos.currentWindowId(2).deployState(DeployState.SHUTDOWN);
     mc1.sendHeartbeat();
+    o1p = physicalPlan.getOperators(dag.getMeta(o1));
+    Assert.assertEquals("o1 partitions", 1,  o1p.size());
+    Assert.assertEquals("o1p1 present", o1p1, o1p.get(0));
+    Assert.assertEquals("input operator state", PTOperator.State.INACTIVE, o1p1.getState());
     scm.monitorHeartbeat(false);
     Assert.assertEquals("committedWindowId", -1, scm.getCommittedWindowId());
     scm.monitorHeartbeat(false); // committedWindowId updated in next cycle