blob: 9f68a4f08591e4c38daa0fd5d96be689010f87c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.plan.logical;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.validation.ConstraintViolation;
import javax.validation.ConstraintViolationException;
import javax.validation.Valid;
import javax.validation.Validation;
import javax.validation.ValidationException;
import javax.validation.Validator;
import javax.validation.ValidatorFactory;
import javax.validation.constraints.AssertTrue;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.validation.constraints.Pattern;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.esotericsoftware.kryo.DefaultSerializer;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.datatorrent.api.AffinityRule;
import com.datatorrent.api.AffinityRule.Type;
import com.datatorrent.api.AffinityRulesSet;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Context.PortContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Module;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Sink;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.StringCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.DefaultDelayOperator;
import com.datatorrent.netlet.util.Slice;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.TestGeneratorInputOperator;
import com.datatorrent.stram.engine.TestNonOptionalOutportInputOperator;
import com.datatorrent.stram.engine.TestOutputOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
import com.datatorrent.stram.support.StramTestSupport.RegexMatcher;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
public class LogicalPlanTest
{
private LogicalPlan dag;
@Before
public void setUp()
{
dag = new LogicalPlan();
}
@Test
public void testCycleDetection()
{
//NodeConf operator1 = b.getOrAddNode("operator1");
GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
GenericTestOperator operator3 = dag.addOperator("operator3", GenericTestOperator.class);
GenericTestOperator operator4 = dag.addOperator("operator4", GenericTestOperator.class);
//NodeConf operator5 = b.getOrAddNode("operator5");
//NodeConf operator6 = b.getOrAddNode("operator6");
GenericTestOperator operator7 = dag.addOperator("operator7", GenericTestOperator.class);
// strongly connect n2-n3-n4-n2
dag.addStream("n2n3", operator2.outport1, operator3.inport1);
dag.addStream("n3n4", operator3.outport1, operator4.inport1);
dag.addStream("n4n2", operator4.outport1, operator2.inport1);
// self referencing operator cycle
StreamMeta n7n7 = dag.addStream("n7n7", operator7.outport1, operator7.inport1);
try {
n7n7.addSink(operator7.inport1);
fail("cannot add to stream again");
} catch (Exception e) {
// expected, stream can have single input/output only
}
LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
dag.findStronglyConnected(dag.getMeta(operator7), vc);
assertEquals("operator self reference", 1, vc.invalidCycles.size());
assertEquals("operator self reference", 1, vc.invalidCycles.get(0).size());
assertEquals("operator self reference", dag.getMeta(operator7), vc.invalidCycles.get(0).iterator().next());
// 3 operator cycle
vc = new LogicalPlan.ValidationContext();
dag.findStronglyConnected(dag.getMeta(operator4), vc);
assertEquals("3 operator cycle", 1, vc.invalidCycles.size());
assertEquals("3 operator cycle", 3, vc.invalidCycles.get(0).size());
assertTrue("operator2", vc.invalidCycles.get(0).contains(dag.getMeta(operator2)));
assertTrue("operator3", vc.invalidCycles.get(0).contains(dag.getMeta(operator3)));
assertTrue("operator4", vc.invalidCycles.get(0).contains(dag.getMeta(operator4)));
try {
dag.validate();
fail("validation should fail");
} catch (ValidationException e) {
// expected
}
}
@Test
public void testCycleDetectionWithDelay()
{
TestGeneratorInputOperator opA = dag.addOperator("A", TestGeneratorInputOperator.class);
GenericTestOperator opB = dag.addOperator("B", GenericTestOperator.class);
GenericTestOperator opC = dag.addOperator("C", GenericTestOperator.class);
GenericTestOperator opD = dag.addOperator("D", GenericTestOperator.class);
DefaultDelayOperator<Object> opDelay = dag.addOperator("opDelay", new DefaultDelayOperator<>());
DefaultDelayOperator<Object> opDelay2 = dag.addOperator("opDelay2", new DefaultDelayOperator<>());
dag.addStream("AtoB", opA.outport, opB.inport1);
dag.addStream("BtoC", opB.outport1, opC.inport1);
dag.addStream("CtoD", opC.outport1, opD.inport1);
dag.addStream("CtoDelay", opC.outport2, opDelay.input);
dag.addStream("DtoDelay", opD.outport1, opDelay2.input);
dag.addStream("DelayToB", opDelay.output, opB.inport2);
dag.addStream("Delay2ToC", opDelay2.output, opC.inport2);
LogicalPlan.ValidationContext vc = new LogicalPlan.ValidationContext();
dag.findStronglyConnected(dag.getMeta(opA), vc);
Assert.assertEquals("No invalid cycle", Collections.emptyList(), vc.invalidCycles);
Set<OperatorMeta> exp = Sets.newHashSet(dag.getMeta(opDelay2), dag.getMeta(opDelay), dag.getMeta(opC), dag.getMeta(opB), dag.getMeta(opD));
Assert.assertEquals("cycle", exp, vc.stronglyConnected.get(0));
}
public static class ValidationOperator extends BaseOperator
{
public final transient DefaultOutputPort<Object> goodOutputPort = new DefaultOutputPort<>();
public final transient DefaultOutputPort<Object> badOutputPort = new DefaultOutputPort<>();
}
public static class CounterOperator extends BaseOperator
{
public final transient InputPort<Object> countInputPort = new DefaultInputPort<Object>()
{
@Override
public final void process(Object payload)
{
}
};
}
@Test
public void testLogicalPlanSerialization() throws Exception
{
dag.setAttribute(OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
ValidationOperator validationNode = dag.addOperator("validationNode", ValidationOperator.class);
CounterOperator countGoodNode = dag.addOperator("countGoodNode", CounterOperator.class);
CounterOperator countBadNode = dag.addOperator("countBadNode", CounterOperator.class);
//ConsoleOutputOperator echoBadNode = dag.addOperator("echoBadNode", ConsoleOutputOperator.class);
// good tuples to counter operator
dag.addStream("goodTuplesStream", validationNode.goodOutputPort, countGoodNode.countInputPort);
// bad tuples to separate stream and echo operator
// (stream with 2 outputs)
dag.addStream("badTuplesStream", validationNode.badOutputPort, countBadNode.countInputPort);
Assert.assertEquals("number root operators", 1, dag.getRootOperators().size());
Assert.assertEquals("root operator id", "validationNode", dag.getRootOperators().get(0).getName());
dag.getContextAttributes(countGoodNode).put(OperatorContext.SPIN_MILLIS, 10);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
LogicalPlan.write(dag, bos);
ByteArrayInputStream bis = new ByteArrayInputStream(bos.toByteArray());
LogicalPlan dagClone = LogicalPlan.read(bis);
Assert.assertNotNull(dagClone);
Assert.assertEquals("number operators in clone", dag.getAllOperators().size(), dagClone.getAllOperators().size());
Assert.assertEquals("number root operators in clone", 1, dagClone.getRootOperators().size());
Assert.assertTrue("root operator in operators", dagClone.getAllOperators().contains(dagClone.getRootOperators().get(0)));
Operator countGoodNodeClone = dagClone.getOperatorMeta("countGoodNode").getOperator();
Assert.assertEquals("", new Integer(10), dagClone.getContextAttributes(countGoodNodeClone).get(OperatorContext.SPIN_MILLIS));
}
@Test
public void testDeleteOperator()
{
TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.addStream("s0", input.outport, o1.inport1);
StreamMeta s1 = dag.addStream("s1", o1.outport1, o2.inport1);
dag.validate();
Assert.assertEquals("", 3, dag.getAllOperators().size());
dag.removeOperator(o2);
s1.remove();
dag.validate();
Assert.assertEquals("", 2, dag.getAllOperators().size());
}
public static class ValidationTestOperator extends BaseOperator implements InputOperator
{
@NotNull
@Pattern(regexp = ".*malhar.*", message = "Value has to contain 'malhar'!")
private String stringField1;
@Min(2)
private int intField1;
@AssertTrue(message = "stringField1 should end with intField1")
private boolean isValidConfiguration()
{
return stringField1.endsWith(String.valueOf(intField1));
}
private String getterProperty2 = "";
@NotNull
public String getProperty2()
{
return getterProperty2;
}
public void setProperty2(String s)
{
// annotations need to be on the getter
getterProperty2 = s;
}
private String[] stringArrayField;
public String[] getStringArrayField()
{
return stringArrayField;
}
public void setStringArrayField(String[] stringArrayField)
{
this.stringArrayField = stringArrayField;
}
public class Nested
{
@NotNull
private String property = "";
public String getProperty()
{
return property;
}
public void setProperty(String property)
{
this.property = property;
}
}
@Valid
private final Nested nestedBean = new Nested();
private String stringProperty2;
public String getStringProperty2()
{
return stringProperty2;
}
public void setStringProperty2(String stringProperty2)
{
this.stringProperty2 = stringProperty2;
}
private Map<String, String> mapProperty = Maps.newHashMap();
public Map<String, String> getMapProperty()
{
return mapProperty;
}
public void setMapProperty(Map<String, String> mapProperty)
{
this.mapProperty = mapProperty;
}
@Override
public void emitTuples()
{
// Emit no tuples
}
}
@Test
public void testOperatorValidation()
{
ValidationTestOperator bean = new ValidationTestOperator();
bean.stringField1 = "malhar1";
bean.intField1 = 1;
// ensure validation standalone produces expected results
ValidatorFactory factory =
Validation.buildDefaultValidatorFactory();
Validator validator = factory.getValidator();
Set<ConstraintViolation<ValidationTestOperator>> constraintViolations =
validator.validate(bean);
Assert.assertEquals("" + constraintViolations,1, constraintViolations.size());
ConstraintViolation<ValidationTestOperator> cv = constraintViolations.iterator().next();
Assert.assertEquals("", bean.intField1, cv.getInvalidValue());
Assert.assertEquals("", "intField1", cv.getPropertyPath().toString());
// ensure DAG validation produces matching results
bean = dag.addOperator("testOperator", bean);
try {
dag.validate();
Assert.fail("should throw ConstraintViolationException");
} catch (ConstraintViolationException e) {
Assert.assertEquals("violation details", constraintViolations, e.getConstraintViolations());
String expRegex = ".*ValidationTestOperator\\{name=null}, propertyPath='intField1', message='must be greater than or equal to 2',.*value=1}]";
Assert.assertThat("exception message", e.getMessage(), RegexMatcher.matches(expRegex));
}
try {
bean.intField1 = 3;
dag.validate();
Assert.fail("should throw ConstraintViolationException");
} catch (ConstraintViolationException e) {
ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
Assert.assertEquals("", false, cv2.getInvalidValue());
Assert.assertEquals("", "validConfiguration", cv2.getPropertyPath().toString());
}
bean.stringField1 = "malhar3";
// annotated getter
try {
bean.getterProperty2 = null;
dag.validate();
Assert.fail("should throw ConstraintViolationException");
} catch (ConstraintViolationException e) {
ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
Assert.assertEquals("", null, cv2.getInvalidValue());
Assert.assertEquals("", "property2", cv2.getPropertyPath().toString());
}
bean.getterProperty2 = "";
// nested property
try {
bean.nestedBean.property = null;
dag.validate();
Assert.fail("should throw ConstraintViolationException");
} catch (ConstraintViolationException e) {
ConstraintViolation<?> cv2 = e.getConstraintViolations().iterator().next();
Assert.assertEquals("" + e.getConstraintViolations(), 1, constraintViolations.size());
Assert.assertEquals("", null, cv2.getInvalidValue());
Assert.assertEquals("", "nestedBean.property", cv2.getPropertyPath().toString());
}
bean.nestedBean.property = "";
// all valid
dag.validate();
}
@OperatorAnnotation(partitionable = false)
public static class TestOperatorAnnotationOperator extends BaseOperator
{
@InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<Object> input1 = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
}
};
}
class NoInputPortOperator extends BaseOperator
{
}
@Test
public void testValidationForNonInputRootOperator()
{
NoInputPortOperator x = dag.addOperator("x", new NoInputPortOperator());
try {
dag.validate();
Assert.fail("should fail because root operator is not input operator");
} catch (ValidationException e) {
// expected
}
}
@OperatorAnnotation(partitionable = false)
public static class TestOperatorAnnotationOperator2 extends BaseOperator implements Partitioner<TestOperatorAnnotationOperator2>
{
@Override
public Collection<Partition<TestOperatorAnnotationOperator2>> definePartitions(Collection<Partition<TestOperatorAnnotationOperator2>> partitions, PartitioningContext context)
{
return null;
}
@Override
public void partitioned(Map<Integer, Partition<TestOperatorAnnotationOperator2>> partitions)
{
}
}
@Test
public void testOperatorAnnotation()
{
TestGeneratorInputOperator input = dag.addOperator("input1", TestGeneratorInputOperator.class);
TestOperatorAnnotationOperator operator = dag.addOperator("operator1", TestOperatorAnnotationOperator.class);
dag.addStream("Connection", input.outport, operator.input1);
dag.setOperatorAttribute(operator, OperatorContext.PARTITIONER, new StatelessPartitioner<TestOperatorAnnotationOperator>(2));
try {
dag.validate();
Assert.fail("should raise operator is not partitionable for operator1");
} catch (ValidationException e) {
Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
}
dag.setOperatorAttribute(operator, OperatorContext.PARTITIONER, null);
dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, true);
try {
dag.validate();
Assert.fail("should raise operator is not partitionable for operator1");
} catch (ValidationException e) {
Assert.assertEquals("", "Operator " + dag.getMeta(operator).getName() + " is not partitionable but PARTITION_PARALLEL attribute is set", e.getMessage());
}
dag.setInputPortAttribute(operator.input1, PortContext.PARTITION_PARALLEL, false);
dag.validate();
dag.removeOperator(operator);
TestOperatorAnnotationOperator2 operator2 = dag.addOperator("operator2", TestOperatorAnnotationOperator2.class);
try {
dag.validate();
Assert.fail("should raise operator is not partitionable for operator2");
} catch (ValidationException e) {
Assert.assertEquals("Operator " + dag.getMeta(operator2).getName() + " provides partitioning capabilities but the annotation on the operator class declares it non partitionable!", e.getMessage());
}
}
@Test(expected = IllegalArgumentException.class)
public void testNullOperatorName()
{
dag.addOperator(null, BaseOperator.class);
}
@Test(expected = IllegalArgumentException.class)
public void testEmptyOperatorName()
{
dag.addOperator("", BaseOperator.class);
}
@Test(expected = IllegalArgumentException.class)
public void testNullStreamId()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
dag.addStream(null, o1.outport1, o1.inport1, o1.inport2 );
}
@Test(expected = IllegalArgumentException.class)
public void testEmptyStreamId()
{
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
dag.addStream("", o1.outport1, o1.inport1 );
}
@Test(expected = IllegalArgumentException.class)
public void testEmptyModuleName()
{
Module testModule = mock(Module.class);
dag.addModule("", testModule);
}
@Test(expected = IllegalArgumentException.class)
public void testNullModuleName()
{
Module testModule = mock(Module.class);
dag.addModule(null, testModule);
}
@Test
public void testPortConnectionValidation()
{
TestNonOptionalOutportInputOperator input = dag.addOperator("input1", TestNonOptionalOutportInputOperator.class);
try {
dag.validate();
Assert.fail("should raise port not connected for input1.outputPort1");
} catch (ValidationException e) {
Assert.assertEquals("", "Output port connection required: input1.outport1", e.getMessage());
}
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
dag.addStream("stream1", input.outport1, o1.inport1);
dag.validate();
// required input
dag.addOperator("counter", CounterOperator.class);
try {
dag.validate();
} catch (ValidationException e) {
Assert.assertEquals("", "Input port connection required: counter.countInputPort", e.getMessage());
}
}
@Test
public void testAtMostOnceProcessingModeValidation()
{
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
GenericTestOperator amoOper = dag.addOperator("amoOper", GenericTestOperator.class);
dag.setOperatorAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
dag.addStream("input1.outport", input1.outport, amoOper.inport1);
dag.addStream("input2.outport", input2.outport, amoOper.inport2);
GenericTestOperator outputOper = dag.addOperator("outputOper", GenericTestOperator.class);
dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
dag.addStream("aloOper.outport1", amoOper.outport1, outputOper.inport1);
try {
dag.validate();
Assert.fail("Exception expected for " + outputOper);
} catch (ValidationException ve) {
Assert.assertEquals("", ve.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/AT_MOST_ONCE");
}
dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, null);
dag.validate();
OperatorMeta outputOperOm = dag.getMeta(outputOper);
Assert.assertEquals("" + outputOperOm.getAttributes(), Operator.ProcessingMode.AT_MOST_ONCE, outputOperOm.getValue(OperatorContext.PROCESSING_MODE));
}
@Test
public void testExactlyOnceProcessingModeValidation()
{
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
GenericTestOperator amoOper = dag.addOperator("amoOper", GenericTestOperator.class);
dag.setOperatorAttribute(amoOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.EXACTLY_ONCE);
dag.addStream("input1.outport", input1.outport, amoOper.inport1);
dag.addStream("input2.outport", input2.outport, amoOper.inport2);
GenericTestOperator outputOper = dag.addOperator("outputOper", GenericTestOperator.class);
dag.addStream("aloOper.outport1", amoOper.outport1, outputOper.inport1);
try {
dag.validate();
Assert.fail("Exception expected for " + outputOper);
} catch (ValidationException ve) {
Assert.assertEquals("", ve.getMessage(), "Processing mode for outputOper should be AT_MOST_ONCE for source amoOper/EXACTLY_ONCE");
}
dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_LEAST_ONCE);
try {
dag.validate();
Assert.fail("Exception expected for " + outputOper);
} catch (ValidationException ve) {
Assert.assertEquals("", ve.getMessage(), "Processing mode outputOper/AT_LEAST_ONCE not valid for source amoOper/EXACTLY_ONCE");
}
// AT_MOST_ONCE is valid
dag.setOperatorAttribute(outputOper, OperatorContext.PROCESSING_MODE, Operator.ProcessingMode.AT_MOST_ONCE);
dag.validate();
}
@Test
public void testLocalityValidation()
{
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
StreamMeta s1 = dag.addStream("input1.outport", input1.outport, o1.inport1).setLocality(Locality.THREAD_LOCAL);
dag.validate();
TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
dag.addStream("input2.outport", input2.outport, o1.inport2);
try {
dag.validate();
Assert.fail("Exception expected for " + o1);
} catch (ValidationException ve) {
Assert.assertThat("", ve.getMessage(), RegexMatcher.matches("Locality THREAD_LOCAL invalid for operator .* with multiple input streams .*"));
}
s1.setLocality(null);
dag.validate();
}
private class TestAnnotationsOperator extends BaseOperator implements InputOperator
{
//final public transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<Object>();
@OutputPortFieldAnnotation(optional = false)
public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>();
@Override
public void emitTuples()
{
// Emit Nothing
}
}
private class TestAnnotationsOperator2 extends BaseOperator implements InputOperator
{
public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
@Override
public void emitTuples()
{
// Emit Nothing
}
}
private class TestAnnotationsOperator3 extends BaseOperator implements InputOperator
{
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Object> outport1 = new DefaultOutputPort<>();
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Object> outport2 = new DefaultOutputPort<>();
@Override
public void emitTuples()
{
// Emit Nothing
}
}
@Test
public void testOutputPortAnnotation()
{
TestAnnotationsOperator ta1 = dag.addOperator("testAnnotationsOperator", new TestAnnotationsOperator());
try {
dag.validate();
Assert.fail("should raise: port connection required");
} catch (ValidationException e) {
Assert.assertEquals("", "Output port connection required: testAnnotationsOperator.outport2", e.getMessage());
}
TestOutputOperator o2 = dag.addOperator("sink", new TestOutputOperator());
dag.addStream("s1", ta1.outport2, o2.inport);
dag.validate();
TestAnnotationsOperator2 ta2 = dag.addOperator("multiOutputPorts1", new TestAnnotationsOperator2());
dag.validate();
TestOutputOperator o3 = dag.addOperator("o3", new TestOutputOperator());
dag.addStream("s2", ta2.outport1, o3.inport);
dag.addOperator("multiOutputPorts3", new TestAnnotationsOperator3());
dag.validate();
}
/**
* Operator that can be used with default Java serialization instead of Kryo
*/
@DefaultSerializer(JavaSerializer.class)
public static class JdkSerializableOperator extends BaseOperator implements Serializable
{
private static final long serialVersionUID = -4024202339520027097L;
public abstract class SerializableInputPort<T> implements InputPort<T>, Sink<T>, java.io.Serializable
{
private static final long serialVersionUID = 1L;
@Override
public Sink<T> getSink()
{
return this;
}
@Override
public void setConnected(boolean connected)
{
}
@Override
public void setup(PortContext context)
{
}
@Override
public void teardown()
{
}
@Override
public StreamCodec<T> getStreamCodec()
{
return null;
}
}
@InputPortFieldAnnotation(optional = true)
public final InputPort<Object> inport1 = new SerializableInputPort<Object>()
{
private static final long serialVersionUID = 1L;
@Override
public final void put(Object payload)
{
}
@Override
public int getCount(boolean reset)
{
return 0;
}
};
}
@Test
public void testJdkSerializableOperator() throws Exception
{
dag.addOperator("o1", new JdkSerializableOperator());
ByteArrayOutputStream outStream = new ByteArrayOutputStream();
LogicalPlan.write(dag, outStream);
outStream.close();
LogicalPlan clonedDag = LogicalPlan.read(new ByteArrayInputStream(outStream.toByteArray()));
JdkSerializableOperator o1Clone = (JdkSerializableOperator)clonedDag.getOperatorMeta("o1").getOperator();
Assert.assertNotNull("port object null", o1Clone.inport1);
}
@Test
public void testAttributeValuesSerializableCheck() throws NoSuchFieldException, SecurityException, IllegalArgumentException, IllegalAccessException
{
Attribute<Object> attr = new Attribute<>(new TestAttributeValue(), new Object2String());
Field nameField = Attribute.class.getDeclaredField("name");
nameField.setAccessible(true);
nameField.set(attr, "Test_Attribute");
nameField.setAccessible(false);
assertNotNull(attr);
// Dag attribute not serializable test
dag.setAttribute(attr, new TestAttributeValue());
try {
dag.validate();
Assert.fail("Setting not serializable attribute should throw exception");
} catch (ValidationException e) {
assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in com.datatorrent.api.DAG are not serializable", e.getMessage());
}
// Operator attribute not serializable test
dag = new LogicalPlan();
TestGeneratorInputOperator operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class);
dag.setOperatorAttribute(operator, attr, new TestAttributeValue());
try {
dag.validate();
Assert.fail("Setting not serializable attribute should throw exception");
} catch (ValidationException e) {
assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator are not serializable", e.getMessage());
}
// Output Port attribute not serializable test
dag = new LogicalPlan();
operator = dag.addOperator("TestOperator", TestGeneratorInputOperator.class);
dag.setOutputPortAttribute(operator.outport, attr, new TestAttributeValue());
try {
dag.validate();
Assert.fail("Setting not serializable attribute should throw exception");
} catch (ValidationException e) {
assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.outport are not serializable", e.getMessage());
}
// Input Port attribute not serializable test
dag = new LogicalPlan();
GenericTestOperator operator1 = dag.addOperator("TestOperator", GenericTestOperator.class);
dag.setInputPortAttribute(operator1.inport1, attr, new TestAttributeValue());
try {
dag.validate();
Assert.fail("Setting non serializable attribute should throw exception");
} catch (ValidationException e) {
assertEquals("Validation Exception should match ", "Attribute value(s) for Test_Attribute in TestOperator.inport1 are not serializable", e.getMessage());
}
}
private static class Object2String implements StringCodec<Object>
{
@Override
public Object fromString(String string)
{
// Stub method for testing - do nothing
return null;
}
@Override
public String toString(Object pojo)
{
// Stub method for testing - do nothing
return null;
}
}
private static class TestAttributeValue
{
}
private static class TestStreamCodec implements StreamCodec<Object>
{
@Override
public Object fromByteArray(Slice fragment)
{
return fragment.stringValue();
}
@Override
public Slice toByteArray(Object o)
{
byte[] b = o.toString().getBytes();
return new Slice(b, 0, b.length);
}
@Override
public int getPartition(Object o)
{
return o.hashCode() / 2;
}
}
public static class TestPortCodecOperator extends BaseOperator
{
public final transient DefaultInputPort<Object> inport1 = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
}
@Override
public StreamCodec<Object> getStreamCodec()
{
return new TestStreamCodec();
}
};
@OutputPortFieldAnnotation(optional = true)
public final transient DefaultOutputPort<Object> outport = new DefaultOutputPort<>();
}
/*
@Test
public void testStreamCodec() throws Exception {
TestGeneratorInputOperator input = dag.addOperator("input", TestGeneratorInputOperator.class);
GenericTestOperator gto1 = dag.addOperator("gto1", GenericTestOperator.class);
StreamMeta stream1 = dag.addStream("s1", input.outport, gto1.inport1);
StreamCodec<?> codec1 = new TestStreamCodec();
dag.setInputPortAttribute(gto1.inport1, PortContext.STREAM_CODEC, codec1);
dag.validate();
//Assert.assertEquals("Stream codec not set", stream1.getStreamCodec(), codec1);
GenericTestOperator gto2 = dag.addOperator("gto2", GenericTestOperator.class);
GenericTestOperator gto3 = dag.addOperator("gto3", GenericTestOperator.class);
StreamMeta stream2 = dag.addStream("s2", gto1.outport1, gto2.inport1, gto3.inport1);
dag.setInputPortAttribute(gto2.inport1, PortContext.STREAM_CODEC, codec1);
try {
dag.validate();
} catch (ValidationException e) {
String msg = e.getMessage();
if (!msg.startsWith("Stream codec not set on input port") || !msg.contains("gto3")
|| !msg.contains(codec1.toString()) || !msg.endsWith("was specified on another port")) {
Assert.fail(String.format("LogicalPlan validation error msg: %s", msg));
}
}
dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec1);
dag.validate();
//Assert.assertEquals("Stream codec not set", stream2.getStreamCodec(), codec1);
StreamCodec<?> codec2 = new TestStreamCodec();
dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec2);
try {
dag.validate();
} catch (ValidationException e) {
String msg = e.getMessage();
if (!msg.startsWith("Conflicting stream codec set on input port") || !msg.contains("gto3")
|| !msg.contains(codec2.toString()) || !msg.endsWith("was specified on another port")) {
Assert.fail(String.format("LogicalPlan validation error msg: %s", msg));
}
}
dag.setInputPortAttribute(gto3.inport1, PortContext.STREAM_CODEC, codec1);
TestPortCodecOperator pco = dag.addOperator("pco", TestPortCodecOperator.class);
StreamMeta stream3 = dag.addStream("s3", gto2.outport1, pco.inport1);
dag.validate();
//Assert.assertEquals("Stream codec class not set", stream3.getCodecClass(), TestStreamCodec.class);
dag.setInputPortAttribute(pco.inport1, PortContext.STREAM_CODEC, codec2);
dag.validate();
//Assert.assertEquals("Stream codec not set", stream3.getStreamCodec(), codec2);
}
*/
@Test
public void testCheckpointableWithinAppWindowAnnotation()
{
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator x = dag.addOperator("x", new GenericTestOperator());
dag.addStream("Stream1", input1.outport, x.inport1);
dag.setOperatorAttribute(x, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
dag.setOperatorAttribute(x, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
dag.validate();
TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
CheckpointableWithinAppWindowOperator y = dag.addOperator("y", new CheckpointableWithinAppWindowOperator());
dag.addStream("Stream2", input2.outport, y.inport1);
dag.setOperatorAttribute(y, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
dag.setOperatorAttribute(y, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
dag.validate();
TestGeneratorInputOperator input3 = dag.addOperator("input3", TestGeneratorInputOperator.class);
NotCheckpointableWithinAppWindowOperator z = dag.addOperator("z", new NotCheckpointableWithinAppWindowOperator());
dag.addStream("Stream3", input3.outport, z.inport1);
dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 15);
dag.setOperatorAttribute(z, OperatorContext.APPLICATION_WINDOW_COUNT, 30);
try {
dag.validate();
Assert.fail("should fail because chekpoint window count is not a factor of application window count");
} catch (ValidationException e) {
// expected
}
dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 30);
dag.validate();
dag.setOperatorAttribute(z, OperatorContext.CHECKPOINT_WINDOW_COUNT, 45);
try {
dag.validate();
Assert.fail("should fail because chekpoint window count is not a factor of application window count");
} catch (ValidationException e) {
// expected
}
}
@OperatorAnnotation(checkpointableWithinAppWindow = true)
class CheckpointableWithinAppWindowOperator extends GenericTestOperator
{
}
@OperatorAnnotation(checkpointableWithinAppWindow = false)
class NotCheckpointableWithinAppWindowOperator extends GenericTestOperator
{
}
@Test
public void testInputPortHiding()
{
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
Operator2 operator2 = dag.addOperator("operator2", new Operator2());
dag.addStream("Stream1", input1.outport, operator2.input);
dag.validate();
}
@Test
public void testInvalidInputPortConnection()
{
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
Operator1 operator1 = dag.addOperator("operator3", new Operator3());
dag.addStream("Stream1", input1.outport, operator1.input);
try {
dag.validate();
} catch (ValidationException ex) {
Assert.assertTrue("validation message", ex.getMessage().startsWith("Invalid port connected"));
return;
}
Assert.fail();
}
@Test
public void testAffinityRulesDagValidation()
{
TestGeneratorInputOperator o1 = dag.addOperator("O1", new TestGeneratorInputOperator());
GenericTestOperator o2 = dag.addOperator("O2", new GenericTestOperator());
GenericTestOperator o3 = dag.addOperator("O3", new GenericTestOperator());
dag.addStream("stream1", o1.outport, o2.inport1).setLocality(Locality.THREAD_LOCAL);
StreamMeta stream2 = dag.addStream("stream2", o2.outport1, o3.inport1).setLocality(Locality.CONTAINER_LOCAL);
AffinityRulesSet ruleSet = new AffinityRulesSet();
// Valid case:
List<AffinityRule> rules = new ArrayList<>();
ruleSet.setAffinityRules(rules);
AffinityRule rule1 = new AffinityRule(Type.AFFINITY, Locality.CONTAINER_LOCAL, false, "O1", "O3");
rules.add(rule1);
dag.setAttribute(DAGContext.AFFINITY_RULES_SET, ruleSet);
dag.validate();
// Locality conflicts with affinity rules case:
AffinityRule rule2 = new AffinityRule(Type.ANTI_AFFINITY, Locality.NODE_LOCAL, false, "O2", "O3");
rules.add(rule2);
try {
dag.validate();
Assert.fail("DAG validation should fail due to conflicting rules");
} catch (ValidationException e) {
Assert.assertEquals("Anti Affinity rule for operators O2 & O3 conflicts with affinity rules or Stream locality", e.getMessage());
}
// Change Stream2 locality to Node to check if validation passes
stream2.setLocality(Locality.RACK_LOCAL);
dag.validate();
// Add anti-affinity rule conflicting with rule1
AffinityRule rule3 = new AffinityRule(Type.ANTI_AFFINITY, Locality.NODE_LOCAL, false, "O1", "O3");
rules.add(rule3);
try {
dag.validate();
Assert.fail("DAG validation should fail due to conflicting rules");
} catch (ValidationException e) {
Assert.assertEquals("Anti Affinity rule for operators O1 & O3 conflicts with affinity rules or Stream locality", e.getMessage());
}
// Change rule1 to Rack local to see if dag validation passes
rules.clear();
rule1.setLocality(Locality.RACK_LOCAL);
rules.add(rule1);
rules.add(rule2);
rules.add(rule3);
dag.validate();
// Add conflicting rules and set relaxLocality for one rule
AffinityRule rule4 = new AffinityRule(Type.ANTI_AFFINITY, Locality.NODE_LOCAL, true, "O1", "O2");
rules.add(rule4);
dag.validate();
// Set conflicting host locality and check if it fails validation
rules.clear();
AffinityRule rule = new AffinityRule(Type.ANTI_AFFINITY, Locality.NODE_LOCAL, false, "O2", "O3");
rules.add(rule);
dag.getMeta(o2).getAttributes().put(OperatorContext.LOCALITY_HOST, "host1");
dag.getMeta(o3).getAttributes().put(OperatorContext.LOCALITY_HOST, "host1");
try {
dag.validate();
Assert.fail("DAG validation should fail due to conflicting host locality");
} catch (ValidationException e) {
Assert.assertEquals("Host Locality for operators: O2(host: host1) & O3(host: host1) conflict with anti-affinity rules", e.getMessage());
}
// Set conflicting affinity and different host locality for node-local
// operators
rules.clear();
rule = new AffinityRule(Type.AFFINITY, Locality.NODE_LOCAL, false, "O2", "O3");
rules.add(rule);
dag.getMeta(o2).getAttributes().put(OperatorContext.LOCALITY_HOST, "host1");
dag.getMeta(o3).getAttributes().put(OperatorContext.LOCALITY_HOST, "host2");
try {
dag.validate();
Assert.fail("DAG validation should fail due to conflicting host locality");
} catch (ValidationException e) {
Assert.assertEquals("Host Locality for operators: O2(host: host1) & O3(host: host2) conflicts with affinity rules", e.getMessage());
}
// Check affinity Thread local validation for non-connected operators
dag.getAttributes().get(DAGContext.AFFINITY_RULES_SET).getAffinityRules().clear();
rule = new AffinityRule(Type.AFFINITY, Locality.THREAD_LOCAL, false, "O1", "O3");
rules.add(rule);
try {
dag.validate();
Assert.fail("DAG validation should fail due to conflicting host locality");
} catch (ValidationException e) {
Assert.assertEquals("Affinity rule specified THREAD_LOCAL affinity for operators O1 & O3 which are not connected by stream", e.getMessage());
}
// Check indirect conflict
dag = new LogicalPlan();
o1 = dag.addOperator("O1", new TestGeneratorInputOperator());
o2 = dag.addOperator("O2", new GenericTestOperator());
o3 = dag.addOperator("O3", new GenericTestOperator());
GenericTestOperator o4 = dag.addOperator("O4", new GenericTestOperator());
GenericTestOperator o5 = dag.addOperator("O5", new GenericTestOperator());
dag.addStream("stream1", o1.outport, o2.inport1, o3.inport1).setLocality(Locality.NODE_LOCAL);
dag.addStream("stream2", o3.outport1, o4.inport1);
dag.addStream("stream3", o2.outport1, o5.inport1);
rules.clear();
// O3 and O5 cannot have NODE_LOCAL anti-affinity now, since they already have NODE_LOCAL affinity
rules.add(new AffinityRule(Type.AFFINITY, Locality.CONTAINER_LOCAL, false, "O1", "O5"));
rules.add(new AffinityRule(Type.ANTI_AFFINITY, Locality.NODE_LOCAL, false, "O3", "O5"));
ruleSet = new AffinityRulesSet();
ruleSet.setAffinityRules(rules);
dag.setAttribute(DAGContext.AFFINITY_RULES_SET, ruleSet);
try {
dag.validate();
Assert.fail("dag validation should fail due to conflicting affinity rules");
} catch (ValidationException e) {
Assert.assertEquals("Anti Affinity rule for operators O3 & O5 conflicts with affinity rules or Stream locality", e.getMessage());
}
}
class Operator1 extends BaseOperator
{
public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
}
};
}
class Operator2 extends Operator1
{
public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
}
};
}
class Operator3 extends Operator1
{
@InputPortFieldAnnotation(optional = true)
public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
}
};
}
@Test
public void testOutputPortHiding()
{
Operator5 operator5 = dag.addOperator("input", new Operator5());
Operator2 operator2 = dag.addOperator("operator2", new Operator2());
dag.addStream("Stream1", operator5.output, operator2.input);
dag.validate();
}
@Test(expected = ValidationException.class)
public void testInvalidOutputPortConnection()
{
Operator4 operator4 = dag.addOperator("input", new Operator5());
Operator3 operator3 = dag.addOperator("operator3", new Operator3());
dag.addStream("Stream1", operator4.output, operator3.input);
dag.validate();
}
class Operator4 extends BaseOperator implements InputOperator
{
public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
@Override
public void emitTuples()
{
}
}
class Operator5 extends Operator4
{
public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<>();
}
/*
These were tests for operator semantics that verified if an operator class implements InputOperator then the same class should not declare input ports.
This would be done later when we are able to verify user code at compile-time.
validation()
{
if (n.getOperator() instanceof InputOperator) {
try {
for (Class<?> clazz : n.getOperator().getClass().getInterfaces()) {
if (clazz.getName().equals(InputOperator.class.getName())) {
for (Field field : n.getOperator().getClass().getDeclaredFields()) {
field.setAccessible(true);
Object declaredObject = field.get(n.getOperator());
if (declaredObject instanceof InputPort) {
throw new ValidationException("Operator class implements InputOperator and also declares input ports: " + n.name);
}
}
break;
}
}
}
catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testInvalidInputOperatorDeclaration()
{
TestGeneratorInputOperator.InvalidInputOperator inputOperator = dag.addOperator("input", new TestGeneratorInputOperator.InvalidInputOperator());
GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
dag.addStream("stream1", inputOperator.outport, operator2.inport1);
try {
dag.validate();
fail("validation should fail");
}
catch (ValidationException e) {
// expected
}
}
@Test
public void testValidInputOperatorDeclaration()
{
TestGeneratorInputOperator.ValidGenericOperator operator1 = dag.addOperator("input", new TestGeneratorInputOperator.ValidGenericOperator());
GenericTestOperator operator2 = dag.addOperator("operator2", GenericTestOperator.class);
dag.addStream("stream1", operator1.outport, operator2.inport1);
dag.validate();
}
*/
}