Merge pull request #571 from dhmgit/durable-sub-message-expiration

Adding an option which allows messages to expire on active durable subscriptions
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..835ee5b
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,164 @@
+#!groovy
+
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  See the NOTICE file distributed with
+ *  this work for additional information regarding copyright ownership.
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *  (the "License"); you may not use this file except in compliance with
+ *  the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+pipeline {
+
+    agent {
+        node {
+            label 'ubuntu'
+        }
+    }
+
+    environment {
+        // ... setup any environment variables ...
+        MVN_LOCAL_REPO_OPT = '-Dmaven.repo.local=.repository'
+        MVN_TEST_FAIL_IGNORE = '-Dmaven.test.failure.ignore=true'
+    }
+
+    tools {
+        // ... tell Jenkins what java version, maven version or other tools are required ...
+        maven 'maven_3_latest'
+        jdk 'jdk_1.8_latest'
+    }
+
+    options {
+        // Configure an overall timeout for the build of one hour.
+        timeout(time: 10, unit: 'HOURS')
+        // When we have test-fails e.g. we don't need to run the remaining steps
+        skipStagesAfterUnstable()
+        buildDiscarder(logRotator(numToKeepStr: '5', artifactNumToKeepStr: '5'))
+    }
+
+    stages {
+        stage('Initialization') {
+            steps {
+                echo 'Building branch ' + env.BRANCH_NAME
+                echo 'Using PATH ' + env.PATH
+            }
+        }
+
+        stage('Cleanup') {
+            steps {
+                echo 'Cleaning up the workspace'
+                deleteDir()
+            }
+        }
+
+        stage('Checkout') {
+            steps {
+                echo 'Checking out branch ' + env.BRANCH_NAME
+                checkout scm
+            }
+        }
+
+        stage('Build') {
+            steps {
+                echo 'Building'
+                sh 'mvn -U -B -e clean install -DskipTests'
+            }
+        }
+
+        stage('Tests') {
+            steps {
+                echo 'Running tests'
+                // all tests is very very long (10 hours on Apache Jenkins)
+                // sh 'mvn -B -e test -pl activemq-unit-tests -Dactivemq.tests=all'
+                sh 'mvn -B -e test'
+            }
+            post {
+                always {
+                    junit(testResults: '**/surefire-reports/*.xml', allowEmptyResults: true)
+                    junit(testResults: '**/failsafe-reports/*.xml', allowEmptyResults: true)
+                }
+            }
+        }
+
+        stage('Deploy') {
+            when {
+                expression {
+                    env.BRANCH_NAME ==~ /(activemq-5.16.x|activemq-5.15.x|master)/
+                }
+            }
+            steps {
+                echo 'Deploying'
+                sh 'mvn -B -e deploy -Pdeploy -DskipTests'
+            }
+        }
+    }
+
+    // Do any post build stuff ... such as sending emails depending on the overall build result.
+    post {
+        // If this build failed, send an email to the list.
+        failure {
+            script {
+                if(env.BRANCH_NAME == "activemq-5.15.x" || env.BRANCH_NAME == "activemq-5.16.x" || env.BRANCH_NAME == "master") {
+                    emailext(
+                            subject: "[BUILD-FAILURE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
+                            body: """
+BUILD-FAILURE: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]':
+Check console output at "<a href="${env.BUILD_URL}">${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]</a>"
+""",
+                            to: "commits@activemq.apache.org",
+                            recipientProviders: [[$class: 'DevelopersRecipientProvider']]
+                    )
+                }
+            }
+        }
+
+        // If this build didn't fail, but there were failing tests, send an email to the list.
+        unstable {
+            script {
+                if(env.BRANCH_NAME == "activemq-5.15.x" || env.BRANCH_NAME == "activemq-5.16.x" || env.BRANCH_NAME == "master") {
+                    emailext(
+                            subject: "[BUILD-UNSTABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
+                            body: """
+BUILD-UNSTABLE: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]':
+Check console output at "<a href="${env.BUILD_URL}">${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]</a>"
+""",
+                            to: "commits@activemq.apache.org",
+                            recipientProviders: [[$class: 'DevelopersRecipientProvider']]
+                    )
+                }
+            }
+        }
+
+        // Send an email, if the last build was not successful and this one is.
+        success {
+            // Cleanup the build directory if the build was successful
+            // (in this cae we probably don't have to do any post-build analysis)
+            deleteDir()
+            script {
+                if ((env.BRANCH_NAME == "activemq-5.15.x" || env.BRANCH_NAME == "activemq-5.16.x" || env.BRANCH_NAME == "master") && (currentBuild.previousBuild != null) && (currentBuild.previousBuild.result != 'SUCCESS')) {
+                    emailext (
+                            subject: "[BUILD-STABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
+                            body: """
+BUILD-STABLE: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]':
+Is back to normal.
+""",
+                            to: "commits@activemq.apache.org",
+                            recipientProviders: [[$class: 'DevelopersRecipientProvider']]
+                    )
+                }
+            }
+        }
+    }
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index ab968b8..e3586e2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -280,7 +280,7 @@
                 // the DLQ.  If a custom redelivery policy is used on the broker the message
                 // can still be redelivered based on the configation of that policy.
                 LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state);
-                settle(delivery, MessageAck.POSION_ACK_TYPE);
+                settle(delivery, MessageAck.POISON_ACK_TYPE);
             } else if (state instanceof Released) {
                 LOG.trace("onDelivery: Released state = {}", state);
                 // re-deliver && don't increment the counter.
@@ -297,7 +297,7 @@
                 if (undeliverableHere != null && undeliverableHere) {
                     // receiver does not want the message..
                     // perhaps we should DLQ it?
-                    ackType = MessageAck.POSION_ACK_TYPE;
+                    ackType = MessageAck.POISON_ACK_TYPE;
                 }
                 settle(delivery, ackType);
             }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 805ef6f..443a8f2 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -894,7 +894,7 @@
         Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
         message.setRegionDestination(this);
         broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
-        MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
+        MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1);
         messageAck.setPoisonCause(cause);
         try {
             acknowledge(connectionContext, subscription, messageAck, message);
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index b7761ed..7385e4f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -2093,7 +2093,7 @@
                                 dropMessage(ref);
                                 if (gotToTheStore(ref.getMessage())) {
                                     LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
-                                    store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
+                                    store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
                                 }
                                 broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
                             }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index bb62541..fb46768 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -30,10 +30,11 @@
     private Collection forwardTo;
     private boolean forwardOnly = true;
     private boolean concurrentSend = false;
+    private boolean sendWhenNotMatched = false;
 
     @Override
     public Destination intercept(Destination destination) {
-        return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isConcurrentSend());
+        return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),isSendWhenNotMatched(), isConcurrentSend());
     }
 
     @Override
@@ -192,4 +193,12 @@
 
         return true;
     }
+    
+    public boolean isSendWhenNotMatched() {
+		return sendWhenNotMatched;
+	}
+
+	public void setSendWhenNotMatched(boolean sendWhenNotMatched) {
+		this.sendWhenNotMatched = sendWhenNotMatched;
+	}
 }
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
index 119257d..5bd5716 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
@@ -41,12 +41,14 @@
     private Collection forwardDestinations;
     private boolean forwardOnly;
     private boolean concurrentSend = false;
+    private boolean sendWhenNotMatched=false;
 
-    public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean concurrentSend) {
+    public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly,boolean sendWhenNotMatched, boolean concurrentSend) {
         super(next);
         this.forwardDestinations = forwardDestinations;
         this.forwardOnly = forwardOnly;
         this.concurrentSend = concurrentSend;
+        this.sendWhenNotMatched = sendWhenNotMatched;
     }
 
     @Override
@@ -100,9 +102,17 @@
                 doForward(context, message, brokerService.getRegionBroker(), destination);
             }
         }
-        if (!forwardOnly) {
-            super.send(context, message);
+        if(sendWhenNotMatched)
+        {
+        	if(matchingDestinations.size() <=0) {        
+        		super.send(context, message);
+        	}
+        }else {
+	        if (!forwardOnly) {
+	            super.send(context, message);
+	        }
         }
+       
         concurrent.await();
         if (exceptionAtomicReference.get() != null) {
             throw exceptionAtomicReference.get();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 602e6eb..a9cbd8f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1093,11 +1093,11 @@
                 if (messageDispatch != null) {
                     LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
                     try {
-                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
+                        MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POISON_ACK_TYPE, 1);
                         poisonAck.setPoisonCause(error);
                         localBroker.oneway(poisonAck);
                     } catch (IOException ioe) {
-                        LOG.error("Failed to posion ack message following forward failure: ", ioe);
+                        LOG.error("Failed to poison ack message following forward failure: ", ioe);
                     }
                     fireFailedForwardAdvisory(messageDispatch, error);
                 } else {
diff --git a/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java b/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java
index 5b51656..b40a669 100644
--- a/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java
+++ b/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java
@@ -278,7 +278,7 @@
         executor.execute(new B());
 
         executor.shutdown();
-        boolean finishedInTime = executor.awaitTermination(5, TimeUnit.MINUTES);
+        boolean finishedInTime = executor.awaitTermination(10, TimeUnit.MINUTES);
         LOG.info("Tested completion finished in time? -> {}", finishedInTime ? "YES" : "NO");
 
         assertTrue("no exceptions", exceptions.isEmpty());
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index a1e7d0a..764f8c9 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -519,7 +519,7 @@
                     sendPullCommand(timeout);
                 } else if (redeliveryExceeded(md)) {
                     LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
-                    posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+                    poisonAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
                     if (timeout > 0) {
                         timeout = Math.max(deadline - System.currentTimeMillis(), 0);
                     }
@@ -541,11 +541,11 @@
         return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired();
     }
 
-    private void posionAck(MessageDispatch md, String cause) throws JMSException {
-        MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
-        posionAck.setFirstMessageId(md.getMessage().getMessageId());
-        posionAck.setPoisonCause(new Throwable(cause));
-        session.sendAck(posionAck);
+    private void poisonAck(MessageDispatch md, String cause) throws JMSException {
+        MessageAck poisonAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
+        poisonAck.setFirstMessageId(md.getMessage().getMessageId());
+        poisonAck.setPoisonCause(new Throwable(cause));
+        session.sendAck(poisonAck);
     }
 
     private boolean redeliveryExceeded(MessageDispatch md) {
@@ -1271,7 +1271,7 @@
                     // DLQ.
                     // Acknowledge the last message.
 
-                    MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+                    MessageAck ack = new MessageAck(lastMd, MessageAck.POISON_ACK_TYPE, deliveredMessages.size());
                     ack.setFirstMessageId(firstMsgId);
                     ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter()  + "] exceeds redelivery policy limit:" + redeliveryPolicy
                             + ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
@@ -1422,7 +1422,7 @@
                     if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
                         if (listener != null && unconsumedMessages.isRunning()) {
                             if (redeliveryExceeded(md)) {
-                                posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+                                poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
                                 return;
                             }
                             ActiveMQMessage message = createActiveMQMessage(md);
@@ -1483,7 +1483,7 @@
                             dispatch(md);
                         } else {
                             LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
-                            posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
+                            poisonAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
                         }
                     }
                 }
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 73c1616..35bddf2 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -900,7 +900,7 @@
                 earlyAck.setFirstMessageId(message.getMessageId());
             } else if (connection.isDuplicate(ActiveMQSession.this, message)) {
                 LOG.debug("{} got duplicate: {}", this, message.getMessageId());
-                earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
+                earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
                 earlyAck.setFirstMessageId(md.getMessage().getMessageId());
                 earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
             }
@@ -986,7 +986,7 @@
                                     // sent to the
                                     // DLQ.
                                     // Acknowledge the last message.
-                                    MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
+                                    MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
                                     ack.setFirstMessageId(md.getMessage().getMessageId());
                                     ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
                                     LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
index bb0a72f..dc575d4 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
@@ -44,7 +44,7 @@
      * message was not processed and the message was considered a poison
      * message.
      */
-    public static final byte POSION_ACK_TYPE = 1;
+    public static final byte POISON_ACK_TYPE = 1;
 
     /**
      * In case the client want's to explicitly let the broker know that a
@@ -117,7 +117,7 @@
     }
 
     public boolean isPoisonAck() {
-        return ackType == POSION_ACK_TYPE;
+        return ackType == POISON_ACK_TYPE;
     }
 
     public boolean isStandardAck() {
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
index 8624a06..01f6fcc 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
@@ -345,20 +345,22 @@
     private NetworkInterface findNetworkInterface() throws SocketException {
         Enumeration<NetworkInterface> ifcs = NetworkInterface.getNetworkInterfaces();
         List<NetworkInterface> possibles = new ArrayList<NetworkInterface>();
-        while (ifcs.hasMoreElements()) {
-            NetworkInterface ni = ifcs.nextElement();
-            try {
-                if (ni.supportsMulticast()
-                        && ni.isUp()) {
-                    for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
-                        if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
-                                && !ia.getAddress().isLoopbackAddress()
-                                && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
-                            possibles.add(ni);
+        if (ifcs != null) {
+            while (ifcs.hasMoreElements()) {
+                NetworkInterface ni = ifcs.nextElement();
+                try {
+                    if (ni.supportsMulticast()
+                            && ni.isUp()) {
+                        for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
+                            if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
+                                    && !ia.getAddress().isLoopbackAddress()
+                                    && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
+                                possibles.add(ni);
+                            }
                         }
                     }
-                }
-            } catch (SocketException ignored) {}
+                } catch (SocketException ignored) {}
+            }
         }
         return possibles.isEmpty() ? null : possibles.get(possibles.size() - 1);
     }
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
index d6a50a1..0a717f4 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
@@ -98,18 +98,21 @@
     }
 
     private void checkSecurity(Class clazz) throws ClassNotFoundException {
-        if (!clazz.isPrimitive()) {
-            if (clazz.getPackage() != null && !trustAllPackages()) {
-               boolean found = false;
-               for (String packageName : getTrustedPackages()) {
-                   if (clazz.getPackage().getName().equals(packageName) || clazz.getPackage().getName().startsWith(packageName + ".")) {
-                       found = true;
-                       break;
-                   }
-               }
-               if (!found) {
-                   throw new ClassNotFoundException("Forbidden " + clazz + "! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.");
-               }
+        if (trustAllPackages() || clazz.isPrimitive()) {
+            return;
+        }
+
+        boolean found = false;
+        Package thePackage = clazz.getPackage();
+        if (thePackage != null) {
+            for (String trustedPackage : getTrustedPackages()) {
+                if (thePackage.getName().equals(trustedPackage) || thePackage.getName().startsWith(trustedPackage + ".")) {
+                    found = true;
+                    break;
+                }
+            }
+            if (!found) {
+                throw new ClassNotFoundException("Forbidden " + clazz + "! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.");
             }
         }
     }
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
index a7285da..b7766a2 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
@@ -161,21 +161,6 @@
         return true;
     }
 
-    protected String readRequestBody(HttpServletRequest request) throws IOException {
-        StringBuffer buffer = new StringBuffer();
-        BufferedReader reader = request.getReader();
-        while (true) {
-            String line = reader.readLine();
-            if (line == null) {
-                break;
-            } else {
-                buffer.append(line);
-                buffer.append("\n");
-            }
-        }
-        return buffer.toString();
-    }
-
     protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
         String clientID = request.getHeader("clientID");
         if (clientID == null) {
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
index f66118e..459b433 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
@@ -42,7 +42,7 @@
     JobSchedulerStoreImpl jobSchedulerStore;
     KahaDBPersistenceAdapter kahaDBPersistenceAdapter;
 
-    @Test
+    @Test(timeout = 120000)
     public void testGcDoneAtStop() throws Exception {
 
         brokerService = createBroker(true);
@@ -63,8 +63,14 @@
 
         brokerService.stop();
 
+        while (verifyFilesOnDisk(jobDir) < 1) {
+            Thread.sleep(100);
+        }
         assertTrue("Expected job store data files at least 1", verifyFilesOnDisk(jobDir) >= 1);
-        assertEquals("Expected kahadb data files", 1, verifyFilesOnDisk(kahaDir));
+        while (verifyFilesOnDisk(kahaDir) < 1) {
+            Thread.sleep(100);
+        }
+        assertTrue("Expected kahadb data files at least 1", verifyFilesOnDisk(kahaDir) >= 1);
     }
 
     @Test
diff --git a/activemq-karaf/src/main/resources/features-core.xml b/activemq-karaf/src/main/resources/features-core.xml
index f04f522..79a1ec5 100644
--- a/activemq-karaf/src/main/resources/features-core.xml
+++ b/activemq-karaf/src/main/resources/features-core.xml
@@ -52,6 +52,7 @@
       <feature>http</feature>
       <feature version="${project.version}">activemq-client</feature>
       <bundle>mvn:org.apache.activemq/activemq-karaf/${project.version}</bundle>
+      <bundle dependency="true">mvn:commons-io/commons-io/${commons-io-version}</bundle>
       <bundle dependency="true">mvn:commons-collections/commons-collections/${commons-collections-version}</bundle>
       <bundle dependency='true'>mvn:commons-lang/commons-lang/${commons-lang-version}</bundle>
       <bundle dependency="true">mvn:commons-codec/commons-codec/1.9</bundle>
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index 339d9b8..cee87ba 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -74,6 +74,7 @@
       javax.management*,
       javax.transaction*;version="[1,3)",
       javax.naming*;resolution:=optional,
+      org.apache.commons.io*;resolution:=optional,
       org.apache.commons.pool*;resolution:=optional,
       org.apache.commons.net*;resolution:=optional,
       com.sun*;resolution:=optional,
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 95fe986..2f66770 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -238,7 +238,7 @@
         MessageAck ack = new MessageAck();
         ack.setDestination(consumerInfo.getDestination());
         ack.setConsumerId(consumerInfo.getConsumerId());
-        ack.setAckType(MessageAck.POSION_ACK_TYPE);
+        ack.setAckType(MessageAck.POISON_ACK_TYPE);
         ack.setMessageID(msgId);
         if (transactionId != null) {
             transactedMessages.add(ackEntry);
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
index b783a54..eddd22a 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
@@ -119,6 +119,9 @@
     }
 
     public void startBroker() throws Exception {
+        if (brokerService != null) {
+            stopBroker();
+        }
         createBroker(true);
 
         XStreamBrokerContext context = new XStreamBrokerContext();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
new file mode 100644
index 0000000..ef71d1d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
@@ -0,0 +1,517 @@
+package org.apache.activemq.broker.virtual;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.FilteredDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.spring.ConsumerBean;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeDestinationSendWhenNotMatchedTest extends EmbeddedBrokerTestSupport {
+
+	private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
+
+	protected int total = 10;
+	protected Connection connection;
+	
+
+	@Test
+	public void testSendWhenNotMatched() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.B");
+		Destination destination1 =new ActiveMQQueue("A.B");
+		Destination destination2 = new ActiveMQQueue("A.C");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);
+	}
+
+	@Test
+	public void testSendWhenMatched() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.B");
+		Destination destination1 =new ActiveMQQueue("A.B");
+		Destination destination2 = new ActiveMQQueue("A.C");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+		
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyFalseSendWhenMatchedTrue1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.D");
+		Destination destination1 =new ActiveMQQueue("A.D");
+		Destination destination2 = new ActiveMQQueue("A.E");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tes13"));
+	
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);	
+
+	}
+	
+	public void testForwardOnlyFalseSendWhenMatchedTrue2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.D");
+		Destination destination1 =new ActiveMQQueue("A.D");
+		Destination destination2 = new ActiveMQQueue("A.E");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+		Thread.sleep(1*1000);
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	} 
+	@Test
+	public void testForwardOnlyFalseBackwardCompatability1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.X");
+		Destination destination1 =new ActiveMQQueue("A.X");
+		Destination destination2 = new ActiveMQQueue("A.Y");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+	
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(1);
+
+	}
+	@Test
+	public void testForwardOnlyFalseBackwardCompatability2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.X");
+		Destination destination1 =new ActiveMQQueue("A.X");
+		Destination destination2 = new ActiveMQQueue("A.Y");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+	
+		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(0);
+
+	}
+	
+	@Test
+	public void testForwardOnlyTrueBackwardCompatability1() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.W");
+		Destination destination1 =new ActiveMQQueue("A.W");
+		Destination destination2 = new ActiveMQQueue("A.V");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "test13"));
+	
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyTrueBackwardCompatability2() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("A.W");
+		Destination destination1 =new ActiveMQQueue("A.W");
+		Destination destination2 = new ActiveMQQueue("A.V");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));	
+		Thread.sleep(2*1000);
+		messageList1.assertMessagesArrived(0);
+		messageList2.assertMessagesArrived(0);
+
+	}
+	
+	@Test
+	public void testForwardOnlySendWhenNotMatchedSetToFalse() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("X.Y");
+		Destination destination1 =  new ActiveMQQueue("X.Y");
+		Destination destination2 =  new ActiveMQQueue("X.Z");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));
+		messageList2.assertMessagesArrived(1);
+		messageList1.assertMessagesArrived(0);
+
+	}
+	@Test
+	public void testForwardOnlyFalseSendWhenNotMatchedSetToFalse() throws Exception {
+		if (connection == null) {
+			connection = createConnection();
+		}
+		connection.start();
+
+		ConsumerBean messageList1 = new ConsumerBean();
+		ConsumerBean messageList2 = new ConsumerBean();
+		messageList1.setVerbose(true);
+		messageList2.setVerbose(true);
+		// messageList1.waitForMessagesToArrive(0);
+		// messageList2.waitForMessagesToArrive(1);
+
+		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+		Destination producerDestination = new ActiveMQQueue("R.S");
+		Destination destination1 =  new ActiveMQQueue("R.S");
+		Destination destination2 =  new ActiveMQQueue("R.T");
+
+		LOG.info("Sending to: " + producerDestination);
+		LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+		MessageConsumer c1 = session.createConsumer(destination1);
+		MessageConsumer c2 = session.createConsumer(destination2);
+
+		c1.setMessageListener(messageList1);
+		c2.setMessageListener(messageList2);
+
+		// create topic producer
+		MessageProducer producer = session.createProducer(producerDestination);
+		assertNotNull(producer);
+
+		producer.send(createMessage(session, "tet13"));		
+		messageList1.assertMessagesArrived(1);
+		messageList2.assertMessagesArrived(1);
+	}
+
+	protected Destination getConsumer1Dsetination() {
+		return new ActiveMQQueue("A.B");
+	}
+
+	protected Destination getConsumer2Dsetination() {
+		return new ActiveMQQueue("A.C");
+	}
+
+	protected Destination getProducerDestination() {
+		return new ActiveMQQueue("A.B");
+	}
+
+	protected TextMessage createMessage(Session session, String testid) throws JMSException {
+		TextMessage textMessage = session.createTextMessage("testMessage");
+		textMessage.setStringProperty("testid", testid);
+		return textMessage;
+	}
+
+	protected BrokerService createBroker() throws Exception {
+		BrokerService answer = new BrokerService();
+		answer.setPersistent(isPersistent());
+		answer.getManagementContext().setCreateConnector(false);
+		answer.addConnector(bindAddress);
+		/*
+		 * <destinationInterceptors> <virtualDestinationInterceptor>
+		 * <virtualDestinations> <compositeQueue name="A.B" forwardOnly="false">
+		 * <forwardTo> <filteredDestination selector="testid LIKE 'test%'" queue="A.C"/>
+		 * </forwardTo> </compositeQueue> </virtualDestinations>
+		 * </virtualDestinationInterceptor> </destinationInterceptors>
+		 */
+		/*
+		 * SendWhenNotMatched = true A message will be always forwarded to  if not matched to filtered destination 
+		 * ForwardOnly setting has no impact
+		 */
+		
+		CompositeQueue compositeQueue = new CompositeQueue();
+		compositeQueue.setName("A.B");
+		compositeQueue.setForwardOnly(true); // By default it is true
+		compositeQueue.setSendWhenNotMatched(true);// By default it is false
+		FilteredDestination filteredQueue = new FilteredDestination();
+		filteredQueue.setQueue("A.C");
+		filteredQueue.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations = new ArrayList<Object>();
+		forwardDestinations.add(filteredQueue);
+		compositeQueue.setForwardTo(forwardDestinations);
+		
+	
+		CompositeQueue compositeQueue0 = new CompositeQueue();
+		compositeQueue0.setName("A.D");
+		compositeQueue0.setForwardOnly(false); // By default it is true
+		compositeQueue0.setSendWhenNotMatched(true);// By default it is false
+		FilteredDestination filteredQueue0 = new FilteredDestination();
+		filteredQueue0.setQueue("A.E");
+		filteredQueue0.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations0 = new ArrayList<Object>();
+		forwardDestinations0.add(filteredQueue0);
+		compositeQueue0.setForwardTo(forwardDestinations0);
+		
+		//Back compatibility test 1
+		CompositeQueue compositeQueue01 = new CompositeQueue();
+		compositeQueue01.setName("A.X");
+		compositeQueue01.setForwardOnly(false); // By default it is true
+		//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+		FilteredDestination filteredQueue01 = new FilteredDestination();
+		filteredQueue01.setQueue("A.Y");
+		filteredQueue01.setSelector("testid LIKE 'test%'");
+		final ArrayList<Object> forwardDestinations01 = new ArrayList<Object>();
+		forwardDestinations01.add(filteredQueue01);
+		compositeQueue01.setForwardTo(forwardDestinations01);
+		
+		//Back compatibility test 2
+				CompositeQueue compositeQueue02 = new CompositeQueue();
+				compositeQueue02.setName("A.W");
+				//compositeQueue02.setForwardOnly(true); // By default it is true
+				//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+				FilteredDestination filteredQueue02 = new FilteredDestination();
+				filteredQueue02.setQueue("A.V");
+				filteredQueue02.setSelector("testid LIKE 'test%'");
+				final ArrayList<Object> forwardDestinations02 = new ArrayList<Object>();
+				forwardDestinations02.add(filteredQueue02);
+				compositeQueue02.setForwardTo(forwardDestinations02);
+		
+		CompositeQueue compositeQueue1 = new CompositeQueue();
+		compositeQueue1.setName("X.Y");
+		compositeQueue1.setForwardOnly(true); // By default it is true
+		ActiveMQQueue forwardQueue1 =new ActiveMQQueue("X.Z");	
+		final ArrayList<Object> forwardDestinations1 = new ArrayList<Object>();		
+		forwardDestinations1.add(forwardQueue1);
+		compositeQueue1.setForwardTo(forwardDestinations1);
+		
+		
+		CompositeQueue compositeQueue2 = new CompositeQueue();
+		compositeQueue2.setName("R.S");
+		compositeQueue2.setForwardOnly(false);
+		ActiveMQQueue forwardQueue2 =new ActiveMQQueue("R.T");	
+		final ArrayList<Object> forwardDestinations2 = new ArrayList<Object>();		
+		forwardDestinations2.add(forwardQueue2);
+		compositeQueue2.setForwardTo(forwardDestinations2);
+
+		VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+		interceptor.setVirtualDestinations(new VirtualDestination[] { compositeQueue,compositeQueue0,compositeQueue01,compositeQueue02,compositeQueue1,compositeQueue2 });
+		answer.setDestinationInterceptors(new DestinationInterceptor[] { interceptor });
+		return answer;
+	}
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
index 8b47b06..d8f3598 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
@@ -71,7 +71,7 @@
          brokerService.stop();
       }
 
-      @Test(timeout = 120000)
+      @Test(timeout = 240000)
       public void testHeapUsage() throws Exception {
          Runtime.getRuntime().gc();
          final long initUsedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
index 774422f..18053e4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
@@ -185,12 +185,18 @@
         if(doCheckpoint) {
             LOG.info("Initiating checkpointUpdate "+ ++checkpointIndex + " ...");
             broker.getPersistenceAdapter().checkpoint(true);
-            TimeUnit.SECONDS.sleep(2);
+            TimeUnit.SECONDS.sleep(4);
             LOG.info("Checkpoint complete.");
         }
         File files[] = dbfiles.listFiles(lff);
         Arrays.sort(files,  new DBFileComparator() );
         logfiles(files);
+
+        while (files.length != expectedCount) {
+            // gives time to checkpoint
+            TimeUnit.SECONDS.sleep(1);
+        }
+
         assertEquals(expectedCount, files.length);
         assertEquals(lastFileName, files[files.length-1].getName());
 
diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml
index 70fa0be..01cacab 100644
--- a/activemq-web/pom.xml
+++ b/activemq-web/pom.xml
@@ -55,6 +55,10 @@
       <artifactId>activemq-pool</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+    <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>activemq-unit-tests</artifactId>
       <scope>test</scope>
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
index 02e2b7a..5a2771b 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
@@ -19,6 +19,7 @@
 
 import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -34,6 +35,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.io.input.BoundedInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +60,12 @@
 public abstract class MessageServletSupport extends HttpServlet {
 
     private static final transient Logger LOG = LoggerFactory.getLogger(MessageServletSupport.class);
+    /**
+     * A configuration tag to specify the maximum message size (in bytes) for the servlet. The default
+     * is given by DEFAULT_MAX_MESSAGE_SIZE below.
+     */
+    private static final String MAX_MESSAGE_SIZE_TAG = "maxMessageSize";
+    private static final Long DEFAULT_MAX_MESSAGE_SIZE = 100000L;
 
     private boolean defaultTopicFlag = true;
     private Destination defaultDestination;
@@ -68,6 +76,7 @@
     private int defaultMessagePriority = 5;
     private long defaultMessageTimeToLive;
     private String destinationOptions;
+    private long maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
 
     public void init(ServletConfig servletConfig) throws ServletException {
         super.init(servletConfig);
@@ -91,6 +100,11 @@
             }
         }
 
+        String maxMessageSizeConfigured = servletConfig.getInitParameter(MAX_MESSAGE_SIZE_TAG);
+        if (maxMessageSizeConfigured != null) {
+            maxMessageSize = Long.parseLong(maxMessageSizeConfigured);
+        }
+
         // lets check to see if there's a connection factory set
         WebClient.initContext(getServletContext());
     }
@@ -344,7 +358,8 @@
         if (answer == null && contentType != null) {
             LOG.debug("Content-Type={}", contentType);
             // lets read the message body instead
-            BufferedReader reader = request.getReader();
+            BoundedInputStream boundedInputStream = new BoundedInputStream(request.getInputStream(), maxMessageSize);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(boundedInputStream));
             StringBuilder buffer = new StringBuilder();
             while (true) {
                 String line = reader.readLine();
diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml
index b1067c0..17ac1f9 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -182,6 +182,7 @@
         <include>${pom.groupId}:activeio-core</include>
         <include>commons-beanutils:commons-beanutils</include>
         <include>commons-collections:commons-collections</include>
+        <include>commons-io:commons-io</include>
         <include>org.apache.commons:commons-dbcp2</include>
         <include>org.apache.commons:commons-pool2</include>
         <include>commons-codec:commons-codec</include>
diff --git a/assembly/src/release/bin/env b/assembly/src/release/bin/env
index 947807b..a31d687 100644
--- a/assembly/src/release/bin/env
+++ b/assembly/src/release/bin/env
@@ -1,4 +1,3 @@
-#!/bin/sh
 # ------------------------------------------------------------------------
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
diff --git a/pom.xml b/pom.xml
index 76ff4e8..4c81889 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,6 @@
     <activemq-protobuf-version>1.1</activemq-protobuf-version>
     <activesoap-version>1.3</activesoap-version>
     <annogen-version>0.1.0</annogen-version>
-    <ant-version>1.10.7</ant-version>
     <ant-bundle-version>1.10.7_1</ant-bundle-version>
     <aopalliance-version>1.0</aopalliance-version>
     <aries-version>1.1.0</aries-version>
@@ -52,14 +51,14 @@
     <cglib-version>2.2</cglib-version>
     <commons-beanutils-version>1.9.4</commons-beanutils-version>
     <commons-collections-version>3.2.2</commons-collections-version>
-    <commons-daemon-version>1.2.2</commons-daemon-version>
-    <commons-dbcp2-version>2.7.0</commons-dbcp2-version>
-    <commons-io-version>2.6</commons-io-version>
+    <commons-daemon-version>1.2.3</commons-daemon-version>
+    <commons-dbcp2-version>2.8.0</commons-dbcp2-version>
+    <commons-io-version>2.8.0</commons-io-version>
     <commons-lang-version>2.6</commons-lang-version>
     <commons-logging-version>1.2</commons-logging-version>
     <commons-pool2-version>2.8.0</commons-pool2-version>
     <commons-primitives-version>1.0</commons-primitives-version>
-    <commons-net-version>3.6</commons-net-version>
+    <commons-net-version>3.7.2</commons-net-version>
     <directory-version>2.0.0.AM25</directory-version>
     <ecj.version>3.17.0</ecj.version>
     <ftpserver-version>1.1.1</ftpserver-version>
@@ -74,13 +73,13 @@
     <httpcore-version>4.4.13</httpcore-version>
     <insight-version>1.2.0.Beta4</insight-version>
     <jackson-version>2.9.10</jackson-version>
-    <jackson-databind-version>2.9.10.5</jackson-databind-version>
+    <jackson-databind-version>2.9.10.6</jackson-databind-version>
     <jasypt-version>1.9.3</jasypt-version>
     <jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
-    <jetty9-version>9.4.28.v20200408</jetty9-version>
+    <jetty9-version>9.4.34.v20201102</jetty9-version>
     <jetty-version>${jetty9-version}</jetty-version>
     <jmdns-version>3.4.1</jmdns-version>
-    <tomcat-api-version>9.0.35</tomcat-api-version>
+    <tomcat-api-version>9.0.39</tomcat-api-version>
     <jettison-version>1.4.1</jettison-version>
     <jmock-version>2.5.1</jmock-version>
     <jolokia-version>1.6.2</jolokia-version>
@@ -111,16 +110,16 @@
     <qpid-jms-proton-version>0.33.6</qpid-jms-proton-version>
     <netty-all-version>4.1.51.Final</netty-all-version>
     <regexp-version>1.3</regexp-version>
-    <rome-version>1.12.2</rome-version>
+    <rome-version>1.15.0</rome-version>
     <saxon-version>9.5.1-5</saxon-version>
     <saxon-bundle-version>9.5.1-5_1</saxon-bundle-version>
     <scala-plugin-version>3.1.0</scala-plugin-version>
     <scala-version>2.11.11</scala-version>
-    <shiro-version>1.5.3</shiro-version>
+    <shiro-version>1.7.0</shiro-version>
     <scalatest-version>3.0.8</scalatest-version>
     <slf4j-version>1.7.30</slf4j-version>
     <snappy-version>1.1.2</snappy-version>
-    <spring-version>4.3.26.RELEASE</spring-version>
+    <spring-version>4.3.29.RELEASE</spring-version>
     <taglibs-version>1.2.5</taglibs-version>
     <velocity-version>2.2</velocity-version>
     <xalan-version>2.7.2</xalan-version>
@@ -1077,12 +1076,6 @@
         <version>${commons-io-version}</version>
       </dependency>
 
-      <dependency>
-        <groupId>org.apache.ant</groupId>
-        <artifactId>ant</artifactId>
-        <version>${ant-version}</version>
-      </dependency>
-
       <!-- ACTIVEMQ-WEB Specific Dependencies -->
       <dependency>
         <groupId>com.rometools</groupId>