blob: e8bbf7b14ad63424646f4dc4e8760ae9e9702cc9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({MiscTests.class, SmallTests.class})
public class TestBoundedPriorityBlockingQueue {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBoundedPriorityBlockingQueue.class);
private final static int CAPACITY = 16;
static class TestObject {
private final int priority;
private final int seqId;
public TestObject(final int priority, final int seqId) {
this.priority = priority;
this.seqId = seqId;
}
public int getSeqId() {
return this.seqId;
}
public int getPriority() {
return this.priority;
}
}
static class TestObjectComparator implements Comparator<TestObject> {
public TestObjectComparator() {}
@Override
public int compare(TestObject a, TestObject b) {
return a.getPriority() - b.getPriority();
}
}
private BoundedPriorityBlockingQueue<TestObject> queue;
@Before
public void setUp() throws Exception {
this.queue = new BoundedPriorityBlockingQueue<>(CAPACITY, new TestObjectComparator());
}
@After
public void tearDown() throws Exception {
}
@Test
public void tesAppend() throws Exception {
// Push
for (int i = 1; i <= CAPACITY; ++i) {
assertTrue(queue.offer(new TestObject(i, i)));
assertEquals(i, queue.size());
assertEquals(CAPACITY - i, queue.remainingCapacity());
}
assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
// Pop
for (int i = 1; i <= CAPACITY; ++i) {
TestObject obj = queue.poll();
assertEquals(i, obj.getSeqId());
assertEquals(CAPACITY - i, queue.size());
assertEquals(i, queue.remainingCapacity());
}
assertEquals(null, queue.poll());
}
@Test
public void tesAppendSamePriority() throws Exception {
// Push
for (int i = 1; i <= CAPACITY; ++i) {
assertTrue(queue.offer(new TestObject(0, i)));
assertEquals(i, queue.size());
assertEquals(CAPACITY - i, queue.remainingCapacity());
}
assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
// Pop
for (int i = 1; i <= CAPACITY; ++i) {
TestObject obj = queue.poll();
assertEquals(i, obj.getSeqId());
assertEquals(CAPACITY - i, queue.size());
assertEquals(i, queue.remainingCapacity());
}
assertEquals(null, queue.poll());
}
@Test
public void testPrepend() throws Exception {
// Push
for (int i = 1; i <= CAPACITY; ++i) {
assertTrue(queue.offer(new TestObject(CAPACITY - i, i)));
assertEquals(i, queue.size());
assertEquals(CAPACITY - i, queue.remainingCapacity());
}
// Pop
for (int i = 1; i <= CAPACITY; ++i) {
TestObject obj = queue.poll();
assertEquals(CAPACITY - (i - 1), obj.getSeqId());
assertEquals(CAPACITY - i, queue.size());
assertEquals(i, queue.remainingCapacity());
}
assertEquals(null, queue.poll());
}
@Test
public void testInsert() throws Exception {
// Push
for (int i = 1; i <= CAPACITY; i += 2) {
assertTrue(queue.offer(new TestObject(i, i)));
assertEquals((1 + i) / 2, queue.size());
}
for (int i = 2; i <= CAPACITY; i += 2) {
assertTrue(queue.offer(new TestObject(i, i)));
assertEquals(CAPACITY / 2 + (i / 2), queue.size());
}
assertFalse(queue.offer(new TestObject(0, -1), 5, TimeUnit.MILLISECONDS));
// Pop
for (int i = 1; i <= CAPACITY; ++i) {
TestObject obj = queue.poll();
assertEquals(i, obj.getSeqId());
assertEquals(CAPACITY - i, queue.size());
assertEquals(i, queue.remainingCapacity());
}
assertEquals(null, queue.poll());
}
@Test
public void testFifoSamePriority() throws Exception {
assertTrue(CAPACITY >= 6);
for (int i = 0; i < 6; ++i) {
assertTrue(queue.offer(new TestObject((1 + (i % 2)) * 10, i)));
}
for (int i = 0; i < 6; i += 2) {
TestObject obj = queue.poll();
assertEquals(10, obj.getPriority());
assertEquals(i, obj.getSeqId());
}
for (int i = 1; i < 6; i += 2) {
TestObject obj = queue.poll();
assertEquals(20, obj.getPriority());
assertEquals(i, obj.getSeqId());
}
assertEquals(null, queue.poll());
}
@Test
public void testPoll() {
assertNull(queue.poll());
PriorityQueue<TestObject> testList = new PriorityQueue<>(CAPACITY, new TestObjectComparator());
for (int i = 0; i < CAPACITY; ++i) {
TestObject obj = new TestObject(i, i);
testList.add(obj);
queue.offer(obj);
}
for (int i = 0; i < CAPACITY; ++i) {
assertEquals(testList.poll(), queue.poll());
}
assertNull(null, queue.poll());
}
@Test
public void testPollInExecutor() throws InterruptedException {
final TestObject testObj = new TestObject(0, 0);
final CyclicBarrier threadsStarted = new CyclicBarrier(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new Runnable() {
@Override
public void run() {
try {
assertNull(queue.poll(1000, TimeUnit.MILLISECONDS));
threadsStarted.await();
assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS));
assertTrue(queue.isEmpty());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
executor.execute(new Runnable() {
@Override
public void run() {
try {
threadsStarted.await();
queue.offer(testObj);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
executor.shutdown();
assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS));
}
}