blob: 85125c70d8975d699234e510d8c3d98e94c5c985 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.engine;
import java.io.File;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.support.ManualScheduledExecutorService;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
public class WindowGeneratorTest
{
@Test
public void test2ndResetWindow() throws InterruptedException
{
logger.info("Testing 2nd Reset Window");
ManualScheduledExecutorService msse = new ManualScheduledExecutorService(1);
WindowGenerator generator = new WindowGenerator(msse, (WindowGenerator.MAX_WINDOW_ID << 1) + 1024);
generator.setFirstWindow(0L);
generator.setResetWindow(0L);
generator.setWindowWidth(1);
SweepableReservoir reservoir = generator.acquireReservoir(Node.OUTPUT, (WindowGenerator.MAX_WINDOW_ID << 1) + 1024);
final AtomicBoolean loggingEnabled = new AtomicBoolean(true);
reservoir.setSink(new Sink<Object>()
{
@Override
public void put(Object payload)
{
if (loggingEnabled.get()) {
logger.debug(payload.toString());
}
}
@Override
public int getCount(boolean reset)
{
return 0;
}
});
generator.activate(null);
msse.tick(1); /* reset window and begin window */
msse.tick(1); /* end window and begin window */
loggingEnabled.set(false);
for (int i = 0; i < WindowGenerator.MAX_WINDOW_ID - 2; i++) {
msse.tick(1); /* end window and begin window */
}
loggingEnabled.set(true);
msse.tick(1); /* end window, reset window, begin window */
final AtomicInteger beginWindowCount = new AtomicInteger(0);
final AtomicInteger endWindowCount = new AtomicInteger(0);
final AtomicInteger resetWindowCount = new AtomicInteger(0);
Tuple t;
reservoir.sweep();
while ((t = reservoir.sweep()) != null) {
reservoir.remove();
switch (t.getType()) {
case BEGIN_WINDOW:
beginWindowCount.incrementAndGet();
break;
case END_WINDOW:
endWindowCount.incrementAndGet();
break;
case RESET_WINDOW:
resetWindowCount.incrementAndGet();
break;
default:
break;
}
}
Assert.assertEquals("begin windows", WindowGenerator.MAX_WINDOW_ID + 1 + 1, beginWindowCount.get());
Assert.assertEquals("end windows", WindowGenerator.MAX_WINDOW_ID + 1, endWindowCount.get());
Assert.assertEquals("reset windows", 2, resetWindowCount.get());
}
/**
* Test of resetWindow functionality of WindowGenerator.
*/
@Test
public void testResetWindow()
{
ManualScheduledExecutorService msse = new ManualScheduledExecutorService(1);
msse.setCurrentTimeMillis(0x0afebabe * 1000L);
WindowGenerator generator = new WindowGenerator(msse, WindowGenerator.MAX_WINDOW_ID + 1024);
final long currentTIme = msse.getCurrentTimeMillis();
final int windowWidth = 0x1234abcd;
generator.setFirstWindow(currentTIme);
generator.setResetWindow(currentTIme);
generator.setWindowWidth(windowWidth);
SweepableReservoir reservoir = generator.acquireReservoir(Node.OUTPUT, 1024);
reservoir.setSink(new Sink<Object>()
{
boolean firsttime = true;
@Override
public int getCount(boolean reset)
{
return 0;
}
@Override
public void put(Object payload)
{
assert (false);
if (firsttime) {
assert (payload instanceof ResetWindowTuple);
firsttime = false;
} else {
assert (payload instanceof Tuple);
}
}
});
generator.activate(null);
msse.tick(1);
Assert.assertNull(reservoir.sweep());
ResetWindowTuple rwt = (ResetWindowTuple)reservoir.sweep();
reservoir.remove();
assert (rwt.getWindowId() == 0x0afebabe00000000L);
assert (rwt.getBaseSeconds() * 1000L == currentTIme);
assert (rwt.getIntervalMillis() == windowWidth);
Tuple t = reservoir.sweep();
reservoir.remove();
assert (t.getType() == MessageType.BEGIN_WINDOW);
assert (t.getWindowId() == 0x0afebabe00000000L);
assert (reservoir.sweep() == null);
}
@Test
public void testWindowGen() throws Exception
{
final AtomicLong currentWindow = new AtomicLong();
final AtomicInteger beginWindowCount = new AtomicInteger();
final AtomicInteger endWindowCount = new AtomicInteger();
final AtomicLong windowXor = new AtomicLong();
Sink<Object> s = new Sink<Object>()
{
@Override
public int getCount(boolean reset)
{
return 0;
}
@Override
public void put(Object payload)
{
logger.debug("unexpected payload {}", payload);
}
};
ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(1, "WindowGenerator");
int windowWidth = 200;
long firstWindowMillis = stpe.getCurrentTimeMillis();
firstWindowMillis -= firstWindowMillis % 1000L;
WindowGenerator wg = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), WindowGenerator.MAX_WINDOW_ID + 1024);
wg.setResetWindow(firstWindowMillis);
wg.setFirstWindow(firstWindowMillis);
wg.setWindowWidth(windowWidth);
SweepableReservoir reservoir = wg.acquireReservoir("GeneratorTester", windowWidth);
reservoir.setSink(s);
wg.activate(null);
Thread.sleep(200);
wg.deactivate();
reservoir.sweep(); /* just transfer over all the control tuples */
Tuple t;
while ((t = reservoir.sweep()) != null) {
reservoir.remove();
long windowId = t.getWindowId();
switch (t.getType()) {
case BEGIN_WINDOW:
currentWindow.set(windowId);
beginWindowCount.incrementAndGet();
windowXor.set(windowXor.get() ^ windowId);
break;
case END_WINDOW:
endWindowCount.incrementAndGet();
windowXor.set(windowXor.get() ^ windowId);
break;
case RESET_WINDOW:
break;
default:
currentWindow.set(0);
break;
}
}
long lastWindowMillis = System.currentTimeMillis();
Assert.assertEquals("only last window open", currentWindow.get(), windowXor.get());
long expectedCnt = (lastWindowMillis - firstWindowMillis) / windowWidth;
Assert.assertTrue("Minimum begin window count", expectedCnt + 1 <= beginWindowCount.get());
Assert.assertEquals("end window count", beginWindowCount.get() - 1, endWindowCount.get());
}
static class RandomNumberGenerator implements InputOperator
{
public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<>();
@Override
public void emitTuples()
{
try {
Thread.sleep(500);
} catch (InterruptedException ex) {
logger.debug("interrupted!", ex);
}
output.emit(++count);
}
@Override
public void beginWindow(long windowId)
{
}
@Override
public void endWindow()
{
}
@Override
public void setup(OperatorContext context)
{
}
@Override
public void teardown()
{
}
int count;
}
static class MyLogger extends BaseOperator
{
public final transient DefaultInputPort<Integer> input = new DefaultInputPort<Integer>()
{
@Override
public void process(Integer tuple)
{
logger.debug("received {}", tuple);
}
};
}
@Test
public void testOutofSequenceError() throws Exception
{
logger.info("Testing Out of Sequence Error");
LogicalPlan dag = new LogicalPlan();
String workingDir = new File("target/testOutofSequenceError").getAbsolutePath();
dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
RandomNumberGenerator rng = dag.addOperator("random", new RandomNumberGenerator());
MyLogger ml = dag.addOperator("logger", new MyLogger());
dag.addStream("stream", rng.output, ml.input);
StramLocalCluster lc = new StramLocalCluster(dag);
lc.run(10000);
}
@Test
public void testWindowToTime()
{
long first = 1431714014000L;
for (int windowWidthMillis : new int[]{500, 123}) {
long time1 = WindowGenerator.getWindowMillis(6149164867354886271L, first, windowWidthMillis);
long time2 = WindowGenerator.getWindowMillis(6149164867354886272L, first, windowWidthMillis);
long window1 = WindowGenerator.getWindowId(time1, first, windowWidthMillis);
long window2 = WindowGenerator.getWindowId(time2, first, windowWidthMillis);
Assert.assertEquals("window 1", 6149164867354886271L, window1);
Assert.assertEquals("window 2", 6149164867354886272L, window2);
Assert.assertEquals("window millis difference", windowWidthMillis, time2 - time1);
}
}
@Test
public void testWindowToTimeBaseSecondRollover()
{
long first = 1431714014123L;
for (int windowWidthMillis : new int[]{500, 123}) {
long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
window1 |= WindowGenerator.MAX_WINDOW_ID;
long window2 = WindowGenerator.getNextWindowId(window1, first, windowWidthMillis);
Assert.assertTrue("base seconds should be greater during an rollover", (window2 >> 32) > (window1 >> 32));
long time1 = WindowGenerator.getWindowMillis(window1, first, windowWidthMillis);
long time2 = WindowGenerator.getWindowMillis(window2, first, windowWidthMillis);
Assert.assertEquals("max window id", WindowGenerator.MAX_WINDOW_ID, window1 & WindowGenerator.WINDOW_MASK);
Assert.assertEquals("rollover after max", 0, window2 & WindowGenerator.WINDOW_MASK);
Assert.assertEquals("window millis difference", windowWidthMillis, time2 - time1);
}
}
@Test
public void testWindowIdAhead()
{
long first = 1431714014123L;
int ahead = 678;
for (int windowWidthMillis : new int[]{500, 123}) {
long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead);
for (int i = 0; i < ahead; i++) {
window1 = WindowGenerator.getNextWindowId(window1, first, windowWidthMillis);
}
Assert.assertEquals(window2, window1);
}
}
@Test
public void testWindowIdCompare()
{
long first = 1431714014123L;
int ahead = 341;
for (int windowWidthMillis : new int[]{500, 123}) {
long window1 = WindowGenerator.getWindowId(first, first, windowWidthMillis);
long window2 = WindowGenerator.getAheadWindowId(window1, first, windowWidthMillis, ahead);
Assert.assertEquals(ahead, WindowGenerator.compareWindowId(window2, window1, windowWidthMillis));
}
}
public static final Logger logger = LoggerFactory.getLogger(WindowGeneratorTest.class);
}