blob: aae05f48dc99af7d3d51234af114e8c097001fd8 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import java.security.AccessController;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.consumer.MockConsumer;
import org.apache.qpid.server.message.ConsumerOption;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageInstanceConsumer;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.MessageDurability;
import org.apache.qpid.server.store.MessageEnqueueRecord;
public class StandardQueueTest extends AbstractQueueTestBase
{
public void testAutoDeleteQueue() throws Exception
{
getQueue().close();
getQueue().delete();
Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, getQname());
queueAttributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
queue.open();
setQueue(queue);
ServerMessage message = createMessage(25l);
QueueConsumer consumer =
(QueueConsumer) getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
getQueue().enqueue(message, null, null);
consumer.close();
assertTrue("Queue was not deleted when consumer was removed",
getQueue().isDeleted());
}
public void testActiveConsumerCount() throws Exception
{
Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, "testActiveConsumerCount");
queueAttributes.put(Queue.OWNER, "testOwner");
final StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, getVirtualHost());
queue.open();
//verify adding an active consumer increases the count
final MockConsumer consumer1 = new MockConsumer();
consumer1.setActive(true);
consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
queue.addConsumer(consumer1,
null,
createMessage(-1l).getClass(),
"test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
//verify adding an inactive consumer doesn't increase the count
final MockConsumer consumer2 = new MockConsumer();
consumer2.setActive(false);
consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
queue.addConsumer(consumer2,
null,
createMessage(-1l).getClass(),
"test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
//verify behaviour in face of expected state changes:
//verify a consumer going suspended->active increases the count
consumer2.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 2, queue.getConsumerCountWithCredit());
//verify a consumer going active->suspended decreases the count
consumer2.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
//verify a consumer going suspended->closed doesn't change the count
consumer2.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
//verify a consumer going active->active doesn't change the count
consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
consumer1.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
//verify a consumer going suspended->suspended doesn't change the count
consumer1.setState(ConsumerTarget.State.SUSPENDED);
assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
consumer1.setState(ConsumerTarget.State.ACTIVE);
assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit());
//verify a consumer going active->closed decreases the count
consumer1.setState(ConsumerTarget.State.CLOSED);
assertEquals("Unexpected active consumer count", 0, queue.getConsumerCountWithCredit());
}
/**
* Tests that entry in dequeued state are not enqueued and not delivered to consumer
*/
public void testEnqueueDequeuedEntry() throws Exception
{
// create a queue where each even entry is considered a dequeued
AbstractQueue queue = new DequeuedQueue(getVirtualHost());
queue.create();
// create a consumer
MockConsumer consumer = new MockConsumer();
// register consumer
queue.addConsumer(consumer,
null,
createMessage(-1l).getClass(),
"test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
// put test messages into a queue
putGivenNumberOfMessages(queue, 4);
// assert received messages
List<MessageInstance> messages = consumer.getMessages();
assertEquals("Only 2 messages should be returned", 2, messages.size());
assertEquals("ID of first message should be 1", 1l,
(messages.get(0).getMessage()).getMessageNumber());
assertEquals("ID of second message should be 3", 3l,
(messages.get(1).getMessage()).getMessageNumber());
}
/**
* Tests whether dequeued entry is sent to subscriber in result of
* invocation of {@link AbstractQueue#processQueue(QueueRunner)}
*/
public void testProcessQueueWithDequeuedEntry() throws Exception
{
// total number of messages to send
int messageNumber = 4;
int dequeueMessageIndex = 1;
Map<String,Object> queueAttributes = new HashMap<>();
queueAttributes.put(Queue.NAME, "test");
// create queue with overridden method deliverAsync
StandardQueueImpl testQueue = new StandardQueueImpl(queueAttributes, getVirtualHost());
testQueue.create();
// put messages
List<StandardQueueEntry> entries =
(List<StandardQueueEntry>) enqueueGivenNumberOfMessages(testQueue, messageNumber);
// dequeue message
dequeueMessage(testQueue, dequeueMessageIndex);
// latch to wait for message receipt
final CountDownLatch latch = new CountDownLatch(messageNumber -1);
// create a consumer
MockConsumer consumer = new MockConsumer()
{
/**
* Send a message and decrement latch
* @param consumer
* @param entry
* @param batch
*/
public long send(final MessageInstanceConsumer consumer, MessageInstance entry, boolean batch)
{
long size = super.send(consumer, entry, batch);
latch.countDown();
return size;
}
};
// subscribe
testQueue.addConsumer(consumer,
null,
entries.get(0).getMessage().getClass(),
"test",
EnumSet.of(ConsumerOption.ACQUIRES,
ConsumerOption.SEES_REQUEUES), 0);
// process queue
testQueue.processQueue(new QueueRunner(testQueue, AccessController.getContext())
{
public void run()
{
// do nothing
}
});
// wait up to 1 minute for message receipt
try
{
latch.await(1, TimeUnit.MINUTES);
}
catch (InterruptedException e1)
{
Thread.currentThread().interrupt();
}
List<MessageInstance> expected = Arrays.asList((MessageInstance) entries.get(0), entries.get(2), entries.get(3));
verifyReceivedMessages(expected, consumer.getMessages());
}
public void testNonDurableImpliesMessageDurabilityNever() throws Exception
{
getQueue().close();
getQueue().delete();
Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.NAME, getQname());
attributes.put(Queue.DURABLE, Boolean.FALSE);
attributes.put(Queue.MESSAGE_DURABILITY, MessageDurability.ALWAYS);
Queue queue = getVirtualHost().createChild(Queue.class, attributes);
assertNotNull("Queue was not created", queue);
setQueue(queue);
assertEquals(MessageDurability.NEVER, queue.getMessageDurability());
}
private static class DequeuedQueue extends AbstractQueue
{
private QueueEntryList _entries = new DequeuedQueueEntryList(this);
public DequeuedQueue(VirtualHost<?> virtualHost)
{
super(attributes(), virtualHost);
}
@Override
QueueEntryList getEntries()
{
return _entries;
}
private static Map<String,Object> attributes()
{
Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.NAME, "test");
attributes.put(Queue.DURABLE, false);
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.PERMANENT);
return attributes;
}
}
private static class DequeuedQueueEntryList extends OrderedQueueEntryList
{
private static final HeadCreator HEAD_CREATOR =
new HeadCreator()
{
@Override
public DequeuedQueueEntry createHead(final QueueEntryList list)
{
return new DequeuedQueueEntry((DequeuedQueueEntryList) list);
}
};
public DequeuedQueueEntryList(final DequeuedQueue queue)
{
super(queue, HEAD_CREATOR);
}
/**
* Entries with even message id are considered
* dequeued!
*/
protected DequeuedQueueEntry createQueueEntry(final ServerMessage message,
final MessageEnqueueRecord enqueueRecord)
{
return new DequeuedQueueEntry(this, message);
}
}
private static class DequeuedQueueEntry extends OrderedQueueEntry
{
private final ServerMessage _message;
private DequeuedQueueEntry(final DequeuedQueueEntryList queueEntryList)
{
super(queueEntryList);
_message = null;
}
public DequeuedQueueEntry(DequeuedQueueEntryList list, final ServerMessage message)
{
super(list, message, null);
_message = message;
}
public boolean isDeleted()
{
return (_message.getMessageNumber() % 2 == 0);
}
public boolean isAvailable()
{
return !(_message.getMessageNumber() % 2 == 0);
}
@Override
public boolean acquire(MessageInstanceConsumer sub)
{
if(_message.getMessageNumber() % 2 == 0)
{
return false;
}
else
{
return super.acquire(sub);
}
}
@Override
public boolean makeAcquisitionUnstealable(final MessageInstanceConsumer consumer)
{
return true;
}
@Override
public boolean makeAcquisitionStealable()
{
return true;
}
}
}