blob: 139e32420fb24d2611cfee015296682d2872a5d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.qpid.server.queue;
import junit.framework.TestCase;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageInstance.EntryState;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for {@link QueueEntryImpl}
*/
public abstract class QueueEntryImplTestBase extends TestCase
{
// tested entry
protected QueueEntryImpl _queueEntry;
protected QueueEntryImpl _queueEntry2;
protected QueueEntryImpl _queueEntry3;
public abstract QueueEntryImpl getQueueEntryImpl(int msgId);
public abstract void testCompareTo();
public abstract void testTraverseWithNoDeletedEntries();
public abstract void testTraverseWithDeletedEntries();
public void setUp() throws Exception
{
_queueEntry = getQueueEntryImpl(1);
_queueEntry2 = getQueueEntryImpl(2);
_queueEntry3 = getQueueEntryImpl(3);
}
protected void mockLogging()
{
final LogActor logActor = mock(LogActor.class);
when(logActor.getRootMessageLogger()).thenReturn(mock(RootMessageLogger.class));
CurrentActor.setDefault(logActor);
}
public void testAcquire()
{
assertTrue("Queue entry should be in AVAILABLE state before invoking of acquire method",
_queueEntry.isAvailable());
acquire();
}
public void testDelete()
{
delete();
}
/**
* Tests release method for entry in acquired state.
* <p>
* Entry in state ACQUIRED should be released and its status should be
* changed to AVAILABLE.
*/
public void testReleaseAcquired()
{
acquire();
_queueEntry.release();
assertTrue("Queue entry should be in AVAILABLE state after invoking of release method",
_queueEntry.isAvailable());
}
/**
* Tests release method for entry in deleted state.
* <p>
* Invoking release on deleted entry should not have any effect on its
* state.
*/
public void testReleaseDeleted()
{
delete();
_queueEntry.release();
assertTrue("Invoking of release on entry in DELETED state should not have any effect",
_queueEntry.isDeleted());
}
/**
* A helper method to put tested object into deleted state and assert the state
*/
private void delete()
{
_queueEntry.delete();
assertTrue("Queue entry should be in DELETED state after invoking of delete method",
_queueEntry.isDeleted());
}
/**
* A helper method to put tested entry into acquired state and assert the sate
*/
private void acquire()
{
_queueEntry.acquire(newConsumer());
assertTrue("Queue entry should be in ACQUIRED state after invoking of acquire method",
_queueEntry.isAcquired());
}
private QueueConsumer newConsumer()
{
final ConsumerTarget target = mock(ConsumerTarget.class);
when(target.getSessionModel()).thenReturn(mock(AMQSessionModel.class));
final QueueConsumer consumer = new QueueConsumer(null,null,true,true,"mock",false,target);
return consumer;
}
/**
* A helper method to get entry state
*
* @return entry state
*/
private EntryState getState()
{
EntryState state = null;
try
{
Field f = QueueEntryImpl.class.getDeclaredField("_state");
f.setAccessible(true);
state = (EntryState) f.get(_queueEntry);
}
catch (Exception e)
{
fail("Failure to get a state field: " + e.getMessage());
}
return state;
}
/**
* Tests rejecting a queue entry records the Consumer ID
* for later verification by isRejectedBy(consumerId).
*/
public void testRejectAndRejectedBy()
{
QueueConsumer sub = newConsumer();
assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
assertFalse("Queue entry should not yet have been acquired by a consumer", _queueEntry.isAcquired());
//acquire, reject, and release the message using the consumer
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub));
_queueEntry.reject();
_queueEntry.release();
//verify the rejection is recorded
assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
//repeat rejection using a second consumer
QueueConsumer sub2 = newConsumer();
assertFalse("Queue entry should not yet have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
assertTrue("Queue entry should have been able to be acquired", _queueEntry.acquire(sub2));
_queueEntry.reject();
//verify it still records being rejected by both consumers
assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub));
assertTrue("Queue entry should have been rejected by the consumer", _queueEntry.isRejectedBy(sub2));
}
/**
* Tests if entries in DEQUEUED or DELETED state are not returned by getNext method.
*/
public void testGetNext()
{
int numberOfEntries = 5;
QueueEntryImpl[] entries = new QueueEntryImpl[numberOfEntries];
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getName());
StandardQueue queue = new StandardQueue(mock(VirtualHost.class), null, queueAttributes);
OrderedQueueEntryList queueEntryList = queue.getEntries();
// create test entries
for(int i = 0; i < numberOfEntries ; i++)
{
ServerMessage message = mock(ServerMessage.class);
when(message.getMessageNumber()).thenReturn((long)i);
final MessageReference reference = mock(MessageReference.class);
when(reference.getMessage()).thenReturn(message);
when(message.newReference()).thenReturn(reference);
QueueEntryImpl entry = queueEntryList.add(message);
entries[i] = entry;
}
// test getNext for not acquired entries
for(int i = 0; i < numberOfEntries ; i++)
{
QueueEntryImpl queueEntry = entries[i];
QueueEntry next = queueEntry.getNextValidEntry();
if (i < numberOfEntries - 1)
{
assertEquals("Unexpected entry from QueueEntryImpl#getNext()", entries[i + 1], next);
}
else
{
assertNull("The next entry after the last should be null", next);
}
}
// discard second
entries[1].acquire();
entries[1].delete();
// discard third
entries[2].acquire();
entries[2].delete();
QueueEntry next = entries[0].getNextValidEntry();
assertEquals("expected forth entry",entries[3], next);
next = next.getNextValidEntry();
assertEquals("expected fifth entry", entries[4], next);
next = next.getNextValidEntry();
assertNull("The next entry after the last should be null", next);
}
}