blob: fffdea6abb00b6dabaca5ff40369cb57c0b2e20f [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.contains;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.BaseMessageInstance;
import org.apache.qpid.server.message.ConsumerOption;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.BrokerTestHelper;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.QueueNotificationListener;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.test.utils.QpidTestCase;
abstract class AbstractQueueTestBase extends QpidTestCase
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractQueueTestBase.class);
private long _queueRunnerWaitTime;
private Queue<?> _queue;
private VirtualHost<?> _virtualHost;
private String _qname = "qname";
private String _owner = "owner";
private String _routingKey = "routing key";
private DirectExchange _exchange;
private MockConsumer _consumerTarget = new MockConsumer();
private QueueConsumer<?> _consumer;
private Map<String,Object> _arguments = Collections.emptyMap();
@Override
public void setUp() throws Exception
{
super.setUp();
BrokerTestHelper.setUp();
_virtualHost = BrokerTestHelper.createVirtualHost(getClass().getName());
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME, _qname);
attributes.put(Queue.OWNER, _owner);
_queue = _virtualHost.createChild(Queue.class, attributes);
_exchange = (DirectExchange) _virtualHost.getChildByName(Exchange.class, ExchangeDefaults.DIRECT_EXCHANGE_NAME);
_queueRunnerWaitTime = Long.getLong("AbstractQueueTestBase.queueRunnerWaitTime", 150L);
_logger.debug("Using AbstractQueueTestBase.queueRunnerWaitTime {}", _queueRunnerWaitTime);
}
@Override
public void tearDown() throws Exception
{
try
{
_queue.close();
_virtualHost.close();
}
finally
{
BrokerTestHelper.tearDown();
super.tearDown();
}
}
public void testCreateQueue() throws Exception
{
_queue.close();
try
{
Map<String,Object> attributes = new HashMap<>(_arguments);
_queue = _virtualHost.createChild(Queue.class, attributes);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
{
assertTrue("Exception was not about missing name",
e.getMessage().contains("name"));
}
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME, "differentName");
_queue = _virtualHost.createChild(Queue.class, attributes);
assertNotNull("Queue was not created", _queue);
}
public void testGetVirtualHost()
{
assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost());
}
public void testBinding()
{
_exchange.addBinding(_routingKey, _queue, Collections.EMPTY_MAP);
assertTrue("Routing key was not bound",
_exchange.isBound(_routingKey));
assertTrue("Queue was not bound to key",
_exchange.isBound(_routingKey,_queue));
assertEquals("Exchange binding count", 1,
_queue.getBindings().size());
final Binding<?> firstBinding = _queue.getBindings().iterator().next();
assertEquals("Wrong exchange bound", _routingKey,
firstBinding.getBindingKey());
assertEquals("Wrong exchange bound", _exchange,
firstBinding.getExchange());
_exchange.deleteBinding(_routingKey, _queue);
assertFalse("Routing key was still bound",
_exchange.isBound(_routingKey));
}
public void testRegisterConsumerThenEnqueueMessage() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
// Check adding a consumer adds it to the queue
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
_queue.getConsumerCountWithCredit());
// Check sending a message ends up with the subscriber
_queue.enqueue(messageA, null, null);
Thread.sleep(_queueRunnerWaitTime);
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull(_consumer.getQueueContext().getReleasedEntry());
// Check removing the consumer removes it's information from the queue
_consumer.close();
assertTrue("Consumer still had queue", _consumerTarget.isClosed());
assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount());
assertFalse("Queue still has active consumer",
1 == _queue.getConsumerCountWithCredit());
ServerMessage messageB = createMessage(new Long (25));
_queue.enqueue(messageB, null, null);
assertNull(_consumer.getQueueContext());
}
public void testEnqueueMessageThenRegisterConsumer() throws Exception, InterruptedException
{
ServerMessage messageA = createMessage(new Long(24));
_queue.enqueue(messageA, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
Thread.sleep(_queueRunnerWaitTime);
assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after an enqueue",
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests enqueuing two messages.
*/
public void testEnqueueTwoMessagesThenRegisterConsumer() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageA, null, null);
_queue.enqueue(messageB, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
Thread.sleep(_queueRunnerWaitTime);
assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage());
assertNull("There should be no releasedEntry after enqueues",
_consumer.getQueueContext().getReleasedEntry());
}
public void testMessageHeldIfNotYetValidWhenConsumerAdded() throws Exception
{
_queue.close();
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME, _qname);
attributes.put(Queue.OWNER, _owner);
attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.TRUE);
_queue = _virtualHost.createChild(Queue.class, attributes);
ServerMessage messageA = createMessage(new Long(24));
AMQMessageHeader messageHeader = messageA.getMessageHeader();
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
_queue.enqueue(messageA, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
Thread.sleep(_queueRunnerWaitTime);
assertEquals("Message which was not yet valid was received", 0, _consumerTarget.getMessages().size());
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
_queue.checkMessageStatus();
Thread.sleep(_queueRunnerWaitTime);
assertEquals("Message which was valid was not received", 1, _consumerTarget.getMessages().size());
}
public void testMessageHoldingDependentOnQueueProperty() throws Exception
{
_queue.close();
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME, _qname);
attributes.put(Queue.OWNER, _owner);
attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.FALSE);
_queue = _virtualHost.createChild(Queue.class, attributes);
ServerMessage messageA = createMessage(new Long(24));
AMQMessageHeader messageHeader = messageA.getMessageHeader();
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
_queue.enqueue(messageA, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
Thread.sleep(_queueRunnerWaitTime);
assertEquals("Message was held despite queue not having holding enabled", 1, _consumerTarget.getMessages().size());
}
public void testUnheldMessageOvertakesHeld() throws Exception
{
_queue.close();
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME, _qname);
attributes.put(Queue.OWNER, _owner);
attributes.put(Queue.HOLD_ON_PUBLISH_ENABLED, Boolean.TRUE);
_queue = _virtualHost.createChild(Queue.class, attributes);
ServerMessage messageA = createMessage(new Long(24));
AMQMessageHeader messageHeader = messageA.getMessageHeader();
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()+20000L);
_queue.enqueue(messageA, null, null);
ServerMessage messageB = createMessage(new Long(25));
_queue.enqueue(messageB, null, null);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
Thread.sleep(_queueRunnerWaitTime);
assertEquals("Expect one message (message B)", 1, _consumerTarget.getMessages().size());
assertEquals("Wrong message received", messageB.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(0).getMessage().getMessageHeader().getMessageId());
when(messageHeader.getNotValidBefore()).thenReturn(System.currentTimeMillis()-100L);
_queue.checkMessageStatus();
Thread.sleep(_queueRunnerWaitTime);
assertEquals("Message which was valid was not received", 2, _consumerTarget.getMessages().size());
assertEquals("Wrong message received", messageA.getMessageHeader().getMessageId(), _consumerTarget.getMessages().get(1).getMessage().getMessageHeader().getMessageId());
}
/**
* Tests that a released queue entry is resent to the subscriber. Verifies also that the
* QueueContext._releasedEntry is reset to null after the entry has been reset.
*/
public void testReleasedMessageIsResentToSubscriber() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue three messages */
_queue.enqueue(messageA, postEnqueueAction, null);
_queue.enqueue(messageB, postEnqueueAction, null);
_queue.enqueue(messageC, postEnqueueAction, null);
Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
3,
_consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
4,
_consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should remain be unset",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests that a released message that becomes expired is not resent to the subscriber.
* This tests ensures that SimpleAMQQueue<?>Entry.getNextAvailableEntry avoids expired entries.
* Verifies also that the QueueContext._releasedEntry is reset to null after the entry has been reset.
*/
public void testReleaseMessageThatBecomesExpiredIsNotRedelivered() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
final CountDownLatch sendIndicator = new CountDownLatch(1);
_consumerTarget = new MockConsumer()
{
@Override
public long send(MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
try
{
return super.send(consumer, entry, batch);
}
finally
{
sendIndicator.countDown();
}
}
};
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.SEES_REQUEUES,
ConsumerOption.ACQUIRES), 0);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue one message with expiration set for a short time in the future */
final long expiration = System.currentTimeMillis() + _queueRunnerWaitTime;
when(messageA.getExpiration()).thenReturn(expiration);
_queue.enqueue(messageA, postEnqueueAction, null);
assertTrue("Message was not sent during expected time interval", sendIndicator.await(5000, TimeUnit.MILLISECONDS));
assertEquals("Unexpected total number of messages sent to consumer", 1, _consumerTarget.getMessages().size());
QueueEntry queueEntry = queueEntries.get(0);
final CountDownLatch dequeueIndicator = new CountDownLatch(1);
queueEntry.addStateChangeListener(new StateChangeListener<MessageInstance, MessageInstance.EntryState>()
{
@Override
public void stateChanged(MessageInstance object, MessageInstance.EntryState oldState, MessageInstance.EntryState newState)
{
if (newState.equals(MessageInstance.DEQUEUED_STATE))
{
dequeueIndicator.countDown();
}
}
});
assertFalse("Redelivery flag should not be set", queueEntry.isRedelivered());
/* Wait a little more to be sure that message will have expired, then release the first message only, causing it to be requeued */
while(!queueEntry.expired() && System.currentTimeMillis() <= expiration )
{
Thread.sleep(10);
}
assertTrue("Expecting the queue entry to be now expired", queueEntry.expired());
queueEntry.release();
assertTrue("Message was not de-queued due to expiration", dequeueIndicator.await(5000, TimeUnit.MILLISECONDS));
assertEquals("Total number of messages sent should not have changed", 1, _consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntry.isRedelivered());
// QueueContext#_releasedEntry is updated after notification, thus, we need to make sure that it is updated
long waitLoopLimit = 10;
while(_consumer.getQueueContext().getReleasedEntry() != null && waitLoopLimit-- > 0 )
{
Thread.sleep(10);
}
assertNull("releasedEntry should be cleared after requeue processed:" + _consumer.getQueueContext().getReleasedEntry(),
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests that if a client releases entries 'out of order' (the order
* used by QueueEntryImpl.compareTo) that messages are still resent
* successfully. Specifically this test ensures the {@see AbstractQueue#requeue()}
* can correctly move the _releasedEntry to an earlier position in the QueueEntry list.
*/
public void testReleasedOutOfComparableOrderAreRedelivered() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
ServerMessage messageC = createMessage(new Long(26));
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue three messages */
_queue.enqueue(messageA, postEnqueueAction, null);
_queue.enqueue(messageB, postEnqueueAction, null);
_queue.enqueue(messageC, postEnqueueAction, null);
Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
3,
_consumerTarget.getMessages().size());
assertFalse("Redelivery flag should not be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(1).isRedelivered());
assertFalse("Redelivery flag should not be set", queueEntries.get(2).isRedelivered());
/* Now release the third and first message only, causing it to be requeued */
queueEntries.get(2).release();
queueEntries.get(0).release();
Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to consumer",
5,
_consumerTarget.getMessages().size());
assertTrue("Redelivery flag should now be set", queueEntries.get(0).isRedelivered());
assertFalse("Redelivery flag should remain be unset", queueEntries.get(1).isRedelivered());
assertTrue("Redelivery flag should now be set",queueEntries.get(2).isRedelivered());
assertNull("releasedEntry should be cleared after requeue processed",
_consumer.getQueueContext().getReleasedEntry());
}
/**
* Tests that a release requeues an entry for a queue with multiple consumers. Verifies that a
* requeue resends a message to a <i>single</i> subscriber.
*/
public void testReleaseForQueueWithMultipleConsumers() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
ServerMessage messageB = createMessage(new Long(25));
MockConsumer target1 = new MockConsumer();
MockConsumer target2 = new MockConsumer();
QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
QueueConsumer consumer2 = (QueueConsumer) _queue.addConsumer(target2, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>();
EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries);
/* Enqueue two messages */
_queue.enqueue(messageA, postEnqueueAction, null);
_queue.enqueue(messageB, postEnqueueAction, null);
Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to both after enqueue",
2,
target1.getMessages().size() + target2.getMessages().size());
/* Now release the first message only, causing it to be requeued */
queueEntries.get(0).release();
Thread.sleep(_queueRunnerWaitTime); // Work done by QueueRunner Thread
assertEquals("Unexpected total number of messages sent to both consumers after release",
3,
target1.getMessages().size() + target2.getMessages().size());
assertNull("releasedEntry should be cleared after requeue processed",
consumer1.getQueueContext().getReleasedEntry());
assertNull("releasedEntry should be cleared after requeue processed",
consumer2.getQueueContext().getReleasedEntry());
}
public void testExclusiveConsumer() throws Exception
{
ServerMessage messageA = createMessage(new Long(24));
// Check adding an exclusive consumer adds it to the queue
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.EXCLUSIVE, ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
assertEquals("Queue does not have consumer", 1,
_queue.getConsumerCount());
assertEquals("Queue does not have active consumer", 1,
_queue.getConsumerCountWithCredit());
// Check sending a message ends up with the subscriber
_queue.enqueue(messageA, null, null);
final long timeout = System.currentTimeMillis() + _queueRunnerWaitTime;
QueueEntry lastSeen = null;
while (timeout > System.currentTimeMillis() &&
((lastSeen = _consumer.getQueueContext().getLastSeenEntry()) == null || lastSeen.getMessage() == null))
{
Thread.sleep(10);
}
assertEquals("Queue context did not see expected message within timeout",
messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage());
// Check we cannot add a second subscriber to the queue
MockConsumer subB = new MockConsumer();
Exception ex = null;
try
{
_queue.addConsumer(subB, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
}
catch (MessageSource.ExistingExclusiveConsumer e)
{
ex = e;
}
assertNotNull(ex);
// Check we cannot add an exclusive subscriber to a queue with an
// existing consumer
_consumer.close();
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
try
{
_consumer = (QueueConsumer<?>) _queue.addConsumer(subB, null, messageA.getClass(), "test",
EnumSet.of(ConsumerOption.EXCLUSIVE), 0);
}
catch (MessageSource.ExistingConsumerPreventsExclusive e)
{
ex = e;
}
assertNotNull(ex);
}
public void testResend() throws Exception
{
Long id = new Long(26);
ServerMessage message = createMessage(id);
_consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES, ConsumerOption.SEES_REQUEUES),
0);
_queue.enqueue(message, new Action<BaseMessageInstance>()
{
@Override
public void performAction(final BaseMessageInstance object)
{
QueueEntryImpl entry = (QueueEntryImpl) object;
entry.setRedelivered();
_consumer.resend(entry);
}
}, null);
}
public void testGetFirstMessageId() throws Exception
{
// Create message
Long messageId = new Long(23);
ServerMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(message, null, null);
// Get message id
Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0);
// Check message id
assertEquals("Message ID was wrong", messageId, testmsgid);
}
public void testGetFirstFiveMessageIds() throws Exception
{
for (int i = 0 ; i < 5; i++)
{
// Create message
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(message, null, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5);
// Check message id
for (int i = 0; i < 5; i++)
{
Long messageId = new Long(i);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
public void testGetLastFiveMessageIds() throws Exception
{
for (int i = 0 ; i < 10; i++)
{
// Create message
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(message, null, null);
}
// Get message ids
List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5);
// Check message id
for (int i = 0; i < 5; i++)
{
Long messageId = new Long(i+5);
assertEquals("Message ID was wrong", messageId, msgids.get(i));
}
}
public void testGetMessagesRangeOnTheQueue() throws Exception
{
for (int i = 1 ; i <= 10; i++)
{
// Create message
Long messageId = new Long(i);
ServerMessage message = createMessage(messageId);
// Put message on queue
_queue.enqueue(message, null, null);
}
// Get non-existent 0th QueueEntry & check returned list was empty
// (the position parameters in this method are indexed from 1)
List<? extends QueueEntry> entries = _queue.getMessagesRangeOnTheQueue(0, 0);
assertTrue(entries.size() == 0);
// Check that when 'from' is 0 it is ignored and the range continues from 1
entries = _queue.getMessagesRangeOnTheQueue(0, 2);
assertTrue(entries.size() == 2);
long msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 1L);
msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 2L);
// Check that when 'from' is greater than 'to' the returned list is empty
entries = _queue.getMessagesRangeOnTheQueue(5, 4);
assertTrue(entries.size() == 0);
// Get first QueueEntry & check id
entries = _queue.getMessagesRangeOnTheQueue(1, 1);
assertTrue(entries.size() == 1);
msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 1L);
// Get 5th,6th,7th entries and check id's
entries = _queue.getMessagesRangeOnTheQueue(5, 7);
assertTrue(entries.size() == 3);
msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 5L);
msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 6L);
msgID = entries.get(2).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 7L);
// Get 10th QueueEntry & check id
entries = _queue.getMessagesRangeOnTheQueue(10, 10);
assertTrue(entries.size() == 1);
msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 10L);
// Get non-existent 11th QueueEntry & check returned set was empty
entries = _queue.getMessagesRangeOnTheQueue(11, 11);
assertTrue(entries.size() == 0);
// Get 9th,10th, and non-existent 11th entries & check result is of size 2 with correct IDs
entries = _queue.getMessagesRangeOnTheQueue(9, 11);
assertTrue(entries.size() == 2);
msgID = entries.get(0).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 9L);
msgID = entries.get(1).getMessage().getMessageNumber();
assertEquals("Message ID was wrong", msgID, 10L);
}
/**
* Tests that dequeued message is not present in the list returned form
* {@link AbstractQueue#getMessagesOnTheQueue()}
*/
public void testGetMessagesOnTheQueueWithDequeuedEntry()
{
int messageNumber = 4;
int dequeueMessageIndex = 1;
// send test messages into a test queue
enqueueGivenNumberOfMessages(_queue, messageNumber);
// dequeue message
dequeueMessage(_queue, dequeueMessageIndex);
// get messages on the queue
List<? extends QueueEntry> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertEquals(messageNumber - 1, entries.size());
int expectedId = 0;
for (int i = 0; i < messageNumber - 1; i++)
{
Long id = ( entries.get(i).getMessage()).getMessageNumber();
if (i == dequeueMessageIndex)
{
assertFalse("Message with id " + dequeueMessageIndex
+ " was dequeued and should not be returned by method getMessagesOnTheQueue!",
new Long(expectedId).equals(id));
expectedId++;
}
assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
new Long(expectedId), id);
expectedId++;
}
}
/**
* Tests that dequeued message is not present in the list returned form
* {@link AbstractQueue#getMessagesOnTheQueue(QueueEntryFilter)}
*/
public void testGetMessagesOnTheQueueByQueueEntryFilterWithDequeuedEntry()
{
int messageNumber = 4;
int dequeueMessageIndex = 1;
// send test messages into a test queue
enqueueGivenNumberOfMessages(_queue, messageNumber);
// dequeue message
dequeueMessage(_queue, dequeueMessageIndex);
// get messages on the queue with filter accepting all available messages
List<? extends QueueEntry> entries = ((AbstractQueue)_queue).getMessagesOnTheQueue(new QueueEntryFilter()
{
public boolean accept(QueueEntry entry)
{
return true;
}
public boolean filterComplete()
{
return false;
}
});
// assert entries on the queue
assertEquals(messageNumber - 1, entries.size());
int expectedId = 0;
for (int i = 0; i < messageNumber - 1; i++)
{
Long id = (entries.get(i).getMessage()).getMessageNumber();
if (i == dequeueMessageIndex)
{
assertFalse("Message with id " + dequeueMessageIndex
+ " was dequeued and should not be returned by method getMessagesOnTheQueue!",
new Long(expectedId).equals(id));
expectedId++;
}
assertEquals("Expected message with id " + expectedId + " but got message with id " + id,
new Long(expectedId), id);
expectedId++;
}
}
/**
* Tests that all messages including dequeued one are deleted from the queue
* on invocation of {@link AbstractQueue#clearQueue()}
*/
public void testClearQueueWithDequeuedEntry() throws Exception
{
int messageNumber = 4;
int dequeueMessageIndex = 1;
// put messages into a test queue
enqueueGivenNumberOfMessages(_queue, messageNumber);
// dequeue message on a test queue
dequeueMessage(_queue, dequeueMessageIndex);
// clean queue
_queue.clearQueue();
// get queue entries
List<? extends QueueEntry> entries = _queue.getMessagesOnTheQueue();
// assert queue entries
assertNotNull(entries);
assertEquals(0, entries.size());
}
public void testNotificationFiredOnEnqueue() throws Exception
{
QueueNotificationListener listener = mock(QueueNotificationListener .class);
_queue.setNotificationListener(listener);
_queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
Integer.valueOf(2)));
_queue.enqueue(createMessage(new Long(24)), null, null);
verifyZeroInteractions(listener);
_queue.enqueue(createMessage(new Long(25)), null, null);
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
public void testNotificationFiredAsync() throws Exception
{
QueueNotificationListener listener = mock(QueueNotificationListener .class);
_queue.enqueue(createMessage(new Long(24)), null, null);
_queue.enqueue(createMessage(new Long(25)), null, null);
_queue.enqueue(createMessage(new Long(26)), null, null);
_queue.setNotificationListener(listener);
_queue.setAttributes(Collections.<String, Object>singletonMap(Queue.ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
Integer.valueOf(2)));
verifyZeroInteractions(listener);
_queue.checkMessageStatus();
verify(listener, atLeastOnce()).notifyClients(eq(NotificationCheck.MESSAGE_COUNT_ALERT), eq(_queue), contains("Maximum count on queue threshold"));
}
public void testMaximumMessageTtl() throws Exception
{
// Test scenarios where only the maximum TTL has been set
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME,"testTtlOverrideMaximumTTl");
attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 10000l);
Queue<?> queue = _virtualHost.createChild(Queue.class, attributes);
assertEquals("TTL has not been overridden", 60000l, getExpirationOnQueue(queue, 50000l, 0l));
assertEquals("TTL has not been overridden", 60000l, getExpirationOnQueue(queue, 50000l, 65000l));
assertEquals("TTL has been incorrectly overridden", 55000l, getExpirationOnQueue(queue, 50000l, 55000l));
long tooLateExpiration = System.currentTimeMillis() + 20000l;
assertTrue("TTL has not been overridden", tooLateExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
long acceptableExpiration = System.currentTimeMillis() + 5000l;
assertEquals("TTL has been incorrectly overriden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
// Test the scenarios where only the minimum TTL has been set
attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME,"testTtlOverrideMinimumTTl");
attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
queue = _virtualHost.createChild(Queue.class, attributes);
assertEquals("TTL has been overridden incorrectly", 0l, getExpirationOnQueue(queue, 50000l, 0l));
assertEquals("TTL has been overridden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
assertEquals("TTL has not been overriden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
long unacceptableExpiration = System.currentTimeMillis() + 5000l;
assertTrue("TTL has not been overridden", unacceptableExpiration != getExpirationOnQueue(queue, 0l, tooLateExpiration));
acceptableExpiration = System.currentTimeMillis() + 20000l;
assertEquals("TTL has been incorrectly overridden", acceptableExpiration, getExpirationOnQueue(queue, 0l, acceptableExpiration));
// Test the scenarios where both the minimum and maximum TTL have been set
attributes = new HashMap<>(_arguments);
attributes.put(Queue.NAME,"testTtlOverrideBothTTl");
attributes.put(Queue.MINIMUM_MESSAGE_TTL, 10000l);
attributes.put(Queue.MAXIMUM_MESSAGE_TTL, 20000l);
queue = _virtualHost.createChild(Queue.class, attributes);
assertEquals("TTL has not been overridden", 70000l, getExpirationOnQueue(queue, 50000l, 0l));
assertEquals("TTL has been overridden incorrectly", 65000l, getExpirationOnQueue(queue, 50000l, 65000l));
assertEquals("TTL has not been overridden", 60000l, getExpirationOnQueue(queue, 50000l, 55000l));
}
public void testOldestMessage()
{
Queue<?> queue = getQueue();
queue.enqueue(createMessage(1l, (byte)1, Collections.singletonMap("sortKey", (Object) "Z"), 10l), null, null);
queue.enqueue(createMessage(2l, (byte)4, Collections.singletonMap("sortKey", (Object) "M"), 100l), null, null);
queue.enqueue(createMessage(3l, (byte)9, Collections.singletonMap("sortKey", (Object) "A"), 1000l), null, null);
assertEquals(10l,queue.getOldestMessageArrivalTime());
}
private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
{
final List<QueueEntry> entries = new ArrayList<>();
ServerMessage message = createMessage(1l);
when(message.getArrivalTime()).thenReturn(arrivalTime);
when(message.getExpiration()).thenReturn(expiration);
queue.enqueue(message,null, null);
queue.visit(new QueueEntryVisitor()
{
@Override
public boolean visit(final QueueEntry entry)
{
entries.add(entry);
return true;
}
});
assertEquals("Expected only one entry in the queue", 1, entries.size());
Long entryExpiration =
(Long) entries.get(0).getInstanceProperties().getProperty(InstanceProperties.Property.EXPIRATION);
queue.clearQueue();
entries.clear();
return entryExpiration;
}
/**
* A helper method to put given number of messages into queue
* <p>
* All messages are asserted that they are present on queue
*
* @param queue
* queue to put messages into
* @param messageNumber
* number of messages to put into queue
*/
protected List<? extends QueueEntry> enqueueGivenNumberOfMessages(Queue<?> queue, int messageNumber)
{
putGivenNumberOfMessages(queue, messageNumber);
// make sure that all enqueued messages are on the queue
List<? extends QueueEntry> entries = queue.getMessagesOnTheQueue();
assertEquals(messageNumber, entries.size());
for (int i = 0; i < messageNumber; i++)
{
assertEquals((long)i, (entries.get(i).getMessage()).getMessageNumber());
}
return entries;
}
/**
* A helper method to put given number of messages into queue
* <p>
* Queue is not checked if messages are added into queue
*
* @param queue
* queue to put messages into
* @param messageNumber
* number of messages to put into queue
* @param queue
* @param messageNumber
*/
protected void putGivenNumberOfMessages(Queue<?> queue, int messageNumber)
{
for (int i = 0; i < messageNumber; i++)
{
// Create message
ServerMessage message = null;
message = createMessage((long)i);
// Put message on queue
queue.enqueue(message,null, null);
}
try
{
Thread.sleep(2000L);
}
catch (InterruptedException e)
{
_logger.error("Thread interrupted", e);
}
}
/**
* A helper method to dequeue an entry on queue with given index
*
* @param queue
* queue to dequeue message on
* @param dequeueMessageIndex
* entry index to dequeue.
*/
protected QueueEntry dequeueMessage(Queue<?> queue, int dequeueMessageIndex)
{
List<? extends QueueEntry> entries = queue.getMessagesOnTheQueue();
QueueEntry entry = entries.get(dequeueMessageIndex);
entry.acquire();
entry.delete();
assertTrue(entry.isDeleted());
return entry;
}
protected void verifyReceivedMessages(List<MessageInstance> expected,
List<MessageInstance> delivered)
{
assertEquals("Consumer did not receive the expected number of messages",
expected.size(), delivered.size());
for (MessageInstance msg : expected)
{
assertTrue("Consumer did not receive msg: "
+ msg.getMessage().getMessageNumber(), delivered.contains(msg));
}
}
public Queue<?> getQueue()
{
return _queue;
}
protected void setQueue(Queue<?> queue)
{
_queue = queue;
}
public MockConsumer getConsumer()
{
return _consumerTarget;
}
public Map<String,Object> getArguments()
{
return _arguments;
}
public void setArguments(Map<String,Object> arguments)
{
_arguments = arguments;
}
protected ServerMessage createMessage(Long id, byte priority, final Map<String,Object> arguments, long arrivalTime)
{
ServerMessage message = createMessage(id);
AMQMessageHeader hdr = message.getMessageHeader();
when(hdr.getPriority()).thenReturn(priority);
when(message.getArrivalTime()).thenReturn(arrivalTime);
when(hdr.getHeaderNames()).thenReturn(arguments.keySet());
final ArgumentCaptor<String> nameCaptor = ArgumentCaptor.forClass(String.class);
when(hdr.containsHeader(nameCaptor.capture())).thenAnswer(new Answer<Boolean>()
{
@Override
public Boolean answer(final InvocationOnMock invocationOnMock) throws Throwable
{
return arguments.containsKey(nameCaptor.getValue());
}
});
final ArgumentCaptor<Set> namesCaptor = ArgumentCaptor.forClass(Set.class);
when(hdr.containsHeaders(namesCaptor.capture())).thenAnswer(new Answer<Boolean>()
{
@Override
public Boolean answer(final InvocationOnMock invocationOnMock) throws Throwable
{
return arguments.keySet().containsAll(namesCaptor.getValue());
}
});
final ArgumentCaptor<String> nameCaptor2 = ArgumentCaptor.forClass(String.class);
when(hdr.getHeader(nameCaptor2.capture())).thenAnswer(new Answer<Object>()
{
@Override
public Object answer(final InvocationOnMock invocationOnMock) throws Throwable
{
return arguments.get(nameCaptor2.getValue());
}
});
return message;
}
protected ServerMessage createMessage(Long id)
{
AMQMessageHeader header = mock(AMQMessageHeader.class);
when(header.getMessageId()).thenReturn(String.valueOf(id));
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn(id);
when(message.getMessageHeader()).thenReturn(header);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
return message;
}
private static class EntryListAddingAction implements Action<BaseMessageInstance>
{
private final ArrayList<QueueEntry> _queueEntries;
public EntryListAddingAction(final ArrayList<QueueEntry> queueEntries)
{
_queueEntries = queueEntries;
}
public void performAction(BaseMessageInstance entry)
{
_queueEntries.add((QueueEntry) entry);
}
}
public VirtualHost<?> getVirtualHost()
{
return _virtualHost;
}
public String getQname()
{
return _qname;
}
public String getOwner()
{
return _owner;
}
public String getRoutingKey()
{
return _routingKey;
}
public DirectExchange getExchange()
{
return _exchange;
}
public MockConsumer getConsumerTarget()
{
return _consumerTarget;
}
}