blob: 956a9fc4244b567214d67a26f7f0f157e0360d75 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
import java.util.*;
import org.apache.log4j.Logger;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.security.QpidSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
abstract class SimpleAMQQueueTestBase<E extends QueueEntryImpl<E,Q,L>, Q extends SimpleAMQQueue<E,Q,L>, L extends SimpleQueueEntryList<E,Q,L>> extends QpidTestCase
{
private static final Logger _logger = Logger.getLogger(SimpleAMQQueueTestBase.class);
private Q _queue;
private VirtualHost _virtualHost;
private String _qname = "qname";
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
private MockConsumer _consumerTarget = new MockConsumer();
private QueueConsumer _consumer;
private Map<String,Object> _arguments = Collections.emptyMap();
@Override
public void setUp() throws Exception
{
super.setUp();
BrokerTestHelper.setUp();
_virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
attributes.put(Queue.NAME, _qname);
attributes.put(Queue.OWNER, _owner);
_queue = (Q) _virtualHost.createQueue(null, attributes);
_exchange = (DirectExchange) _virtualHost.getExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME);
}
@Override
public void tearDown() throws Exception
{
try
{
_queue.stop();
_virtualHost.close();
}
finally
{
BrokerTestHelper.tearDown();
super.tearDown();
}
}
public void testCreateQueue() throws Exception
{
_queue.stop();
try
{
Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
_queue = (Q) _virtualHost.createQueue(null, attributes);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
assertTrue("Exception was not about missing name",
e.getMessage().contains("name"));
}
Map<String,Object> attributes = new HashMap<String, Object>(_arguments);
attributes.put(Queue.ID, UUIDGenerator.generateRandomUUID());
attributes.put(Queue.NAME, "differentName");
_queue = (Q) _virtualHost.createQueue(null, attributes);
assertNotNull("Queue was not created", _queue);
}
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
}
public void testBinding() throws QpidSecurityException
{
_exchange.addBinding(_routingKey, _queue, Collections.EMPTY_MAP);
assertTrue("Routing key was not bound",
_exchange.isBound(_routingKey));
assertTrue("Queue was not bound to key",
_exchange.isBound(_routingKey,_queue));
assertEquals("Exchange binding count", 1,
_queue.getBindings().size());
assertEquals("Wrong exchange bound", _routingKey,
_queue.getBindings().get(0).getBindingKey());
assertEquals("Wrong exchange bound", _exchange,
_queue.getBindings().get(0).getExchange());
_exchange.removeBinding(_routingKey, _queue, Collections.EMPTY_MAP);
assertFalse("Routing key was still bound",
_exchange.isBound(_routingKey));
}
public void testRegisterConsumerThenEnqueueMessage() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
// Check adding a consumer adds it to the queue
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
_queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
_queue.enqueue(messageA, null);
try
{
Thread.sleep(2000L);
}
catch(InterruptedException e)
{
}
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull(_consumer.getQueueContext().getReleasedEntry());
// Check removing the consumer removes it's information from the queue
_consumer.close();
assertTrue("Consumer still had queue", _consumerTarget.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
assertFalse("Queue still has active consumer",
1 == _queue.getActiveConsumerCount());
ServerMessage messageB = createMessage(new Long (25));
_queue.enqueue(messageB, null);
assertNull(_consumer.getQueueContext());
}
public void testEnqueueMessageThenRegisterConsumer() throws Exception, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA, null);
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
Thread.sleep(150);
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests enqueuing two messages.
*/
public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageA, null);
_queue.enqueue(messageB, null);
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
Thread.sleep(150);
assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests that a released queue entry is resent to the subscriber. Verifies also that the
* QueueContext._releasedEntry is reset to null after the entry has been reset.
*/
public void testReleasedMessageIsResentToSubscriber() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue three messages */
_queue.enqueue(messageA, postEnqueueAction);
_queue.enqueue(messageB, postEnqueueAction);
_queue.enqueue(messageC, postEnqueueAction);
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to consumer",
3,
_consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to consumer",
4,
_consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests that a released message that becomes expired is not resent to the subscriber.
* This tests ensures that SimpleAMQQueueEntry.getNextAvailableEntry avoids expired entries.
* Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset.
*/
public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.SEES_REQUEUES,
Consumer.Option.ACQUIRES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue one message with expiration set for a short time in the future */
int messageExpirationOffset = 200;
final long expiration = System.currentTimeMillis() + messageExpirationOffset;
when(messageA.getExpiration()).thenReturn(expiration);
_queue.enqueue(messageA, postEnqueueAction);
int subFlushWaitTime = 150;
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to consumer",
1,
_consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
/* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
Thread.sleep(messageExpirationOffset - subFlushWaitTime + 10);
queueEntries.get(0).release();
Thread.sleep(subFlushWaitTime); // Work done by SubFlushRunner/QueueRunner Threads
assertTrue("Expecting the queue entry to be now expired", queueEntries.get(0).expired());
assertEquals("Total number of messages sent should not have changed",
1,
_consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests that if a client releases entries 'out of order' (the order
* used by QueueEntryImpl.compareTo) that messages are still resent
* successfully. Specifically this test ensures the {@see SimpleAMQQueue#requeue()}
* can correctly move the _releasedEntry to an earlier position in the QueueEntry list.
*/
public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue three messages */
_queue.enqueue(messageA, postEnqueueAction);
_queue.enqueue(messageB, postEnqueueAction);
_queue.enqueue(messageC, postEnqueueAction);
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to consumer",
3,
_consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
/* Now release the third and first message only, causing it to be requeued */
queueEntries.get(2).release();
queueEntries.get(0).release();
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to consumer",
5,
_consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a
* requeue resends a message to a <i>single</i> subscriber.
*/
public void testReleaseForQueueWithMultipleConsumers() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
MockConsumer target1 = new MockConsumer();
MockConsumer target2 = new MockConsumer();
QueueConsumer consumer1 = _queue.addConsumer(target1, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
QueueConsumer consumer2 = _queue.addConsumer(target2, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue two messages */
_queue.enqueue(messageA, postEnqueueAction);
_queue.enqueue(messageB, postEnqueueAction);
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to both after enqueue",
2,
target1.getMessages().size() + target2.getMessages().size());
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
Thread.sleep(150); // Work done by SubFlushRunner/QueueRunner Threads
assertEquals("Unexpected total number of messages sent to both consumers after release",
3,
target1.getMessages().size() + target2.getMessages().size());
assertNull("releasedEntry should be cleared after requeue processed",
consumer1.getQueueContext().getReleasedEntry());
assertNull("releasedEntry should be cleared after requeue processed",
consumer2.getQueueContext().getReleasedEntry());
}
public void testExclusiveConsumer() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
// Check adding an exclusive consumer adds it to the queue
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
_queue.getActiveConsumerCount());
// Check sending a message ends up with the subscriber
_queue.enqueue(messageA, null);
try
{
Thread.sleep(2000L);
}
catch (InterruptedException e)
{
}
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
MockConsumer subB = new MockConsumer();
Exception ex = null;
try
{
_queue.addConsumer(subB, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
}
catch (MessageSource.ExistingExclusiveConsumer e)
{
ex = e;
}
assertNotNull(ex);
// Check we cannot add an exclusive subscriber to a queue with an
// existing consumer
_consumer.close();
_consumer = _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES,
Consumer.Option.SEES_REQUEUES));
try
{
_consumer = _queue.addConsumer(subB, null, messageA.getClass(), "test",
EnumSet.of(Consumer.Option.EXCLUSIVE));
}
catch (MessageSource.ExistingConsumerPreventsExclusive e)
{
ex = e;
}
assertNotNull(ex);
}
public void testResend() throws Exception
{
Long id = new Long(26);
ServerMessage message = createMessage(id);
_consumer = _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
_queue.enqueue(message, new Action<MessageInstance<?,? extends Consumer>>()
{
@Override
public void performAction(final MessageInstance<?,? extends Consumer> object)
{
QueueEntryImpl entry = (QueueEntryImpl) object;
entry.setRedelivered();
_consumer.resend(entry);
}
});
}
public void testGetFirstMessageId() throws Exception
{
// Create message
Long messageId = new Long(23);
ServerMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(message, null);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
// Check message id
assertEquals("Message ID was wrong", messageId, testmsgid);
}
public void testGetFirstFiveMessageIds() throws Exception
{
for (int i = 0 ; i < 5; i++)
{
// Create message
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(message, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
// Check message id
for (int i = 0; i < 5; i++)
{
Long messageId = new Long(i);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
public void testGetLastFiveMessageIds() throws Exception
{
for (int i = 0 ; i < 10; i++)
{
// Create message
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(message, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
// Check message id
for (int i = 0; i < 5; i++)
{
Long messageId = new Long(i+5);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
public void testGetMessagesRangeOnTheQueue() throws Exception
{
for (int i = 1 ; i <= 10; i++)
{
// Create message
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(message, null);
}
// Get non-existent 0th QueueEntry & check returned list was empty
// (the position parameters in this method are indexed from 1)
List<E> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
assertTrue(entries.size() == 0);
// Check that when 'from' is 0 it is ignored and the range continues from 1
entries = _queue.getMessagesRangeOnTheQueue(0, 2);
assertTrue(entries.size() == 2);
long msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 1L);
msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 2L);
// Check that when 'from' is greater than 'to' the returned list is empty
entries = _queue.getMessagesRangeOnTheQueue(5, 4);
assertTrue(entries.size() == 0);
// Get first QueueEntry & check id
entries = _queue.getMessagesRangeOnTheQueue(1, 1);
assertTrue(entries.size() == 1);
msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 1L);
// Get 5th,6th,7th entries and check id's
entries = _queue.getMessagesRangeOnTheQueue(5, 7);
assertTrue(entries.size() == 3);
msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 5L);
msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 6L);
msgID = entries.get(2).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 7L);
// Get 10th QueueEntry & check id
entries = _queue.getMessagesRangeOnTheQueue(10, 10);
assertTrue(entries.size() == 1);
msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 10L);
// Get non-existent 11th QueueEntry & check returned set was empty
entries = _queue.getMessagesRangeOnTheQueue(11, 11);
assertTrue(entries.size() == 0);
// Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs
entries = _queue.getMessagesRangeOnTheQueue(9, 11);
assertTrue(entries.size() == 2);
msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 9L);
msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 10L);
}
/**
* processQueue() is used when asynchronously delivering messages to
* consumers which could not be delivered immediately during the
* enqueue() operation.
*
* A defect within the method would mean that delivery of these messages may
* not occur should the Runner stop before all messages have been processed.
* Such a defect was discovered when Selectors were used such that one and
* only one consumer can/will accept any given messages, but multiple
* consumers are present, and one of the earlier consumers receives
* more messages than the others.
*
* This test is to validate that the processQueue() method is able to
* correctly deliver all of the messages present for asynchronous delivery
* to consumers in such a scenario.
*/
public void testProcessQueueWithUniqueSelectors() throws Exception
{
SimpleAMQQueue testQueue = createNonAsyncDeliverQueue();
// retrieve the QueueEntryList the queue creates and insert the test
// messages, thus avoiding straight-through delivery attempts during
//enqueue() process.
QueueEntryList list = testQueue.getEntries();
assertNotNull("QueueEntryList should have been created", list);
QueueEntry msg1 = list.add(createMessage(1L));
QueueEntry msg2 = list.add(createMessage(2L));
QueueEntry msg3 = list.add(createMessage(3L));
QueueEntry msg4 = list.add(createMessage(4L));
QueueEntry msg5 = list.add(createMessage(5L));
// Create lists of the entries each consumer should be interested
// in.Bias over 50% of the messages to the first consumer so that
// the later consumers reject them and report being done before
// the first consumer as the processQueue method proceeds.
List<String> msgListSub1 = createEntriesList(msg1, msg2, msg3);
List<String> msgListSub2 = createEntriesList(msg4);
List<String> msgListSub3 = createEntriesList(msg5);
MockConsumer sub1 = new MockConsumer(msgListSub1);
MockConsumer sub2 = new MockConsumer(msgListSub2);
MockConsumer sub3 = new MockConsumer(msgListSub3);
// register the consumers
testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test",
EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES));
//check that no messages have been delivered to the
//consumers during registration
assertEquals("No messages should have been delivered yet", 0, sub1.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub2.getMessages().size());
assertEquals("No messages should have been delivered yet", 0, sub3.getMessages().size());
// call processQueue to deliver the messages
testQueue.processQueue(new QueueRunner(testQueue)
{
@Override
public void run()
{
// we don't actually want/need this runner to do any work
// because we we are already doing it!
}
});
// check expected messages delivered to correct consumers
verifyReceivedMessages(Arrays.asList((MessageInstance)msg1,msg2,msg3), sub1.getMessages());
verifyReceivedMessages(Collections.singletonList((MessageInstance)msg4), sub2.getMessages());
verifyReceivedMessages(Collections.singletonList((MessageInstance)msg5), sub3.getMessages());
}
private SimpleAMQQueue createNonAsyncDeliverQueue()
{
TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
return new NonAsyncDeliverQueue(factory, getVirtualHost());
}
/**
* Tests that dequeued message is not present in the list returned form
* {@link SimpleAMQQueue#getMessagesOnTheQueue()}
*/
public void testGetMessagesOnTheQueueWithDequeuedEntry()
{
int messageNumber = 4;
int dequeueMessageIndex = 1;
// send test messages into a test queue
enqueueGivenNumberOfMessages(_queue, messageNumber);
// dequeue message
dequeueMessage(_queue, dequeueMessageIndex);
// get messages on the queue
List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertEquals(messageNumber - 1, entries.size());
int expectedId = 0;
for (int i = 0; i < messageNumber - 1; i++)
{
Long id = ( entries.get(i).getMessage()).getMessageNumber();
if (i == dequeueMessageIndex)
{
assertFalse("Message with id " + dequeueMessageIndex
+ " was dequeued and should not be returned by method getMessagesOnTheQueue!",
new Long(expectedId).equals(id));
expectedId++;
}
assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
new Long(expectedId), id);
expectedId++;
}
}
/**
* Tests that dequeued message is not present in the list returned form
* {@link SimpleAMQQueue#getMessagesOnTheQueue(QueueEntryFilter)}
*/
public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry()
{
int messageNumber = 4;
int dequeueMessageIndex = 1;
// send test messages into a test queue
enqueueGivenNumberOfMessages(_queue, messageNumber);
// dequeue message
dequeueMessage(_queue, dequeueMessageIndex);
// get messages on the queue with filter accepting all available messages
List<E> entries = _queue.getMessagesOnTheQueue(new QueueEntryFilter<E>()
{
public boolean accept(E entry)
{
return true;
}
public boolean filterComplete()
{
return false;
}
});
// assert entries on the queue
assertEquals(messageNumber - 1, entries.size());
int expectedId = 0;
for (int i = 0; i < messageNumber - 1; i++)
{
Long id = (entries.get(i).getMessage()).getMessageNumber();
if (i == dequeueMessageIndex)
{
assertFalse("Message with id " + dequeueMessageIndex
+ " was dequeued and should not be returned by method getMessagesOnTheQueue!",
new Long(expectedId).equals(id));
expectedId++;
}
assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
new Long(expectedId), id);
expectedId++;
}
}
/**
* Tests that dequeued message on the top is not accounted and next message
* is deleted from the queue on invocation of
* {@link SimpleAMQQueue#deleteMessageFromTop()}
*/
public void testDeleteMessageFromTopWithDequeuedEntryOnTop()
{
int messageNumber = 4;
int dequeueMessageIndex = 0;
// put messages into a test queue
enqueueGivenNumberOfMessages(_queue, messageNumber);
// dequeue message on top
dequeueMessage(_queue, dequeueMessageIndex);
//delete message from top
_queue.deleteMessageFromTop();
//get queue entries
List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertNotNull("Null is returned from getMessagesOnTheQueue", entries);
assertEquals("Expected " + (messageNumber - 2) + " number of messages but recieved " + entries.size(),
messageNumber - 2, entries.size());
assertEquals("Expected first entry with id 2", 2l,
(entries.get(0).getMessage()).getMessageNumber());
}
/**
* Tests that all messages including dequeued one are deleted from the queue
* on invocation of {@link SimpleAMQQueue#clearQueue()}
*/
public void testClearQueueWithDequeuedEntry() throws Exception
{
int messageNumber = 4;
int dequeueMessageIndex = 1;
// put messages into a test queue
enqueueGivenNumberOfMessages(_queue, messageNumber);
// dequeue message on a test queue
dequeueMessage(_queue, dequeueMessageIndex);
// clean queue
_queue.clearQueue();
// get queue entries
List<E> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertNotNull(entries);
assertEquals(0, entries.size());
}
public void testNotificationFiredOnEnqueue() throws Exception
{
AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class);
_queue.setNotificationListener(listener);
_queue.setMaximumMessageCount(2);
_queue.enqueue(createMessage(new Long(24)), null);
verifyZeroInteractions(listener);
_queue.enqueue(createMessage(new Long(25)), null);
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
public void testNotificationFiredAsync() throws Exception
{
AMQQueue.NotificationListener listener = mock(AMQQueue.NotificationListener.class);
_queue.enqueue(createMessage(new Long(24)), null);
_queue.enqueue(createMessage(new Long(25)), null);
_queue.enqueue(createMessage(new Long(26)), null);
_queue.setNotificationListener(listener);
_queue.setMaximumMessageCount(2);
verifyZeroInteractions(listener);
_queue.checkMessageStatus();
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
/**
* A helper method to put given number of messages into queue
* <p>
* All messages are asserted that they are present on queue
*
* @param queue
* queue to put messages into
* @param messageNumber
* number of messages to put into queue
*/
protected List<E> enqueueGivenNumberOfMessages(Q queue, int messageNumber)
{
putGivenNumberOfMessages(queue, messageNumber);
// make sure that all enqueued messages are on the queue
List<E> entries = queue.getMessagesOnTheQueue();
assertEquals(messageNumber, entries.size());
for (int i = 0; i < messageNumber; i++)
{
assertEquals((long)i, (entries.get(i).getMessage()).getMessageNumber());
}
return entries;
}
/**
* A helper method to put given number of messages into queue
* <p>
* Queue is not checked if messages are added into queue
*
* @param queue
* queue to put messages into
* @param messageNumber
* number of messages to put into queue
* @param queue
* @param messageNumber
*/
protected <T extends SimpleAMQQueue> void putGivenNumberOfMessages(T queue, int messageNumber)
{
for (int i = 0; i < messageNumber; i++)
{
// Create message
ServerMessage message = null;
message = createMessage((long)i);
// Put message on queue
queue.enqueue(message,null);
}
try
{
Thread.sleep(2000L);
}
catch (InterruptedException e)
{
_logger.error("Thread interrupted", e);
}
}
/**
* A helper method to dequeue an entry on queue with given index
*
* @param queue
* queue to dequeue message on
* @param dequeueMessageIndex
* entry index to dequeue.
*/
protected QueueEntry dequeueMessage(AMQQueue queue, int dequeueMessageIndex)
{
List<QueueEntry> entries = queue.getMessagesOnTheQueue();
QueueEntry entry = entries.get(dequeueMessageIndex);
entry.acquire();
entry.delete();
assertTrue(entry.isDeleted());
return entry;
}
private List<String> createEntriesList(QueueEntry... entries)
{
ArrayList<String> entriesList = new ArrayList<String>();
for (QueueEntry entry : entries)
{
entriesList.add(entry.getMessage().getMessageHeader().getMessageId());
}
return entriesList;
}
protected void verifyReceivedMessages(List<MessageInstance> expected,
List<MessageInstance> delivered)
{
assertEquals("Consumer did not receive the expected number of messages",
expected.size(), delivered.size());
for (MessageInstance msg : expected)
{
assertTrue("Consumer did not receive msg: "
+ msg.getMessage().getMessageNumber(), delivered.contains(msg));
}
}
public Q getQueue()
{
return _queue;
}
protected void setQueue(Q queue)
{
_queue = queue;
}
public MockConsumer getConsumer()
{
return _consumerTarget;
}
public Map<String,Object> getArguments()
{
return _arguments;
}
public void setArguments(Map<String,Object> arguments)
{
_arguments = arguments;
}
protected ServerMessage createMessage(Long id)
{
AMQMessageHeader header = mock(AMQMessageHeader.class);
when(header.getMessageId()).thenReturn(String.valueOf(id));
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn(id);
when(message.getMessageHeader()).thenReturn(header);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
return message;
}
private static class EntryListAddingAction implements Action<MessageInstance<?,? extends Consumer>>
{
private final ArrayList<QueueEntry> _queueEntries;
public EntryListAddingAction(final ArrayList<QueueEntry> queueEntries)
{
_queueEntries = queueEntries;
}
public void performAction(MessageInstance<?,? extends Consumer> entry)
{
_queueEntries.add((QueueEntry) entry);
}
}
public VirtualHost getVirtualHost()
{
return _virtualHost;
}
public String getQname()
{
return _qname;
}
public String getOwner()
{
return _owner;
}
public String getRoutingKey()
{
return _routingKey;
}
public DirectExchange getExchange()
{
return _exchange;
}
public MockConsumer getConsumerTarget()
{
return _consumerTarget;
}
static class TestSimpleQueueEntryListFactory implements QueueEntryListFactory<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
{
@Override
public NonAsyncDeliverList createQueueEntryList(final NonAsyncDeliverQueue queue)
{
return new NonAsyncDeliverList(queue);
}
}
private static class NonAsyncDeliverEntry extends OrderedQueueEntry<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
{
public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList)
{
super(queueEntryList);
}
public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList,
final ServerMessage message,
final long entryId)
{
super(queueEntryList, message, entryId);
}
public NonAsyncDeliverEntry(final NonAsyncDeliverList queueEntryList, final ServerMessage message)
{
super(queueEntryList, message);
}
}
private static class NonAsyncDeliverList extends OrderedQueueEntryList<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
{
private static final HeadCreator<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList> HEAD_CREATOR =
new HeadCreator<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>()
{
@Override
public NonAsyncDeliverEntry createHead(final NonAsyncDeliverList list)
{
return new NonAsyncDeliverEntry(list);
}
};
public NonAsyncDeliverList(final NonAsyncDeliverQueue queue)
{
super(queue, HEAD_CREATOR);
}
@Override
protected NonAsyncDeliverEntry createQueueEntry(final ServerMessage<?> message)
{
return new NonAsyncDeliverEntry(this,message);
}
}
private static class NonAsyncDeliverQueue extends SimpleAMQQueue<NonAsyncDeliverEntry, NonAsyncDeliverQueue, NonAsyncDeliverList>
{
public NonAsyncDeliverQueue(final TestSimpleQueueEntryListFactory factory, VirtualHost vhost)
{
super(vhost, null, attributes(), factory);
}
private static Map<String,Object> attributes()
{
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, "test");
attributes.put(Queue.DURABLE, false);
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
return attributes;
}
@Override
public void deliverAsync(QueueConsumer sub)
{
// do nothing, i.e prevent deliveries by the SubFlushRunner
// when registering the new consumers
}
}
}