blob: 2c92a392e09f35ac0a752ef37bb16d8836c247ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.util.BitSet;
import java.util.NoSuchElementException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.junit.Test;
import static org.junit.Assert.*;
@SuppressWarnings("ConstantConditions")
public class ManyToOneConcurrentLinkedQueueTest
{
private final ManyToOneConcurrentLinkedQueue<Integer> queue = new ManyToOneConcurrentLinkedQueue<>();
@Test
public void testRelaxedIsEmptyWhenEmpty()
{
assertTrue(queue.relaxedIsEmpty());
}
@Test
public void testRelaxedIsEmptyWhenNotEmpty()
{
queue.offer(0);
assertFalse(queue.relaxedIsEmpty());
}
@Test
public void testSizeWhenEmpty()
{
assertEquals(0, queue.size());
}
@Test
public void testSizeWhenNotEmpty()
{
queue.offer(0);
assertEquals(1, queue.size());
for (int i = 1; i < 100; i++)
queue.offer(i);
assertEquals(100, queue.size());
}
@Test
public void testEmptyPeek()
{
assertNull(queue.peek());
}
@Test
public void testNonEmptyPeek()
{
queue.offer(0);
assertEquals(0, (int) queue.peek());
}
@Test
public void testEmptyPoll()
{
assertNull(queue.poll());
}
@Test
public void testNonEmptyPoll()
{
queue.offer(0);
assertEquals(0, (int) queue.poll());
}
@Test(expected = NoSuchElementException.class)
public void testEmptyRemove()
{
queue.remove();
}
@Test
public void testNonEmptyRemove()
{
queue.offer(0);
assertEquals(0, (int) queue.remove());
}
@Test
public void testOtherRemoveWhenEmpty()
{
assertFalse(queue.remove(0));
}
@Test
public void testOtherRemoveSingleNode()
{
queue.offer(0);
assertTrue(queue.remove(0));
assertTrue(queue.isEmpty());
}
@Test
public void testOtherRemoveWhenFirst()
{
queue.offer(0);
queue.offer(1);
queue.offer(2);
assertTrue(queue.remove(0));
assertEquals(1, (int) queue.poll());
assertEquals(2, (int) queue.poll());
assertNull(queue.poll());
}
@Test
public void testOtherRemoveFromMiddle()
{
queue.offer(0);
queue.offer(1);
queue.offer(2);
assertTrue(queue.remove(1));
assertEquals(0, (int) queue.poll());
assertEquals(2, (int) queue.poll());
assertNull(queue.poll());
}
@Test
public void testOtherRemoveFromEnd()
{
queue.offer(0);
queue.offer(1);
queue.offer(2);
assertTrue(queue.remove(2));
assertEquals(0, (int) queue.poll());
assertEquals(1, (int) queue.poll());
assertNull(queue.poll());
}
@Test
public void testOtherRemoveWhenDoesnNotExist()
{
queue.offer(0);
queue.offer(1);
queue.offer(2);
assertFalse(queue.remove(3));
assertEquals(0, (int) queue.poll());
assertEquals(1, (int) queue.poll());
assertEquals(2, (int) queue.poll());
}
@Test
public void testTransfersInCorrectOrder()
{
for (int i = 0; i < 1024; i++)
queue.offer(i);
for (int i = 0; i < 1024; i++)
assertEquals(i, (int) queue.poll());
assertTrue(queue.relaxedIsEmpty());
}
@Test
public void testTransfersInCorrectOrderWhenInterleaved()
{
for (int i = 0; i < 1024; i++)
{
queue.offer(i);
assertEquals(i, (int) queue.poll());
}
assertTrue(queue.relaxedIsEmpty());
}
@Test
public void testDrain()
{
for (int i = 0; i < 1024; i++)
queue.offer(i);
class Consumer
{
private int previous = -1;
public void accept(int i)
{
assertEquals(++previous, i);
}
}
Consumer consumer = new Consumer();
queue.drain(consumer::accept);
assertEquals(1023, consumer.previous);
assertTrue(queue.relaxedIsEmpty());
}
@Test
public void testPeekLastAndOffer()
{
assertNull(queue.relaxedPeekLastAndOffer(0));
for (int i = 1; i < 1024; i++)
assertEquals(i - 1, (int) queue.relaxedPeekLastAndOffer(i));
for (int i = 0; i < 1024; i++)
assertEquals(i, (int) queue.poll());
assertTrue(queue.relaxedIsEmpty());
}
enum Strategy
{
PEEK_AND_REMOVE, POLL
}
@Test
public void testConcurrentlyWithPoll()
{
testConcurrently(Strategy.POLL);
}
@Test
public void testConcurrentlyWithPeekAndRemove()
{
testConcurrently(Strategy.PEEK_AND_REMOVE);
}
private void testConcurrently(Strategy strategy)
{
int numThreads = 4;
int numItems = 1_000_000 * numThreads;
class Producer implements Runnable
{
private final int start, step, limit;
private Producer(int start, int step, int limit)
{
this.start = start;
this.step = step;
this.limit = limit;
}
public void run()
{
for (int i = start; i < limit; i += step)
queue.offer(i);
}
}
Executor executor = Executors.newFixedThreadPool(numThreads);
for (int i = 0; i < numThreads; i++)
executor.execute(new Producer(i, numThreads, numItems));
BitSet itemsPolled = new BitSet(numItems);
for (int i = 0; i < numItems; i++)
{
Integer item;
switch (strategy)
{
case PEEK_AND_REMOVE:
//noinspection StatementWithEmptyBody
while ((item = queue.peek()) == null) ;
assertFalse(queue.relaxedIsEmpty());
assertEquals(item, queue.remove());
itemsPolled.set(item);
break;
case POLL:
//noinspection StatementWithEmptyBody
while ((item = queue.poll()) == null) ;
itemsPolled.set(item);
break;
}
}
assertEquals(numItems, itemsPolled.cardinality());
assertTrue(queue.relaxedIsEmpty());
}
}