blob: 64298def41410353edbcbcc93d763051bd739f2d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.engine;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator.ProcessingMode;
import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import static java.lang.Thread.sleep;
/**
*
*/
public class AtMostOnceTest extends ProcessingModeTests
{
public AtMostOnceTest()
{
super(ProcessingMode.AT_MOST_ONCE);
}
@Test
@Override
public void testLinearInputOperatorRecovery() throws Exception
{
super.testLinearInputOperatorRecovery();
Assert.assertTrue("Generated Outputs", maxTuples <= CollectorOperator.collection.size());
Assert.assertTrue("No Duplicates", CollectorOperator.duplicates.isEmpty());
}
@Test
@Override
public void testLinearOperatorRecovery() throws Exception
{
super.testLinearOperatorRecovery();
Assert.assertTrue("Generated Outputs", maxTuples >= CollectorOperator.collection.size());
Assert.assertTrue("No Duplicates", CollectorOperator.duplicates.isEmpty());
}
@Test
@Override
public void testLinearInlineOperatorsRecovery() throws Exception
{
super.testLinearInlineOperatorsRecovery();
Assert.assertTrue("Generated Outputs", maxTuples >= CollectorOperator.collection.size());
Assert.assertTrue("No Duplicates", CollectorOperator.duplicates.isEmpty());
}
@Test
@SuppressWarnings("SleepWhileInLoop")
@Override
public void testNonLinearOperatorRecovery() throws InterruptedException
{
final HashSet<Object> collection = new HashSet<>();
com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap map = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
map.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 0);
map.put(OperatorContext.PROCESSING_MODE, processingMode);
final GenericNode node = new GenericNode(new MultiInputOperator(),
new com.datatorrent.stram.engine.OperatorContext(1, "operator", map, null));
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("input1", 1024);
AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("input1", 1024);
node.connectInputPort("input1", reservoir1);
node.connectInputPort("input2", reservoir2);
node.connectOutputPort("output", new Sink<Object>()
{
@Override
public void put(Object t)
{
if (collection.contains(t)) {
throw new RuntimeException("Duplicate Found!");
}
collection.add(t);
}
@Override
public int getCount(boolean bln)
{
return 0;
}
});
final AtomicBoolean active = new AtomicBoolean(false);
Thread thread = new Thread()
{
@Override
public void run()
{
active.set(true);
node.activate();
node.run();
node.deactivate();
}
};
thread.start();
for (int i = 0; i < 100 && !active.get(); i++) {
sleep(5);
}
reservoir1.add(new Tuple(MessageType.BEGIN_WINDOW, 1));
reservoir1.add(1);
reservoir2.add(new Tuple(MessageType.BEGIN_WINDOW, 1));
reservoir1.add(new EndWindowTuple(1));
reservoir2.add(1);
reservoir2.add(new EndWindowTuple(1));
for (int i = 0; i < 100 && collection.size() < 4; i++) {
sleep(5);
}
reservoir1.add(new Tuple(MessageType.BEGIN_WINDOW, 2));
reservoir1.add(2);
reservoir1.add(new EndWindowTuple(2));
for (int i = 0; i < 100 && collection.size() < 6; i++) {
sleep(5);
}
reservoir2.add(new Tuple(MessageType.BEGIN_WINDOW, 4));
reservoir2.add(4);
reservoir2.add(new EndWindowTuple((4)));
for (int i = 0; i < 100 && collection.size() < 9; i++) {
sleep(5);
}
reservoir1.add(new Tuple(MessageType.BEGIN_WINDOW, 3));
reservoir1.add(3);
reservoir1.add(new EndWindowTuple(3));
sleep(500);
reservoir1.add(new Tuple(MessageType.BEGIN_WINDOW, 5));
reservoir1.add(5);
reservoir2.add(new Tuple(MessageType.BEGIN_WINDOW, 5));
reservoir1.add(new EndWindowTuple(5));
reservoir2.add(5);
reservoir2.add(new EndWindowTuple(5));
for (int i = 0; i < 100 && collection.size() < 14; i++) {
sleep(5);
}
thread.interrupt();
thread.join();
/* lets make sure that we have all the tuples and nothing more */
for (Object o : collection) {
if (o instanceof Tuple) {
Tuple t = (Tuple)o;
long windowId = t.getWindowId();
Assert.assertTrue("Valid Window Id", windowId == 1 || windowId == 2 || windowId == 4 || windowId == 5);
Assert.assertTrue("Valid Tuple Type", t.getType() == MessageType.BEGIN_WINDOW || t.getType() == MessageType.END_WINDOW || t.getType() == MessageType.END_STREAM);
} else {
switch (((Integer)o).intValue()) {
case 101:
case 201:
case 102:
case 204:
case 105:
case 205:
break;
default:
Assert.fail("Unexpected Data Tuple: " + o);
}
}
}
}
@SuppressWarnings("unused")
private static final Logger logger = LoggerFactory.getLogger(AtMostOnceTest.class);
}