blob: 26e913bfd7c31edd347b02992d3637e56054e98f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.stream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import javax.validation.ConstraintViolationException;
import javax.validation.ValidationException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.engine.GenericNodeTest.GenericOperator;
import com.datatorrent.stram.engine.ProcessingModeTests.CollectorOperator;
import com.datatorrent.stram.engine.RecoverableInputOperator;
import com.datatorrent.stram.plan.TestPlanContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.datatorrent.stram.support.StramTestSupport.MemoryStorageAgent;
/**
*
*/
public class OiOStreamTest
{
public OiOStreamTest()
{
}
@Test
public void validatePositiveOiO()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
RecoverableInputOperator inputOperator = plan.addOperator("IntegerGenerator", new RecoverableInputOperator());
CollectorOperator outputOperator = plan.addOperator("IntegerCollector", new CollectorOperator());
plan.addStream("PossibleOiO", inputOperator.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiO validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OIO Single InputPort");
}
}
@Test
public void validatePositiveOiOiO()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator = plan.addOperator("intermediateOperator", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingOutputOperator outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingOutputOperator());
plan.addStream("OiO1", inputOperator.output, intermediateOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiO2", intermediateOperator.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiOiO validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OiOiO validation");
}
}
@Test
public void validatePositiveOiOOptionalInput()
{
LogicalPlan plan = new LogicalPlan();
RecoverableInputOperator inputOp1 = plan.addOperator("InputOperator1", new RecoverableInputOperator());
GenericOperator genOp = plan.addOperator("GenericOperator", new GenericOperator());
plan.addStream("OiO1", inputOp1.output, genOp.ip1).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiO validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OiO Single Connected InputPort");
}
}
@Test
public void validateNegativeOiO()
{
LogicalPlan plan = new LogicalPlan();
RecoverableInputOperator inputOp1 = plan.addOperator("InputOperator1", new RecoverableInputOperator());
RecoverableInputOperator inputOp2 = plan.addOperator("InputOperator2", new RecoverableInputOperator());
GenericOperator genOp = plan.addOperator("GenericOperator", new GenericOperator());
StreamMeta oio1 = plan.addStream("OiO1", inputOp1.output, genOp.ip1).setLocality(Locality.THREAD_LOCAL);
StreamMeta oio2 = plan.addStream("OiO2", inputOp2.output, genOp.ip2).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.fail("OIO Both InputPorts");
} catch (ConstraintViolationException ex) {
Assert.assertTrue("OiO validation passed", true);
} catch (ValidationException ex) {
Assert.assertTrue("OiO validation passed", true);
}
oio1.setLocality(null);
try {
plan.validate();
Assert.fail("OIO First InputPort");
} catch (ConstraintViolationException ex) {
Assert.assertTrue("OiO validation passed", true);
} catch (ValidationException ex) {
Assert.assertTrue("OiO validation passed", true);
}
oio1.setLocality(Locality.THREAD_LOCAL);
oio2.setLocality(null);
try {
plan.validate();
Assert.fail("OIO Second InputPort");
} catch (ConstraintViolationException ex) {
Assert.assertTrue("OiO validation passed", true);
} catch (ValidationException ex) {
Assert.assertTrue("OiO validation passed", true);
}
}
@Test
public void validatePositiveOiOiOdiamond()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout1", intermediateOperator1.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout2", intermediateOperator2.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiOiO diamond validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OIOIO diamond validation");
}
}
@Test
public void validatePositiveOiOiOdiamondWithCores()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator3 = plan.addOperator("intermediateOperator3", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator4 = plan.addOperator("intermediateOperator4", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator3.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOIntermediate1", intermediateOperator1.output, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOIntermediate2", intermediateOperator3.output, intermediateOperator4.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout1", intermediateOperator2.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout2", intermediateOperator4.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL);
plan.setOperatorAttribute(inputOperator, OperatorContext.VCORES, 1);
plan.setOperatorAttribute(intermediateOperator1, OperatorContext.VCORES, 1);
plan.setOperatorAttribute(intermediateOperator2, OperatorContext.VCORES, 2);
plan.setOperatorAttribute(intermediateOperator3, OperatorContext.VCORES, 3);
plan.setOperatorAttribute(intermediateOperator4, OperatorContext.VCORES, 5);
plan.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
try {
plan.validate();
Assert.assertTrue("OiOiO extended diamond validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OIOIO extended diamond validation");
}
PhysicalPlan physicalPlan = new PhysicalPlan(plan, new TestPlanContext());
Assert.assertTrue("number of containers", 1 == physicalPlan.getContainers().size());
Assert.assertTrue("number of vcores " + physicalPlan.getContainers().get(0).getRequiredVCores(), 5 == physicalPlan.getContainers().get(0).getRequiredVCores());
}
@Test
public void validateNegativeOiOiOdiamond()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout1", intermediateOperator1.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("nonOiOout2", intermediateOperator2.output, outputOperator.input2).setLocality(null);
try {
plan.validate();
Assert.fail("OIOIO negative diamond");
} catch (ConstraintViolationException ex) {
Assert.assertTrue("OIOIO negative diamond", true);
} catch (ValidationException ex) {
Assert.assertTrue("OIOIO negative diamond", true);
}
}
@Test
public void validatePositiveOiOiOExtendeddiamond()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator3 = plan.addOperator("intermediateOperator3", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator4 = plan.addOperator("intermediateOperator4", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator3.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOIntermediate1", intermediateOperator1.output, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOIntermediate2", intermediateOperator3.output, intermediateOperator4.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout1", intermediateOperator2.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout2", intermediateOperator4.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiOiO extended diamond validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OIOIO extended diamond validation");
}
}
@Test
public void validateNegativeOiOiOExtendeddiamond()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator3 = plan.addOperator("intermediateOperator3", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator4 = plan.addOperator("intermediateOperator4", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator3.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOIntermediate1", intermediateOperator1.output, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("nonOiOIntermediate2", intermediateOperator3.output, intermediateOperator4.input).setLocality(null);
plan.addStream("OiOout1", intermediateOperator2.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout2", intermediateOperator4.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.fail("OiOiO extended diamond validation");
} catch (ConstraintViolationException ex) {
Assert.assertTrue("OiOiO extended diamond validation", true);
} catch (ValidationException ex) {
Assert.assertTrue("OiOiO extended diamond validation", true);
}
}
public static class ThreadIdValidatingInputOperator implements InputOperator
{
public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
public static long threadId;
@Override
public void emitTuples()
{
assert (threadId == Thread.currentThread().getId());
}
@Override
public void beginWindow(long windowId)
{
assert (threadId == Thread.currentThread().getId());
}
@Override
public void endWindow()
{
assert (threadId == Thread.currentThread().getId());
BaseOperator.shutdown();
}
@Override
public void setup(OperatorContext context)
{
threadId = Thread.currentThread().getId();
}
@Override
public void teardown()
{
assert (threadId == Thread.currentThread().getId());
}
}
public static class ThreadIdValidatingOutputOperator implements Operator
{
public static long threadId;
public static List<Long> threadList = Collections.synchronizedList(new ArrayList<Long>());
public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
{
@Override
public void process(Number tuple)
{
assert (threadList.contains(Thread.currentThread().getId()));
}
};
@Override
public void beginWindow(long windowId)
{
assert (threadList.contains(Thread.currentThread().getId()));
}
@Override
public void endWindow()
{
assert (threadList.contains(Thread.currentThread().getId()));
}
@Override
public void setup(OperatorContext context)
{
threadId = Thread.currentThread().getId();
threadList.add(Thread.currentThread().getId());
}
@Override
public void teardown()
{
assert (threadList.contains(Thread.currentThread().getId()));
}
}
public static class ThreadIdValidatingGenericIntermediateOperator implements Operator
{
public static long threadId;
public static List<Long> threadList = Collections.synchronizedList(new ArrayList<Long>());
public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
{
@Override
public void process(Number tuple)
{
assert (threadList.contains(Thread.currentThread().getId()));
}
};
@Override
public void beginWindow(long windowId)
{
assert (threadList.contains(Thread.currentThread().getId()));
}
@Override
public void endWindow()
{
assert (threadList.contains(Thread.currentThread().getId()));
}
@Override
public void setup(OperatorContext context)
{
threadId = Thread.currentThread().getId();
threadList.add(Thread.currentThread().getId());
}
@Override
public void teardown()
{
assert (threadList.contains(Thread.currentThread().getId()));
}
public final transient DefaultOutputPort<Long> output = new DefaultOutputPort<>();
}
public static class ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts extends ThreadIdValidatingGenericIntermediateOperator
{
public final transient DefaultOutputPort<Long> output2 = new DefaultOutputPort<>();
}
public static class ThreadIdValidatingGenericOperatorWithTwoInputPorts implements Operator
{
public static long threadId;
public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>()
{
@Override
public void process(Number tuple)
{
assert (threadId == Thread.currentThread().getId());
}
};
public final transient DefaultInputPort<Number> input2 = new DefaultInputPort<Number>()
{
@Override
public void process(Number tuple)
{
assert (threadId == Thread.currentThread().getId());
}
};
@Override
public void beginWindow(long windowId)
{
assert (threadId == Thread.currentThread().getId());
}
@Override
public void endWindow()
{
assert (threadId == Thread.currentThread().getId());
}
@Override
public void setup(OperatorContext context)
{
threadId = Thread.currentThread().getId();
}
@Override
public void teardown()
{
assert (threadId == Thread.currentThread().getId());
}
}
@Test
public void validateOiOImplementation() throws Exception
{
LogicalPlan lp = new LogicalPlan();
lp.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
ThreadIdValidatingInputOperator io = lp.addOperator("Input Operator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingOutputOperator go = lp.addOperator("Output Operator", new ThreadIdValidatingOutputOperator());
StreamMeta stream = lp.addStream("Stream", io.output, go.input);
/* The first test makes sure that when they are not ThreadLocal they use different threads */
ThreadIdValidatingOutputOperator.threadList.clear();
lp.validate();
StramLocalCluster slc = new StramLocalCluster(lp);
slc.run();
Assert.assertFalse("Thread Id", ThreadIdValidatingInputOperator.threadId == ThreadIdValidatingOutputOperator.threadId);
/* This test makes sure that since they are ThreadLocal, they indeed share a thread */
ThreadIdValidatingOutputOperator.threadList.clear();
stream.setLocality(Locality.THREAD_LOCAL);
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("Thread Id", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingOutputOperator.threadId);
}
@Test
public void validateOiOiOImplementation() throws Exception
{
LogicalPlan lp = new LogicalPlan();
lp.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
ThreadIdValidatingInputOperator inputOperator = lp.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator = lp.addOperator("intermediateOperator", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingOutputOperator outputOperator = lp.addOperator("outputOperator", new ThreadIdValidatingOutputOperator());
StreamMeta stream1 = lp.addStream("OiO1", inputOperator.output, intermediateOperator.input);
StreamMeta stream2 = lp.addStream("OiO2", intermediateOperator.output, outputOperator.input);
StramLocalCluster slc;
/* The first test makes sure that when they are not ThreadLocal they use different threads */
ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertFalse("Thread Id 1", ThreadIdValidatingInputOperator.threadId == ThreadIdValidatingGenericIntermediateOperator.threadId);
Assert.assertFalse("Thread Id 2", ThreadIdValidatingGenericIntermediateOperator.threadId == ThreadIdValidatingOutputOperator.threadId);
/* This test makes sure that since they are ThreadLocal, they indeed share a thread */
ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
stream1.setLocality(Locality.THREAD_LOCAL);
stream2.setLocality(Locality.THREAD_LOCAL);
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("Thread Id 3", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericIntermediateOperator.threadId);
Assert.assertEquals("Thread Id 4", ThreadIdValidatingGenericIntermediateOperator.threadId, ThreadIdValidatingOutputOperator.threadId);
}
@Test
public void validateOiOiODiamondImplementation() throws Exception
{
LogicalPlan lp = new LogicalPlan();
lp.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
ThreadIdValidatingInputOperator inputOperator = lp.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = lp.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = lp.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = lp.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
StreamMeta stream1 = lp.addStream("OiOinput", inputOperator.output, intermediateOperator1.input, intermediateOperator2.input);
StreamMeta stream2 = lp.addStream("OiOintermediateToOutput1", intermediateOperator1.output, outputOperator.input);
StreamMeta stream3 = lp.addStream("OiOintermediateToOutput2", intermediateOperator2.output, outputOperator.input2);
StramLocalCluster slc;
/*
* The first test makes sure that when they are not ThreadLocal they use different threads
*/
ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
ThreadIdValidatingOutputOperator.threadList.clear();
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("nonOIO: Number of threads", 2, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
Assert.assertFalse("nonOIO: Thread Ids of input operator and intermediate operator1",
ThreadIdValidatingInputOperator.threadId == ThreadIdValidatingGenericIntermediateOperator.threadList.get(0));
Assert.assertFalse("nonOIO: Thread Ids of input operator and intermediate operator2",
ThreadIdValidatingInputOperator.threadId == ThreadIdValidatingGenericIntermediateOperator.threadList.get(1));
Assert.assertNotEquals("nonOIO: Thread Ids of two intermediate operators", ThreadIdValidatingGenericIntermediateOperator.threadList.get(0), ThreadIdValidatingGenericIntermediateOperator.threadList.get(1));
Assert.assertNotEquals("nonOIO: Thread Ids of input and output operators", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
/*
* This test makes sure that since all operators in diamond are ThreadLocal, they indeed share a thread
*/
ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
ThreadIdValidatingOutputOperator.threadList.clear();
stream1.setLocality(Locality.THREAD_LOCAL);
stream2.setLocality(Locality.THREAD_LOCAL);
stream3.setLocality(Locality.THREAD_LOCAL);
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("OIO: Number of threads", 2, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
Assert.assertEquals("OIO: Thread Ids of input operator and intermediate operator1",
ThreadIdValidatingInputOperator.threadId, (long)ThreadIdValidatingGenericIntermediateOperator.threadList.get(0));
Assert.assertEquals("OIO: Thread Ids of input operator and intermediate operator2",
ThreadIdValidatingInputOperator.threadId, (long)ThreadIdValidatingGenericIntermediateOperator.threadList.get(1));
Assert.assertEquals("OIO: Thread Ids of two intermediate operators", ThreadIdValidatingGenericIntermediateOperator.threadList.get(0), ThreadIdValidatingGenericIntermediateOperator.threadList.get(1));
Assert.assertEquals("OIO: Thread Ids of input and output operators", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
}
@Test
public void validateOiOiOTreeImplementation() throws Exception
{
LogicalPlan lp = new LogicalPlan();
lp.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
ThreadIdValidatingInputOperator inputOperator1 = lp.addOperator("inputOperator1", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperatorfromInputOper1 = lp.addOperator("intermediateOperatorfromInputOper1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperatorfromInterOper11 = lp.addOperator("intermediateOperatorfromInterOper11", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperatorfromInterOper12 = lp.addOperator("intermediateOperatorfromInterOper12", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingOutputOperator outputOperatorFromInputOper = lp.addOperator("outputOperatorFromInputOper", new ThreadIdValidatingOutputOperator());
ThreadIdValidatingOutputOperator outputOperatorFromInterOper11 = lp.addOperator("outputOperatorFromInterOper11", new ThreadIdValidatingOutputOperator());
ThreadIdValidatingOutputOperator outputOperatorFromInterOper21 = lp.addOperator("outputOperatorFromInterOper21", new ThreadIdValidatingOutputOperator());
ThreadIdValidatingOutputOperator outputOperatorFromInterOper22 = lp.addOperator("outputOperatorFromInterOper22", new ThreadIdValidatingOutputOperator());
StreamMeta stream1 = lp.addStream("OiO1", inputOperator1.output, outputOperatorFromInputOper.input, intermediateOperatorfromInputOper1.input);
StreamMeta stream2 = lp.addStream("OiO2", intermediateOperatorfromInputOper1.output, intermediateOperatorfromInterOper11.input, intermediateOperatorfromInterOper12.input);
StreamMeta stream3 = lp.addStream("OiO3", intermediateOperatorfromInterOper11.output, outputOperatorFromInterOper11.input);
lp.addStream("nonOiO1", intermediateOperatorfromInterOper12.output, outputOperatorFromInterOper21.input, outputOperatorFromInterOper22.input);
StramLocalCluster slc;
/*
* This test makes sure that since no operators in dag tree are ThreadLocal, they dont share threads
*/
ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
ThreadIdValidatingOutputOperator.threadList.clear();
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 3, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
Assert.assertEquals("nonOIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size());
Assert.assertEquals("nonOIO: Number of unique threads ThreadIdValidatingOutputOperator", 4, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
Assert.assertFalse("nonOIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
/*
* This test makes sure that since some operators in the dag tree are ThreadLocal, they indeed share a thread
*/
ThreadIdValidatingGenericIntermediateOperator.threadList.clear();
ThreadIdValidatingOutputOperator.threadList.clear();
stream1.setLocality(Locality.THREAD_LOCAL);
stream2.setLocality(Locality.THREAD_LOCAL);
stream3.setLocality(Locality.THREAD_LOCAL);
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("OIO: Number of threads ThreadIdValidatingGenericIntermediateOperator", 3, ThreadIdValidatingGenericIntermediateOperator.threadList.size());
Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingGenericIntermediateOperator", 1, (new HashSet<>(ThreadIdValidatingGenericIntermediateOperator.threadList)).size());
Assert.assertEquals("OIO: Number of threads ThreadIdValidatingOutputOperator", 4, ThreadIdValidatingOutputOperator.threadList.size());
Assert.assertEquals("OIO: Number of unique threads ThreadIdValidatingOutputOperator", 3, (new HashSet<>(ThreadIdValidatingOutputOperator.threadList)).size());
Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingOutputOperator", ThreadIdValidatingOutputOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
Assert.assertTrue("OIO:: inputOperator1 : ThreadIdValidatingGenericIntermediateOperator", ThreadIdValidatingGenericIntermediateOperator.threadList.contains(ThreadIdValidatingInputOperator.threadId));
}
@Test
public void validateOiOTwoPortBetweenOperatorsImplementation() throws Exception
{
LogicalPlan lp = new LogicalPlan();
lp.setAttribute(com.datatorrent.api.Context.OperatorContext.STORAGE_AGENT, new MemoryStorageAgent());
ThreadIdValidatingInputOperator inputOperator = lp.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts intermediateOperator = lp.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = lp.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
StreamMeta stream1 = lp.addStream("OiOinput", inputOperator.output, intermediateOperator.input);
StreamMeta stream2 = lp.addStream("OiOintermediateOutput1", intermediateOperator.output, outputOperator.input);
StreamMeta stream3 = lp.addStream("OiOintermediateOutput2", intermediateOperator.output2, outputOperator.input2);
StramLocalCluster slc;
/*
* The first test makes sure that when they are not ThreadLocal they use different threads
*/
ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadList.clear();
ThreadIdValidatingOutputOperator.threadList.clear();
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("nonOIO: Number of threads", 1, ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadList.size());
Assert.assertNotEquals("nonOIO: Thread Ids of input operator and intermediate operator",
ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadId);
Assert.assertNotEquals("nonOIO: Thread Ids of intermediate and output operators", ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
Assert.assertNotEquals("nonOIO: Thread Ids of input and output operators", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
/*
* This test makes sure that since all streams between two operators are thread local, they indeed share a thread
*/
ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadList.clear();
ThreadIdValidatingOutputOperator.threadList.clear();
stream2.setLocality(Locality.THREAD_LOCAL);
stream3.setLocality(Locality.THREAD_LOCAL);
lp.validate();
slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("OIO: Number of threads", 1, ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadList.size());
Assert.assertNotEquals("OIO: Thread Ids of input operator and intermediate operator",
ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadId);
Assert.assertEquals("OIO: Thread Ids of intermediate and output operators", ThreadIdValidatingGenericIntermediateOperatorWithTwoOutputPorts.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
Assert.assertNotEquals("OIO: Thread Ids of input and output operators", ThreadIdValidatingInputOperator.threadId, ThreadIdValidatingGenericOperatorWithTwoInputPorts.threadId);
}
private static final Logger logger = LoggerFactory.getLogger(OiOStreamTest.class);
}