blob: 44a65f11208178553ba6aa33858ad39695f78291 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.queue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.security.Principal;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.qpid.server.configuration.updater.CurrentThreadTaskExecutor;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.BrokerModel;
import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.store.TransactionLogResource;
public class StandardQueueEntryListTest extends QueueEntryListTestBase
{
private StandardQueueImpl _testQueue;
private StandardQueueEntryList _sqel;
private static final String SCAVENGE_PROP = "qpid.queue.scavenge_count";
private String oldScavengeValue = null;
private ConfiguredObjectFactoryImpl _factory;
@Override
protected void setUp()
{
oldScavengeValue = System.setProperty(SCAVENGE_PROP, "9");
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
final VirtualHost virtualHost = mock(VirtualHost.class);
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
_factory = new ConfiguredObjectFactoryImpl(BrokerModel.getInstance());
when(virtualHost.getObjectFactory()).thenReturn(_factory);
when(virtualHost.getModel()).thenReturn(_factory.getModel());
TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
when(virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
when(virtualHost.getChildExecutor()).thenReturn(taskExecutor);
when(virtualHost.getPrincipal()).thenReturn(mock(Principal.class));
_testQueue = new StandardQueueImpl(queueAttributes, virtualHost);
_testQueue.open();
_sqel = _testQueue.getEntries();
for(int i = 1; i <= 100; i++)
{
final ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long) i);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
final QueueEntry bleh = _sqel.add(message, null);
assertNotNull("QE should not have been null", bleh);
}
}
@Override
protected void tearDown()
{
if(oldScavengeValue != null)
{
System.setProperty(SCAVENGE_PROP, oldScavengeValue);
}
else
{
System.clearProperty(SCAVENGE_PROP);
}
}
@Override
public StandardQueueEntryList getTestList()
{
return getTestList(false);
}
@Override
public StandardQueueEntryList getTestList(boolean newList)
{
if(newList)
{
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
final VirtualHost virtualHost = mock(VirtualHost.class);
when(virtualHost.getEventLogger()).thenReturn(new EventLogger());
when(virtualHost.getObjectFactory()).thenReturn(_factory);
when(virtualHost.getModel()).thenReturn(_factory.getModel());
when(virtualHost.getPrincipal()).thenReturn(mock(Principal.class));
TaskExecutor taskExecutor = CurrentThreadTaskExecutor.newStartedInstance();
when(virtualHost.getTaskExecutor()).thenReturn(taskExecutor);
when(virtualHost.getChildExecutor()).thenReturn(taskExecutor);
StandardQueueImpl queue = new StandardQueueImpl(queueAttributes, virtualHost);
queue.open();
return queue.getEntries();
}
else
{
return _sqel;
}
}
@Override
public long getExpectedFirstMsgId()
{
return 1;
}
@Override
public int getExpectedListLength()
{
return 100;
}
@Override
public ServerMessage getTestMessageToAdd()
{
ServerMessage msg = mock(ServerMessage.class);
when(msg.getMessageNumber()).thenReturn(1l);
return msg;
}
@Override
protected StandardQueueImpl getTestQueue()
{
return _testQueue;
}
public void testScavenge() throws Exception
{
OrderedBaseQueueEntryList sqel = new StandardQueueEntryList(mock(StandardQueueImpl.class));
ConcurrentMap<Integer,QueueEntry> entriesMap = new ConcurrentHashMap<Integer,QueueEntry>();
//Add messages to generate QueueEntry's
for(int i = 1; i <= 100 ; i++)
{
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long) i);
MessageReference ref = mock(MessageReference.class);
when(ref.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(ref);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(ref);
QueueEntry bleh = sqel.add(message, null);
assertNotNull("QE should not have been null", bleh);
entriesMap.put(i,bleh);
}
OrderedQueueEntry head = (OrderedQueueEntry) sqel.getHead();
//We shall now delete some specific messages mid-queue that will lead to
//requiring a scavenge once the requested threshold of 9 deletes is passed
//Delete the 2nd message only
assertTrue("Failed to delete QueueEntry", remove(entriesMap,2));
verifyDeletedButPresentBeforeScavenge(head, 2);
//Delete messages 12 to 14
assertTrue("Failed to delete QueueEntry", remove(entriesMap,12));
verifyDeletedButPresentBeforeScavenge(head, 12);
assertTrue("Failed to delete QueueEntry", remove(entriesMap,13));
verifyDeletedButPresentBeforeScavenge(head, 13);
assertTrue("Failed to delete QueueEntry", remove(entriesMap,14));
verifyDeletedButPresentBeforeScavenge(head, 14);
//Delete message 20 only
assertTrue("Failed to delete QueueEntry", remove(entriesMap,20));
verifyDeletedButPresentBeforeScavenge(head, 20);
//Delete messages 81 to 84
assertTrue("Failed to delete QueueEntry", remove(entriesMap,81));
verifyDeletedButPresentBeforeScavenge(head, 81);
assertTrue("Failed to delete QueueEntry", remove(entriesMap,82));
verifyDeletedButPresentBeforeScavenge(head, 82);
assertTrue("Failed to delete QueueEntry", remove(entriesMap,83));
verifyDeletedButPresentBeforeScavenge(head, 83);
assertTrue("Failed to delete QueueEntry", remove(entriesMap,84));
verifyDeletedButPresentBeforeScavenge(head, 84);
//Delete message 99 - this is the 10th message deleted that is after the queue head
//and so will invoke the scavenge() which is set to go after 9 previous deletions
assertTrue("Failed to delete QueueEntry", remove(entriesMap,99));
verifyAllDeletedMessagedNotPresent(head, entriesMap);
}
private boolean remove(Map<Integer,QueueEntry> entriesMap, int pos)
{
QueueEntry entry = entriesMap.remove(pos);
boolean wasDeleted = entry.isDeleted();
entry.acquire();
entry.delete();
return entry.isDeleted() && !wasDeleted;
}
private void verifyDeletedButPresentBeforeScavenge(OrderedQueueEntry head, long messageId)
{
//Use the head to get the initial entry in the queue
OrderedQueueEntry entry = head.getNextNode();
for(long i = 1; i < messageId ; i++)
{
assertEquals("Expected QueueEntry was not found in the list", i, (long) entry.getMessage().getMessageNumber());
entry = entry.getNextNode();
}
assertTrue("Entry should have been deleted", entry.isDeleted());
}
private void verifyAllDeletedMessagedNotPresent(OrderedQueueEntry head, Map<Integer,QueueEntry> remainingMessages)
{
//Use the head to get the initial entry in the queue
OrderedQueueEntry entry = head.getNextNode();
assertNotNull("Initial entry should not have been null", entry);
int count = 0;
while (entry != null)
{
assertFalse("Entry " + entry.getMessage().getMessageNumber() + " should not have been deleted", entry.isDeleted());
assertNotNull("QueueEntry "+entry.getMessage().getMessageNumber()+" was not found in the list of remaining entries " + remainingMessages,
remainingMessages.get((int)(entry.getMessage().getMessageNumber())));
count++;
entry = entry.getNextNode();
}
assertEquals("Count should have been equal",count,remainingMessages.size());
}
public void testGettingNextElement()
{
final int numberOfEntries = 5;
final OrderedQueueEntry[] entries = new OrderedQueueEntry[numberOfEntries];
final OrderedBaseQueueEntryList queueEntryList = getTestList(true);
// create test entries
for(int i = 0; i < numberOfEntries; i++)
{
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long)i);
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
when(message.newReference(any(TransactionLogResource.class))).thenReturn(reference);
entries[i] = (OrderedQueueEntry) queueEntryList.add(message, null);
}
// test getNext for not acquired entries
for(int i = 0; i < numberOfEntries; i++)
{
final OrderedQueueEntry next = entries[i].getNextValidEntry();
if(i < numberOfEntries - 1)
{
assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
}
else
{
assertNull("The next entry after the last should be null", next);
}
}
// delete second
entries[1].acquire();
entries[1].delete();
// dequeue third
entries[2].acquire();
entries[2].delete();
OrderedQueueEntry next = entries[2].getNextValidEntry();
assertEquals("expected forth entry", entries[3], next);
next = next.getNextValidEntry();
assertEquals("expected fifth entry", entries[4], next);
next = next.getNextValidEntry();
assertNull("The next entry after the last should be null", next);
}
}