blob: 42a55d90ff44eb98f3d4948aec6da5c6582c9171 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;
import org.apache.ratis.util.function.TriConsumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeoutException;
public class TestDataQueue {
static <T> TriConsumer<T, TimeDuration, TimeoutException> getTimeoutHandler(boolean expctedTimeout) {
return (element, time, exception) -> {
if (!expctedTimeout) {
throw new AssertionError("Unexpected timeout to get element " + element + " in " + time, exception);
}
};
}
static void assertSizes(long expectedNumElements, long expectedNumBytes, DataQueue<?> q) {
Assertions.assertEquals(expectedNumElements, q.getNumElements());
Assertions.assertEquals(expectedNumBytes, q.getNumBytes());
}
final SizeInBytes byteLimit = SizeInBytes.valueOf(100);
final int elementLimit = 5;
final DataQueue<Long> q = new DataQueue<>(null, byteLimit, elementLimit, Long::longValue);
@Test
@Timeout(value = 1)
public void testElementLimit() {
runTestElementLimit(q);
}
static void runTestElementLimit(DataQueue<Long> q) {
assertSizes(0, 0, q);
final int elementLimit = q.getElementLimit();
long numBytes = 0;
for (long i = 0; i < elementLimit; i++) {
Assertions.assertEquals(i, q.getNumElements());
Assertions.assertEquals(numBytes, q.getNumBytes());
final boolean offered = q.offer(i);
Assertions.assertTrue(offered);
numBytes += i;
assertSizes(i+1, numBytes, q);
}
{
final boolean offered = q.offer(0L);
Assertions.assertFalse(offered);
assertSizes(elementLimit, numBytes, q);
}
{ // poll all elements
final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
Assertions.assertEquals(elementLimit, polled.size());
for (int i = 0; i < polled.size(); i++) {
Assertions.assertEquals(i, polled.get(i).intValue());
}
}
assertSizes(0, 0, q);
}
@Test
@Timeout(value = 1)
public void testByteLimit() {
runTestByteLimit(q);
}
static void runTestByteLimit(DataQueue<Long> q) {
assertSizes(0, 0, q);
final long byteLimit = q.getByteLimit();
try {
q.offer(byteLimit + 1);
Assertions.fail();
} catch (IllegalStateException ignored) {
}
final long halfBytes = byteLimit / 2;
{
final boolean offered = q.offer(halfBytes);
Assertions.assertTrue(offered);
assertSizes(1, halfBytes, q);
}
{
final boolean offered = q.offer(halfBytes + 1);
Assertions.assertFalse(offered);
assertSizes(1, halfBytes, q);
}
{
final boolean offered = q.offer(halfBytes);
Assertions.assertTrue(offered);
assertSizes(2, byteLimit, q);
}
{
final boolean offered = q.offer(1L);
Assertions.assertFalse(offered);
assertSizes(2, byteLimit, q);
}
{
final boolean offered = q.offer(0L);
Assertions.assertTrue(offered);
assertSizes(3, byteLimit, q);
}
{ // poll all elements
final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
Assertions.assertEquals(3, polled.size());
Assertions.assertEquals(halfBytes, polled.get(0).intValue());
Assertions.assertEquals(halfBytes, polled.get(1).intValue());
Assertions.assertEquals(0, polled.get(2).intValue());
}
assertSizes(0, 0, q);
}
@Test
@Timeout(value = 1)
public void testIteratorAndRemove() {
runTestIteratorAndRemove(q);
}
static void runTestIteratorAndRemove(DataQueue<Long> q) {
assertSizes(0, 0, q);
final int elementLimit = q.getElementLimit();
int numElements = 0;
long numBytes = 0;
for(long i = 0; i < elementLimit; i++) {
final boolean offered = q.offer(i);
Assertions.assertTrue(offered);
numElements++;
numBytes += i;
assertSizes(numElements, numBytes, q);
}
{ // test iterator()
final Iterator<Long> i = q.iterator();
for (long expected = 0; expected < elementLimit; expected++) {
Assertions.assertEquals(expected, i.next().longValue());
}
}
{ // test remove(..)
final List<Long> toRemoves = new ArrayList<>(elementLimit);
for (long i = 0; i < elementLimit; i++) {
toRemoves.add(i);
}
Collections.shuffle(toRemoves);
for (Long r : toRemoves) {
q.remove(r);
numElements--;
numBytes -= r;
assertSizes(numElements, numBytes, q);
}
}
assertSizes(0, 0, q);
}
@Test
@Timeout(value = 1)
public void testTimeout() {
assertSizes(0, 0, q);
long numBytes = 0;
for (long i = 0; i < elementLimit; i++) {
Assertions.assertEquals(i, q.getNumElements());
Assertions.assertEquals(numBytes, q.getNumBytes());
final boolean offered = q.offer(i);
Assertions.assertTrue(offered);
numBytes += i;
assertSizes(i+1, numBytes, q);
}
{ // poll with zero time
final List<Long> polled = q.pollList(0, (i, timeout) -> i, getTimeoutHandler(false));
Assertions.assertTrue(polled.isEmpty());
assertSizes(elementLimit, numBytes, q);
}
final int halfElements = elementLimit / 2;
{ // poll with timeout
final List<Long> polled = q.pollList(100, (i, timeout) -> {
if (i == halfElements) {
// simulate timeout
throw new TimeoutException("i=" + i);
}
return i;
}, getTimeoutHandler(true));
Assertions.assertEquals(halfElements, polled.size());
for (int i = 0; i < polled.size(); i++) {
Assertions.assertEquals(i, polled.get(i).intValue());
numBytes -= i;
}
assertSizes(elementLimit - halfElements, numBytes, q);
}
{ // poll the remaining elements
final List<Long> polled = q.pollList(100, (i, timeout) -> i, getTimeoutHandler(false));
Assertions.assertEquals(elementLimit - halfElements, polled.size());
for (int i = 0; i < polled.size(); i++) {
Assertions.assertEquals(halfElements + i, polled.get(i).intValue());
}
}
assertSizes(0, 0, q);
}
}