blob: 66f1b84ae627ee94672fb981b868a9aced6cb7a5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.engine;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.Lists;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Operator.ProcessingMode;
import com.datatorrent.api.Sink;
import com.datatorrent.api.Stats.OperatorStats;
import com.datatorrent.api.StreamCodec;
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.packet.PayloadTuple;
import com.datatorrent.bufferserver.server.Server;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.ScheduledExecutorService;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.stram.CustomControlTupleTest;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.codec.DefaultStatefulStreamCodec;
import com.datatorrent.stram.stream.BufferServerPublisher;
import com.datatorrent.stram.stream.BufferServerSubscriber;
import com.datatorrent.stram.stream.OiOStream;
import com.datatorrent.stram.tuple.CustomControlTuple;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.EndWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
/**
*
*/
public class GenericNodeTest
{
@Rule
public FSTestWatcher testMeta = new FSTestWatcher();
public static class FSTestWatcher extends TestWatcher
{
private String dir;
public String getDir()
{
return dir;
}
@Override
protected void starting(org.junit.runner.Description description)
{
dir = "target/" + description.getClassName() + "/" + description.getMethodName();
}
@Override
protected void finished(org.junit.runner.Description description)
{
super.finished(description);
FileUtils.deleteQuietly(new File(dir));
}
}
public static class DelayAsyncFSStorageAgent extends AsyncFSStorageAgent
{
private static final long serialVersionUID = 201511301205L;
public DelayAsyncFSStorageAgent(String localBasePath, String path, Configuration conf)
{
super(localBasePath, path, conf);
}
private long delayMS = 2000L;
public DelayAsyncFSStorageAgent(String path, Configuration conf)
{
super(path, conf);
}
@Override
public void save(final Object object, final int operatorId, final long windowId) throws IOException
{
//Do nothing
}
@Override
public void copyToHDFS(int operatorId, long windowId) throws IOException
{
try {
Thread.sleep(delayMS);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
/**
* @return the delayMS
*/
public long getDelayMS()
{
return delayMS;
}
/**
* @param delayMS the delayMS to set
*/
public void setDelayMS(long delayMS)
{
this.delayMS = delayMS;
}
}
public static class TestStatsOperatorContext extends OperatorContext
{
private static final long serialVersionUID = 201511301206L;
public volatile List<Checkpoint> checkpoints = Lists.newArrayList();
public TestStatsOperatorContext(int id, String name, AttributeMap attributes, Context parentContext)
{
super(id, name, attributes, parentContext);
}
@Override
public void report(OperatorStats stats, long windowId)
{
super.report(stats, windowId);
if (stats.checkpoint != null) {
checkpoints.add((Checkpoint)stats.checkpoint);
}
}
}
public static class GenericOperator implements Operator
{
Context.OperatorContext context;
long beginWindowId;
long endWindowId;
public final transient DefaultInputPort<Object> ip1 = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
op.emit(tuple);
}
};
@InputPortFieldAnnotation( optional = true)
public final transient DefaultInputPort<Object> ip2 = new DefaultInputPort<Object>()
{
@Override
public void process(Object tuple)
{
op.emit(tuple);
}
};
@OutputPortFieldAnnotation( optional = true)
DefaultOutputPort<Object> op = new DefaultOutputPort<>();
@Override
public void beginWindow(long windowId)
{
beginWindowId = windowId;
}
@Override
public void endWindow()
{
endWindowId = beginWindowId;
}
@Override
public void setup(Context.OperatorContext context)
{
this.context = context;
}
@Override
public void teardown()
{
}
}
public static class CheckpointDistanceOperator extends GenericOperator
{
List<Integer> distances = new ArrayList<>();
int numWindows = 0;
int maxWindows = 0;
@Override
public void beginWindow(long windowId)
{
super.beginWindow(windowId);
if (numWindows++ < maxWindows) {
distances.add(context.getWindowsFromCheckpoint());
}
}
}
@Test
@SuppressWarnings("SleepWhileInLoop")
public void testSynchingLogic() throws InterruptedException
{
long sleeptime = 25L;
final ArrayList<Object> list = new ArrayList<>();
GenericOperator go = new GenericOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
new DefaultAttributeMap(), null));
gn.setId(1);
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
Sink<Object> output = new Sink<Object>()
{
@Override
public void put(Object tuple)
{
list.add(tuple);
}
@Override
public int getCount(boolean reset)
{
return 0;
}
};
gn.connectInputPort("ip1", reservoir1);
gn.connectInputPort("ip2", reservoir2);
gn.connectOutputPort("op", output);
gn.firstWindowMillis = 0;
gn.windowWidthMillis = 100;
final AtomicBoolean ab = new AtomicBoolean(false);
Thread t = new Thread()
{
@Override
public void run()
{
ab.set(true);
gn.activate();
gn.run();
gn.deactivate();
}
};
t.start();
do {
Thread.sleep(sleeptime);
} while (ab.get() == false);
Tuple beginWindow1 = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
reservoir1.add(beginWindow1);
Thread.sleep(sleeptime);
Assert.assertEquals(1, list.size());
reservoir2.add(beginWindow1);
Thread.sleep(sleeptime);
Assert.assertEquals(1, list.size());
Tuple endWindow1 = new EndWindowTuple(0x1L);
reservoir1.add(endWindow1);
Thread.sleep(sleeptime);
Assert.assertEquals(1, list.size());
Tuple beginWindow2 = new Tuple(MessageType.BEGIN_WINDOW, 0x2L);
reservoir1.add(beginWindow2);
Thread.sleep(sleeptime);
Assert.assertEquals(1, list.size());
reservoir2.add(endWindow1);
Thread.sleep(sleeptime);
Assert.assertEquals(3, list.size());
reservoir2.add(beginWindow2);
Thread.sleep(sleeptime);
Assert.assertEquals(3, list.size());
Tuple endWindow2 = new EndWindowTuple(0x2L);
reservoir2.add(endWindow2);
Thread.sleep(sleeptime);
Assert.assertEquals(3, list.size());
reservoir1.add(endWindow2);
Thread.sleep(sleeptime);
Assert.assertEquals(4, list.size());
EndStreamTuple est = new EndStreamTuple(0L);
reservoir1.add(est);
Thread.sleep(sleeptime);
Assert.assertEquals(4, list.size());
Tuple beginWindow3 = new Tuple(MessageType.BEGIN_WINDOW, 0x3L);
reservoir2.add(beginWindow3);
Thread.sleep(sleeptime);
Assert.assertEquals(5, list.size());
Tuple endWindow3 = new EndWindowTuple(0x3L);
reservoir2.add(endWindow3);
Thread.sleep(sleeptime);
Assert.assertEquals(6, list.size());
Assert.assertNotSame(Thread.State.TERMINATED, t.getState());
reservoir2.add(est);
Thread.sleep(sleeptime);
Assert.assertEquals(7, list.size());
Thread.sleep(sleeptime);
Assert.assertEquals(Thread.State.TERMINATED, t.getState());
}
@Test
public void testBufferServerSubscriberActivationBeforeOperator() throws InterruptedException, IOException
{
final String streamName = "streamName";
final String upstreamNodeId = "upstreamNodeId";
final String downstreamNodeId = "downStreamNodeId";
EventLoop eventloop = DefaultEventLoop.createEventLoop("StreamTestEventLoop");
((DefaultEventLoop)eventloop).start();
final Server bufferServer = new Server(eventloop, 0); // find random port
final int bufferServerPort = bufferServer.run().getPort();
final StreamCodec<Object> serde = new DefaultStatefulStreamCodec<>();
final BlockingQueue<Object> tuples = new ArrayBlockingQueue<>(10);
GenericTestOperator go = new GenericTestOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
new DefaultAttributeMap(), null));
gn.setId(1);
Sink<Object> output = new Sink<Object>()
{
@Override
public void put(Object tuple)
{
tuples.add(tuple);
}
@Override
public int getCount(boolean reset)
{
return 0;
}
};
InetSocketAddress socketAddress = new InetSocketAddress("localhost", bufferServerPort);
StreamContext issContext = new StreamContext(streamName);
issContext.setSourceId(upstreamNodeId);
issContext.setSinkId(downstreamNodeId);
issContext.setFinishedWindowId(-1);
issContext.setBufferServerAddress(socketAddress);
issContext.put(StreamContext.CODEC, serde);
issContext.put(StreamContext.EVENT_LOOP, eventloop);
StreamContext ossContext = new StreamContext(streamName);
ossContext.setSourceId(upstreamNodeId);
ossContext.setSinkId(downstreamNodeId);
ossContext.setBufferServerAddress(socketAddress);
ossContext.put(StreamContext.CODEC, serde);
ossContext.put(StreamContext.EVENT_LOOP, eventloop);
BufferServerPublisher oss = new BufferServerPublisher(upstreamNodeId, 1024);
oss.setup(ossContext);
oss.activate(ossContext);
oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x1L));
byte[] buff = PayloadTuple.getSerializedTuple(0, 1);
buff[buff.length - 1] = (byte)1;
oss.put(buff);
oss.put(new EndWindowTuple(0x1L));
oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x2L));
buff = PayloadTuple.getSerializedTuple(0, 1);
buff[buff.length - 1] = (byte)2;
oss.put(buff);
oss.put(new EndWindowTuple(0x2L));
oss.put(new Tuple(MessageType.BEGIN_WINDOW, 0x3L));
buff = PayloadTuple.getSerializedTuple(0, 1);
buff[buff.length - 1] = (byte)3;
oss.put(buff);
oss.put(new EndWindowTuple(0x3L));
oss.put(new EndStreamTuple(0L));
BufferServerSubscriber iss = new BufferServerSubscriber(downstreamNodeId, 1024);
iss.setup(issContext);
gn.connectInputPort(GenericTestOperator.IPORT1, iss.acquireReservoir("testReservoir", 10));
gn.connectOutputPort(GenericTestOperator.OPORT1, output);
SweepableReservoir tupleWait = iss.acquireReservoir("testReservoir2", 10);
iss.activate(issContext);
while (tupleWait.sweep() == null) {
Thread.sleep(100);
}
gn.firstWindowMillis = 0;
gn.windowWidthMillis = 100;
Thread t = new Thread()
{
@Override
public void run()
{
gn.activate();
gn.run();
gn.deactivate();
}
};
t.start();
t.join();
Assert.assertEquals(10, tuples.size());
List<Object> list = new ArrayList<>(tuples);
Assert.assertEquals("Payload Tuple 1", 1, ((byte[])list.get(1))[5]);
Assert.assertEquals("Payload Tuple 2", 2, ((byte[])list.get(4))[5]);
Assert.assertEquals("Payload Tuple 3", 3, ((byte[])list.get(7))[5]);
if (bufferServer != null) {
bufferServer.stop();
}
((DefaultEventLoop)eventloop).stop();
}
@Test
public void testPrematureTermination() throws InterruptedException
{
long maxSleep = 5000;
long sleeptime = 25L;
GenericOperator go = new GenericOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
new DefaultAttributeMap(), null));
gn.setId(1);
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
gn.connectInputPort("ip1", reservoir1);
gn.connectInputPort("ip2", reservoir2);
gn.connectOutputPort("op", Sink.BLACKHOLE);
gn.firstWindowMillis = 0;
gn.windowWidthMillis = 100;
final AtomicBoolean ab = new AtomicBoolean(false);
Thread t = new Thread()
{
@Override
public void run()
{
ab.set(true);
gn.activate();
gn.run();
gn.deactivate();
}
};
t.start();
long interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((ab.get() == false) && (interval < maxSleep));
int controlTupleCount = gn.controlTupleCount;
Tuple beginWindow1 = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
reservoir1.add(beginWindow1);
reservoir2.add(beginWindow1);
interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
Assert.assertTrue("Begin window called", go.endWindowId != go.beginWindowId);
controlTupleCount = gn.controlTupleCount;
Tuple endWindow1 = new EndWindowTuple(0x1L);
reservoir1.add(endWindow1);
reservoir2.add(endWindow1);
interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
Assert.assertTrue("End window called", go.endWindowId == go.beginWindowId);
controlTupleCount = gn.controlTupleCount;
Tuple beginWindow2 = new Tuple(MessageType.BEGIN_WINDOW, 0x2L);
reservoir1.add(beginWindow2);
reservoir2.add(beginWindow2);
interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
gn.shutdown();
t.join();
Assert.assertTrue("End window not called", go.endWindowId != go.beginWindowId);
}
@Test
public void testControlTuplesDeliveryGenericNode() throws InterruptedException
{
long maxSleep = 5000000;
long sleeptime = 25L;
GenericOperator go = new GenericOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
new DefaultAttributeMap(), null));
gn.setId(1);
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
gn.connectInputPort("ip1", reservoir1);
TestSink testSink = new TestSink();
gn.connectOutputPort("op", testSink);
gn.firstWindowMillis = 0;
gn.windowWidthMillis = 100;
final AtomicBoolean ab = new AtomicBoolean(false);
Thread t = new Thread()
{
@Override
public void run()
{
ab.set(true);
gn.activate();
gn.run();
gn.deactivate();
}
};
t.start();
long interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((ab.get() == false) && (interval < maxSleep));
int controlTupleCount = gn.controlTupleCount;
Tuple beginWindow = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
reservoir1.add(beginWindow);
interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
controlTupleCount = gn.controlTupleCount;
CustomControlTuple t1 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1, false));
CustomControlTuple t2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2, true));
CustomControlTuple t3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3, false));
CustomControlTuple t4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4, true));
reservoir1.add(t1);
reservoir1.add(t2);
reservoir1.add(t3);
reservoir1.add(t4);
interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3);
controlTupleCount = gn.controlTupleCount;
Tuple endWindow = new Tuple(MessageType.END_WINDOW, 0x1L);
reservoir1.add(endWindow);
interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((gn.controlTupleCount == controlTupleCount) && (interval < maxSleep));
gn.shutdown();
t.join();
Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6);
long expected = 0;
for (Object o: testSink.collectedTuples) {
if (o instanceof CustomControlTuple) {
expected++;
}
}
Assert.assertTrue("Number of Custom control tuples", expected == 4);
}
@Test
public void testControlTuplesDeliveryOiONode() throws InterruptedException
{
GenericOperator go = new GenericOperator();
final OiONode oioNode = new OiONode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
new DefaultAttributeMap(), null));
oioNode.setId(1);
OiOStream stream = new OiOStream();
SweepableReservoir reservoir = stream.getReservoir();
((OiOStream.OiOReservoir)reservoir).setControlSink((oioNode).getControlSink(reservoir));
oioNode.connectInputPort("ip1", reservoir);
Sink controlSink = oioNode.getControlSink(reservoir);
TestSink testSink = new TestSink();
oioNode.connectOutputPort("op", testSink);
oioNode.firstWindowMillis = 0;
oioNode.windowWidthMillis = 100;
oioNode.activate();
Tuple beginWindow = new Tuple(MessageType.BEGIN_WINDOW, 0x1L);
controlSink.put(beginWindow);
Assert.assertTrue("Begin window", testSink.getResultCount() == 1);
CustomControlTuple t1 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(1, false));
CustomControlTuple t2 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(2, true));
CustomControlTuple t3 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(3, false));
CustomControlTuple t4 = new CustomControlTuple(new CustomControlTupleTest.TestControlTuple(4, true));
controlSink.put(t1);
controlSink.put(t2);
controlSink.put(t3);
controlSink.put(t4);
Assert.assertTrue("Custom control tuples emitted immediately", testSink.getResultCount() == 3);
Tuple endWindow = new Tuple(MessageType.END_WINDOW, 0x1L);
controlSink.put(endWindow);
oioNode.deactivate();
oioNode.shutdown();
Assert.assertTrue("Total control tuples", testSink.getResultCount() == 6);
long expected = 0;
for (Object o: testSink.collectedTuples) {
if (o instanceof CustomControlTuple) {
expected++;
}
}
Assert.assertTrue("Number of Custom control tuples", expected == 4);
}
@Test
public void testReservoirPortMapping() throws InterruptedException
{
long maxSleep = 5000;
long sleeptime = 25L;
GenericOperator go = new GenericOperator();
final GenericNode gn = new GenericNode(go, new com.datatorrent.stram.engine.OperatorContext(0, "operator",
new DefaultAttributeMap(), null));
gn.setId(1);
AbstractReservoir reservoir1 = AbstractReservoir.newReservoir("ip1Res", 1024);
AbstractReservoir reservoir2 = AbstractReservoir.newReservoir("ip2Res", 1024);
gn.connectInputPort("ip1", reservoir1);
gn.connectInputPort("ip2", reservoir2);
gn.connectOutputPort("op", Sink.BLACKHOLE);
gn.firstWindowMillis = 0;
gn.windowWidthMillis = 100;
final AtomicBoolean ab = new AtomicBoolean(false);
Thread t = new Thread()
{
@Override
public void run()
{
ab.set(true);
gn.activate();
gn.run();
gn.deactivate();
}
};
t.start();
long interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((ab.get() == false) && (interval < maxSleep));
gn.populateReservoirInputPortMap();
gn.shutdown();
t.join();
Assert.assertTrue("Port Mapping Size", gn.reservoirPortMap.size() == 2);
Assert.assertTrue("Sink 1 is not a port", gn.reservoirPortMap.get(reservoir1) instanceof Operator.InputPort);
Assert.assertTrue("Sink 2 is not a port", gn.reservoirPortMap.get(reservoir2) instanceof Operator.InputPort);
}
@Test
public void testDoubleCheckpointAtleastOnce() throws Exception
{
NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_LEAST_ONCE, true, testMeta.getDir());
}
@Test
public void testDoubleCheckpointAtMostOnce() throws Exception
{
NodeTest.testDoubleCheckpointHandling(ProcessingMode.AT_MOST_ONCE, true, testMeta.getDir());
}
@Test
public void testDoubleCheckpointExactlyOnce() throws Exception
{
NodeTest.testDoubleCheckpointHandling(ProcessingMode.EXACTLY_ONCE, true, testMeta.getDir());
}
/**
* This tests to make sure that the race condition reported in APEX-83 is fixed.
*/
@Test
public void testCheckpointApplicationWindowCountAtleastOnce() throws Exception
{
testCheckpointApplicationWindowCount(ProcessingMode.AT_LEAST_ONCE);
}
/**
* This tests to make sure that the race condition reported in APEX-83 is fixed.
*/
@Test
public void testCheckpointApplicationWindowCountAtMostOnce() throws Exception
{
testCheckpointApplicationWindowCount(ProcessingMode.AT_MOST_ONCE);
}
private void testCheckpointApplicationWindowCount(ProcessingMode processingMode) throws Exception
{
final long timeoutMillis = 10000L;
final long sleepTime = 25L;
WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
long resetWindow = 0L;
long firstWindowMillis = 1448909287863L;
int windowWidth = 100;
windowGenerator.setResetWindow(resetWindow);
windowGenerator.setFirstWindow(firstWindowMillis);
windowGenerator.setWindowWidth(windowWidth);
windowGenerator.setCheckpointCount(1, 0);
GenericOperator go = new GenericOperator();
DefaultAttributeMap dam = new DefaultAttributeMap();
dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 5);
dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5);
dam.put(OperatorContext.PROCESSING_MODE, processingMode);
DelayAsyncFSStorageAgent storageAgent = new DelayAsyncFSStorageAgent(testMeta.getDir(), new Configuration());
storageAgent.setDelayMS(200L);
dam.put(OperatorContext.STORAGE_AGENT, storageAgent);
TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, "operator", dam, null);
final GenericNode gn = new GenericNode(go, operatorContext);
gn.setId(1);
TestSink testSink = new TestSink();
gn.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(gn.id), 1024));
gn.connectOutputPort("output", testSink);
gn.firstWindowMillis = firstWindowMillis;
gn.windowWidthMillis = windowWidth;
windowGenerator.activate(null);
Thread t = new Thread()
{
@Override
public void run()
{
gn.activate();
gn.run();
gn.deactivate();
}
};
t.start();
long startTime = System.currentTimeMillis();
long endTime = 0;
while (operatorContext.checkpoints.size() < 8 && ((endTime = System.currentTimeMillis()) - startTime) < timeoutMillis) {
Thread.sleep(sleepTime);
}
gn.shutdown();
t.join();
windowGenerator.deactivate();
Assert.assertTrue(!operatorContext.checkpoints.isEmpty());
for (int index = 0; index < operatorContext.checkpoints.size(); index++) {
if (operatorContext.checkpoints.get(index) == null) {
continue;
}
Assert.assertEquals(0, operatorContext.checkpoints.get(index).applicationWindowCount);
Assert.assertEquals(0, operatorContext.checkpoints.get(index).checkpointWindowCount);
}
}
@Test
public void testDefaultCheckPointDistance() throws InterruptedException
{
testCheckpointDistance(Context.DAGContext.CHECKPOINT_WINDOW_COUNT.defaultValue, Context.OperatorContext.CHECKPOINT_WINDOW_COUNT.defaultValue);
}
@Test
public void testDAGGreaterCheckPointDistance() throws InterruptedException
{
testCheckpointDistance(7, 5);
}
@Test
public void testOpGreaterCheckPointDistance() throws InterruptedException
{
testCheckpointDistance(3, 5);
}
private void testCheckpointDistance(int dagCheckPoint, int opCheckPoint) throws InterruptedException
{
int windowWidth = 50;
long sleeptime = 25L;
int maxWindows = 60;
// Adding some extra time for the windows to finish
long maxSleep = windowWidth * maxWindows + 5000;
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, "default");
final WindowGenerator windowGenerator = new WindowGenerator(executorService, 1024);
windowGenerator.setWindowWidth(windowWidth);
windowGenerator.setFirstWindow(executorService.getCurrentTimeMillis());
windowGenerator.setCheckpointCount(dagCheckPoint, 0);
//GenericOperator go = new GenericOperator();
CheckpointDistanceOperator go = new CheckpointDistanceOperator();
go.maxWindows = maxWindows;
List<Integer> checkpoints = new ArrayList<>();
int window = 0;
while (window < maxWindows) {
window = (int)Math.ceil((double)(window + 1) / dagCheckPoint) * dagCheckPoint;
window = (int)Math.ceil((double)window / opCheckPoint) * opCheckPoint;
checkpoints.add(window);
}
final StreamContext stcontext = new StreamContext("s1");
DefaultAttributeMap attrMap = new DefaultAttributeMap();
attrMap.put(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, dagCheckPoint);
attrMap.put(Context.OperatorContext.CHECKPOINT_WINDOW_COUNT, opCheckPoint);
final OperatorContext context = new com.datatorrent.stram.engine.OperatorContext(0, "operator", attrMap, null);
final GenericNode gn = new GenericNode(go, context);
gn.setId(1);
//DefaultReservoir reservoir1 = new DefaultReservoir("ip1Res", 1024);
//DefaultReservoir reservoir2 = new DefaultReservoir("ip2Res", 1024);
//gn.connectInputPort("ip1", reservoir1);
//gn.connectInputPort("ip2", reservoir2);
gn.connectInputPort("ip1", windowGenerator.acquireReservoir("ip1", 1024));
gn.connectInputPort("ip2", windowGenerator.acquireReservoir("ip2", 1024));
gn.connectOutputPort("op", Sink.BLACKHOLE);
final AtomicBoolean ab = new AtomicBoolean(false);
Thread t = new Thread()
{
@Override
public void run()
{
gn.setup(context);
windowGenerator.activate(stcontext);
gn.activate();
ab.set(true);
gn.run();
windowGenerator.deactivate();
gn.deactivate();
gn.teardown();
}
};
t.start();
long interval = 0;
do {
Thread.sleep(sleeptime);
interval += sleeptime;
} while ((go.numWindows < maxWindows) && (interval < maxSleep));
Assert.assertEquals("Number distances", maxWindows, go.numWindows);
int chkindex = 0;
int nextCheckpoint = checkpoints.get(chkindex++);
for (int i = 0; i < maxWindows; ++i) {
if ((i + 1) > nextCheckpoint) {
nextCheckpoint = checkpoints.get(chkindex++);
}
Assert.assertEquals("Windows from checkpoint for " + i, nextCheckpoint - i, (int)go.distances.get(i));
}
gn.shutdown();
t.join();
}
private static final Logger LOG = LoggerFactory.getLogger(GenericNodeTest.class);
}