QPID-8514: [Broker-J] Delete next available entry if the least significant entry is acquired by the consumer
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
index 7c110f8..bbae28a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueueEntryList.java
@@ -81,6 +81,7 @@
         {
             case AVAILABLE:
                 queueStatistics.addToAvailable(sizeWithHeader);
+                _queue.checkCapacity();
                 break;
             case ACQUIRED:
                 if(isConsumerAcquired && !wasConsumerAcquired)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
index 7a43dab..c64f875 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
@@ -19,13 +19,20 @@
 
 package org.apache.qpid.server.queue;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.QueueMessages;
 import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
+import org.apache.qpid.server.txn.ServerTransaction;
 
 public class RingOverflowPolicyHandler implements OverflowPolicyHandler
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(RingOverflowPolicyHandler.class);
     private final Handler _handler;
 
     RingOverflowPolicyHandler(final Queue<?> queue,
@@ -38,7 +45,7 @@
     @Override
     public void checkOverflow(final QueueEntry newlyEnqueued)
     {
-        _handler.checkOverflow();
+        _handler.checkOverflow(newlyEnqueued);
     }
 
     private static class Handler extends OverflowPolicyMaximumQueueDepthChangeListener
@@ -57,10 +64,10 @@
         @Override
         void onMaximumQueueDepthChange(final Queue<?> queue)
         {
-            checkOverflow();
+            checkOverflow(null);
         }
 
-        private void checkOverflow()
+        private void checkOverflow(final QueueEntry newlyEnqueued)
         {
             // When this method causes an entry to be deleted, the size of the queue is changed, leading to
             // checkOverflow being called again (because for other policies this may trigger relaxation of flow control,
@@ -78,6 +85,7 @@
                     int counter = 0;
                     int queueDepthMessages;
                     long queueDepthBytes;
+                    QueueEntry lastSeenEntry = null;
                     do
                     {
                         queueDepthMessages = _queue.getQueueDepthMessages();
@@ -94,22 +102,26 @@
                                 overflow = true;
                             }
 
-                            QueueEntry entry = _queue.getLeastSignificantOldestEntry();
-
-                            if (entry != null)
+                            lastSeenEntry = lastSeenEntry == null
+                                    ? _queue.getLeastSignificantOldestEntry()
+                                    : lastSeenEntry.getNextValidEntry();
+                            if (lastSeenEntry != null)
                             {
-                                counter++;
-                                _queue.deleteEntry(entry);
-                            }
-                            else
-                            {
-                                queueDepthMessages = _queue.getQueueDepthMessages();
-                                queueDepthBytes = _queue.getQueueDepthBytes();
-                                break;
+                                // ensure that we are deleting only entries before the newly enqueued one
+                                if (newlyEnqueued != null && lastSeenEntry.compareTo(newlyEnqueued) >= 0)
+                                {
+                                    // stop at new entry
+                                    lastSeenEntry = null;
+                                }
+                                else if (lastSeenEntry.acquireOrSteal(null))
+                                {
+                                    counter++;
+                                    deleteAcquiredEntry(lastSeenEntry);
+                                }
                             }
                         }
                     }
-                    while (bytesOverflow || messagesOverflow);
+                    while ((bytesOverflow || messagesOverflow) && lastSeenEntry != null);
 
                     if (overflow)
                     {
@@ -126,6 +138,27 @@
                 }
             }
         }
-    }
 
+        private void deleteAcquiredEntry(final QueueEntry entry)
+        {
+            final MessageStore messageStore = _queue.getVirtualHost().getMessageStore();
+            final ServerTransaction txn =
+                    new AsyncAutoCommitTransaction(messageStore, (future, action) -> action.postCommit());
+            txn.dequeue(entry.getEnqueueRecord(),
+                        new ServerTransaction.Action()
+                        {
+                            @Override
+                            public void postCommit()
+                            {
+                                entry.delete();
+                            }
+
+                            @Override
+                            public void onRollback()
+                            {
+
+                            }
+                        });
+        }
+    }
 }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
index a69af14..7143fcd 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.qpid.server.queue;
 
-import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.mock;
@@ -40,6 +39,9 @@
 import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.queue.ProducerFlowControlOverflowPolicyHandlerTest.LogMessageMatcher;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.Transaction;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
 import org.apache.qpid.test.utils.UnitTestBase;
 
 public class RingOverflowPolicyHandlerTest extends UnitTestBase
@@ -52,16 +54,24 @@
     @Before
     public void setUp() throws Exception
     {
-
         _eventLogger = mock(EventLogger.class);
         _subject = mock(LogSubject.class);
 
+        final Transaction tx = mock(Transaction.class);
+
+        final MessageStore messageStore = mock(MessageStore.class);
+        when(messageStore.newTransaction()).thenReturn(tx);
+
+        final QueueManagingVirtualHost virtualHost = mock(QueueManagingVirtualHost.class);
+        when(virtualHost.getMessageStore()).thenReturn(messageStore);
+
         _queue = mock(AbstractQueue.class);
         when(_queue.getMaximumQueueDepthBytes()).thenReturn(-1L);
         when(_queue.getMaximumQueueDepthMessages()).thenReturn(-1L);
         when(_queue.getOverflowPolicy()).thenReturn(OverflowPolicy.RING);
         when(_queue.getQueueDepthMessages()).thenReturn(0);
         when(_queue.getLogSubject()).thenReturn(_subject);
+        when(_queue.getVirtualHost()).thenReturn(virtualHost);
 
         _ringOverflowPolicyHandler = new RingOverflowPolicyHandler(_queue, _eventLogger);
     }
@@ -77,7 +87,7 @@
 
         _ringOverflowPolicyHandler.checkOverflow(null);
 
-        verify(_queue).deleteEntry(lastEntry);
+        verify(lastEntry).delete();
         LogMessage dropped = QueueMessages.DROPPED(1L, 4, 1, 5,-1);
         verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
         verifyNoMoreInteractions(_eventLogger);
@@ -94,7 +104,7 @@
 
         _ringOverflowPolicyHandler.checkOverflow(null);
 
-        verify((AbstractQueue<?>) _queue).deleteEntry(lastEntry);
+        verify(lastEntry).delete();
         LogMessage dropped = QueueMessages.DROPPED(1, 4, 5, -1,5);
         verify(_eventLogger).message(same(_subject), argThat(new LogMessageMatcher(dropped)));
         verifyNoMoreInteractions(_eventLogger);
@@ -103,26 +113,28 @@
     @Test
     public void testCheckOverflowWhenUnderfullBytes() throws Exception
     {
+        QueueEntry lastEntry = createLastEntry();
         when(_queue.getQueueDepthBytes()).thenReturn(5L);
         when(_queue.getMaximumQueueDepthBytes()).thenReturn(5L);
         when(_queue.getQueueDepthMessages()).thenReturn(3);
 
         _ringOverflowPolicyHandler.checkOverflow(null);
 
-        verify(_queue, never()).deleteEntry(any(QueueEntry.class));
+        verify(lastEntry, never()).delete();
         verifyNoMoreInteractions(_eventLogger);
     }
 
     @Test
     public void testCheckOverflowWhenUnderfullMessages() throws Exception
     {
+        QueueEntry lastEntry = createLastEntry();
         when(_queue.getQueueDepthMessages()).thenReturn(5);
         when(_queue.getMaximumQueueDepthMessages()).thenReturn(5L);
         when(_queue.getQueueDepthBytes()).thenReturn(10L);
 
         _ringOverflowPolicyHandler.checkOverflow(null);
 
-        verify(_queue, never()).deleteEntry(any(QueueEntry.class));
+        verify(lastEntry, never()).delete();
         verifyNoMoreInteractions(_eventLogger);
     }
 
@@ -133,6 +145,7 @@
         when(oldestMessage.getMessageHeader()).thenReturn(oldestMessageHeader);
         QueueEntry oldestEntry = mock(QueueEntry.class);
         when(oldestEntry.getMessage()).thenReturn(oldestMessage);
+        when(oldestEntry.acquireOrSteal(null)).thenReturn(true);
         return oldestEntry;
     }
 }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java
new file mode 100644
index 0000000..584485f
--- /dev/null
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyTest.java
@@ -0,0 +1,213 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.Serializable;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.configuration.updater.TaskExecutorImpl;
+import org.apache.qpid.server.consumer.ConsumerOption;
+import org.apache.qpid.server.consumer.TestConsumerTarget;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.message.MessageInstance;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.internal.InternalMessage;
+import org.apache.qpid.server.model.BrokerTestHelper;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.MessageEnqueueRecord;
+import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
+import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
+import org.apache.qpid.server.virtualhost.TestMemoryVirtualHost;
+import org.apache.qpid.test.utils.UnitTestBase;
+
+public class RingOverflowPolicyTest extends UnitTestBase
+{
+    private TaskExecutor _taskExecutor;
+    private QueueManagingVirtualHost<?> _virtualHost;
+    private AtomicLong _messageId;
+
+    @Before
+    public void setUp() throws Exception
+    {
+        _taskExecutor = new TaskExecutorImpl();
+        _taskExecutor.start();
+        String name = getClass().getName();
+        final VirtualHostNode<?> virtualHostNode = BrokerTestHelper.createVirtualHostNodeMock(
+                name, true, BrokerTestHelper.createAccessControlMock(), BrokerTestHelper.createBrokerMock());
+        when(virtualHostNode.getChildExecutor()).thenReturn(_taskExecutor);
+        when(virtualHostNode.getTaskExecutor()).thenReturn(_taskExecutor);
+
+        final Map<String, Object> virtualHostAttributes = new HashMap<>();
+        virtualHostAttributes.put(VirtualHost.TYPE, TestMemoryVirtualHost.VIRTUAL_HOST_TYPE);
+        virtualHostAttributes.put(VirtualHost.NAME, name);
+        virtualHostAttributes.put(QueueManagingVirtualHost.CONNECTION_THREAD_POOL_SIZE, 2);
+        virtualHostAttributes.put(QueueManagingVirtualHost.NUMBER_OF_SELECTORS, 1);
+
+        final ConfiguredObjectFactory objectFactory = virtualHostNode.getObjectFactory();
+        final QueueManagingVirtualHost<?> host = (QueueManagingVirtualHost<?>)objectFactory.create(VirtualHost.class, virtualHostAttributes, virtualHostNode);
+        final AbstractVirtualHost abstractVirtualHost = (AbstractVirtualHost) host;
+        abstractVirtualHost.start();
+        when(virtualHostNode.getVirtualHost()).thenReturn(abstractVirtualHost);
+        _virtualHost = host;
+        _messageId = new AtomicLong();
+    }
+
+    @After
+    public void tearDown() throws Exception
+    {
+        _virtualHost.close();
+        _taskExecutor.stop();
+    }
+
+    @Test
+    public void testEnqueueWithOverflowWhenLeastSignificantEntryIsAcquiredByConsumer() throws Exception
+    {
+        final Queue<?> queue = createTestRingQueue(2);
+
+        final ServerMessage<?> message1 = enqueueTestMessage(queue);
+
+        final TestConsumerTarget consumerTarget = createTestConsumerTargetAndConsumer(queue);
+        final boolean received = consumerTarget.processPending();
+        assertThat(received, is(true));
+
+        final MessageInstance receivedMessage = consumerTarget.getMessages().remove(0);
+        assertThat(receivedMessage, is(notNullValue()));
+        assertThat(receivedMessage.isAcquired(), is(true));
+        assertThat(receivedMessage.getMessage(), is(message1));
+
+        final ServerMessage<?> message2 = enqueueTestMessage(queue);
+        assertThat(queue.getQueueDepthMessages(), is(equalTo(2)));
+
+        final ServerMessage<?> message3 = enqueueTestMessage(queue);
+        assertThat(queue.getQueueDepthMessages(), is(equalTo(2)));
+
+        assertThat(message2.isReferenced(queue), is(equalTo(false)));
+        assertThat(message3.isReferenced(queue), is(equalTo(true)));
+    }
+
+    @Test
+    public void testLeastSignificantEntryAcquiredByConsumerIsDeletedAfterRelease() throws Exception
+    {
+        final Queue<?> queue = createTestRingQueue(1);
+
+        final ServerMessage<?> message1 = enqueueTestMessage(queue);
+
+        final TestConsumerTarget consumerTarget = createTestConsumerTargetAndConsumer(queue);
+        final boolean received = consumerTarget.processPending();
+        assertThat(received, is(true));
+
+        final MessageInstance receivedMessage = consumerTarget.getMessages().remove(0);
+        assertThat(receivedMessage, is(notNullValue()));
+        assertThat(receivedMessage.isAcquired(), is(true));
+        assertThat(receivedMessage.getMessage(), is(message1));
+
+        final ServerMessage<?> message2 = enqueueTestMessage(queue);
+        assertThat(queue.getQueueDepthMessages(), is(equalTo(2)));
+
+        assertThat(message1.isReferenced(queue), is(equalTo(true)));
+        assertThat(message2.isReferenced(queue), is(equalTo(true)));
+
+        receivedMessage.release();
+        assertThat(queue.getQueueDepthMessages(), is(equalTo(1)));
+        assertThat(message1.isReferenced(queue), is(equalTo(false)));
+        assertThat(message2.isReferenced(queue), is(equalTo(true)));
+    }
+
+    private Queue<?> createTestRingQueue(final int messageLimit)
+    {
+        final Map<String, Object> attributes = new HashMap<>();
+        attributes.put(Queue.NAME, getTestName());
+        attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING.name());
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, messageLimit);
+        return _virtualHost.createChild(Queue.class, attributes);
+    }
+
+    private TestConsumerTarget createTestConsumerTargetAndConsumer(final Queue<?> queue) throws Exception
+    {
+        final TestConsumerTarget consumerTarget = new TestConsumerTarget();
+        queue.addConsumer(consumerTarget,
+                          null,
+                          InternalMessage.class,
+                          getTestName(),
+                          EnumSet.of(ConsumerOption.ACQUIRES, ConsumerOption.SEES_REQUEUES),
+                          0);
+        return consumerTarget;
+    }
+
+    private ServerMessage<?> enqueueTestMessage(final Queue<?> queue)
+    {
+        final ServerMessage<?> message = createMessage(_messageId.incrementAndGet(), queue.getName());
+        final MessageEnqueueRecord record = createMessageEnqueueRecord(queue.getId(), message.getMessageNumber());
+        queue.enqueue(message, null, record);
+        return message;
+    }
+
+    private MessageEnqueueRecord createMessageEnqueueRecord(final UUID queueId, final long messageNumber)
+    {
+        return new MessageEnqueueRecord()
+        {
+            @Override
+            public UUID getQueueId()
+            {
+                return queueId;
+            }
+
+            @Override
+            public long getMessageNumber()
+            {
+                return messageNumber;
+            }
+        };
+    }
+
+    private ServerMessage<?> createMessage(final long messageNumber, final String queueName)
+    {
+        final AMQMessageHeader amqpHeader = mock(AMQMessageHeader.class);
+        when(amqpHeader.getMessageId()).thenReturn(String.valueOf(messageNumber));
+        when(amqpHeader.getExpiration()).thenReturn(0L);
+        final Serializable messageContent = String.format("test message %d", messageNumber);
+        return InternalMessage.createMessage(_virtualHost.getMessageStore(),
+                                             amqpHeader,
+                                             messageContent,
+                                             false,
+                                             queueName);
+    }
+}