blob: 616ee74b2d7dc13b92072a6e2b549f69845f2879 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQMessage;
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueEntryIterator;
import org.apache.qpid.server.queue.SimpleQueueEntryList;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.subscription.MockSubscription;
import org.apache.qpid.server.subscription.Subscription;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
/**
* QPID-1385 : Race condition between added to unacked map and resending due to a rollback.
*
* In AMQChannel _unackedMap.clear() was done after the visit. This meant that the clear was not in the same
* synchronized block as as the preparation to resend.
*
* This clearing/prep for resend was done as a result of the rollback call. HOWEVER, the delivery thread was still
* in the process of sending messages to the client. It is therefore possible that a message could block on the
* _unackedMap lock waiting for the visit to compelete so that it can add the new message to the unackedMap....
* which is then cleared by the resend/rollback thread.
*
* This problem was encountered by the testSend2ThenRollback test.
*
* To try and increase the chance of the race condition occuring this test will send multiple messages so that the
* delivery thread will be in progress while the rollback method is called. Hopefully this will cause the
* deliveryTag to be lost
*/
public class ExtractResendAndRequeueTest extends TestCase
{
private UnacknowledgedMessageMapImpl _unacknowledgedMessageMap;
private static final int INITIAL_MSG_COUNT = 10;
private AMQQueue _queue = new MockAMQQueue(getName());
private MessageStore _messageStore = new MemoryMessageStore();
private LinkedList<QueueEntry> _referenceList = new LinkedList<QueueEntry>();
@Override
public void setUp() throws AMQException
{
_unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(100);
long id = 0;
SimpleQueueEntryList list = new SimpleQueueEntryList(_queue);
// Add initial messages to QueueEntryList
for (int count = 0; count < INITIAL_MSG_COUNT; count++)
{
AMQMessage msg = new MockAMQMessage(id);
list.add(msg);
//Increment ID;
id++;
}
// Iterate through the QueueEntryList and add entries to unacknowledgeMessageMap and referecenList
QueueEntryIterator queueEntries = list.iterator();
while(queueEntries.advance())
{
QueueEntry entry = queueEntries.getNode();
_unacknowledgedMessageMap.add(entry.getMessage().getMessageNumber(), entry);
// Store the entry for future inspection
_referenceList.add(entry);
}
assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
}
/**
* Helper method to create a new subscription and aquire the given messages.
*
* @param messageList The messages to aquire
*
* @return Subscription that performed the aquire
*/
private Subscription createSubscriptionAndAquireMessages(LinkedList<QueueEntry> messageList)
{
Subscription subscription = new MockSubscription();
// Aquire messages in subscription
for (QueueEntry entry : messageList)
{
entry.acquire(subscription);
}
return subscription;
}
/**
* This is the normal consumer rollback method.
*
* An active consumer that has aquired messages expects those messasges to be reset when rollback is requested.
*
* This test validates that the msgToResend map includes all the messages and none are left behind.
*
* @throws AMQException the visit interface throws this
*/
public void testResend() throws AMQException
{
//We don't need the subscription object here.
createSubscriptionAndAquireMessages(_referenceList);
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
// requeueIfUnabletoResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", INITIAL_MSG_COUNT, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
}
/**
* This is the normal consumer close method.
*
* When a consumer that has aquired messages expects closes the messages that it has aquired should be removed from
* the unacknowledgeMap and placed in msgToRequeue
*
* This test validates that the msgToRequeue map includes all the messages and none are left behind.
*
* @throws AMQException the visit interface throws this
*/
public void testRequeueDueToSubscriptionClosure() throws AMQException
{
Subscription subscription = createSubscriptionAndAquireMessages(_referenceList);
// Close subscription
subscription.close();
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
// requeueIfUnabletoResend doesn't matter here.
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
}
/**
* If the subscription is null, due to message being retrieved via a GET, And we request that messages are requeued
* requeueIfUnabletoResend(set to true) then all messages should be sent to the msgToRequeue map.
*
* @throws AMQException the visit interface throws this
*/
public void testRequeueDueToMessageHavingNoConsumerTag() throws AMQException
{
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
// requeueIfUnabletoResend = true so all messages should go to msgToRequeue
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
msgToResend, true, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", INITIAL_MSG_COUNT, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
}
/**
* If the subscription is null, due to message being retrieved via a GET, And we request that we don't
* requeueIfUnabletoResend(set to false) then all messages should be dropped as we do not have a dead letter queue.
*
* @throws AMQException the visit interface throws this
*/
public void testDrop() throws AMQException
{
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
// requeueIfUnabletoResend = false so all messages should be dropped all maps should be empty
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
msgToResend, false, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
for (QueueEntry entry : _referenceList)
{
assertTrue("Message was not discarded", entry.isDeleted());
}
}
/**
* If the subscription is null, due to message being retrieved via a GET, AND the queue upon which the message was
* delivered has been deleted then it is not possible to requeue. Currently we simply discar the message but in the
* future we may wish to dead letter the message.
*
* Validate that at the end of the visit all Maps are empty and all messages are marked as deleted
*
* @throws AMQException the visit interface throws this
*/
public void testDiscard() throws AMQException
{
final Map<Long, QueueEntry> msgToRequeue = new LinkedHashMap<Long, QueueEntry>();
final Map<Long, QueueEntry> msgToResend = new LinkedHashMap<Long, QueueEntry>();
_queue.delete();
// requeueIfUnabletoResend : value doesn't matter here as queue has been deleted
_unacknowledgedMessageMap.visit(new ExtractResendAndRequeue(_unacknowledgedMessageMap, msgToRequeue,
msgToResend, false, _messageStore));
assertEquals("Message count for resend not correct.", 0, msgToResend.size());
assertEquals("Message count for requeue not correct.", 0, msgToRequeue.size());
assertEquals("Map was not emptied", 0, _unacknowledgedMessageMap.size());
for (QueueEntry entry : _referenceList)
{
assertTrue("Message was not discarded", entry.isDeleted());
}
}
}