QPID-8514: [Broker-J] Delete next available entry if the least significant entry is acquired by the consumer
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
index 7c110f8..bbae28a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
@@ -81,6 +81,7 @@
{
case AVAILABLE:
queueStatistics.addToAvailable(sizeWithHeader);
+ _queue.checkCapacity();
break;
case ACQUIRED:
if(isConsumerAcquired && !wasConsumerAcquired)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
index 7a43dab..c64f875 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
@@ -19,13 +19,20 @@
package org.apache.qpid.server.queue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.QueueMessages;
import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
public class RingOverflowPolicyHandler implements OverflowPolicyHandler
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(RingOverflowPolicyHandler.class);
private final Handler _handler;
RingOverflowPolicyHandler(final Queue<?> queue,
@@ -38,7 +45,7 @@
@Override
public void checkOverflow(final QueueEntry newlyEnqueued)
{
- _handler.checkOverflow();
+ _handler.checkOverflow(newlyEnqueued);
}
private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener
@@ -57,10 +64,10 @@
@Override
void onMaximumQueueDepthChange(final Queue<?> queue)
{
- checkOverflow();
+ checkOverflow(null);
}
- private void checkOverflow()
+ private void checkOverflow(final QueueEntry newlyEnqueued)
{
// When this method causes an entry to be deleted, the size of the queue is changed, leading to
// checkOverflow being called again (because for other policies this may trigger relaxation of flow control,
@@ -78,6 +85,7 @@
int counter = 0;
int queueDepthMessages;
long queueDepthBytes;
+ QueueEntry lastSeenEntry = null;
do
{
queueDepthMessages = _queue.getQueueDepthMessages();
@@ -94,22 +102,26 @@
overflow = true;
}
- QueueEntry entry = _queue.getLeastSignificantOldestEntry();
-
- if (entry != null)
+ lastSeenEntry = lastSeenEntry == null
+ ? _queue.getLeastSignificantOldestEntry()
+ : lastSeenEntry.getNextValidEntry();
+ if (lastSeenEntry != null)
{
- counter++;
- _queue.deleteEntry(entry);
- }
- else
- {
- queueDepthMessages = _queue.getQueueDepthMessages();
- queueDepthBytes = _queue.getQueueDepthBytes();
- break;
+ // ensure that we are deleting only entries before the newly enqueued one
+ if (newlyEnqueued != null && lastSeenEntry.compareTo(newlyEnqueued) >= 0)
+ {
+ // stop at new entry
+ lastSeenEntry = null;
+ }
+ else if (lastSeenEntry.acquireOrSteal(null))
+ {
+ counter++;
+ deleteAcquiredEntry(lastSeenEntry);
+ }
}
}
}
- while (bytesOverflow || messagesOverflow);
+ while ((bytesOverflow || messagesOverflow) && lastSeenEntry != null);
if (overflow)
{
@@ -126,6 +138,27 @@
}
}
}
- }
+ private void deleteAcquiredEntry(final QueueEntry entry)
+ {
+ final MessageStore messageStore = _queue.getVirtualHost().getMessageStore();
+ final ServerTransaction txn =
+ new AsyncAutoCommitTransaction(messageStore, (future, action) -> action.postCommit());
+ txn.dequeue(entry.getEnqueueRecord(),
+ new ServerTransaction.Action()
+ {
+ @Override
+ public void postCommit()
+ {
+ entry.delete();
+ }
+
+ @Override
+ public void onRollback()
+ {
+
+ }
+ });
+ }
+ }
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
index a69af14..7143fcd 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
@@ -19,7 +19,6 @@
package org.apache.qpid.server.queue;
-import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
@@ -40,6 +39,9 @@
import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.ProducerFlowControlOverflowPolicyHandlerTest.LogMessageMatcher;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.apache.qpid.test.utils.UnitTestBase;
public class RingOverflowPolicyHandlerTest extends UnitTestBase
@@ -52,16 +54,24 @@
@Before
public void setUp() throws Exception
{
-
_eventLogger = mock(EventLogger.class);
_subject = mock(LogSubject.class);
+ final Transaction tx = mock(Transaction.class);
+
+ final MessageStore messageStore = mock(MessageStore.class);
+ when(messageStore.newTransaction()).thenReturn(tx);
+
+ final QueueManagingVirtualHost virtualHost = mock(QueueManagingVirtualHost.class);
+ when(virtualHost.getMessageStore()).thenReturn(messageStore);
+
_queue = mock(AbstractQueue.class);
when(_queue.getMaximumQueueDepthBytes()).thenReturn(-1L);
when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L);
when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.RING);
when(_queue.getQueueDepthMessages()).thenReturn(0);
when(_queue.getLogSubject()).thenReturn(_subject);
+ when(_queue.getVirtualHost()).thenReturn(virtualHost);
_ringOverflowPolicyHandler = new RingOverflowPolicyHandler(_queue, _eventLogger);
}
@@ -77,7 +87,7 @@
_ringOverflowPolicyHandler.checkOverflow(null);
- verify(_queue).deleteEntry(lastEntry);
+ verify(lastEntry).delete();
LogMessage dropped = QueueMessages.DROPPED(1L, 4, 1, 5,-1);
verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
verifyNoMoreInteractions(_eventLogger);
@@ -94,7 +104,7 @@
_ringOverflowPolicyHandler.checkOverflow(null);
- verify((AbstractQueue<?>) _queue).deleteEntry(lastEntry);
+ verify(lastEntry).delete();
LogMessage dropped = QueueMessages.DROPPED(1, 4, 5, -1,5);
verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
verifyNoMoreInteractions(_eventLogger);
@@ -103,26 +113,28 @@
@Test
public void testCheckOverflowWhenUnderfullBytes() throws Exception
{
+ QueueEntry lastEntry = createLastEntry();
when(_queue.getQueueDepthBytes()).thenReturn(5L);
when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
when(_queue.getQueueDepthMessages()).thenReturn(3);
_ringOverflowPolicyHandler.checkOverflow(null);
- verify(_queue, never()).deleteEntry(any(QueueEntry.class));
+ verify(lastEntry, never()).delete();
verifyNoMoreInteractions(_eventLogger);
}
@Test
public void testCheckOverflowWhenUnderfullMessages() throws Exception
{
+ QueueEntry lastEntry = createLastEntry();
when(_queue.getQueueDepthMessages()).thenReturn(5);
when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
when(_queue.getQueueDepthBytes()).thenReturn(10L);
_ringOverflowPolicyHandler.checkOverflow(null);
- verify(_queue, never()).deleteEntry(any(QueueEntry.class));
+ verify(lastEntry, never()).delete();
verifyNoMoreInteractions(_eventLogger);
}
@@ -133,6 +145,7 @@
when(oldestMessage.getMessageHeader()).thenReturn(oldestMessageHeader);
QueueEntry oldestEntry = mock(QueueEntry.class);
when(oldestEntry.getMessage()).thenReturn(oldestMessage);
+ when(oldestEntry.acquireOrSteal(null)).thenReturn(true);
return oldestEntry;
}
}
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java
new file mode 100644
index 0000000..584485f
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java
@@ -0,0 +1,213 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class RingOverflowPolicyTest extends UnitTestBase
+{
+ private TaskExecutor _taskExecutor;
+ private QueueManagingVirtualHost<?> _virtualHost;
+ private AtomicLong _messageId;
+
+ @Before
+ public void setUp() throws Exception
+ {
+ _taskExecutor = new TaskExecutorImpl();
+ _taskExecutor.start();
+ String name = getClass().getName();
+ final VirtualHostNode<?> virtualHostNode = BrokerTestHelper.createVirtualHostNodeMock(
+ name, true, BrokerTestHelper.createAccessControlMock(), BrokerTestHelper.createBrokerMock());
+ when(virtualHostNode.getChildExecutor()).thenReturn(_taskExecutor);
+ when(virtualHostNode.getTaskExecutor()).thenReturn(_taskExecutor);
+
+ final Map<String, Object> virtualHostAttributes = new HashMap<>();
+ virtualHostAttributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+ virtualHostAttributes.put(VirtualHost.NAME, name);
+ virtualHostAttributes.put(QueueManagingVirtualHost.CONNECTION_THREAD_POOL_SIZE, 2);
+ virtualHostAttributes.put(QueueManagingVirtualHost.NUMBER_OF_SELECTORS, 1);
+
+ final ConfiguredObjectFactory objectFactory = virtualHostNode.getObjectFactory();
+ final QueueManagingVirtualHost<?> host = (QueueManagingVirtualHost<?>)objectFactory.create(VirtualHost.class, virtualHostAttributes, virtualHostNode);
+ final AbstractVirtualHost abstractVirtualHost = (AbstractVirtualHost) host;
+ abstractVirtualHost.start();
+ when(virtualHostNode.getVirtualHost()).thenReturn(abstractVirtualHost);
+ _virtualHost = host;
+ _messageId = new AtomicLong();
+ }
+
+ @After
+ public void tearDown() throws Exception
+ {
+ _virtualHost.close();
+ _taskExecutor.stop();
+ }
+
+ @Test
+ public void testEnqueueWithOverflowWhenLeastSignificantEntryIsAcquiredByConsumer() throws Exception
+ {
+ final Queue<?> queue = createTestRingQueue(2);
+
+ final ServerMessage<?> message1 = enqueueTestMessage(queue);
+
+ final TestConsumerTarget consumerTarget = createTestConsumerTargetAndConsumer(queue);
+ final boolean received = consumerTarget.processPending();
+ assertThat(received, is(true));
+
+ final MessageInstance receivedMessage = consumerTarget.getMessages().remove(0);
+ assertThat(receivedMessage, is(notNullValue()));
+ assertThat(receivedMessage.isAcquired(), is(true));
+ assertThat(receivedMessage.getMessage(), is(message1));
+
+ final ServerMessage<?> message2 = enqueueTestMessage(queue);
+ assertThat(queue.getQueueDepthMessages(), is(equalTo(2)));
+
+ final ServerMessage<?> message3 = enqueueTestMessage(queue);
+ assertThat(queue.getQueueDepthMessages(), is(equalTo(2)));
+
+ assertThat(message2.isReferenced(queue), is(equalTo(false)));
+ assertThat(message3.isReferenced(queue), is(equalTo(true)));
+ }
+
+ @Test
+ public void testLeastSignificantEntryAcquiredByConsumerIsDeletedAfterRelease() throws Exception
+ {
+ final Queue<?> queue = createTestRingQueue(1);
+
+ final ServerMessage<?> message1 = enqueueTestMessage(queue);
+
+ final TestConsumerTarget consumerTarget = createTestConsumerTargetAndConsumer(queue);
+ final boolean received = consumerTarget.processPending();
+ assertThat(received, is(true));
+
+ final MessageInstance receivedMessage = consumerTarget.getMessages().remove(0);
+ assertThat(receivedMessage, is(notNullValue()));
+ assertThat(receivedMessage.isAcquired(), is(true));
+ assertThat(receivedMessage.getMessage(), is(message1));
+
+ final ServerMessage<?> message2 = enqueueTestMessage(queue);
+ assertThat(queue.getQueueDepthMessages(), is(equalTo(2)));
+
+ assertThat(message1.isReferenced(queue), is(equalTo(true)));
+ assertThat(message2.isReferenced(queue), is(equalTo(true)));
+
+ receivedMessage.release();
+ assertThat(queue.getQueueDepthMessages(), is(equalTo(1)));
+ assertThat(message1.isReferenced(queue), is(equalTo(false)));
+ assertThat(message2.isReferenced(queue), is(equalTo(true)));
+ }
+
+ private Queue<?> createTestRingQueue(final int messageLimit)
+ {
+ final Map<String, Object> attributes = new HashMap<>();
+ attributes.put(Queue.NAME, getTestName());
+ attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING.name());
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, messageLimit);
+ return _virtualHost.createChild(Queue.class, attributes);
+ }
+
+ private TestConsumerTarget createTestConsumerTargetAndConsumer(final Queue<?> queue) throws Exception
+ {
+ final TestConsumerTarget consumerTarget = new TestConsumerTarget();
+ queue.addConsumer(consumerTarget,
+ null,
+ InternalMessage.class,
+ getTestName(),
+ EnumSet.of(ConsumerOption.ACQUIRES, ConsumerOption.SEES_REQUEUES),
+ 0);
+ return consumerTarget;
+ }
+
+ private ServerMessage<?> enqueueTestMessage(final Queue<?> queue)
+ {
+ final ServerMessage<?> message = createMessage(_messageId.incrementAndGet(), queue.getName());
+ final MessageEnqueueRecord record = createMessageEnqueueRecord(queue.getId(), message.getMessageNumber());
+ queue.enqueue(message, null, record);
+ return message;
+ }
+
+ private MessageEnqueueRecord createMessageEnqueueRecord(final UUID queueId, final long messageNumber)
+ {
+ return new MessageEnqueueRecord()
+ {
+ @Override
+ public UUID getQueueId()
+ {
+ return queueId;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return messageNumber;
+ }
+ };
+ }
+
+ private ServerMessage<?> createMessage(final long messageNumber, final String queueName)
+ {
+ final AMQMessageHeader amqpHeader = mock(AMQMessageHeader.class);
+ when(amqpHeader.getMessageId()).thenReturn(String.valueOf(messageNumber));
+ when(amqpHeader.getExpiration()).thenReturn(0L);
+ final Serializable messageContent = String.format("test message %d", messageNumber);
+ return InternalMessage.createMessage(_virtualHost.getMessageStore(),
+ amqpHeader,
+ messageContent,
+ false,
+ queueName);
+ }
+}