APEXCORE-654 fix update recovery window when delay part of group
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 51e85f7..18d6787 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -2082,6 +2082,8 @@
if (checkpointGroup.size() > 1) {
for (OperatorMeta om : checkpointGroup) {
Collection<PTOperator> operators = plan.getAllOperators(om);
+ Collection<PTOperator> unifiers = getUnifiersInCheckpointGroup(operators);
+ operators.addAll(unifiers);
for (PTOperator groupOper : operators) {
synchronized (groupOper.checkpoints) {
commonCheckpoints.retainAll(groupOper.checkpoints);
@@ -2175,6 +2177,22 @@
}
+ private static Collection<PTOperator> getUnifiersInCheckpointGroup(Collection<PTOperator> operators)
+ {
+ Set<PTOperator> unifiers = Sets.newHashSet();
+ for (PTOperator op : operators) {
+ for (PTOperator.PTOutput out : op.getOutputs()) {
+ for (PTOperator.PTInput in : out.sinks) {
+ PTOperator target = in.target;
+ if (target.isUnifier()) {
+ unifiers.add(target);
+ }
+ }
+ }
+ }
+ return unifiers;
+ }
+
public long windowIdToMillis(long windowId)
{
int widthMillis = plan.getLogicalPlan().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
@@ -2186,7 +2204,7 @@
return this.vars.windowStartMillis;
}
- private Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
+ protected Map<OperatorMeta, Set<OperatorMeta>> getCheckpointGroups()
{
if (this.checkpointGroups == null) {
this.checkpointGroups = new HashMap<>();
diff --git a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
index 0c997ec..b2b4c0c 100644
--- a/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/CheckpointTest.java
@@ -39,6 +39,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultOutputPort;
@@ -46,8 +47,10 @@
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.MockContainer.MockOperatorStats;
import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
@@ -55,6 +58,7 @@
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.OperatorContext;
+import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.physical.PTContainer;
@@ -282,6 +286,57 @@
}
@Test
+ public void testUpdateRecoveryCheckpointWithCycle() throws Exception
+ {
+ Clock clock = new SystemClock();
+
+ dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
+
+ // Simulate a DAG with a loop which has a unifier operator
+ TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
+ GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
+ GenericTestOperator o3 = dag.addOperator("o3", GenericTestOperator.class);
+ GenericTestOperator o4 = dag.addOperator("o4", GenericTestOperator.class);
+ DefaultDelayOperator d = dag.addOperator("d", DefaultDelayOperator.class);
+
+ dag.addStream("o1.output1", o1.outport, o2.inport1);
+ dag.addStream("o2.output1", o2.outport1, o3.inport1);
+ dag.addStream("o3.output1", o3.outport1, o4.inport1);
+ dag.addStream("o4.output1", o4.outport1, d.input);
+ dag.addStream("d.output", d.output, o2.inport2);
+ dag.setOperatorAttribute(o3, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<Operator>(2));
+
+ dag.validate();
+ StreamingContainerManager dnm = new StreamingContainerManager(dag);
+ PhysicalPlan plan = dnm.getPhysicalPlan();
+
+ for (PTOperator oper : plan.getAllOperators().values()) {
+ Assert.assertEquals("Initial activation windowId" + oper, Checkpoint.INITIAL_CHECKPOINT, oper.getRecoveryCheckpoint());
+ Assert.assertEquals("Checkpoints empty" + oper, Collections.emptyList(), oper.checkpoints);
+ }
+
+ Checkpoint cp1 = new Checkpoint(1L, 0, 0);
+ Checkpoint cp2 = new Checkpoint(2L, 0, 0);
+
+ Map<OperatorMeta, Set<OperatorMeta>> checkpointGroups = dnm.getCheckpointGroups();
+
+ Map<Integer, PTOperator> allOperators = plan.getAllOperators();
+ for (PTOperator operator: allOperators.values()) {
+ operator.setState(PTOperator.State.ACTIVE);
+ operator.checkpoints.add(cp1);
+ dnm.updateRecoveryCheckpoints(operator,
+ new UpdateCheckpointsContext(clock, false, checkpointGroups), false);
+ }
+
+ List<PTOperator> physicalO1 = plan.getOperators(dag.getOperatorMeta("o1"));
+ physicalO1.get(0).checkpoints.add(cp2);
+ dnm.updateRecoveryCheckpoints(physicalO1.get(0),
+ new UpdateCheckpointsContext(clock, false, checkpointGroups), false);
+
+ Assert.assertEquals("Recovery checkpoint updated ", physicalO1.get(0).getRecoveryCheckpoint(), cp1);
+ }
+
+ @Test
public void testUpdateCheckpointsRecovery()
{
MockClock clock = new MockClock();