QPID-8378: [Broker-J] Make sure that message reference is released for deleted node
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 22e42d4..cc6608d 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -1714,15 +1714,11 @@
         {
             QueueEntry node = queueListIterator.getNode();
             MessageReference reference = node.newMessageReference();
-            if(reference != null && !node.isDeleted())
+            if(reference != null)
             {
                 try
                 {
-                    if (!reference.getMessage().checkValid())
-                    {
-                        malformedEntry(node);
-                    }
-                    else if (visitor.visit(node))
+                    if (!node.isDeleted() && reference.getMessage().checkValid() && visitor.visit(node))
                     {
                         break;
                     }
diff --git a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
index 4fcea19..0a4f987 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
@@ -34,6 +34,7 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.verifyZeroInteractions;
 import static org.mockito.Mockito.when;
 
@@ -1257,6 +1258,58 @@
                      _queue.getQueueDepthMessages());
     }
 
+    @Test
+    public void testVisit()
+    {
+        final ServerMessage message = createMessage(1L, 2, 3);
+        _queue.enqueue(message, null, null);
+
+        final QueueEntryVisitor visitor = mock(QueueEntryVisitor.class);
+        _queue.visit(visitor);
+
+        final ArgumentCaptor<QueueEntry> argument = ArgumentCaptor.forClass(QueueEntry.class);
+        verify(visitor).visit(argument.capture());
+
+        final QueueEntry queueEntry = argument.getValue();
+        assertEquals(message, queueEntry.getMessage());
+        verify(message.newReference()).release();
+    }
+
+    @Test
+    public void testVisitWhenNodeDeletedAfterAdvance()
+    {
+        final QueueEntryList list = mock(QueueEntryList.class);
+
+        final Map<String,Object> attributes = new HashMap<>();
+        attributes.put(Queue.NAME, _qname);
+        attributes.put(Queue.OWNER, _owner);
+
+        @SuppressWarnings("unchecked")
+        final Queue queue = new AbstractQueue(attributes, _virtualHost)
+        {
+            @Override
+            QueueEntryList getEntries()
+            {
+                return list;
+            }
+
+        };
+
+        final MessageReference reference = mock(MessageReference.class);
+        final QueueEntry entry = mock(QueueEntry.class);
+        when(entry.isDeleted()).thenReturn(true);
+        when(entry.newMessageReference()).thenReturn(reference);
+        final QueueEntryIterator iterator = mock(QueueEntryIterator.class);
+        when(iterator.advance()).thenReturn(true, false);
+        when(iterator.getNode()).thenReturn(entry);
+        when(list.iterator()).thenReturn(iterator);
+
+        final QueueEntryVisitor visitor = mock(QueueEntryVisitor.class);
+        queue.visit(visitor);
+        verifyNoMoreInteractions(visitor);
+        verify(reference).release();
+    }
+
     private void makeVirtualHostTargetSizeExceeded()
     {
         final InternalMessage message = InternalMessage.createMessage(_virtualHost.getMessageStore(),