[AMQ-8068] Fix topic memory leak on message eviction using UniquePropertyMessageEvictionStrategy
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/UniquePropertyMessageEvictionStrategy.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/UniquePropertyMessageEvictionStrategy.java
index 39d54d3..a6962a5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/UniquePropertyMessageEvictionStrategy.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/UniquePropertyMessageEvictionStrategy.java
@@ -22,7 +22,7 @@
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
-
+import java.util.List;
 
 /**
  * An eviction strategy which evicts the oldest message within messages with the same property value
@@ -45,12 +45,10 @@
 
     @Override
     public MessageReference[] evictMessages(LinkedList messages) throws IOException {
-        MessageReference oldest = (MessageReference)messages.getFirst();
-        HashMap<Object, MessageReference> pivots = new HashMap<Object, MessageReference>();
-        Iterator iter = messages.iterator();
+        List<MessageReference> messageReferences = new LinkedList<>(messages);
+        HashMap<Object, MessageReference> pivots = new HashMap<>();
 
-        for (int i = 0; iter.hasNext(); i++) {
-            MessageReference reference = (MessageReference) iter.next();
+        for (MessageReference reference : messageReferences) {
             if (propertyName != null && reference.getMessage().getProperty(propertyName) != null) {
                 Object key = reference.getMessage().getProperty(propertyName);
                 if (pivots.containsKey(key)) {
@@ -64,16 +62,13 @@
             }
         }
 
-        if (!pivots.isEmpty()) {
-            for (MessageReference ref : pivots.values()) {
-                messages.remove(ref);
-            }
-
-            if (messages.size() != 0) {
-                return (MessageReference[])messages.toArray(new MessageReference[messages.size()]);
-            }
+        if (pivots.isEmpty() || pivots.values().size() == messageReferences.size()) {
+            return new MessageReference[]{(MessageReference) messages.removeFirst()};
+        } else {
+            messages.removeIf(message -> !pivots.containsValue(message));
+            messageReferences.removeAll(pivots.values());
+            return messageReferences.toArray(new MessageReference[0]);
         }
-        return new MessageReference[] {oldest};
-
     }
+
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java
index 5657e5c..7a51fb8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/UniquePropertyMessageEvictionStrategyTest.java
@@ -18,11 +18,13 @@
 package org.apache.activemq.broker.region;
 
 import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.broker.region.policy.UniquePropertyMessageEvictionStrategy;
+import org.apache.activemq.command.ActiveMQDestination;
 
 import javax.jms.*;
 import java.util.ArrayList;
@@ -30,8 +32,6 @@
 
 public class UniquePropertyMessageEvictionStrategyTest extends EmbeddedBrokerTestSupport {
 
-
-
     @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService broker =  super.createBroker();
@@ -80,8 +80,7 @@
                 Thread.sleep(100);
             }
         }
-
-
+        
         for (int i = 0; i < 11; i++) {
             javax.jms.Message msg = consumer.receive(1000);
             assertNotNull(msg);
@@ -100,6 +99,7 @@
         javax.jms.Message msg = consumer.receive(1000);
         assertNull(msg);
 
+        assertEquals("usage goes to 0", 0, TestSupport.getDestination(broker, ActiveMQDestination.transform(destination)).getMemoryUsage().getUsage());
     }
 
 }