blob: 6472a613fa5cb62a1e8ec8692f6a49897f021427 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import jakarta.jms.JMSException;
import jakarta.jms.MessageNotReadableException;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.facade.test.JmsTestMessageFacade;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test for the priority based message Queue
*/
public class PriorityMessageQueueTest {
private PriorityMessageQueue queue;
private final IdGenerator messageId = new IdGenerator();
private long sequence;
@Before
public void setUp() {
queue = new PriorityMessageQueue();
queue.start();
}
@Test
public void testCreate() {
PriorityMessageQueue queue = new PriorityMessageQueue();
assertFalse(queue.isClosed());
assertTrue(queue.isEmpty());
assertFalse(queue.isRunning());
assertEquals(0, queue.size());
}
@Test
public void testClose() {
assertFalse(queue.isClosed());
assertTrue(queue.isRunning());
queue.close();
assertTrue(queue.isClosed());
assertFalse(queue.isRunning());
queue.close();
}
@Test
public void testDequeueNoWaitWhenQueueIsClosed() {
JmsInboundMessageDispatch message = createEnvelope();
queue.enqueueFirst(message);
assertFalse(queue.isEmpty());
queue.close();
assertSame(null, queue.dequeueNoWait());
}
@Test
public void testDequeueWhenQueueIsClosed() throws InterruptedException {
JmsInboundMessageDispatch message = createEnvelope();
queue.enqueueFirst(message);
assertFalse(queue.isEmpty());
queue.close();
assertSame(null, queue.dequeue(1L));
}
@Test
public void testDequeueWhenQueueIsNotEmpty() throws InterruptedException {
JmsInboundMessageDispatch message = createEnvelope();
queue.enqueueFirst(message);
assertFalse(queue.isEmpty());
assertSame(message, queue.dequeue(1L));
}
@Test
public void testDequeueZeroWhenQueueIsNotEmpty() throws InterruptedException {
JmsInboundMessageDispatch message = createEnvelope();
queue.enqueueFirst(message);
assertFalse(queue.isEmpty());
assertSame(message, queue.dequeue(0));
}
@Test
public void testDequeueZeroWhenQueueIsEmpty() throws InterruptedException {
assertTrue(queue.isEmpty());
assertSame(null, queue.dequeue(0));
}
@Test
public void testDequeueWhenQueueIsStopped() throws InterruptedException {
JmsInboundMessageDispatch message = createEnvelope();
queue.enqueueFirst(message);
assertFalse(queue.isEmpty());
queue.stop();
assertFalse(queue.isRunning());
assertSame(null, queue.dequeue(1L));
queue.start();
assertTrue(queue.isRunning());
assertSame(message, queue.dequeue(1L));
}
@Test
public void testDequeueNoWaitWhenQueueIsStopped() {
JmsInboundMessageDispatch message = createEnvelope();
queue.enqueueFirst(message);
assertFalse(queue.isEmpty());
queue.stop();
assertFalse(queue.isRunning());
assertSame(null, queue.dequeueNoWait());
queue.start();
assertTrue(queue.isRunning());
assertSame(message, queue.dequeueNoWait());
}
@Test
public void testEnqueueFirstOverridesPriority() {
// Add a higher priority message
JmsInboundMessageDispatch message1 = createEnvelope(7);
queue.enqueue(message1);
// Add other lower priority messages 'first'.
JmsInboundMessageDispatch message2 = createEnvelope(4);
JmsInboundMessageDispatch message3 = createEnvelope(3);
JmsInboundMessageDispatch message4 = createEnvelope(2);
queue.enqueueFirst(message2);
queue.enqueueFirst(message3);
queue.enqueueFirst(message4);
// Verify they dequeue in the reverse of the order
// they were added, and not priority order.
assertSame(message4, queue.dequeueNoWait());
assertSame(message3, queue.dequeueNoWait());
assertSame(message2, queue.dequeueNoWait());
assertSame(message1, queue.dequeueNoWait());
}
@Test
public void testClear() {
List<JmsInboundMessageDispatch> messages = createFullRangePrioritySet();
for (JmsInboundMessageDispatch envelope: messages) {
queue.enqueue(envelope);
}
assertFalse(queue.isEmpty());
queue.clear();
assertTrue(queue.isEmpty());
}
@Test
public void testRemoveFirstOnEmptyQueue() {
assertNull(queue.dequeueNoWait());
}
@Test
public void testRemoveFirst() throws JMSException {
List<JmsInboundMessageDispatch> messages = createFullRangePrioritySet();
for (JmsInboundMessageDispatch envelope: messages) {
queue.enqueue(envelope);
}
for (byte i = 9; i >= 0; --i) {
JmsInboundMessageDispatch first = queue.dequeueNoWait();
assertEquals(i, first.getMessage().getJMSPriority());
}
assertTrue(queue.isEmpty());
}
@Test
public void testRemoveFirstSparse() throws JMSException {
queue.enqueue(createEnvelope(9));
queue.enqueue(createEnvelope(4));
queue.enqueue(createEnvelope(1));
JmsInboundMessageDispatch envelope = queue.dequeueNoWait();
assertEquals(9, envelope.getMessage().getJMSPriority());
envelope = queue.dequeueNoWait();
assertEquals(4, envelope.getMessage().getJMSPriority());
envelope = queue.dequeueNoWait();
assertEquals(1, envelope.getMessage().getJMSPriority());
assertTrue(queue.isEmpty());
}
@Test(timeout = 10000)
public void testDequeueWaitsUntilMessageArrives() throws InterruptedException {
doDequeueWaitsUntilMessageArrivesTestImpl(-1);
}
@Test(timeout = 10000)
public void testDequeueTimedWaitsUntilMessageArrives() throws InterruptedException {
doDequeueWaitsUntilMessageArrivesTestImpl(5000);
}
private void doDequeueWaitsUntilMessageArrivesTestImpl(int timeout) throws InterruptedException {
final JmsInboundMessageDispatch message = createEnvelope();
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
queue.enqueueFirst(message);
}
});
runner.start();
assertSame(message, queue.dequeue(timeout));
}
@Test(timeout = 10000)
public void testDequeueWaitsUntilMessageArrivesWhenLockNotified() throws InterruptedException {
doDequeueWaitsUntilMessageArrivesWhenLockNotifiedTestImpl(-1);
}
@Test(timeout = 10000)
public void testTimedDequeueWaitsUntilMessageArrivesWhenLockNotified() throws InterruptedException {
doDequeueWaitsUntilMessageArrivesWhenLockNotifiedTestImpl(100000);
}
private void doDequeueWaitsUntilMessageArrivesWhenLockNotifiedTestImpl(int timeout) throws InterruptedException {
final JmsInboundMessageDispatch message = createEnvelope();
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
try {
singalQueue(queue);
} catch (Exception e1) {
return;
}
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
queue.enqueueFirst(message);
}
});
runner.start();
assertSame(message, queue.dequeue(timeout));
}
@Test(timeout = 10000)
public void testDequeueReturnsWhenQueueIsStopped() throws InterruptedException {
Thread runner = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
}
queue.stop();
}
});
runner.start();
assertNull(queue.dequeue(-1));
}
@Test
public void testRestartingClosedQueueHasNoEffect() throws InterruptedException {
JmsInboundMessageDispatch message = createEnvelope();
queue.enqueueFirst(message);
assertTrue(queue.isRunning());
assertFalse(queue.isClosed());
queue.stop();
assertFalse(queue.isRunning());
assertFalse(queue.isClosed());
assertNull(queue.dequeue(1L));
queue.close();
assertTrue(queue.isClosed());
assertFalse(queue.isRunning());
queue.start();
assertTrue(queue.isClosed());
assertFalse(queue.isRunning());
assertNull(queue.dequeue(1L));
}
@Test
public void testUnreadablePrioirtyIsStillEnqueued() throws JMSException {
JmsInboundMessageDispatch message = createEnvelopeWithMessageThatCannotReadPriority();
queue.enqueue(createEnvelope(9));
queue.enqueue(message);
queue.enqueue(createEnvelope(1));
JmsInboundMessageDispatch envelope = queue.dequeueNoWait();
assertEquals(9, envelope.getMessage().getJMSPriority());
envelope = queue.dequeueNoWait();
try {
envelope.getMessage().getJMSPriority();
fail("Unreadable priority message should sit at default level");
} catch (MessageNotReadableException mnre) {}
envelope = queue.dequeueNoWait();
assertEquals(1, envelope.getMessage().getJMSPriority());
assertTrue(queue.isEmpty());
}
private List<JmsInboundMessageDispatch> createFullRangePrioritySet() {
List<JmsInboundMessageDispatch> messages = new ArrayList<JmsInboundMessageDispatch>();
for (int i = 0; i < 10; ++i) {
messages.add(createEnvelope(i));
}
return messages;
}
private JmsInboundMessageDispatch createEnvelope() {
JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(sequence++);
envelope.setMessage(createMessage());
return envelope;
}
private JmsInboundMessageDispatch createEnvelopeWithMessageThatCannotReadPriority() throws JMSException {
JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(sequence++);
JmsMessage message = Mockito.mock(JmsMessage.class);
Mockito.when(message.getJMSPriority()).thenThrow(new MessageNotReadableException("Message is not readable"));
envelope.setMessage(message);
return envelope;
}
private JmsInboundMessageDispatch createEnvelope(int priority) {
JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(sequence++);
envelope.setMessage(createMessage(priority));
return envelope;
}
private JmsMessage createMessage() {
return createMessage(4);
}
private JmsMessage createMessage(int priority) {
JmsTestMessageFacade facade = new JmsTestMessageFacade();
facade.setMessageId(messageId.generateId());
facade.setPriority((byte) priority);
JmsMessage message = new JmsMessage(facade);
return message;
}
private void singalQueue(PriorityMessageQueue queue) throws Exception {
Field lock = null;
Class<?> queueType = queue.getClass();
while (queueType != null && lock == null) {
try {
lock = queueType.getDeclaredField("lock");
} catch (NoSuchFieldException error) {
queueType = queueType.getSuperclass();
if (Object.class.equals(queueType)) {
queueType = null;
}
}
}
assertNotNull("MessageQueue implementation unknown", lock);
lock.setAccessible(true);
Object lockView = lock.get(queue);
synchronized (lockView) {
lockView.notify();
}
}
}