| package org.apache.qpid.server.queue; |
| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| import junit.framework.TestCase; |
| |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.framing.BasicContentHeaderProperties; |
| import org.apache.qpid.framing.ContentHeaderBody; |
| import org.apache.qpid.framing.ContentHeaderProperties; |
| import org.apache.qpid.framing.FieldTable; |
| import org.apache.qpid.framing.abstraction.MessagePublishInfo; |
| import org.apache.qpid.server.exchange.DirectExchange; |
| import org.apache.qpid.server.exchange.Exchange; |
| import org.apache.qpid.server.registry.ApplicationRegistry; |
| import org.apache.qpid.server.store.MessageStore; |
| import org.apache.qpid.server.store.StoreContext; |
| import org.apache.qpid.server.store.TestableMemoryMessageStore; |
| import org.apache.qpid.server.subscription.MockSubscription; |
| import org.apache.qpid.server.subscription.Subscription; |
| import org.apache.qpid.server.subscription.SubscriptionImpl; |
| import org.apache.qpid.server.txn.NonTransactionalContext; |
| import org.apache.qpid.server.virtualhost.VirtualHost; |
| |
| public class SimpleAMQQueueTest extends TestCase |
| { |
| |
| protected SimpleAMQQueue _queue; |
| protected VirtualHost _virtualHost; |
| protected TestableMemoryMessageStore _store = new TestableMemoryMessageStore(); |
| protected AMQShortString _qname = new AMQShortString("qname"); |
| protected AMQShortString _owner = new AMQShortString("owner"); |
| protected AMQShortString _routingKey = new AMQShortString("routing key"); |
| protected DirectExchange _exchange = new DirectExchange(); |
| protected MockSubscription _subscription = new MockSubscription(); |
| protected FieldTable _arguments = null; |
| |
| MessagePublishInfo info = new MessagePublishInfo() |
| { |
| |
| public AMQShortString getExchange() |
| { |
| return null; |
| } |
| |
| public void setExchange(AMQShortString exchange) |
| { |
| //To change body of implemented methods use File | Settings | File Templates. |
| } |
| |
| public boolean isImmediate() |
| { |
| return false; |
| } |
| |
| public boolean isMandatory() |
| { |
| return false; |
| } |
| |
| public AMQShortString getRoutingKey() |
| { |
| return null; |
| } |
| }; |
| |
| @Override |
| protected void setUp() throws Exception |
| { |
| super.setUp(); |
| //Create Application Registry for test |
| ApplicationRegistry applicationRegistry = (ApplicationRegistry)ApplicationRegistry.getInstance(1); |
| |
| _virtualHost = new VirtualHost("vhost", _store); |
| applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost); |
| |
| _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, _virtualHost, _arguments); |
| } |
| |
| @Override |
| protected void tearDown() |
| { |
| _queue.stop(); |
| ApplicationRegistry.remove(1); |
| } |
| |
| public void testCreateQueue() throws AMQException |
| { |
| _queue.stop(); |
| try { |
| _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(null, false, _owner, false, _virtualHost, _arguments ); |
| assertNull("Queue was created", _queue); |
| } |
| catch (IllegalArgumentException e) |
| { |
| assertTrue("Exception was not about missing name", |
| e.getMessage().contains("name")); |
| } |
| |
| try { |
| _queue = new SimpleAMQQueue(_qname, false, _owner, false, null); |
| assertNull("Queue was created", _queue); |
| } |
| catch (IllegalArgumentException e) |
| { |
| assertTrue("Exception was not about missing vhost", |
| e.getMessage().contains("Host")); |
| } |
| |
| _queue = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_qname, false, _owner, false, |
| _virtualHost, _arguments); |
| assertNotNull("Queue was not created", _queue); |
| } |
| |
| public void testGetVirtualHost() |
| { |
| assertEquals("Virtual host was wrong", _virtualHost, _queue.getVirtualHost()); |
| } |
| |
| public void testBinding() |
| { |
| try |
| { |
| _queue.bind(_exchange, _routingKey, null); |
| assertTrue("Routing key was not bound", |
| _exchange.getBindings().containsKey(_routingKey)); |
| assertEquals("Queue was not bound to key", |
| _exchange.getBindings().get(_routingKey).get(0), |
| _queue); |
| assertEquals("Exchange binding count", 1, |
| _queue.getExchangeBindings().size()); |
| assertEquals("Wrong exchange bound", _routingKey, |
| _queue.getExchangeBindings().get(0).getRoutingKey()); |
| assertEquals("Wrong exchange bound", _exchange, |
| _queue.getExchangeBindings().get(0).getExchange()); |
| |
| _queue.unBind(_exchange, _routingKey, null); |
| assertFalse("Routing key was still bound", |
| _exchange.getBindings().containsKey(_routingKey)); |
| assertNull("Routing key was not empty", |
| _exchange.getBindings().get(_routingKey)); |
| } |
| catch (AMQException e) |
| { |
| assertNull("Unexpected exception", e); |
| } |
| } |
| |
| public void testSubscription() throws AMQException |
| { |
| // Check adding a subscription adds it to the queue |
| _queue.registerSubscription(_subscription, false); |
| assertEquals("Subscription did not get queue", _queue, |
| _subscription.getQueue()); |
| assertEquals("Queue does not have consumer", 1, |
| _queue.getConsumerCount()); |
| assertEquals("Queue does not have active consumer", 1, |
| _queue.getActiveConsumerCount()); |
| |
| // Check sending a message ends up with the subscriber |
| AMQMessage messageA = createMessage(new Long(24)); |
| _queue.enqueue(null, messageA); |
| assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); |
| |
| // Check removing the subscription removes it's information from the queue |
| _queue.unregisterSubscription(_subscription); |
| assertTrue("Subscription still had queue", _subscription.isClosed()); |
| assertFalse("Queue still has consumer", 1 == _queue.getConsumerCount()); |
| assertFalse("Queue still has active consumer", |
| 1 == _queue.getActiveConsumerCount()); |
| |
| AMQMessage messageB = createMessage(new Long (25)); |
| _queue.enqueue(null, messageB); |
| QueueEntry entry = _subscription.getLastSeenEntry(); |
| assertNull(entry); |
| } |
| |
| public void testQueueNoSubscriber() throws AMQException, InterruptedException |
| { |
| AMQMessage messageA = createMessage(new Long(24)); |
| _queue.enqueue(null, messageA); |
| _queue.registerSubscription(_subscription, false); |
| Thread.sleep(150); |
| assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); |
| } |
| |
| public void testExclusiveConsumer() throws AMQException |
| { |
| // Check adding an exclusive subscription adds it to the queue |
| _queue.registerSubscription(_subscription, true); |
| assertEquals("Subscription did not get queue", _queue, |
| _subscription.getQueue()); |
| assertEquals("Queue does not have consumer", 1, |
| _queue.getConsumerCount()); |
| assertEquals("Queue does not have active consumer", 1, |
| _queue.getActiveConsumerCount()); |
| |
| // Check sending a message ends up with the subscriber |
| AMQMessage messageA = createMessage(new Long(24)); |
| _queue.enqueue(null, messageA); |
| assertEquals(messageA, _subscription.getLastSeenEntry().getMessage()); |
| |
| // Check we cannot add a second subscriber to the queue |
| Subscription subB = new MockSubscription(); |
| Exception ex = null; |
| try |
| { |
| _queue.registerSubscription(subB, false); |
| } |
| catch (AMQException e) |
| { |
| ex = e; |
| } |
| assertNotNull(ex); |
| assertTrue(ex instanceof AMQException); |
| |
| // Check we cannot add an exclusive subscriber to a queue with an |
| // existing subscription |
| _queue.unregisterSubscription(_subscription); |
| _queue.registerSubscription(_subscription, false); |
| try |
| { |
| _queue.registerSubscription(subB, true); |
| } |
| catch (AMQException e) |
| { |
| ex = e; |
| } |
| assertNotNull(ex); |
| } |
| |
| public void testAutoDeleteQueue() throws Exception |
| { |
| _queue.stop(); |
| _queue = new SimpleAMQQueue(_qname, false, _owner, true, _virtualHost); |
| _queue.registerSubscription(_subscription, false); |
| AMQMessage message = createMessage(new Long(25)); |
| _queue.enqueue(null, message); |
| _queue.unregisterSubscription(_subscription); |
| assertTrue("Queue was not deleted when subscription was removed", |
| _queue.isDeleted()); |
| } |
| |
| public void testResend() throws Exception |
| { |
| _queue.registerSubscription(_subscription, false); |
| Long id = new Long(26); |
| AMQMessage message = createMessage(id); |
| _queue.enqueue(null, message); |
| QueueEntry entry = _subscription.getLastSeenEntry(); |
| entry.setRedelivered(true); |
| _queue.resend(entry, _subscription); |
| |
| } |
| |
| public void testGetFirstMessageId() throws Exception |
| { |
| // Create message |
| Long messageId = new Long(23); |
| AMQMessage message = createMessage(messageId); |
| |
| // Put message on queue |
| _queue.enqueue(null, message); |
| // Get message id |
| Long testmsgid = _queue.getMessagesOnTheQueue(1).get(0); |
| |
| // Check message id |
| assertEquals("Message ID was wrong", messageId, testmsgid); |
| } |
| |
| public void testGetFirstFiveMessageIds() throws Exception |
| { |
| for (int i = 0 ; i < 5; i++) |
| { |
| // Create message |
| Long messageId = new Long(i); |
| AMQMessage message = createMessage(messageId); |
| // Put message on queue |
| _queue.enqueue(null, message); |
| } |
| // Get message ids |
| List<Long> msgids = _queue.getMessagesOnTheQueue(5); |
| |
| // Check message id |
| for (int i = 0; i < 5; i++) |
| { |
| Long messageId = new Long(i); |
| assertEquals("Message ID was wrong", messageId, msgids.get(i)); |
| } |
| } |
| |
| public void testGetLastFiveMessageIds() throws Exception |
| { |
| for (int i = 0 ; i < 10; i++) |
| { |
| // Create message |
| Long messageId = new Long(i); |
| AMQMessage message = createMessage(messageId); |
| // Put message on queue |
| _queue.enqueue(null, message); |
| } |
| // Get message ids |
| List<Long> msgids = _queue.getMessagesOnTheQueue(5, 5); |
| |
| // Check message id |
| for (int i = 0; i < 5; i++) |
| { |
| Long messageId = new Long(i+5); |
| assertEquals("Message ID was wrong", messageId, msgids.get(i)); |
| } |
| } |
| |
| public void testEnqueueDequeueOfPersistentMessageToNonDurableQueue() throws AMQException |
| { |
| // Create IncomingMessage and nondurable queue |
| NonTransactionalContext txnContext = new NonTransactionalContext(_store, null, null, null); |
| IncomingMessage msg = new IncomingMessage(1L, info, txnContext, null); |
| ContentHeaderBody contentHeaderBody = new ContentHeaderBody(); |
| contentHeaderBody.properties = new BasicContentHeaderProperties(); |
| ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2); |
| msg.setContentHeaderBody(contentHeaderBody); |
| ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); |
| |
| // Send persistent message |
| qs.add(_queue); |
| msg.enqueue(qs); |
| msg.routingComplete(_store, new MessageHandleFactory()); |
| _store.storeMessageMetaData(null, new Long(1L), new MessageMetaData(info, contentHeaderBody, 1)); |
| |
| // Check that it is enqueued |
| AMQQueue data = _store.getMessages().get(1L); |
| assertNotNull(data); |
| |
| // Dequeue message |
| MockQueueEntry entry = new MockQueueEntry(); |
| AMQMessage amqmsg = new AMQMessage(1L, _store, new MessageHandleFactory(), txnContext); |
| |
| entry.setMessage(amqmsg); |
| _queue.dequeue(null, entry); |
| |
| // Check that it is dequeued |
| data = _store.getMessages().get(1L); |
| assertNull(data); |
| } |
| |
| |
| // FIXME: move this to somewhere useful |
| private static AMQMessageHandle createMessageHandle(final long messageId, final MessagePublishInfo publishBody) |
| { |
| final AMQMessageHandle amqMessageHandle = (new MessageHandleFactory()).createMessageHandle(messageId, |
| null, |
| false); |
| try |
| { |
| amqMessageHandle.setPublishAndContentHeaderBody(new StoreContext(), |
| publishBody, |
| new ContentHeaderBody() |
| { |
| public int getSize() |
| { |
| return 1; |
| } |
| }); |
| } |
| catch (AMQException e) |
| { |
| // won't happen |
| } |
| |
| |
| return amqMessageHandle; |
| } |
| |
| public class TestMessage extends AMQMessage |
| { |
| private final long _tag; |
| private int _count; |
| |
| TestMessage(long tag, long messageId, MessagePublishInfo publishBody, StoreContext storeContext) |
| throws AMQException |
| { |
| super(createMessageHandle(messageId, publishBody), storeContext, publishBody); |
| _tag = tag; |
| } |
| |
| |
| public boolean incrementReference() |
| { |
| _count++; |
| return true; |
| } |
| |
| public void decrementReference(StoreContext context) |
| { |
| _count--; |
| } |
| |
| void assertCountEquals(int expected) |
| { |
| assertEquals("Wrong count for message with tag " + _tag, expected, _count); |
| } |
| } |
| |
| protected AMQMessage createMessage(Long id) throws AMQException |
| { |
| AMQMessage messageA = new TestMessage(id, id, info, new StoreContext()); |
| return messageA; |
| } |
| } |