APEXCORE-654 fix update recovery window when delay part of group
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 51e85f7..18d6787 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2082,6 +2082,8 @@
     if (checkpointGroup.size() > 1) {
       for (OperatorMeta om : checkpointGroup) {
         Collection<PTOperator> operators = plan.getAllOperators(om);
+        Collection<PTOperator> unifiers = getUnifiersInCheckpointGroup(operators);
+        operators.addAll(unifiers);
         for (PTOperator groupOper : operators) {
           synchronized (groupOper.checkpoints) {
             commonCheckpoints.retainAll(groupOper.checkpoints);
@@ -2175,6 +2177,22 @@
 
   }
 
+  private static Collection<PTOperator> getUnifiersInCheckpointGroup(Collection<PTOperator> operators)
+  {
+    Set<PTOperator> unifiers = Sets.newHashSet();
+    for (PTOperator op : operators) {
+      for (PTOperator.PTOutput out : op.getOutputs()) {
+        for (PTOperator.PTInput in : out.sinks) {
+          PTOperator target = in.target;
+          if (target.isUnifier()) {
+            unifiers.add(target);
+          }
+        }
+      }
+    }
+    return unifiers;
+  }
+
   public long windowIdToMillis(long windowId)
   {
     int widthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
@@ -2186,7 +2204,7 @@
     return this.vars.windowStartMillis;
   }
 
-  private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
+  protected Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
   {
     if (this.checkpointGroups == null) {
       this.checkpointGroups = new HashMap<>();
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index 0c997ec..b2b4c0c 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -39,6 +39,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.DefaultOutputPort;
@@ -46,8 +47,10 @@
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
 import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
 import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.stram.MockContainer.MockOperatorStats;
 import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
@@ -55,6 +58,7 @@
 import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState;
 import com.datatorrent.stram.engine.GenericTestOperator;
 import com.datatorrent.stram.engine.OperatorContext;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
 import com.datatorrent.stram.plan.logical.LogicalPlan;
 import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
 import com.datatorrent.stram.plan.physical.PTContainer;
@@ -282,6 +286,57 @@
   }
 
   @Test
+  public void testUpdateRecoveryCheckpointWithCycle() throws Exception
+  {
+    Clock clock = new SystemClock();
+
+    dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+
+    // Simulate a DAG with a loop which has a unifier operator
+    TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
+    GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+    GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+    GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
+    DefaultDelayOperator d = dag.addOperator("d", DefaultDelayOperator.class);
+
+    dag.addStream("o1.output1", o1.outport, o2.inport1);
+    dag.addStream("o2.output1", o2.outport1, o3.inport1);
+    dag.addStream("o3.output1", o3.outport1, o4.inport1);
+    dag.addStream("o4.output1", o4.outport1, d.input);
+    dag.addStream("d.output", d.output, o2.inport2);
+    dag.setOperatorAttribute(o3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+
+    dag.validate();
+    StreamingContainerManager dnm = new StreamingContainerManager(dag);
+    PhysicalPlan plan = dnm.getPhysicalPlan();
+
+    for (PTOperator oper : plan.getAllOperators().values()) {
+      Assert.assertEquals("Initial activation windowId" + oper, Checkpoint.INITIAL_CHECKPOINT, oper.getRecoveryCheckpoint());
+      Assert.assertEquals("Checkpoints empty" + oper, Collections.emptyList(), oper.checkpoints);
+    }
+
+    Checkpoint cp1 = new Checkpoint(1L, 0, 0);
+    Checkpoint cp2 = new Checkpoint(2L, 0, 0);
+
+    Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups = dnm.getCheckpointGroups();
+
+    Map<Integer, PTOperator> allOperators = plan.getAllOperators();
+    for (PTOperator operator: allOperators.values()) {
+      operator.setState(PTOperator.State.ACTIVE);
+      operator.checkpoints.add(cp1);
+      dnm.updateRecoveryCheckpoints(operator,
+          new UpdateCheckpointsContext(clock, false, checkpointGroups), false);
+    }
+
+    List<PTOperator> physicalO1 = plan.getOperators(dag.getOperatorMeta("o1"));
+    physicalO1.get(0).checkpoints.add(cp2);
+    dnm.updateRecoveryCheckpoints(physicalO1.get(0),
+        new UpdateCheckpointsContext(clock, false, checkpointGroups), false);
+
+    Assert.assertEquals("Recovery checkpoint updated ", physicalO1.get(0).getRecoveryCheckpoint(), cp1);
+  }
+
+  @Test
   public void testUpdateCheckpointsRecovery()
   {
     MockClock clock = new MockClock();