Merge pull request #578 from jbonofre/AMQ-8073

[AMQ-8073] Upgrade to commons-pool2 2.9.0
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
index 0c7044f..1c17875 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -203,6 +203,7 @@
     private final AtomicBoolean preShutdownHooksInvoked = new AtomicBoolean(false);
     private BrokerPlugin[] plugins;
     private boolean keepDurableSubsActive = true;
+    private boolean enableMessageExpirationOnActiveDurableSubs = false;
     private boolean useVirtualTopics = true;
     private boolean useMirroredQueues = false;
     private boolean useTempMirroredQueues = true;
@@ -1729,6 +1730,14 @@
     public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
         this.keepDurableSubsActive = keepDurableSubsActive;
     }
+    
+    public boolean isEnableMessageExpirationOnActiveDurableSubs() {
+    	return enableMessageExpirationOnActiveDurableSubs;
+    }
+    
+    public void setEnableMessageExpirationOnActiveDurableSubs(boolean enableMessageExpirationOnActiveDurableSubs) {
+    	this.enableMessageExpirationOnActiveDurableSubs = enableMessageExpirationOnActiveDurableSubs;
+    }
 
     public boolean isUseVirtualTopics() {
         return useVirtualTopics;
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 6a5c599..e58da62 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -58,6 +58,7 @@
     private final ConcurrentMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
     private final SubscriptionKey subscriptionKey;
     private boolean keepDurableSubsActive;
+    private boolean enableMessageExpirationOnActiveDurableSubs;
     private final AtomicBoolean active = new AtomicBoolean();
     private final AtomicLong offlineTimestamp = new AtomicLong(-1);
     private final HashSet<MessageId> ackedAndPrepared = new HashSet<MessageId>();
@@ -69,6 +70,7 @@
         this.pending.setSystemUsage(usageManager);
         this.pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
         this.keepDurableSubsActive = keepDurableSubsActive;
+        this.enableMessageExpirationOnActiveDurableSubs = broker.getBrokerService().isEnableMessageExpirationOnActiveDurableSubs();
         subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
     }
 
@@ -429,4 +431,8 @@
     public boolean isKeepDurableSubsActive() {
         return keepDurableSubsActive;
     }
+
+    public boolean isEnableMessageExpirationOnActiveDurableSubs() {
+    	return enableMessageExpirationOnActiveDurableSubs;
+    }
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
index 7df0138..7210fa8 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
@@ -686,7 +686,7 @@
                 final ConnectionContext connectionContext = createConnectionContext();
                 for (Message message : toExpire) {
                     for (DurableTopicSubscription sub : durableSubscribers.values()) {
-                        if (!sub.isActive()) {
+                        if (!sub.isActive() || sub.isEnableMessageExpirationOnActiveDurableSubs()) {
                             message.setRegionDestination(this);
                             messageExpired(connectionContext, sub, message);
                         }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index bb62541..fb46768 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -30,10 +30,11 @@
     private Collection forwardTo;
     private boolean forwardOnly = true;
     private boolean concurrentSend = false;
+    private boolean sendWhenNotMatched = false;
 
     @Override
     public Destination intercept(Destination destination) {
-        return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isConcurrentSend());
+        return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),isSendWhenNotMatched(), isConcurrentSend());
     }
 
     @Override
@@ -192,4 +193,12 @@
 
         return true;
     }
+    
+    public boolean isSendWhenNotMatched() {
+		return sendWhenNotMatched;
+	}
+
+	public void setSendWhenNotMatched(boolean sendWhenNotMatched) {
+		this.sendWhenNotMatched = sendWhenNotMatched;
+	}
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
index 119257d..5bd5716 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
@@ -41,12 +41,14 @@
     private Collection forwardDestinations;
     private boolean forwardOnly;
     private boolean concurrentSend = false;
+    private boolean sendWhenNotMatched=false;
 
-    public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean concurrentSend) {
+    public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly,boolean sendWhenNotMatched, boolean concurrentSend) {
         super(next);
         this.forwardDestinations = forwardDestinations;
         this.forwardOnly = forwardOnly;
         this.concurrentSend = concurrentSend;
+        this.sendWhenNotMatched = sendWhenNotMatched;
     }
 
     @Override
@@ -100,9 +102,17 @@
                 doForward(context, message, brokerService.getRegionBroker(), destination);
             }
         }
-        if (!forwardOnly) {
-            super.send(context, message);
+        if(sendWhenNotMatched)
+        {
+        	if(matchingDestinations.size() <=0) {        
+        		super.send(context, message);
+        	}
+        }else {
+	        if (!forwardOnly) {
+	            super.send(context, message);
+	        }
         }
+       
         concurrent.await();
         if (exceptionAtomicReference.get() != null) {
             throw exceptionAtomicReference.get();
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
index 8624a06..01f6fcc 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
@@ -345,20 +345,22 @@
     private NetworkInterface findNetworkInterface() throws SocketException {
         Enumeration<NetworkInterface> ifcs = NetworkInterface.getNetworkInterfaces();
         List<NetworkInterface> possibles = new ArrayList<NetworkInterface>();
-        while (ifcs.hasMoreElements()) {
-            NetworkInterface ni = ifcs.nextElement();
-            try {
-                if (ni.supportsMulticast()
-                        && ni.isUp()) {
-                    for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
-                        if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
-                                && !ia.getAddress().isLoopbackAddress()
-                                && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
-                            possibles.add(ni);
+        if (ifcs != null) {
+            while (ifcs.hasMoreElements()) {
+                NetworkInterface ni = ifcs.nextElement();
+                try {
+                    if (ni.supportsMulticast()
+                            && ni.isUp()) {
+                        for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
+                            if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
+                                    && !ia.getAddress().isLoopbackAddress()
+                                    && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
+                                possibles.add(ni);
+                            }
                         }
                     }
-                }
-            } catch (SocketException ignored) {}
+                } catch (SocketException ignored) {}
+            }
         }
         return possibles.isEmpty() ? null : possibles.get(possibles.size() - 1);
     }
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
index d6a50a1..0a717f4 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
@@ -98,18 +98,21 @@
     }
 
     private void checkSecurity(Class clazz) throws ClassNotFoundException {
-        if (!clazz.isPrimitive()) {
-            if (clazz.getPackage() != null && !trustAllPackages()) {
-               boolean found = false;
-               for (String packageName : getTrustedPackages()) {
-                   if (clazz.getPackage().getName().equals(packageName) || clazz.getPackage().getName().startsWith(packageName + ".")) {
-                       found = true;
-                       break;
-                   }
-               }
-               if (!found) {
-                   throw new ClassNotFoundException("Forbidden " + clazz + "! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.");
-               }
+        if (trustAllPackages() || clazz.isPrimitive()) {
+            return;
+        }
+
+        boolean found = false;
+        Package thePackage = clazz.getPackage();
+        if (thePackage != null) {
+            for (String trustedPackage : getTrustedPackages()) {
+                if (thePackage.getName().equals(trustedPackage) || thePackage.getName().startsWith(trustedPackage + ".")) {
+                    found = true;
+                    break;
+                }
+            }
+            if (!found) {
+                throw new ClassNotFoundException("Forbidden " + clazz + "! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.");
             }
         }
     }
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
index a7285da..b7766a2 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
@@ -161,21 +161,6 @@
         return true;
     }
 
-    protected String readRequestBody(HttpServletRequest request) throws IOException {
-        StringBuffer buffer = new StringBuffer();
-        BufferedReader reader = request.getReader();
-        while (true) {
-            String line = reader.readLine();
-            if (line == null) {
-                break;
-            } else {
-                buffer.append(line);
-                buffer.append("\n");
-            }
-        }
-        return buffer.toString();
-    }
-
     protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
         String clientID = request.getHeader("clientID");
         if (clientID == null) {
diff --git a/activemq-karaf/src/main/resources/features-core.xml b/activemq-karaf/src/main/resources/features-core.xml
index f04f522..79a1ec5 100644
--- a/activemq-karaf/src/main/resources/features-core.xml
+++ b/activemq-karaf/src/main/resources/features-core.xml
@@ -52,6 +52,7 @@
       <feature>http</feature>
       <feature version="${project.version}">activemq-client</feature>
       <bundle>mvn:org.apache.activemq/activemq-karaf/${project.version}</bundle>
+      <bundle dependency="true">mvn:commons-io/commons-io/${commons-io-version}</bundle>
       <bundle dependency="true">mvn:commons-collections/commons-collections/${commons-collections-version}</bundle>
       <bundle dependency='true'>mvn:commons-lang/commons-lang/${commons-lang-version}</bundle>
       <bundle dependency="true">mvn:commons-codec/commons-codec/1.9</bundle>
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index 339d9b8..cee87ba 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -74,6 +74,7 @@
       javax.management*,
       javax.transaction*;version="[1,3)",
       javax.naming*;resolution:=optional,
+      org.apache.commons.io*;resolution:=optional,
       org.apache.commons.pool*;resolution:=optional,
       org.apache.commons.net*;resolution:=optional,
       com.sun*;resolution:=optional,
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
new file mode 100644
index 0000000..ef71d1d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
@@ -0,0 +1,517 @@
+package org.apache.activemq.broker.virtual;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.FilteredDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.spring.ConsumerBean;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeDestinationSendWhenNotMatchedTest extends EmbeddedBrokerTestSupport {
+
+	private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
+
+	protected int total = 10;
+	protected Connection connection;
+	
+
+	@Test
+	public void testSendWhenNotMatched() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.B");
+		Destination destination1 =new ActiveMQQueue("A.B");
+		Destination destination2 = new ActiveMQQueue("A.C");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);
+	}
+
+	@Test
+	public void testSendWhenMatched() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.B");
+		Destination destination1 =new ActiveMQQueue("A.B");
+		Destination destination2 = new ActiveMQQueue("A.C");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+		
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyFalseSendWhenMatchedTrue1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.D");
+		Destination destination1 =new ActiveMQQueue("A.D");
+		Destination destination2 = new ActiveMQQueue("A.E");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tes13"));
+	
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);	
+
+	}
+	
+	public void testForwardOnlyFalseSendWhenMatchedTrue2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.D");
+		Destination destination1 =new ActiveMQQueue("A.D");
+		Destination destination2 = new ActiveMQQueue("A.E");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+		Thread.sleep(1*1000);
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	} 
+	@Test
+	public void testForwardOnlyFalseBackwardCompatability1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.X");
+		Destination destination1 =new ActiveMQQueue("A.X");
+		Destination destination2 = new ActiveMQQueue("A.Y");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+	
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(1);
+
+	}
+	@Test
+	public void testForwardOnlyFalseBackwardCompatability2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.X");
+		Destination destination1 =new ActiveMQQueue("A.X");
+		Destination destination2 = new ActiveMQQueue("A.Y");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+	
+		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);
+
+	}
+	
+	@Test
+	public void testForwardOnlyTrueBackwardCompatability1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.W");
+		Destination destination1 =new ActiveMQQueue("A.W");
+		Destination destination2 = new ActiveMQQueue("A.V");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+	
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyTrueBackwardCompatability2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.W");
+		Destination destination1 =new ActiveMQQueue("A.W");
+		Destination destination2 = new ActiveMQQueue("A.V");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));	
+		Thread.sleep(2*1000);
+		messageList1.assertMessagesArrived(0);
+		messageList2.assertMessagesArrived(0);
+
+	}
+	
+	@Test
+	public void testForwardOnlySendWhenNotMatchedSetToFalse() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("X.Y");
+		Destination destination1 =  new ActiveMQQueue("X.Y");
+		Destination destination2 =  new ActiveMQQueue("X.Z");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyFalseSendWhenNotMatchedSetToFalse() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("R.S");
+		Destination destination1 =  new ActiveMQQueue("R.S");
+		Destination destination2 =  new ActiveMQQueue("R.T");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(1);
+	}
+
+	protected Destination getConsumer1Dsetination() {
+		return new ActiveMQQueue("A.B");
+	}
+
+	protected Destination getConsumer2Dsetination() {
+		return new ActiveMQQueue("A.C");
+	}
+
+	protected Destination getProducerDestination() {
+		return new ActiveMQQueue("A.B");
+	}
+
+	protected TextMessage createMessage(Session session, String testid) throws JMSException {
+		TextMessage textMessage = session.createTextMessage("testMessage");
+		textMessage.setStringProperty("testid", testid);
+		return textMessage;
+	}
+
+	protected BrokerService createBroker() throws Exception {
+		BrokerService answer = new BrokerService();
+		answer.setPersistent(isPersistent());
+		answer.getManagementContext().setCreateConnector(false);
+		answer.addConnector(bindAddress);
+		/*
+		 * <destinationInterceptors> <virtualDestinationInterceptor>
+		 * <virtualDestinations> <compositeQueue name="A.B" forwardOnly="false">
+		 * <forwardTo> <filteredDestination selector="testid LIKE 'test%'" queue="A.C"/>
+		 * </forwardTo> </compositeQueue> </virtualDestinations>
+		 * </virtualDestinationInterceptor> </destinationInterceptors>
+		 */
+		/*
+		 * SendWhenNotMatched = true A message will be always forwarded to  if not matched to filtered destination 
+		 * ForwardOnly setting has no impact
+		 */
+		
+		CompositeQueue compositeQueue = new CompositeQueue();
+		compositeQueue.setName("A.B");
+		compositeQueue.setForwardOnly(true); // By default it is true
+		compositeQueue.setSendWhenNotMatched(true);// By default it is false
+		FilteredDestination filteredQueue = new FilteredDestination();
+		filteredQueue.setQueue("A.C");
+		filteredQueue.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations = new ArrayList<Object>();
+		forwardDestinations.add(filteredQueue);
+		compositeQueue.setForwardTo(forwardDestinations);
+		
+	
+		CompositeQueue compositeQueue0 = new CompositeQueue();
+		compositeQueue0.setName("A.D");
+		compositeQueue0.setForwardOnly(false); // By default it is true
+		compositeQueue0.setSendWhenNotMatched(true);// By default it is false
+		FilteredDestination filteredQueue0 = new FilteredDestination();
+		filteredQueue0.setQueue("A.E");
+		filteredQueue0.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations0 = new ArrayList<Object>();
+		forwardDestinations0.add(filteredQueue0);
+		compositeQueue0.setForwardTo(forwardDestinations0);
+		
+		//Back compatibility test 1
+		CompositeQueue compositeQueue01 = new CompositeQueue();
+		compositeQueue01.setName("A.X");
+		compositeQueue01.setForwardOnly(false); // By default it is true
+		//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+		FilteredDestination filteredQueue01 = new FilteredDestination();
+		filteredQueue01.setQueue("A.Y");
+		filteredQueue01.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations01 = new ArrayList<Object>();
+		forwardDestinations01.add(filteredQueue01);
+		compositeQueue01.setForwardTo(forwardDestinations01);
+		
+		//Back compatibility test 2
+				CompositeQueue compositeQueue02 = new CompositeQueue();
+				compositeQueue02.setName("A.W");
+				//compositeQueue02.setForwardOnly(true); // By default it is true
+				//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+				FilteredDestination filteredQueue02 = new FilteredDestination();
+				filteredQueue02.setQueue("A.V");
+				filteredQueue02.setSelector("testid LIKE 'test%'");
+				final ArrayList<Object> forwardDestinations02 = new ArrayList<Object>();
+				forwardDestinations02.add(filteredQueue02);
+				compositeQueue02.setForwardTo(forwardDestinations02);
+		
+		CompositeQueue compositeQueue1 = new CompositeQueue();
+		compositeQueue1.setName("X.Y");
+		compositeQueue1.setForwardOnly(true); // By default it is true
+		ActiveMQQueue forwardQueue1 =new ActiveMQQueue("X.Z");	
+		final ArrayList<Object> forwardDestinations1 = new ArrayList<Object>();		
+		forwardDestinations1.add(forwardQueue1);
+		compositeQueue1.setForwardTo(forwardDestinations1);
+		
+		
+		CompositeQueue compositeQueue2 = new CompositeQueue();
+		compositeQueue2.setName("R.S");
+		compositeQueue2.setForwardOnly(false);
+		ActiveMQQueue forwardQueue2 =new ActiveMQQueue("R.T");	
+		final ArrayList<Object> forwardDestinations2 = new ArrayList<Object>();		
+		forwardDestinations2.add(forwardQueue2);
+		compositeQueue2.setForwardTo(forwardDestinations2);
+
+		VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+		interceptor.setVirtualDestinations(new VirtualDestination[] { compositeQueue,compositeQueue0,compositeQueue01,compositeQueue02,compositeQueue1,compositeQueue2 });
+		answer.setDestinationInterceptors(new DestinationInterceptor[] { interceptor });
+		return answer;
+	}
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
new file mode 100644
index 0000000..69c734e
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.TestSupport.PersistenceAdapterChoice;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.MessageId;
+import org.junit.Test;
+//import org.slf4j.Logger;
+//import org.slf4j.LoggerFactory;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(value = Parameterized.class)
+public class ActiveDurableSubscriptionBrowseExpireTest extends DurableSubscriptionOfflineTestBase {    
+	//private static final Logger LOG = LoggerFactory.getLogger(ActiveDurableSubscriptionBrowseExpireTest.class);
+    private boolean enableExpiration = true;
+
+    public ActiveDurableSubscriptionBrowseExpireTest(boolean enableExpiration) {
+    	keepDurableSubsActive = true;
+    	this.enableExpiration = enableExpiration;
+    }
+    
+    @Parameterized.Parameters(name = "enableExpiration_{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                { false },
+                { true }
+            });
+    }
+    
+    @Override
+    public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
+        return super.setPersistenceAdapter(broker, PersistenceAdapterChoice.MEM);
+    }
+
+    @Override
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + getName(true));
+        connectionFactory.setWatchTopicAdvisories(false);
+        return connectionFactory;
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testBrowseExpireActiveSub() throws Exception {
+    	final int numberOfMessages = 10;
+        
+        broker.setEnableMessageExpirationOnActiveDurableSubs(enableExpiration);
+
+        // create durable subscription
+        Connection con = createConnection("consumer");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createDurableSubscriber(topic, "SubsId");
+        
+        long timeStamp = System.currentTimeMillis();
+        sendMessages(numberOfMessages, timeStamp);
+
+        ObjectName[] subs = broker.getAdminView().getDurableTopicSubscribers();
+        assertEquals(1, subs.length);
+        
+        ObjectName subName = subs[0];
+        DurableSubscriptionViewMBean sub = (DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(subName, DurableSubscriptionViewMBean.class, true);
+        assertEquals(true, sub.isActive());
+        
+        // browse the durable sub
+        CompositeData[] data  = sub.browse();
+        assertNotNull(data);
+        assertEquals(numberOfMessages, data.length);
+
+        Destination dest = broker.getDestination(topic);
+        assertEquals(0, dest.getDestinationStatistics().getExpired().getCount());
+        
+        // add every 3rd message to the expiration list
+        TopicMessageStore topicStore = (TopicMessageStore)dest.getMessageStore();
+        LinkedList<org.apache.activemq.command.Message> messagesToExpire = new LinkedList<>(); 
+        topicStore.recover(new MessageRecoveryListener() {
+            @Override
+            public boolean recoverMessage(org.apache.activemq.command.Message message) throws Exception {
+            	int index = (int)message.getProperty("index");
+            	if(index % 3 == 0)
+            		messagesToExpire.add(message);
+                return true;
+            }
+
+            @Override
+            public boolean recoverMessageReference(MessageId messageReference) throws Exception {
+                return true;
+            }
+
+            @Override
+            public boolean hasSpace() {
+
+                return true;
+            }
+
+            @Override
+            public boolean isDuplicate(MessageId id) {
+                return false;
+            }
+        });
+       
+        // expire messages in the topic store
+        for(org.apache.activemq.command.Message message: messagesToExpire) {
+        	message.setExpiration(timeStamp - 1);
+    		topicStore.updateMessage(message);
+        }
+        
+        // browse (should | should not) expire the messages on the destination if expiration is (enabled | not enabled)
+        data = sub.browse();
+        assertNotNull(data);
+        assertEquals(enableExpiration ? messagesToExpire.size() : 0, dest.getDestinationStatistics().getExpired().getCount());
+        
+        session.close();
+        con.close();
+    }
+    
+    private void sendMessages(int numberOfMessages, long timeStamp) throws Exception {
+    	Connection con = createConnection("producer");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(null);
+
+        for (int i = 0; i < numberOfMessages; i++) {
+            Message message = session.createMessage();
+            message.setIntProperty("index", i);
+            message.setJMSTimestamp(timeStamp);
+            producer.send(topic, message);
+        }
+
+        session.close();
+        con.close();
+    }
+}
diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml
index 70fa0be..01cacab 100644
--- a/activemq-web/pom.xml
+++ b/activemq-web/pom.xml
@@ -55,6 +55,10 @@
       <artifactId>activemq-pool</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>activemq-unit-tests</artifactId>
       <scope>test</scope>
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
index 02e2b7a..5a2771b 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
@@ -19,6 +19,7 @@
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -34,6 +35,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.io.input.BoundedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +60,12 @@
 public abstract class MessageServletSupport extends HttpServlet {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(MessageServletSupport.class);
+    /**
+     * A configuration tag to specify the maximum message size (in bytes) for the servlet. The default
+     * is given by DEFAULT_MAX_MESSAGE_SIZE below.
+     */
+    private static final String MAX_MESSAGE_SIZE_TAG = "maxMessageSize";
+    private static final Long DEFAULT_MAX_MESSAGE_SIZE = 100000L;
 
     private boolean defaultTopicFlag = true;
     private Destination defaultDestination;
@@ -68,6 +76,7 @@
     private int defaultMessagePriority = 5;
     private long defaultMessageTimeToLive;
     private String destinationOptions;
+    private long maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
 
     public void init(ServletConfig servletConfig) throws ServletException {
         super.init(servletConfig);
@@ -91,6 +100,11 @@
             }
         }
 
+        String maxMessageSizeConfigured = servletConfig.getInitParameter(MAX_MESSAGE_SIZE_TAG);
+        if (maxMessageSizeConfigured != null) {
+            maxMessageSize = Long.parseLong(maxMessageSizeConfigured);
+        }
+
         // lets check to see if there's a connection factory set
         WebClient.initContext(getServletContext());
     }
@@ -344,7 +358,8 @@
         if (answer == null && contentType != null) {
             LOG.debug("Content-Type={}", contentType);
             // lets read the message body instead
-            BufferedReader reader = request.getReader();
+            BoundedInputStream boundedInputStream = new BoundedInputStream(request.getInputStream(), maxMessageSize);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(boundedInputStream));
             StringBuilder buffer = new StringBuilder();
             while (true) {
                 String line = reader.readLine();
diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml
index b1067c0..17ac1f9 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -182,6 +182,7 @@
         <include>${pom.groupId}:activeio-core</include>
         <include>commons-beanutils:commons-beanutils</include>
         <include>commons-collections:commons-collections</include>
+        <include>commons-io:commons-io</include>
         <include>org.apache.commons:commons-dbcp2</include>
         <include>org.apache.commons:commons-pool2</include>
         <include>commons-codec:commons-codec</include>
diff --git a/assembly/src/release/bin/env b/assembly/src/release/bin/env
index 947807b..a31d687 100644
--- a/assembly/src/release/bin/env
+++ b/assembly/src/release/bin/env
@@ -1,4 +1,3 @@
-#!/bin/sh
 # ------------------------------------------------------------------------
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
diff --git a/pom.xml b/pom.xml
index 9acd48f..db5d16d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -52,7 +52,7 @@
     <commons-beanutils-version>1.9.4</commons-beanutils-version>
     <commons-collections-version>3.2.2</commons-collections-version>
     <commons-daemon-version>1.2.3</commons-daemon-version>
-    <commons-dbcp2-version>2.7.0</commons-dbcp2-version>
+    <commons-dbcp2-version>2.8.0</commons-dbcp2-version>
     <commons-io-version>2.8.0</commons-io-version>
     <commons-lang-version>2.6</commons-lang-version>
     <commons-logging-version>1.2</commons-logging-version>