blob: 6ca530b10d249dbde1d8de69dea57df97a055236 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram;
import java.util.concurrent.Callable;
import javax.validation.ConstraintViolationException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.api.ControlAwareDefaultInputPort;
import org.apache.apex.api.ControlAwareDefaultOutputPort;
import org.apache.apex.api.operator.ControlTuple;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
public class CustomControlTupleTest
{
public static final Logger LOG = LoggerFactory.getLogger(CustomControlTupleTest.class);
private static long controlIndex = 0;
private static int numControlTuples = 0;
private static boolean done = false;
private static boolean endApp = false;
private static long endingWindowId = 0;
private static boolean immediate = false;
@Before
public void starting()
{
controlIndex = 0;
numControlTuples = 0;
done = false;
endApp = false;
endingWindowId = 0;
}
public static class Generator extends BaseOperator implements InputOperator
{
private long currentWindowId;
public final transient ControlAwareDefaultOutputPort<Double> out = new ControlAwareDefaultOutputPort<>();
@Override
public void beginWindow(long windowId)
{
if (!done) {
currentWindowId = windowId;
out.emitControl(new TestControlTuple(controlIndex++, immediate));
}
}
@Override
public void emitTuples()
{
if (!done) {
out.emitControl(new TestControlTuple(controlIndex++, immediate));
}
}
@Override
public void endWindow()
{
if (!done) {
out.emitControl(new TestControlTuple(controlIndex++, immediate));
endingWindowId = currentWindowId;
done = true;
}
}
}
public static class DefaultProcessor extends BaseOperator
{
public final transient DefaultInputPort<Double> input = new DefaultInputPort<Double>()
{
@Override
public void process(Double tuple)
{
output.emit(tuple);
}
};
public final transient DefaultOutputPort<Double> output = new DefaultOutputPort<>();
}
public static class ControlAwareProcessor extends BaseOperator
{
public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>()
{
@Override
public void process(Double tuple)
{
output.emit(tuple);
}
@Override
public boolean processControl(ControlTuple tuple)
{
output.emitControl(tuple);
return true;
}
};
public final transient ControlAwareDefaultOutputPort<Double> output = new ControlAwareDefaultOutputPort<>();
}
public static class ControlAwareReceiver extends BaseOperator
{
private long currentWindowId;
@Override
public void beginWindow(long windowId)
{
currentWindowId = windowId;
}
public final transient ControlAwareDefaultInputPort<Double> input = new ControlAwareDefaultInputPort<Double>()
{
@Override
public boolean processControl(ControlTuple payload)
{
numControlTuples++;
return false;
}
@Override
public void process(Double tuple)
{
}
};
@Override
public void endWindow()
{
if (done && currentWindowId > endingWindowId) {
endApp = true;
}
}
}
@ApplicationAnnotation(name = "TestDefaultPropagation")
public static class Application1 implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
dag.addStream("genToProcessor", randomGenerator.out, processor.input);
dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
}
}
@ApplicationAnnotation(name = "TestExplicitPropagation")
public static class Application2 implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
ControlAwareProcessor processor = dag.addOperator("process", ControlAwareProcessor.class);
ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
dag.addStream("genToProcessor", randomGenerator.out, processor.input);
dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
}
}
@ApplicationAnnotation(name = "TestDuplicateControlTuples")
public static class Application3 implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
dag.addStream("genToProcessor", randomGenerator.out, processor.input);
dag.addStream("ProcessorToReceiver", processor.output, receiver.input);
dag.setOperatorAttribute(processor, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<>(2));
}
}
@ApplicationAnnotation(name = "TestThreadLocal")
public static class Application4 implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
dag.addStream("genToProcessor", randomGenerator.out, processor.input).setLocality(DAG.Locality.THREAD_LOCAL);
dag.addStream("ProcessorToReceiver", processor.output, receiver.input).setLocality(DAG.Locality.THREAD_LOCAL);
}
}
@ApplicationAnnotation(name = "TestContainerLocal")
public static class Application5 implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
Generator randomGenerator = dag.addOperator("randomGenerator", Generator.class);
DefaultProcessor processor = dag.addOperator("process", DefaultProcessor.class);
ControlAwareReceiver receiver = dag.addOperator("receiver", ControlAwareReceiver.class);
dag.addStream("genToProcessor", randomGenerator.out, processor.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
dag.addStream("ProcessorToReceiver", processor.output, receiver.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
}
}
public void testApp(StreamingApplication app) throws Exception
{
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return endApp;
}
});
lc.run(200000); // runs for 20 seconds and quits if terminating condition not reached
LOG.info("Control Tuples received {} expected {}", numControlTuples, controlIndex);
Assert.assertTrue("Incorrect Control Tuples", numControlTuples == controlIndex);
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
@Test
public void testDefaultPropagation() throws Exception
{
immediate = false;
testApp(new Application1());
}
@Test
public void testExplicitPropagation() throws Exception
{
immediate = false;
testApp(new Application2());
}
@Test
public void testDuplicateControlTuples() throws Exception
{
immediate = false;
testApp(new Application3());
}
@Test
public void testThreadLocal() throws Exception
{
immediate = false;
testApp(new Application4());
}
@Test
public void testContainerLocal() throws Exception
{
immediate = false;
testApp(new Application5());
}
@Test
public void testDefaultPropagationImmediate() throws Exception
{
immediate = true;
testApp(new Application1());
}
@Test
public void testExplicitPropagationImmediate() throws Exception
{
immediate = true;
testApp(new Application2());
}
@Test
public void testDuplicateControlTuplesImmediate() throws Exception
{
immediate = true;
testApp(new Application3());
}
@Test
public void testThreadLocalImmediate() throws Exception
{
immediate = true;
testApp(new Application4());
}
@Test
public void testContainerLocalImmediate() throws Exception
{
immediate = true;
testApp(new Application5());
}
public static class TestControlTuple implements ControlTuple
{
public long data;
public boolean immediate;
public TestControlTuple()
{
data = 0;
}
public TestControlTuple(long data, boolean immediate)
{
this.data = data;
this.immediate = immediate;
}
@Override
public boolean equals(Object t)
{
if (t instanceof TestControlTuple && ((TestControlTuple)t).data == this.data) {
return true;
}
return false;
}
@Override
public String toString()
{
return data + "";
}
@Override
public DeliveryType getDeliveryType()
{
if (immediate) {
return DeliveryType.IMMEDIATE;
} else {
return DeliveryType.END_WINDOW;
}
}
}
}