blob: 544e95cb266d5cbe6d438be4e4d674eea2172633 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.utils.concurrent;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class WeightedQueueTest
{
private static WeightedQueue<Object> queue()
{
return new WeightedQueue<>(10);
}
private WeightedQueue<Object> queue;
@Before
public void setUp()
{
queue = queue();
}
private static WeightedQueue.Weighable weighable(int weight)
{
return () -> weight;
}
@Test(expected = UnsupportedOperationException.class)
public void testAddUnsupported() throws Exception
{
queue.add(new Object());
}
@Test(expected = UnsupportedOperationException.class)
public void testRemoveUnsupported() throws Exception
{
queue.remove();
}
@Test(expected = UnsupportedOperationException.class)
public void testElementUnsupported() throws Exception
{
queue.element();
}
@Test(expected = UnsupportedOperationException.class)
public void testPeekUnsupported() throws Exception
{
queue.peek();
}
@Test(expected = UnsupportedOperationException.class)
public void testRemainingCapacityUnsupported() throws Exception
{
queue.remainingCapacity();
}
@Test(expected = UnsupportedOperationException.class)
public void testRemoveElementUnsupported() throws Exception
{
queue.remove(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testContainsAllUnsupported() throws Exception
{
queue.containsAll(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testAddAllUnsupported() throws Exception
{
queue.addAll(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testRemoveAllUnsupported() throws Exception
{
queue.removeAll(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testRetainAllUnsupported() throws Exception
{
queue.retainAll(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testClearUnsupported() throws Exception
{
queue.clear();
}
@Test(expected = UnsupportedOperationException.class)
public void testSizeUnsupported() throws Exception
{
queue.size();
}
@Test(expected = UnsupportedOperationException.class)
public void testIsEmptyUnsupported() throws Exception
{
queue.isEmpty();
}
@Test(expected = UnsupportedOperationException.class)
public void testContainsUnsupported() throws Exception
{
queue.contains(null);
}
@Test(expected = UnsupportedOperationException.class)
public void testIteratorUnsupported() throws Exception
{
queue.iterator();
}
@Test(expected = UnsupportedOperationException.class)
public void testToArrayUnsupported() throws Exception
{
queue.toArray();
}
@Test(expected = UnsupportedOperationException.class)
public void testToArray2Unsupported() throws Exception
{
queue.toArray( new Object[] {});
}
@Test(expected = UnsupportedOperationException.class)
public void testDrainToUnsupported() throws Exception
{
queue.drainTo(new ArrayList<>());
}
@Test(expected = UnsupportedOperationException.class)
public void testTimedPollUnsupported() throws Exception
{
queue.poll(1, TimeUnit.MICROSECONDS);
}
@Test
public void testDrainToWithLimit() throws Exception
{
queue.offer(new Object());
queue.offer(new Object());
queue.offer(new Object());
ArrayList<Object> list = new ArrayList<>();
queue.drainTo(list, 1);
assertEquals(1, list.size());
list.clear();
queue.drainTo(list, 10);
assertEquals(2, list.size());
}
@Test(expected = NullPointerException.class)
public void offerNullThrows() throws Exception
{
queue.offer(null);
}
/**
* This also tests that natural weight (weighable interface) is respected
*/
@Test
public void offerFullFails() throws Exception
{
assertTrue(queue.offer(weighable(10)));
assertFalse(queue.offer(weighable(1)));
}
/**
* Validate permits aren't leaked and return values are correct
*/
@Test
public void testOfferWrappedQueueRefuses() throws Exception
{
queue = new WeightedQueue<>(10, new BadQueue(true), WeightedQueue.NATURAL_WEIGHER);
assertEquals(10, queue.availableWeight.availablePermits());
assertFalse(queue.offer(new Object()));
assertEquals(10, queue.availableWeight.availablePermits());
}
/**
* Validate permits aren't leaked and return values are correct
*/
@Test
public void testOfferWrappedQueueThrows() throws Exception
{
queue = new WeightedQueue<>(10, new BadQueue(false), WeightedQueue.NATURAL_WEIGHER);
assertEquals(10, queue.availableWeight.availablePermits());
try
{
assertFalse(queue.offer(new Object()));
fail();
}
catch (UnsupportedOperationException e)
{
//expected and desired
}
assertEquals(10, queue.availableWeight.availablePermits());
}
/**
* If not weighable and not custom weigher the default weight is 1
*/
@Test
public void defaultWeightRespected() throws Exception
{
for (int ii = 0; ii < 10; ii++)
{
assertTrue(queue.offer(new Object()));
}
assertFalse(queue.offer(new Object()));
}
@Test
public void testCustomWeigher() throws Exception
{
queue = new WeightedQueue<>(10, new LinkedBlockingQueue<>(), weighable -> 10 );
assertTrue(queue.offer(new Object()));
assertFalse(queue.offer(new Object()));
}
@Test(expected = UnsupportedOperationException.class)
public void testCustomQueue() throws Exception
{
new WeightedQueue<>(10, new BadQueue(false), WeightedQueue.NATURAL_WEIGHER).offer(new Object());
}
@Test(expected = NullPointerException.class)
public void timedOfferNullValueThrows() throws Exception
{
queue.offer(null, 1, TimeUnit.SECONDS);
}
@Test(expected = NullPointerException.class)
public void timedOfferNullTimeThrows() throws Exception
{
queue.offer(null, 1, null);
}
/**
* This is how it seems to be handled in java.util.concurrent, it's the same as just try
*/
@Test
public void timedOfferNegativeTimeIgnored() throws Exception
{
queue.offer(weighable(10));
queue.offer(new Object(), -1, TimeUnit.SECONDS);
}
/**
* This also tests that natural weight (weighable interface) is respected
*/
@Test
public void timedOfferFullFails() throws Exception
{
assertTrue(queue.offer(weighable(10), 1, TimeUnit.MICROSECONDS));
assertFalse(queue.offer(weighable(1), 1, TimeUnit.MICROSECONDS));
}
@Test
public void timedOfferEventuallySucceeds() throws Exception
{
assertTrue(queue.offer(weighable(10), 1, TimeUnit.MICROSECONDS));
Thread t = new Thread(() ->
{
try
{
queue.offer(weighable(1), 1, TimeUnit.DAYS);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
});
t.start();
Thread.sleep(100);
assertTrue(t.getState() != Thread.State.TERMINATED);
queue.poll();
t.join(60000);
assertEquals(t.getState(), Thread.State.TERMINATED);
}
/**
* Validate permits aren't leaked and return values are correct
*/
@Test
public void testTimedOfferWrappedQueueRefuses() throws Exception
{
queue = new WeightedQueue<>(10, new BadQueue(true), WeightedQueue.NATURAL_WEIGHER);
assertEquals(10, queue.availableWeight.availablePermits());
assertFalse(queue.offer(new Object(), 1, TimeUnit.MICROSECONDS));
assertEquals(10, queue.availableWeight.availablePermits());
}
/**
* Validate permits aren't leaked and return values are correct
*/
@Test
public void testTimedOfferWrappedQueueThrows() throws Exception
{
queue = new WeightedQueue<>(10, new BadQueue(false), WeightedQueue.NATURAL_WEIGHER);
assertEquals(10, queue.availableWeight.availablePermits());
try
{
assertFalse(queue.offer(new Object(), 1, TimeUnit.MICROSECONDS));
fail();
}
catch (UnsupportedOperationException e)
{
//expected and desired
}
assertEquals(10, queue.availableWeight.availablePermits());
}
@Test
public void testPoll() throws Exception
{
assertEquals(10, queue.availableWeight.availablePermits());
assertNull(queue.poll());
assertEquals(10, queue.availableWeight.availablePermits());
Object o = new Object();
assertTrue(queue.offer(o));
assertEquals(9, queue.availableWeight.availablePermits());
WeightedQueue.Weighable weighable = weighable(9);
assertTrue(queue.offer(weighable));
assertEquals(0, queue.availableWeight.availablePermits());
assertEquals(o, queue.poll());
assertEquals(1, queue.availableWeight.availablePermits());
assertEquals(weighable, queue.poll());
assertEquals(10, queue.availableWeight.availablePermits());
}
@Test(expected = NullPointerException.class)
public void testPutNullThrows() throws Exception
{
queue.put(null);
}
@Test
public void testPutFullBlocks() throws Exception
{
WeightedQueue.Weighable weighable = weighable(10);
assertEquals(10, queue.availableWeight.availablePermits());
queue.put(weighable);
assertEquals(0, queue.availableWeight.availablePermits());
Object o = new Object();
Thread t = new Thread(() -> {
try
{
queue.put(o);
} catch (InterruptedException e)
{
e.printStackTrace();
}
});
t.start();
Thread.sleep(100);
assertTrue(t.getState() != Thread.State.TERMINATED);
assertEquals(0, queue.availableWeight.availablePermits());
assertEquals(weighable, queue.poll());
assertTrue(queue.availableWeight.availablePermits() > 0);
t.join();
assertEquals(o, queue.poll());
assertEquals(10, queue.availableWeight.availablePermits());
}
@Test
public void testPutWrappedQueueThrows() throws Exception
{
queue = new WeightedQueue<>(10, new BadQueue(false), WeightedQueue.NATURAL_WEIGHER);
assertEquals(10, queue.availableWeight.availablePermits());
try
{
queue.put(new Object());
fail();
}
catch (UnsupportedOperationException e)
{
//expected and desired
}
assertEquals(10, queue.availableWeight.availablePermits());
}
@Test(expected = IllegalArgumentException.class)
public void testTryAcquireWeightIllegalWeight()
{
queue.tryAcquireWeight(weighable(-1));
}
@Test(expected = IllegalArgumentException.class)
public void testAcquireWeightIllegalWeight() throws Exception
{
queue.acquireWeight(weighable(-1), 1, TimeUnit.DAYS);
}
@Test(expected = IllegalArgumentException.class)
public void testReleaseWeightIllegalWeight()
{
queue.releaseWeight(weighable(-1));
}
@Test
public void testTake() throws Exception
{
Thread t = new Thread(() -> {
try
{
queue.take();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
});
t.start();
Thread.sleep(500);
assertTrue(t.getState() != Thread.State.TERMINATED);
assertEquals(10, queue.availableWeight.availablePermits());
queue.offer(new Object());
t.join(60 * 1000);
assertEquals(t.getState(), Thread.State.TERMINATED);
assertEquals(10, queue.availableWeight.availablePermits());
}
@Test(expected = IllegalArgumentException.class)
public void testConstructorLTZeroWeightThrows() throws Exception
{
new WeightedQueue(0);
}
@Test(expected = IllegalArgumentException.class)
public void testConstructor2LTZeroWeightThrows() throws Exception
{
new WeightedQueue(0, new LinkedBlockingQueue<>(), WeightedQueue.NATURAL_WEIGHER);
}
@Test(expected = NullPointerException.class)
public void testConstructorNullQueueThrows() throws Exception
{
new WeightedQueue(1, null, WeightedQueue.NATURAL_WEIGHER);
}
@Test(expected = NullPointerException.class)
public void testConstructorNullWeigherThrows() throws Exception
{
new WeightedQueue(1, new LinkedBlockingQueue<>(), null);
}
/**
* A blocking queue that throws or refuses on every method
*/
private static class BadQueue implements BlockingQueue<Object>
{
/**
* Refuse instead of throwing for some methods that have a boolean return value
*/
private boolean refuse = false;
private BadQueue(boolean refuse)
{
this.refuse = refuse;
}
@Override
public boolean add(Object o)
{
throw new UnsupportedOperationException();
}
@Override
public boolean offer(Object o)
{
if (refuse)
{
return false;
}
throw new UnsupportedOperationException();
}
@Override
public Object remove()
{
throw new UnsupportedOperationException();
}
@Override
public Object poll()
{
throw new UnsupportedOperationException();
}
@Override
public Object element()
{
throw new UnsupportedOperationException();
}
@Override
public Object peek()
{
throw new UnsupportedOperationException();
}
@Override
public void put(Object o) throws InterruptedException
{
throw new UnsupportedOperationException();
}
@Override
public boolean offer(Object o, long timeout, TimeUnit unit) throws InterruptedException
{
if (refuse)
{
return false;
}
throw new UnsupportedOperationException();
}
@Override
public Object take() throws InterruptedException
{
throw new UnsupportedOperationException();
}
@Override
public Object poll(long timeout, TimeUnit unit) throws InterruptedException
{
throw new UnsupportedOperationException();
}
@Override
public int remainingCapacity()
{
throw new UnsupportedOperationException();
}
@Override
public boolean remove(Object o)
{
throw new UnsupportedOperationException();
}
@Override
public boolean addAll(Collection c)
{
throw new UnsupportedOperationException();
}
@Override
public void clear()
{
throw new UnsupportedOperationException();
}
@Override
public boolean retainAll(Collection c)
{
throw new UnsupportedOperationException();
}
@Override
public boolean removeAll(Collection c)
{
throw new UnsupportedOperationException();
}
@Override
public boolean containsAll(Collection c)
{
throw new UnsupportedOperationException();
}
@Override
public int size()
{
throw new UnsupportedOperationException();
}
@Override
public boolean isEmpty()
{
throw new UnsupportedOperationException();
}
@Override
public boolean contains(Object o)
{
throw new UnsupportedOperationException();
}
@Override
public Iterator iterator()
{
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray()
{
throw new UnsupportedOperationException();
}
@Override
public Object[] toArray(Object[] a)
{
throw new UnsupportedOperationException();
}
@Override
public int drainTo(Collection c)
{
throw new UnsupportedOperationException();
}
@Override
public int drainTo(Collection c, int maxElements)
{
throw new UnsupportedOperationException();
}
};
}