blob: 3f659166b186e2e9d428bda993dfbacbe44ac912 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.engine;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.jctools.queues.SpscArrayQueue;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.api.Sink;
import com.datatorrent.netlet.util.CircularBuffer;
import com.datatorrent.stram.tuple.Tuple;
import junitparams.JUnitParamsRunner;
import junitparams.Parameters;
import static com.datatorrent.api.Context.PortContext.QUEUE_CAPACITY;
import static com.datatorrent.api.Context.PortContext.SPIN_MILLIS;
import static com.datatorrent.bufferserver.packet.MessageType.BEGIN_WINDOW;
import static java.lang.Thread.sleep;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(JUnitParamsRunner.class)
public class AbstractReservoirTest
{
private static final Logger logger = LoggerFactory.getLogger(AbstractReservoirTest.class);
private static final String countPropertyName = "com.datatorrent.stram.engine.AbstractReservoirTest.count";
private static final String capacityPropertyName = "com.datatorrent.stram.engine.AbstractReservoirTest.capacity";
private static final int COUNT = Integer.getInteger(countPropertyName, 10000000);
private static final int CAPACITY = Integer.getInteger(capacityPropertyName, QUEUE_CAPACITY.defaultValue);
@Rule
public ExpectedException exception = ExpectedException.none();
private static AbstractReservoir newReservoir(final String reservoirClassName, final int capacity)
{
if (reservoirClassName == null) {
System.clearProperty(AbstractReservoir.reservoirClassNameProperty);
} else {
System.setProperty(AbstractReservoir.reservoirClassNameProperty, reservoirClassName);
}
final String id = reservoirClassName == null ? "DefaultReservoir" : reservoirClassName;
final AbstractReservoir reservoir = AbstractReservoir.newReservoir(id, capacity);
return reservoir;
}
private static void setSink(final AbstractReservoir reservoir, final Sink<Object> sink)
{
assertNull(reservoir.setSink(sink));
assertEquals(sink, reservoir.getSink());
}
@SuppressWarnings("unused")
private Object defaultTestParameters()
{
Object[][] defaultTestParameters = new Object[][] {
{null, NoSuchElementException.class},
{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayQueueReservoir", NoSuchElementException.class},
{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayBlockingQueueReservoir", NoSuchElementException.class},
{"com.datatorrent.stram.engine.AbstractReservoir$ArrayBlockingQueueReservoir", NoSuchElementException.class},
{"com.datatorrent.stram.engine.AbstractReservoir$CircularBufferReservoir", IllegalStateException.class}
};
for (Object[] o: defaultTestParameters) {
o[0] = newReservoir((String)o[0], 2);
final Sink<Object> sink = new Sink<Object>()
{
private int count;
@Override
public void put(Object tuple)
{
count++;
}
@Override
public int getCount(boolean reset)
{
try {
return count;
} finally {
if (reset) {
count = 0;
}
}
}
};
setSink((AbstractReservoir)o[0], sink);
}
return defaultTestParameters;
}
@SuppressWarnings("unused")
private Object performanceTestParameters()
{
Object[][] performanceTestParameters = new Object[][] {
{null, 2500},
{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayQueueReservoir", 10000},
{"com.datatorrent.stram.engine.AbstractReservoir$SpscArrayBlockingQueueReservoir", 2500},
{"com.datatorrent.stram.engine.AbstractReservoir$ArrayBlockingQueueReservoir", 10000},
{"com.datatorrent.stram.engine.AbstractReservoir$CircularBufferReservoir", 100000}
};
for (Object[] o : performanceTestParameters) {
o[0] = newReservoir((String)o[0], CAPACITY);
final Sink<Object> sink = new Sink<Object>()
{
private int count = 0;
@Override
public void put(Object tuple)
{
if (++count == COUNT) {
throw new RuntimeException();
}
}
@Override
public int getCount(boolean reset)
{
return count;
}
};
setSink((AbstractReservoir)o[0], sink);
}
return performanceTestParameters;
}
@Test
@Parameters(method = "defaultTestParameters")
public void testEmpty(final AbstractReservoir reservoir, final Class<? extends Throwable> type)
{
assertTrue(reservoir.isEmpty());
assertEquals(0, reservoir.size());
assertEquals(0, reservoir.size(true));
assertEquals(0, reservoir.size(false));
assertNull(reservoir.sweep());
exception.expect(type);
reservoir.remove();
}
@Test
@Parameters(method = "defaultTestParameters")
public void testAddAndSweepObject(final AbstractReservoir reservoir, final Class<? extends Throwable> type)
{
final Object o = new Integer(0);
assertTrue(reservoir.add(o));
assertFalse(reservoir.isEmpty());
assertEquals(1, reservoir.size());
assertEquals(1, reservoir.size(false));
assertEquals(0, reservoir.getCount(false));
assertEquals(o, reservoir.peek());
assertNull(reservoir.sweep());
assertEquals(1, reservoir.getCount(false));
assertEquals(1, reservoir.getSink().getCount(false));
assertTrue(reservoir.isEmpty());
assertEquals(0, reservoir.size());
assertEquals(0, reservoir.size(false));
exception.expect(type);
reservoir.remove();
}
@Test
@Parameters(method = "defaultTestParameters")
public void testAddAndSweepTuple(final AbstractReservoir reservoir, final Class<? extends Throwable> type)
{
final Tuple t = new Tuple(BEGIN_WINDOW, 0L);
assertTrue(reservoir.add(t));
assertFalse(reservoir.isEmpty());
assertEquals(1, reservoir.size());
assertEquals(1, reservoir.size(false));
assertEquals(t, reservoir.peek());
assertEquals(t, reservoir.sweep());
assertEquals(t, reservoir.sweep());
assertEquals(0, reservoir.getCount(false));
assertEquals(0, reservoir.getSink().getCount(false));
assertFalse(reservoir.isEmpty());
assertEquals(t, reservoir.remove());
assertNull(reservoir.peek());
assertNull(reservoir.poll());
assertNull(reservoir.sweep());
exception.expect(type);
reservoir.remove();
}
@Test
@Parameters(method = "defaultTestParameters")
public void testFullReservoir(final AbstractReservoir reservoir, final Class<? extends Throwable> type)
{
int capacity = reservoir.remainingCapacity();
assertTrue(capacity > 0);
final Object o = new Integer(0);
for (int i = 0; i < capacity; i++) {
assertTrue(reservoir.offer(o));
}
assertFalse(reservoir.offer(o));
exception.expect(IllegalStateException.class);
reservoir.add(o);
}
@Test
@Ignore
@Parameters(method = "performanceTestParameters")
public void performanceTest(final AbstractReservoir reservoir, final long expectedTime)
{
final long start = System.currentTimeMillis();
final int maxSleepTime = SPIN_MILLIS.defaultValue;
final Thread t = new Thread(
new Runnable()
{
@Override
public void run()
{
try {
int sleepMillis;
while (true) {
sleepMillis = 0;
reservoir.sweep();
while (reservoir.isEmpty()) {
sleep(Math.min(maxSleepTime, sleepMillis++));
}
}
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
} catch (RuntimeException e) {
assertEquals(COUNT, reservoir.getSink().getCount(false));
}
}
}
);
t.start();
final Object o = new Byte[128];
try {
for (int i = 0; i < COUNT; i++) {
reservoir.put(o);
}
t.join();
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
long time = System.currentTimeMillis() - start;
logger.debug("{}: time {}", reservoir.getId(), time);
assertTrue(reservoir.getId() + ": expected to complete within " + expectedTime + " millis. Actual time "
+ time + " millis", expectedTime > time);
}
@Test
@Ignore
public void testBlockingQueuePerformance()
{
final ArrayBlockingQueue<Object> blockingQueue = new ArrayBlockingQueue<>(CAPACITY);
final long start = System.currentTimeMillis();
final Thread t = new Thread(
new Runnable()
{
@Override
public void run()
{
try {
for (int i = 0; i < COUNT; i++) {
blockingQueue.take();
}
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
}
}
);
t.start();
final Object o = new Byte[128];
try {
for (int i = 0; i < COUNT; i++) {
blockingQueue.put(o);
}
t.join();
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
logger.debug("Time {}", System.currentTimeMillis() - start);
}
@Test
@Ignore
public void testSpscQueuePerformance()
{
final SpscArrayQueue<Object> spscArrayQueue = new SpscArrayQueue<>(CAPACITY);
final long start = System.currentTimeMillis();
final Thread t = new Thread(
new Runnable()
{
@Override
public void run()
{
try {
int sleepMillis;
for (int i = 0; i < COUNT; i++) {
sleepMillis = 0;
while (spscArrayQueue.poll() == null) {
sleep(sleepMillis);
sleepMillis = Math.min(10, sleepMillis + 1);
}
}
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
}
}
);
t.start();
final Object o = new Byte[128];
int sleepMillis;
try {
for (int i = 0; i < COUNT; i++) {
sleepMillis = 0;
while (!spscArrayQueue.offer(o)) {
sleep(sleepMillis);
sleepMillis = Math.min(10, sleepMillis + 1);
}
}
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
logger.debug("Time {}", System.currentTimeMillis() - start);
}
@Test
@Ignore
public void testSpscBlockingQueuePerformance()
{
final SpscArrayQueue<Object> spscArrayQueue = new SpscArrayQueue<>(CAPACITY);
final ReentrantLock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final long start = System.currentTimeMillis();
final Thread t = new Thread(
new Runnable()
{
@Override
public void run()
{
try {
int sleepMillis;
for (int i = 0; i < COUNT; i++) {
sleepMillis = 0;
while (spscArrayQueue.poll() == null) {
sleep(sleepMillis);
sleepMillis = Math.min(10, sleepMillis + 1);
}
lock.lock();
notFull.signal();
lock.unlock();
}
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
}
}
);
t.start();
final Object o = new Byte[128];
try {
for (int i = 0; i < COUNT; i++) {
if (!spscArrayQueue.offer(o)) {
lock.lockInterruptibly();
while (!spscArrayQueue.offer(o)) {
notFull.await();
}
lock.unlock();
}
}
t.join();
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
logger.debug("Time {}", System.currentTimeMillis() - start);
}
@Test
@Ignore
public void testCircularBufferPerformance()
{
final CircularBuffer<Object> circularBuffer = new CircularBuffer<>(CAPACITY);
final long start = System.currentTimeMillis();
final Thread t = new Thread(
new Runnable()
{
@Override
public void run()
{
try {
int sleepMillis;
for (int i = 0; i < COUNT; i++) {
sleepMillis = 0;
while (circularBuffer.poll() == null) {
sleep(sleepMillis);
sleepMillis = Math.min(10, sleepMillis + 1);
}
}
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
}
}
);
t.start();
final Object o = new Byte[128];
try {
for (int i = 0; i < COUNT; i++) {
circularBuffer.put(o);
}
t.join();
} catch (InterruptedException e) {
logger.error("Interrupted", e);
throw new RuntimeException(e);
}
logger.debug("Time {}", System.currentTimeMillis() - start);
}
}