| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.server.protocol.v0_8; |
| |
| |
| import org.apache.qpid.AMQException; |
| import org.apache.qpid.framing.AMQShortString; |
| import org.apache.qpid.framing.BasicContentHeaderProperties; |
| import org.apache.qpid.framing.ContentHeaderBody; |
| import org.apache.qpid.framing.abstraction.MessagePublishInfo; |
| import org.apache.qpid.server.consumer.ConsumerImpl; |
| import org.apache.qpid.server.flow.LimitlessCreditManager; |
| import org.apache.qpid.server.flow.Pre0_10CreditManager; |
| import org.apache.qpid.server.message.MessageInstance; |
| import org.apache.qpid.server.queue.AMQQueue; |
| import org.apache.qpid.server.store.StoredMessage; |
| import org.apache.qpid.server.store.TestMemoryMessageStore; |
| import org.apache.qpid.server.txn.AutoCommitTransaction; |
| import org.apache.qpid.server.txn.ServerTransaction; |
| import org.apache.qpid.server.util.BrokerTestHelper; |
| import org.apache.qpid.server.virtualhost.VirtualHost; |
| import org.apache.qpid.test.utils.QpidTestCase; |
| |
| import java.util.ArrayList; |
| import java.util.EnumSet; |
| import java.util.Set; |
| |
| /** |
| * Tests that acknowledgements are handled correctly. |
| */ |
| public class AckTest extends QpidTestCase |
| { |
| private ConsumerTarget_0_8 _subscriptionTarget; |
| private ConsumerImpl _consumer; |
| |
| private AMQProtocolSession _protocolSession; |
| |
| private TestMemoryMessageStore _messageStore; |
| |
| private AMQChannel _channel; |
| |
| private AMQQueue _queue; |
| |
| private static final AMQShortString DEFAULT_CONSUMER_TAG = new AMQShortString("conTag"); |
| private VirtualHost _virtualHost; |
| |
| @Override |
| public void setUp() throws Exception |
| { |
| super.setUp(); |
| BrokerTestHelper.setUp(); |
| _channel = BrokerTestHelper_0_8.createChannel(5); |
| _protocolSession = _channel.getProtocolSession(); |
| _virtualHost = _protocolSession.getVirtualHost(); |
| _queue = BrokerTestHelper.createQueue(getTestName(), _virtualHost); |
| _messageStore = (TestMemoryMessageStore)_virtualHost.getMessageStore(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception |
| { |
| BrokerTestHelper.tearDown(); |
| super.tearDown(); |
| } |
| |
| private void publishMessages(int count) throws AMQException |
| { |
| publishMessages(count, false); |
| } |
| |
| private void publishMessages(int count, boolean persistent) throws AMQException |
| { |
| for (int i = 1; i <= count; i++) |
| { |
| // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0) |
| // TODO: Establish some way to determine the version for the test. |
| MessagePublishInfo publishBody = new MessagePublishInfo() |
| { |
| |
| public AMQShortString getExchange() |
| { |
| return new AMQShortString("someExchange"); |
| } |
| |
| public void setExchange(AMQShortString exchange) |
| { |
| //To change body of implemented methods use File | Settings | File Templates. |
| } |
| |
| public boolean isImmediate() |
| { |
| return false; |
| } |
| |
| public boolean isMandatory() |
| { |
| return false; |
| } |
| |
| public AMQShortString getRoutingKey() |
| { |
| return new AMQShortString("rk"); |
| } |
| }; |
| BasicContentHeaderProperties b = new BasicContentHeaderProperties(); |
| ContentHeaderBody cb = new ContentHeaderBody(); |
| cb.setProperties(b); |
| |
| if (persistent) |
| { |
| //This is DeliveryMode.PERSISTENT |
| b.setDeliveryMode((byte) 2); |
| } |
| |
| // we increment the reference here since we are not delivering the messaging to any queues, which is where |
| // the reference is normally incremented. The test is easier to construct if we have direct access to the |
| // subscription |
| ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>(); |
| qs.add(_queue); |
| MessageMetaData mmd = new MessageMetaData(publishBody,cb, System.currentTimeMillis()); |
| final StoredMessage storedMessage = _messageStore.addMessage(mmd); |
| final AMQMessage message = new AMQMessage(storedMessage); |
| ServerTransaction txn = new AutoCommitTransaction(_messageStore); |
| txn.enqueue(_queue, message, |
| new ServerTransaction.Action() |
| { |
| public void postCommit() |
| { |
| _queue.enqueue(message,null); |
| } |
| |
| public void onRollback() |
| { |
| //To change body of implemented methods use File | Settings | File Templates. |
| } |
| }); |
| |
| } |
| try |
| { |
| Thread.sleep(2000L); |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| } |
| |
| } |
| |
| /** |
| * Tests that the acknowledgements are correctly associated with a channel and |
| * order is preserved when acks are enabled |
| */ |
| public void testAckChannelAssociationTest() throws Exception |
| { |
| _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, |
| DEFAULT_CONSUMER_TAG, |
| null, |
| new LimitlessCreditManager()); |
| _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), |
| EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, |
| ConsumerImpl.Option.ACQUIRES)); |
| final int msgCount = 10; |
| publishMessages(msgCount, true); |
| UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); |
| assertEquals("Unexpected size for unacknowledged message map",msgCount,map.size()); |
| |
| Set<Long> deliveryTagSet = map.getDeliveryTags(); |
| int i = 1; |
| for (long deliveryTag : deliveryTagSet) |
| { |
| assertTrue(deliveryTag == i); |
| i++; |
| MessageInstance unackedMsg = map.get(deliveryTag); |
| assertTrue(unackedMsg.getOwningResource() == _queue); |
| } |
| |
| } |
| |
| /** |
| * Tests that in no-ack mode no messages are retained |
| */ |
| public void testNoAckMode() throws Exception |
| { |
| // false arg means no acks expected |
| _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel, |
| DEFAULT_CONSUMER_TAG, |
| null, |
| new LimitlessCreditManager()); |
| _consumer = _queue.addConsumer(_subscriptionTarget, |
| null, |
| AMQMessage.class, |
| DEFAULT_CONSUMER_TAG.toString(), |
| EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, |
| ConsumerImpl.Option.ACQUIRES)); |
| final int msgCount = 10; |
| publishMessages(msgCount); |
| UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); |
| assertTrue(map.size() == 0); |
| assertTrue(_messageStore.getMessageCount() == 0); |
| |
| |
| } |
| |
| /** |
| * Tests that in no-ack mode no messages are retained |
| */ |
| public void testPersistentNoAckMode() throws Exception |
| { |
| // false arg means no acks expected |
| |
| _subscriptionTarget = ConsumerTarget_0_8.createNoAckTarget(_channel, |
| DEFAULT_CONSUMER_TAG, |
| null, |
| new LimitlessCreditManager()); |
| _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), |
| EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES)); |
| final int msgCount = 10; |
| publishMessages(msgCount, true); |
| |
| UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); |
| assertTrue(map.size() == 0); |
| assertTrue(_messageStore.getMessageCount() == 0); |
| |
| |
| } |
| |
| /** |
| * Tests that a single acknowledgement is handled correctly (i.e multiple flag not |
| * set case) |
| */ |
| public void testSingleAckReceivedTest() throws Exception |
| { |
| |
| _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, |
| DEFAULT_CONSUMER_TAG, |
| null, |
| new LimitlessCreditManager()); |
| _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), |
| EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, |
| ConsumerImpl.Option.ACQUIRES)); |
| |
| final int msgCount = 10; |
| publishMessages(msgCount); |
| |
| _channel.acknowledgeMessage(5, false); |
| UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); |
| assertEquals("Map not expected size",msgCount - 1,map.size()); |
| |
| Set<Long> deliveryTagSet = map.getDeliveryTags(); |
| int i = 1; |
| for (long deliveryTag : deliveryTagSet) |
| { |
| assertTrue(deliveryTag == i); |
| MessageInstance unackedMsg = map.get(deliveryTag); |
| assertTrue(unackedMsg.getOwningResource() == _queue); |
| // 5 is the delivery tag of the message that *should* be removed |
| if (++i == 5) |
| { |
| ++i; |
| } |
| } |
| } |
| |
| /** |
| * Tests that a single acknowledgement is handled correctly (i.e multiple flag not |
| * set case) |
| */ |
| public void testMultiAckReceivedTest() throws Exception |
| { |
| |
| _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, |
| DEFAULT_CONSUMER_TAG, |
| null, |
| new LimitlessCreditManager()); |
| _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), |
| EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, |
| ConsumerImpl.Option.ACQUIRES)); |
| |
| final int msgCount = 10; |
| publishMessages(msgCount); |
| |
| |
| |
| _channel.acknowledgeMessage(5, true); |
| UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); |
| assertTrue(map.size() == 5); |
| |
| Set<Long> deliveryTagSet = map.getDeliveryTags(); |
| int i = 1; |
| for (long deliveryTag : deliveryTagSet) |
| { |
| assertTrue(deliveryTag == i + 5); |
| MessageInstance unackedMsg = map.get(deliveryTag); |
| assertTrue(unackedMsg.getOwningResource() == _queue); |
| ++i; |
| } |
| } |
| |
| /** |
| * Tests that a multiple acknowledgement is handled correctly. When ack'ing all pending msgs. |
| */ |
| public void testMultiAckAllReceivedTest() throws Exception |
| { |
| |
| _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, |
| DEFAULT_CONSUMER_TAG, |
| null, |
| new LimitlessCreditManager()); |
| _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), |
| EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, |
| ConsumerImpl.Option.ACQUIRES)); |
| |
| final int msgCount = 10; |
| publishMessages(msgCount); |
| |
| _channel.acknowledgeMessage(0, true); |
| UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); |
| assertTrue(map.size() == 0); |
| |
| Set<Long> deliveryTagSet = map.getDeliveryTags(); |
| int i = 1; |
| for (long deliveryTag : deliveryTagSet) |
| { |
| assertTrue(deliveryTag == i + 5); |
| MessageInstance unackedMsg = map.get(deliveryTag); |
| assertTrue(unackedMsg.getOwningResource() == _queue); |
| ++i; |
| } |
| } |
| |
| /** |
| * A regression fixing QPID-1136 showed this up |
| * |
| * @throws Exception |
| */ |
| public void testMessageDequeueRestoresCreditTest() throws Exception |
| { |
| // Send 10 messages |
| Pre0_10CreditManager creditManager = new Pre0_10CreditManager(0l, 1); |
| |
| |
| _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); |
| _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), |
| EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, |
| ConsumerImpl.Option.ACQUIRES)); |
| |
| final int msgCount = 1; |
| publishMessages(msgCount); |
| |
| _consumer.externalStateChange(); |
| |
| _channel.acknowledgeMessage(1, false); |
| |
| // Check credit available |
| assertTrue("No credit available", creditManager.hasCredit()); |
| |
| } |
| |
| public static junit.framework.Test suite() |
| { |
| return new junit.framework.TestSuite(AckTest.class); |
| } |
| } |