blob: 285aba35ca24bb05664528554b3eb6311bb27cf5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.plan.logical;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.validation.ValidationException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.collect.Sets;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.StreamingContainerManager.UpdateCheckpointsContext;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
import com.datatorrent.stram.support.StramTestSupport.TestMeta;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
/**
* Unit tests for topologies with delay operator
*/
public class DelayOperatorTest
{
@Rule
public TestMeta testMeta = new TestMeta();
private static Lock sequential = new ReentrantLock();
@Before
public void setup()
{
sequential.lock();
}
@After
public void teardown()
{
sequential.unlock();
}
@Test
public void testInvalidDelayDetection()
{
LogicalPlan dag = new LogicalPlan();
GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
dag.addStream("BtoC", opB.outport1, opC.inport1);
dag.addStream("CtoD", opC.outport1, opD.inport1);
dag.addStream("CtoDelay", opC.outport2, opDelay.input);
dag.addStream("DelayToD", opDelay.output, opD.inport2);
List<List<String>> invalidDelays = new ArrayList<>();
dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
assertEquals("operator invalid delay", 1, invalidDelays.size());
try {
dag.validate();
fail("validation should fail");
} catch (ValidationException e) {
// expected
}
dag = new LogicalPlan();
opB = dag.addOperator("B", GenericTestOperator.class);
opC = dag.addOperator("C", GenericTestOperator.class);
opD = dag.addOperator("D", GenericTestOperator.class);
opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
dag.setOperatorAttribute(opDelay, Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
dag.addStream("BtoC", opB.outport1, opC.inport1);
dag.addStream("CtoD", opC.outport1, opD.inport1);
dag.addStream("CtoDelay", opC.outport2, opDelay.input);
dag.addStream("DelayToC", opDelay.output, opC.inport2);
invalidDelays = new ArrayList<>();
dag.findInvalidDelays(dag.getMeta(opB), invalidDelays, new Stack<OperatorMeta>());
assertEquals("operator invalid delay", 1, invalidDelays.size());
try {
dag.validate();
fail("validation should fail");
} catch (ValidationException e) {
// expected
}
dag = new LogicalPlan();
opB = dag.addOperator("B", GenericTestOperator.class);
opC = dag.addOperator("C", GenericTestOperator.class);
opD = dag.addOperator("D", GenericTestOperator.class);
opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
dag.addStream("BtoC", opB.outport1, opC.inport1);
dag.addStream("CtoD", opC.outport1, opD.inport1);
dag.addStream("CtoDelay", opC.outport2, opDelay.input).setLocality(DAG.Locality.THREAD_LOCAL);
dag.addStream("DelayToC", opDelay.output, opC.inport2).setLocality(DAG.Locality.THREAD_LOCAL);
try {
dag.validate();
fail("validation should fail");
} catch (ValidationException e) {
// expected
}
}
@Test
public void testValidDelay()
{
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
dag.addStream("AtoB", opA.outport, opB.inport1);
dag.addStream("BtoC", opB.outport1, opC.inport1);
dag.addStream("CtoD", opC.outport1, opD.inport1);
dag.addStream("CtoDelay", opC.outport2, opDelay.input);
dag.addStream("DelayToB", opDelay.output, opB.inport2);
dag.validate();
}
public static final Long[] FIBONACCI_NUMBERS = new Long[]{
1L, 1L, 2L, 3L, 5L, 8L, 13L, 21L, 34L, 55L, 89L, 144L, 233L, 377L, 610L, 987L, 1597L, 2584L, 4181L, 6765L,
10946L, 17711L, 28657L, 46368L, 75025L, 121393L, 196418L, 317811L, 514229L, 832040L, 1346269L, 2178309L,
3524578L, 5702887L, 9227465L, 14930352L, 24157817L, 39088169L, 63245986L, 102334155L
};
public static class FibonacciOperator extends BaseOperator
{
public static List<Long> results = new ArrayList<>();
public long currentNumber = 1;
private transient long tempNum;
public transient DefaultInputPort<Object> dummyInputPort = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
}
};
public transient DefaultInputPort<Long> input = new DefaultInputPort<Long>()
{
@Override
public void process(Long tuple)
{
tempNum = tuple;
}
};
public transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
@Override
public void endWindow()
{
output.emit(currentNumber);
results.add(currentNumber);
currentNumber += tempNum;
if (currentNumber <= 0) {
// overflow
currentNumber = 1;
}
}
}
public static class FailableFibonacciOperator extends FibonacciOperator implements Operator.CheckpointListener
{
private boolean committed = false;
private int simulateFailureWindows = 0;
private boolean simulateFailureAfterCommit = false;
private int windowCount = 0;
public static volatile boolean failureSimulated = false;
@Override
public void beginWindow(long windowId)
{
if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) &&
!failureSimulated) {
LOG.debug("FailableFibonacciOperator beginWindow {} {} {}", windowId, windowCount, simulateFailureWindows);
if (windowCount++ == simulateFailureWindows) {
failureSimulated = true;
LOG.debug("FailableFibonacciOperator is simulating failure");
throw new RuntimeException("simulating failure");
}
}
}
@Override
public void checkpointed(long windowId)
{
LOG.debug("FailableFibonacciOperator is checkpointed {}", windowId);
}
@Override
public void committed(long windowId)
{
LOG.debug("FailableFibonacciOperator is committed {}", windowId);
committed = true;
}
public void setSimulateFailureWindows(int windows, boolean afterCommit)
{
this.simulateFailureAfterCommit = afterCommit;
this.simulateFailureWindows = windows;
}
}
public static class FailableDelayOperator extends DefaultDelayOperator implements Operator.CheckpointListener
{
private boolean committed = false;
private int simulateFailureWindows = 0;
private boolean simulateFailureAfterCommit = false;
private int windowCount = 0;
private static volatile boolean failureSimulated = false;
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
if (simulateFailureWindows > 0 && ((simulateFailureAfterCommit && committed) || !simulateFailureAfterCommit) &&
!failureSimulated) {
LOG.debug("FailableDelayOperator beginWindow {} {} {}", windowId, windowCount, simulateFailureWindows);
if (windowCount++ == simulateFailureWindows) {
failureSimulated = true;
LOG.debug("FailableDelayOperator is simulating failure {}", windowId);
throw new RuntimeException("simulating failure");
}
}
}
@Override
public void checkpointed(long windowId)
{
LOG.debug("FailableDelayOperator is checkpointed {}", windowId);
}
@Override
public void committed(long windowId)
{
LOG.debug("FailableDelayOperator is committed {}", windowId);
committed = true;
}
public void setSimulateFailureWindows(int windows, boolean afterCommit)
{
this.simulateFailureAfterCommit = afterCommit;
this.simulateFailureWindows = windows;
}
}
@Test
public void testFibonacci() throws Exception
{
LogicalPlan dag = new LogicalPlan();
TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
dag.addStream("operator_to_delay", fib.output, opDelay.input);
dag.addStream("delay_to_operator", opDelay.output, fib.input);
FibonacciOperator.results.clear();
final StramLocalCluster localCluster = new StramLocalCluster(dag);
localCluster.setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return FibonacciOperator.results.size() >= 10;
}
});
localCluster.run(10000);
Assert.assertArrayEquals(Arrays.copyOfRange(FIBONACCI_NUMBERS, 0, 10),
FibonacciOperator.results.subList(0, 10).toArray());
}
@Test
public void testFibonacciRecovery1() throws Exception
{
LogicalPlan dag = StramTestSupport.createDAG(testMeta);
TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
FailableFibonacciOperator fib = dag.addOperator("FIB", FailableFibonacciOperator.class);
DefaultDelayOperator opDelay = dag.addOperator("opDelay", DefaultDelayOperator.class);
fib.setSimulateFailureWindows(3, true);
dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
dag.addStream("operator_to_delay", fib.output, opDelay.input);
dag.addStream("delay_to_operator", opDelay.output, fib.input);
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
FailableFibonacciOperator.results.clear();
FailableFibonacciOperator.failureSimulated = false;
final StramLocalCluster localCluster = new StramLocalCluster(dag);
localCluster.setPerContainerBufferServer(true);
localCluster.setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return FailableFibonacciOperator.results.size() >= 30;
}
});
localCluster.run(60000);
Assert.assertTrue("failure should be invoked", FailableFibonacciOperator.failureSimulated);
Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20),
Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
}
@Test
public void testFibonacciRecovery2() throws Exception
{
LogicalPlan dag = StramTestSupport.createDAG(testMeta);
TestGeneratorInputOperator dummyInput = dag.addOperator("DUMMY", TestGeneratorInputOperator.class);
FibonacciOperator fib = dag.addOperator("FIB", FibonacciOperator.class);
FailableDelayOperator opDelay = dag.addOperator("opDelay", FailableDelayOperator.class);
opDelay.setSimulateFailureWindows(5, true);
dag.addStream("dummy_to_operator", dummyInput.outport, fib.dummyInputPort);
dag.addStream("operator_to_delay", fib.output, opDelay.input);
dag.addStream("delay_to_operator", opDelay.output, fib.input);
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
FibonacciOperator.results.clear();
FailableDelayOperator.failureSimulated = false;
final StramLocalCluster localCluster = new StramLocalCluster(dag);
localCluster.setPerContainerBufferServer(true);
localCluster.setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return FibonacciOperator.results.size() >= 30;
}
});
localCluster.run(60000);
Assert.assertTrue("failure should be invoked", FailableDelayOperator.failureSimulated);
Assert.assertArrayEquals(Arrays.copyOfRange(new TreeSet<>(Arrays.asList(FIBONACCI_NUMBERS)).toArray(), 0, 20),
Arrays.copyOfRange(new TreeSet<>(FibonacciOperator.results).toArray(), 0, 20));
}
@Test
public void testCheckpointUpdate()
{
LogicalPlan dag = StramTestSupport.createDAG(testMeta);
TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
dag.addStream("AtoB", opA.outport, opB.inport1);
dag.addStream("BtoC", opB.outport1, opC.inport1);
dag.addStream("CtoD", opC.outport1, opD.inport1);
dag.addStream("CtoDelay", opC.outport2, opDelay.input);
dag.addStream("DelayToB", opDelay.output, opB.inport2);
dag.validate();
dag.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
StreamingContainerManager scm = new StreamingContainerManager(dag);
PhysicalPlan plan = scm.getPhysicalPlan();
// set all operators as active to enable recovery window id update
for (PTOperator oper : plan.getAllOperators().values()) {
oper.setState(PTOperator.State.ACTIVE);
}
Clock clock = new SystemClock();
PTOperator opA1 = plan.getOperators(dag.getMeta(opA)).get(0);
PTOperator opB1 = plan.getOperators(dag.getMeta(opB)).get(0);
PTOperator opC1 = plan.getOperators(dag.getMeta(opC)).get(0);
PTOperator opDelay1 = plan.getOperators(dag.getMeta(opDelay)).get(0);
PTOperator opD1 = plan.getOperators(dag.getMeta(opD)).get(0);
Checkpoint cp3 = new Checkpoint(3L, 0, 0);
Checkpoint cp5 = new Checkpoint(5L, 0, 0);
Checkpoint cp4 = new Checkpoint(4L, 0, 0);
opB1.checkpoints.add(cp3);
opC1.checkpoints.add(cp3);
opC1.checkpoints.add(cp4);
opDelay1.checkpoints.add(cp3);
opDelay1.checkpoints.add(cp5);
opD1.checkpoints.add(cp5);
// construct grouping that would be supplied through LogicalPlan
Set<OperatorMeta> stronglyConnected = Sets.newHashSet(dag.getMeta(opB), dag.getMeta(opC), dag.getMeta(opDelay));
Map<OperatorMeta, Set<OperatorMeta>> groups = new HashMap<>();
for (OperatorMeta om : stronglyConnected) {
groups.put(om, stronglyConnected);
}
UpdateCheckpointsContext ctx = new UpdateCheckpointsContext(clock, false, groups);
scm.updateRecoveryCheckpoints(opB1, ctx, false);
Assert.assertEquals("checkpoint " + opA1, Checkpoint.INITIAL_CHECKPOINT, opA1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + opB1, cp3, opC1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + opC1, cp3, opC1.getRecoveryCheckpoint());
Assert.assertEquals("checkpoint " + opD1, cp5, opD1.getRecoveryCheckpoint());
}
@Test
public void testValidationWithMultipleStreamLoops()
{
LogicalPlan dag = StramTestSupport.createDAG(testMeta);
TestGeneratorInputOperator source = dag.addOperator("A", TestGeneratorInputOperator.class);
GenericTestOperator op1 = dag.addOperator("Op1", GenericTestOperator.class);
GenericTestOperator op2 = dag.addOperator("Op2", GenericTestOperator.class);
DefaultDelayOperator<Object> delay = dag.addOperator("Delay", DefaultDelayOperator.class);
dag.addStream("Source", source.outport, op1.inport1);
dag.addStream("Stream1", op1.outport1, op2.inport1);
dag.addStream("Stream2", op1.outport2, op2.inport2);
dag.addStream("Op to Delay", op2.outport1, delay.input);
dag.addStream("Delay to Op", delay.output, op1.inport2);
dag.validate();
}
private static final Logger LOG = LoggerFactory.getLogger(DelayOperatorTest.class);
}