Merge pull request #571 from dhmgit/durable-sub-message-expiration
Adding an option which allows messages to expire on active durable subscriptions
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..835ee5b
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,164 @@
+#!groovy
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+pipeline {
+
+ agent {
+ node {
+ label 'ubuntu'
+ }
+ }
+
+ environment {
+ // ... setup any environment variables ...
+ MVN_LOCAL_REPO_OPT = '-Dmaven.repo.local=.repository'
+ MVN_TEST_FAIL_IGNORE = '-Dmaven.test.failure.ignore=true'
+ }
+
+ tools {
+ // ... tell Jenkins what java version, maven version or other tools are required ...
+ maven 'maven_3_latest'
+ jdk 'jdk_1.8_latest'
+ }
+
+ options {
+ // Configure an overall timeout for the build of one hour.
+ timeout(time: 10, unit: 'HOURS')
+ // When we have test-fails e.g. we don't need to run the remaining steps
+ skipStagesAfterUnstable()
+ buildDiscarder(logRotator(numToKeepStr: '5', artifactNumToKeepStr: '5'))
+ }
+
+ stages {
+ stage('Initialization') {
+ steps {
+ echo 'Building branch ' + env.BRANCH_NAME
+ echo 'Using PATH ' + env.PATH
+ }
+ }
+
+ stage('Cleanup') {
+ steps {
+ echo 'Cleaning up the workspace'
+ deleteDir()
+ }
+ }
+
+ stage('Checkout') {
+ steps {
+ echo 'Checking out branch ' + env.BRANCH_NAME
+ checkout scm
+ }
+ }
+
+ stage('Build') {
+ steps {
+ echo 'Building'
+ sh 'mvn -U -B -e clean install -DskipTests'
+ }
+ }
+
+ stage('Tests') {
+ steps {
+ echo 'Running tests'
+ // all tests is very very long (10 hours on Apache Jenkins)
+ // sh 'mvn -B -e test -pl activemq-unit-tests -Dactivemq.tests=all'
+ sh 'mvn -B -e test'
+ }
+ post {
+ always {
+ junit(testResults: '**/surefire-reports/*.xml', allowEmptyResults: true)
+ junit(testResults: '**/failsafe-reports/*.xml', allowEmptyResults: true)
+ }
+ }
+ }
+
+ stage('Deploy') {
+ when {
+ expression {
+ env.BRANCH_NAME ==~ /(activemq-5.16.x|activemq-5.15.x|master)/
+ }
+ }
+ steps {
+ echo 'Deploying'
+ sh 'mvn -B -e deploy -Pdeploy -DskipTests'
+ }
+ }
+ }
+
+ // Do any post build stuff ... such as sending emails depending on the overall build result.
+ post {
+ // If this build failed, send an email to the list.
+ failure {
+ script {
+ if(env.BRANCH_NAME == "activemq-5.15.x" || env.BRANCH_NAME == "activemq-5.16.x" || env.BRANCH_NAME == "master") {
+ emailext(
+ subject: "[BUILD-FAILURE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
+ body: """
+BUILD-FAILURE: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]':
+Check console output at "<a href="${env.BUILD_URL}">${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]</a>"
+""",
+ to: "commits@activemq.apache.org",
+ recipientProviders: [[$class: 'DevelopersRecipientProvider']]
+ )
+ }
+ }
+ }
+
+ // If this build didn't fail, but there were failing tests, send an email to the list.
+ unstable {
+ script {
+ if(env.BRANCH_NAME == "activemq-5.15.x" || env.BRANCH_NAME == "activemq-5.16.x" || env.BRANCH_NAME == "master") {
+ emailext(
+ subject: "[BUILD-UNSTABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
+ body: """
+BUILD-UNSTABLE: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]':
+Check console output at "<a href="${env.BUILD_URL}">${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]</a>"
+""",
+ to: "commits@activemq.apache.org",
+ recipientProviders: [[$class: 'DevelopersRecipientProvider']]
+ )
+ }
+ }
+ }
+
+ // Send an email, if the last build was not successful and this one is.
+ success {
+ // Cleanup the build directory if the build was successful
+ // (in this cae we probably don't have to do any post-build analysis)
+ deleteDir()
+ script {
+ if ((env.BRANCH_NAME == "activemq-5.15.x" || env.BRANCH_NAME == "activemq-5.16.x" || env.BRANCH_NAME == "master") && (currentBuild.previousBuild != null) && (currentBuild.previousBuild.result != 'SUCCESS')) {
+ emailext (
+ subject: "[BUILD-STABLE]: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]'",
+ body: """
+BUILD-STABLE: Job '${env.JOB_NAME} [${env.BRANCH_NAME}] [${env.BUILD_NUMBER}]':
+Is back to normal.
+""",
+ to: "commits@activemq.apache.org",
+ recipientProviders: [[$class: 'DevelopersRecipientProvider']]
+ )
+ }
+ }
+ }
+ }
+
+}
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
index ab968b8..e3586e2 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpSender.java
@@ -280,7 +280,7 @@
// the DLQ. If a custom redelivery policy is used on the broker the message
// can still be redelivered based on the configation of that policy.
LOG.trace("onDelivery: Rejected state = {}, message poisoned.", state);
- settle(delivery, MessageAck.POSION_ACK_TYPE);
+ settle(delivery, MessageAck.POISON_ACK_TYPE);
} else if (state instanceof Released) {
LOG.trace("onDelivery: Released state = {}", state);
// re-deliver && don't increment the counter.
@@ -297,7 +297,7 @@
if (undeliverableHere != null && undeliverableHere) {
// receiver does not want the message..
// perhaps we should DLQ it?
- ackType = MessageAck.POSION_ACK_TYPE;
+ ackType = MessageAck.POISON_ACK_TYPE;
}
settle(delivery, ackType);
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 805ef6f..443a8f2 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -894,7 +894,7 @@
Throwable cause = new Throwable(DUPLICATE_FROM_STORE_MSG_PREFIX + destination);
message.setRegionDestination(this);
broker.getRoot().sendToDeadLetterQueue(connectionContext, message, null, cause);
- MessageAck messageAck = new MessageAck(message, MessageAck.POSION_ACK_TYPE, 1);
+ MessageAck messageAck = new MessageAck(message, MessageAck.POISON_ACK_TYPE, 1);
messageAck.setPoisonCause(cause);
try {
acknowledge(connectionContext, subscription, messageAck, message);
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index b7761ed..7385e4f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -2093,7 +2093,7 @@
dropMessage(ref);
if (gotToTheStore(ref.getMessage())) {
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
- store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POSION_ACK_TYPE, 1));
+ store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
}
broker.getRoot().sendToDeadLetterQueue(connectionContext, ref.getMessage(), null, new Throwable("duplicate paged in from cursor for " + destination));
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
index bb62541..fb46768 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestination.java
@@ -30,10 +30,11 @@
private Collection forwardTo;
private boolean forwardOnly = true;
private boolean concurrentSend = false;
+ private boolean sendWhenNotMatched = false;
@Override
public Destination intercept(Destination destination) {
- return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(), isConcurrentSend());
+ return new CompositeDestinationFilter(destination, getForwardTo(), isForwardOnly(),isSendWhenNotMatched(), isConcurrentSend());
}
@Override
@@ -192,4 +193,12 @@
return true;
}
+
+ public boolean isSendWhenNotMatched() {
+ return sendWhenNotMatched;
+ }
+
+ public void setSendWhenNotMatched(boolean sendWhenNotMatched) {
+ this.sendWhenNotMatched = sendWhenNotMatched;
+ }
}
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
index 119257d..5bd5716 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/CompositeDestinationFilter.java
@@ -41,12 +41,14 @@
private Collection forwardDestinations;
private boolean forwardOnly;
private boolean concurrentSend = false;
+ private boolean sendWhenNotMatched=false;
- public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly, boolean concurrentSend) {
+ public CompositeDestinationFilter(Destination next, Collection forwardDestinations, boolean forwardOnly,boolean sendWhenNotMatched, boolean concurrentSend) {
super(next);
this.forwardDestinations = forwardDestinations;
this.forwardOnly = forwardOnly;
this.concurrentSend = concurrentSend;
+ this.sendWhenNotMatched = sendWhenNotMatched;
}
@Override
@@ -100,9 +102,17 @@
doForward(context, message, brokerService.getRegionBroker(), destination);
}
}
- if (!forwardOnly) {
- super.send(context, message);
+ if(sendWhenNotMatched)
+ {
+ if(matchingDestinations.size() <=0) {
+ super.send(context, message);
+ }
+ }else {
+ if (!forwardOnly) {
+ super.send(context, message);
+ }
}
+
concurrent.await();
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 602e6eb..a9cbd8f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -1093,11 +1093,11 @@
if (messageDispatch != null) {
LOG.warn("PoisonAck of {} on forwarding error: {}", messageDispatch.getMessage().getMessageId(), error);
try {
- MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POSION_ACK_TYPE, 1);
+ MessageAck poisonAck = new MessageAck(messageDispatch, MessageAck.POISON_ACK_TYPE, 1);
poisonAck.setPoisonCause(error);
localBroker.oneway(poisonAck);
} catch (IOException ioe) {
- LOG.error("Failed to posion ack message following forward failure: ", ioe);
+ LOG.error("Failed to poison ack message following forward failure: ", ioe);
}
fireFailedForwardAdvisory(messageDispatch, error);
} else {
diff --git a/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java b/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java
index 5b51656..b40a669 100644
--- a/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java
+++ b/activemq-broker/src/test/java/org/apache/activemq/store/PListTestSupport.java
@@ -278,7 +278,7 @@
executor.execute(new B());
executor.shutdown();
- boolean finishedInTime = executor.awaitTermination(5, TimeUnit.MINUTES);
+ boolean finishedInTime = executor.awaitTermination(10, TimeUnit.MINUTES);
LOG.info("Tested completion finished in time? -> {}", finishedInTime ? "YES" : "NO");
assertTrue("no exceptions", exceptions.isEmpty());
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
index a1e7d0a..764f8c9 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
@@ -519,7 +519,7 @@
sendPullCommand(timeout);
} else if (redeliveryExceeded(md)) {
LOG.debug("{} received with excessive redelivered: {}", getConsumerId(), md);
- posionAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+ poisonAck(md, "Dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
if (timeout > 0) {
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
@@ -541,11 +541,11 @@
return isConsumerExpiryCheckEnabled() && dispatch.getMessage().isExpired();
}
- private void posionAck(MessageDispatch md, String cause) throws JMSException {
- MessageAck posionAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
- posionAck.setFirstMessageId(md.getMessage().getMessageId());
- posionAck.setPoisonCause(new Throwable(cause));
- session.sendAck(posionAck);
+ private void poisonAck(MessageDispatch md, String cause) throws JMSException {
+ MessageAck poisonAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
+ poisonAck.setFirstMessageId(md.getMessage().getMessageId());
+ poisonAck.setPoisonCause(new Throwable(cause));
+ session.sendAck(poisonAck);
}
private boolean redeliveryExceeded(MessageDispatch md) {
@@ -1271,7 +1271,7 @@
// DLQ.
// Acknowledge the last message.
- MessageAck ack = new MessageAck(lastMd, MessageAck.POSION_ACK_TYPE, deliveredMessages.size());
+ MessageAck ack = new MessageAck(lastMd, MessageAck.POISON_ACK_TYPE, deliveredMessages.size());
ack.setFirstMessageId(firstMsgId);
ack.setPoisonCause(new Throwable("Delivery[" + lastMd.getMessage().getRedeliveryCounter() + "] exceeds redelivery policy limit:" + redeliveryPolicy
+ ", cause:" + lastMd.getRollbackCause(), lastMd.getRollbackCause()));
@@ -1422,7 +1422,7 @@
if (this.info.isBrowser() || md.getDeliverySequenceId() != 0l || !session.connection.isDuplicate(this, md.getMessage())) {
if (listener != null && unconsumedMessages.isRunning()) {
if (redeliveryExceeded(md)) {
- posionAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
+ poisonAck(md, "listener dispatch[" + md.getRedeliveryCounter() + "] to " + getConsumerId() + " exceeds redelivery policy limit:" + redeliveryPolicy);
return;
}
ActiveMQMessage message = createActiveMQMessage(md);
@@ -1483,7 +1483,7 @@
dispatch(md);
} else {
LOG.warn("{} suppressing duplicate delivery on connection, poison acking: {}", getConsumerId(), md);
- posionAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
+ poisonAck(md, "Suppressing duplicate delivery on connection, consumer " + getConsumerId());
}
}
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 73c1616..35bddf2 100644
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -900,7 +900,7 @@
earlyAck.setFirstMessageId(message.getMessageId());
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
- earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
+ earlyAck = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
earlyAck.setFirstMessageId(md.getMessage().getMessageId());
earlyAck.setPoisonCause(new Throwable("Duplicate delivery to " + this));
}
@@ -986,7 +986,7 @@
// sent to the
// DLQ.
// Acknowledge the last message.
- MessageAck ack = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);
+ MessageAck ack = new MessageAck(md, MessageAck.POISON_ACK_TYPE, 1);
ack.setFirstMessageId(md.getMessage().getMessageId());
ack.setPoisonCause(new Throwable("Exceeded ra redelivery policy limit:" + redeliveryPolicy));
LOG.trace("Exceeded redelivery with count: {}, Ack: {}", redeliveryCounter, ack);
diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
index bb0a72f..dc575d4 100644
--- a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
+++ b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java
@@ -44,7 +44,7 @@
* message was not processed and the message was considered a poison
* message.
*/
- public static final byte POSION_ACK_TYPE = 1;
+ public static final byte POISON_ACK_TYPE = 1;
/**
* In case the client want's to explicitly let the broker know that a
@@ -117,7 +117,7 @@
}
public boolean isPoisonAck() {
- return ackType == POSION_ACK_TYPE;
+ return ackType == POISON_ACK_TYPE;
}
public boolean isStandardAck() {
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
index 8624a06..01f6fcc 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/discovery/multicast/MulticastDiscoveryAgent.java
@@ -345,20 +345,22 @@
private NetworkInterface findNetworkInterface() throws SocketException {
Enumeration<NetworkInterface> ifcs = NetworkInterface.getNetworkInterfaces();
List<NetworkInterface> possibles = new ArrayList<NetworkInterface>();
- while (ifcs.hasMoreElements()) {
- NetworkInterface ni = ifcs.nextElement();
- try {
- if (ni.supportsMulticast()
- && ni.isUp()) {
- for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
- if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
- && !ia.getAddress().isLoopbackAddress()
- && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
- possibles.add(ni);
+ if (ifcs != null) {
+ while (ifcs.hasMoreElements()) {
+ NetworkInterface ni = ifcs.nextElement();
+ try {
+ if (ni.supportsMulticast()
+ && ni.isUp()) {
+ for (InterfaceAddress ia : ni.getInterfaceAddresses()) {
+ if (ia != null && ia.getAddress() instanceof java.net.Inet4Address
+ && !ia.getAddress().isLoopbackAddress()
+ && (ni.getDisplayName()==null || !ni.getDisplayName().startsWith("vnic"))) {
+ possibles.add(ni);
+ }
}
}
- }
- } catch (SocketException ignored) {}
+ } catch (SocketException ignored) {}
+ }
}
return possibles.isEmpty() ? null : possibles.get(possibles.size() - 1);
}
diff --git a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
index d6a50a1..0a717f4 100644
--- a/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
+++ b/activemq-client/src/main/java/org/apache/activemq/util/ClassLoadingAwareObjectInputStream.java
@@ -98,18 +98,21 @@
}
private void checkSecurity(Class clazz) throws ClassNotFoundException {
- if (!clazz.isPrimitive()) {
- if (clazz.getPackage() != null && !trustAllPackages()) {
- boolean found = false;
- for (String packageName : getTrustedPackages()) {
- if (clazz.getPackage().getName().equals(packageName) || clazz.getPackage().getName().startsWith(packageName + ".")) {
- found = true;
- break;
- }
- }
- if (!found) {
- throw new ClassNotFoundException("Forbidden " + clazz + "! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.");
- }
+ if (trustAllPackages() || clazz.isPrimitive()) {
+ return;
+ }
+
+ boolean found = false;
+ Package thePackage = clazz.getPackage();
+ if (thePackage != null) {
+ for (String trustedPackage : getTrustedPackages()) {
+ if (thePackage.getName().equals(trustedPackage) || thePackage.getName().startsWith(trustedPackage + ".")) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new ClassNotFoundException("Forbidden " + clazz + "! This class is not trusted to be serialized as ObjectMessage payload. Please take a look at http://activemq.apache.org/objectmessage.html for more information on how to configure trusted classes.");
}
}
}
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
index a7285da..b7766a2 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTunnelServlet.java
@@ -161,21 +161,6 @@
return true;
}
- protected String readRequestBody(HttpServletRequest request) throws IOException {
- StringBuffer buffer = new StringBuffer();
- BufferedReader reader = request.getReader();
- while (true) {
- String line = reader.readLine();
- if (line == null) {
- break;
- } else {
- buffer.append(line);
- buffer.append("\n");
- }
- }
- return buffer.toString();
- }
-
protected BlockingQueueTransport getTransportChannel(HttpServletRequest request, HttpServletResponse response) throws IOException {
String clientID = request.getHeader("clientID");
if (clientID == null) {
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
index f66118e..459b433 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/scheduler/AMQ7086Test.java
@@ -42,7 +42,7 @@
JobSchedulerStoreImpl jobSchedulerStore;
KahaDBPersistenceAdapter kahaDBPersistenceAdapter;
- @Test
+ @Test(timeout = 120000)
public void testGcDoneAtStop() throws Exception {
brokerService = createBroker(true);
@@ -63,8 +63,14 @@
brokerService.stop();
+ while (verifyFilesOnDisk(jobDir) < 1) {
+ Thread.sleep(100);
+ }
assertTrue("Expected job store data files at least 1", verifyFilesOnDisk(jobDir) >= 1);
- assertEquals("Expected kahadb data files", 1, verifyFilesOnDisk(kahaDir));
+ while (verifyFilesOnDisk(kahaDir) < 1) {
+ Thread.sleep(100);
+ }
+ assertTrue("Expected kahadb data files at least 1", verifyFilesOnDisk(kahaDir) >= 1);
}
@Test
diff --git a/activemq-karaf/src/main/resources/features-core.xml b/activemq-karaf/src/main/resources/features-core.xml
index f04f522..79a1ec5 100644
--- a/activemq-karaf/src/main/resources/features-core.xml
+++ b/activemq-karaf/src/main/resources/features-core.xml
@@ -52,6 +52,7 @@
<feature>http</feature>
<feature version="${project.version}">activemq-client</feature>
<bundle>mvn:org.apache.activemq/activemq-karaf/${project.version}</bundle>
+ <bundle dependency="true">mvn:commons-io/commons-io/${commons-io-version}</bundle>
<bundle dependency="true">mvn:commons-collections/commons-collections/${commons-collections-version}</bundle>
<bundle dependency='true'>mvn:commons-lang/commons-lang/${commons-lang-version}</bundle>
<bundle dependency="true">mvn:commons-codec/commons-codec/1.9</bundle>
diff --git a/activemq-osgi/pom.xml b/activemq-osgi/pom.xml
index 339d9b8..cee87ba 100644
--- a/activemq-osgi/pom.xml
+++ b/activemq-osgi/pom.xml
@@ -74,6 +74,7 @@
javax.management*,
javax.transaction*;version="[1,3)",
javax.naming*;resolution:=optional,
+ org.apache.commons.io*;resolution:=optional,
org.apache.commons.pool*;resolution:=optional,
org.apache.commons.net*;resolution:=optional,
com.sun*;resolution:=optional,
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
index 95fe986..2f66770 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
@@ -238,7 +238,7 @@
MessageAck ack = new MessageAck();
ack.setDestination(consumerInfo.getDestination());
ack.setConsumerId(consumerInfo.getConsumerId());
- ack.setAckType(MessageAck.POSION_ACK_TYPE);
+ ack.setAckType(MessageAck.POISON_ACK_TYPE);
ack.setMessageID(msgId);
if (transactionId != null) {
transactedMessages.add(ackEntry);
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
index b783a54..eddd22a 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java
@@ -119,6 +119,9 @@
}
public void startBroker() throws Exception {
+ if (brokerService != null) {
+ stopBroker();
+ }
createBroker(true);
XStreamBrokerContext context = new XStreamBrokerContext();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
new file mode 100644
index 0000000..ef71d1d
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/CompositeDestinationSendWhenNotMatchedTest.java
@@ -0,0 +1,517 @@
+package org.apache.activemq.broker.virtual;
+
+import java.util.ArrayList;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.EmbeddedBrokerTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.FilteredDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.spring.ConsumerBean;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CompositeDestinationSendWhenNotMatchedTest extends EmbeddedBrokerTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
+
+ protected int total = 10;
+ protected Connection connection;
+
+
+ @Test
+ public void testSendWhenNotMatched() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+ // messageList1.waitForMessagesToArrive(0);
+ // messageList2.waitForMessagesToArrive(1);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.B");
+ Destination destination1 =new ActiveMQQueue("A.B");
+ Destination destination2 = new ActiveMQQueue("A.C");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(0);
+ }
+
+ @Test
+ public void testSendWhenMatched() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+ // messageList1.waitForMessagesToArrive(0);
+ // messageList2.waitForMessagesToArrive(1);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.B");
+ Destination destination1 =new ActiveMQQueue("A.B");
+ Destination destination2 = new ActiveMQQueue("A.C");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "test13"));
+
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(0);
+
+ }
+ @Test
+ public void testForwardOnlyFalseSendWhenMatchedTrue1() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.D");
+ Destination destination1 =new ActiveMQQueue("A.D");
+ Destination destination2 = new ActiveMQQueue("A.E");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tes13"));
+
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(0);
+
+ }
+
+ public void testForwardOnlyFalseSendWhenMatchedTrue2() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.D");
+ Destination destination1 =new ActiveMQQueue("A.D");
+ Destination destination2 = new ActiveMQQueue("A.E");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "test13"));
+ Thread.sleep(1*1000);
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(0);
+
+ }
+ @Test
+ public void testForwardOnlyFalseBackwardCompatability1() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.X");
+ Destination destination1 =new ActiveMQQueue("A.X");
+ Destination destination2 = new ActiveMQQueue("A.Y");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "test13"));
+
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(1);
+
+ }
+ @Test
+ public void testForwardOnlyFalseBackwardCompatability2() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.X");
+ Destination destination1 =new ActiveMQQueue("A.X");
+ Destination destination2 = new ActiveMQQueue("A.Y");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+
+
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(0);
+
+ }
+
+ @Test
+ public void testForwardOnlyTrueBackwardCompatability1() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.W");
+ Destination destination1 =new ActiveMQQueue("A.W");
+ Destination destination2 = new ActiveMQQueue("A.V");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "test13"));
+
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(0);
+
+ }
+ @Test
+ public void testForwardOnlyTrueBackwardCompatability2() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("A.W");
+ Destination destination1 =new ActiveMQQueue("A.W");
+ Destination destination2 = new ActiveMQQueue("A.V");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+ Thread.sleep(2*1000);
+ messageList1.assertMessagesArrived(0);
+ messageList2.assertMessagesArrived(0);
+
+ }
+
+ @Test
+ public void testForwardOnlySendWhenNotMatchedSetToFalse() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+ // messageList1.waitForMessagesToArrive(0);
+ // messageList2.waitForMessagesToArrive(1);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("X.Y");
+ Destination destination1 = new ActiveMQQueue("X.Y");
+ Destination destination2 = new ActiveMQQueue("X.Z");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+ messageList2.assertMessagesArrived(1);
+ messageList1.assertMessagesArrived(0);
+
+ }
+ @Test
+ public void testForwardOnlyFalseSendWhenNotMatchedSetToFalse() throws Exception {
+ if (connection == null) {
+ connection = createConnection();
+ }
+ connection.start();
+
+ ConsumerBean messageList1 = new ConsumerBean();
+ ConsumerBean messageList2 = new ConsumerBean();
+ messageList1.setVerbose(true);
+ messageList2.setVerbose(true);
+ // messageList1.waitForMessagesToArrive(0);
+ // messageList2.waitForMessagesToArrive(1);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination producerDestination = new ActiveMQQueue("R.S");
+ Destination destination1 = new ActiveMQQueue("R.S");
+ Destination destination2 = new ActiveMQQueue("R.T");
+
+ LOG.info("Sending to: " + producerDestination);
+ LOG.info("Consuming from: " + destination1 + " and " + destination2);
+
+ MessageConsumer c1 = session.createConsumer(destination1);
+ MessageConsumer c2 = session.createConsumer(destination2);
+
+ c1.setMessageListener(messageList1);
+ c2.setMessageListener(messageList2);
+
+ // create topic producer
+ MessageProducer producer = session.createProducer(producerDestination);
+ assertNotNull(producer);
+
+ producer.send(createMessage(session, "tet13"));
+ messageList1.assertMessagesArrived(1);
+ messageList2.assertMessagesArrived(1);
+ }
+
+ protected Destination getConsumer1Dsetination() {
+ return new ActiveMQQueue("A.B");
+ }
+
+ protected Destination getConsumer2Dsetination() {
+ return new ActiveMQQueue("A.C");
+ }
+
+ protected Destination getProducerDestination() {
+ return new ActiveMQQueue("A.B");
+ }
+
+ protected TextMessage createMessage(Session session, String testid) throws JMSException {
+ TextMessage textMessage = session.createTextMessage("testMessage");
+ textMessage.setStringProperty("testid", testid);
+ return textMessage;
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService answer = new BrokerService();
+ answer.setPersistent(isPersistent());
+ answer.getManagementContext().setCreateConnector(false);
+ answer.addConnector(bindAddress);
+ /*
+ * <destinationInterceptors> <virtualDestinationInterceptor>
+ * <virtualDestinations> <compositeQueue name="A.B" forwardOnly="false">
+ * <forwardTo> <filteredDestination selector="testid LIKE 'test%'" queue="A.C"/>
+ * </forwardTo> </compositeQueue> </virtualDestinations>
+ * </virtualDestinationInterceptor> </destinationInterceptors>
+ */
+ /*
+ * SendWhenNotMatched = true A message will be always forwarded to if not matched to filtered destination
+ * ForwardOnly setting has no impact
+ */
+
+ CompositeQueue compositeQueue = new CompositeQueue();
+ compositeQueue.setName("A.B");
+ compositeQueue.setForwardOnly(true); // By default it is true
+ compositeQueue.setSendWhenNotMatched(true);// By default it is false
+ FilteredDestination filteredQueue = new FilteredDestination();
+ filteredQueue.setQueue("A.C");
+ filteredQueue.setSelector("testid LIKE 'test%'");
+ final ArrayList<Object> forwardDestinations = new ArrayList<Object>();
+ forwardDestinations.add(filteredQueue);
+ compositeQueue.setForwardTo(forwardDestinations);
+
+
+ CompositeQueue compositeQueue0 = new CompositeQueue();
+ compositeQueue0.setName("A.D");
+ compositeQueue0.setForwardOnly(false); // By default it is true
+ compositeQueue0.setSendWhenNotMatched(true);// By default it is false
+ FilteredDestination filteredQueue0 = new FilteredDestination();
+ filteredQueue0.setQueue("A.E");
+ filteredQueue0.setSelector("testid LIKE 'test%'");
+ final ArrayList<Object> forwardDestinations0 = new ArrayList<Object>();
+ forwardDestinations0.add(filteredQueue0);
+ compositeQueue0.setForwardTo(forwardDestinations0);
+
+ //Back compatibility test 1
+ CompositeQueue compositeQueue01 = new CompositeQueue();
+ compositeQueue01.setName("A.X");
+ compositeQueue01.setForwardOnly(false); // By default it is true
+ //compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+ FilteredDestination filteredQueue01 = new FilteredDestination();
+ filteredQueue01.setQueue("A.Y");
+ filteredQueue01.setSelector("testid LIKE 'test%'");
+ final ArrayList<Object> forwardDestinations01 = new ArrayList<Object>();
+ forwardDestinations01.add(filteredQueue01);
+ compositeQueue01.setForwardTo(forwardDestinations01);
+
+ //Back compatibility test 2
+ CompositeQueue compositeQueue02 = new CompositeQueue();
+ compositeQueue02.setName("A.W");
+ //compositeQueue02.setForwardOnly(true); // By default it is true
+ //compositeQueue01.setSendWhenNotMatched(false);// By default it is false
+ FilteredDestination filteredQueue02 = new FilteredDestination();
+ filteredQueue02.setQueue("A.V");
+ filteredQueue02.setSelector("testid LIKE 'test%'");
+ final ArrayList<Object> forwardDestinations02 = new ArrayList<Object>();
+ forwardDestinations02.add(filteredQueue02);
+ compositeQueue02.setForwardTo(forwardDestinations02);
+
+ CompositeQueue compositeQueue1 = new CompositeQueue();
+ compositeQueue1.setName("X.Y");
+ compositeQueue1.setForwardOnly(true); // By default it is true
+ ActiveMQQueue forwardQueue1 =new ActiveMQQueue("X.Z");
+ final ArrayList<Object> forwardDestinations1 = new ArrayList<Object>();
+ forwardDestinations1.add(forwardQueue1);
+ compositeQueue1.setForwardTo(forwardDestinations1);
+
+
+ CompositeQueue compositeQueue2 = new CompositeQueue();
+ compositeQueue2.setName("R.S");
+ compositeQueue2.setForwardOnly(false);
+ ActiveMQQueue forwardQueue2 =new ActiveMQQueue("R.T");
+ final ArrayList<Object> forwardDestinations2 = new ArrayList<Object>();
+ forwardDestinations2.add(forwardQueue2);
+ compositeQueue2.setForwardTo(forwardDestinations2);
+
+ VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
+ interceptor.setVirtualDestinations(new VirtualDestination[] { compositeQueue,compositeQueue0,compositeQueue01,compositeQueue02,compositeQueue1,compositeQueue2 });
+ answer.setDestinationInterceptors(new DestinationInterceptor[] { interceptor });
+ return answer;
+ }
+}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
index 8b47b06..d8f3598 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6815Test.java
@@ -71,7 +71,7 @@
brokerService.stop();
}
- @Test(timeout = 120000)
+ @Test(timeout = 240000)
public void testHeapUsage() throws Exception {
Runtime.getRuntime().gc();
final long initUsedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
index 774422f..18053e4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ7118Test.java
@@ -185,12 +185,18 @@
if(doCheckpoint) {
LOG.info("Initiating checkpointUpdate "+ ++checkpointIndex + " ...");
broker.getPersistenceAdapter().checkpoint(true);
- TimeUnit.SECONDS.sleep(2);
+ TimeUnit.SECONDS.sleep(4);
LOG.info("Checkpoint complete.");
}
File files[] = dbfiles.listFiles(lff);
Arrays.sort(files, new DBFileComparator() );
logfiles(files);
+
+ while (files.length != expectedCount) {
+ // gives time to checkpoint
+ TimeUnit.SECONDS.sleep(1);
+ }
+
assertEquals(expectedCount, files.length);
assertEquals(lastFileName, files[files.length-1].getName());
diff --git a/activemq-web/pom.xml b/activemq-web/pom.xml
index 70fa0be..01cacab 100644
--- a/activemq-web/pom.xml
+++ b/activemq-web/pom.xml
@@ -55,6 +55,10 @@
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+ <dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-unit-tests</artifactId>
<scope>test</scope>
diff --git a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
index 02e2b7a..5a2771b 100644
--- a/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
+++ b/activemq-web/src/main/java/org/apache/activemq/web/MessageServletSupport.java
@@ -19,6 +19,7 @@
import java.io.BufferedReader;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -34,6 +35,7 @@
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.commons.io.input.BoundedInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +60,12 @@
public abstract class MessageServletSupport extends HttpServlet {
private static final transient Logger LOG = LoggerFactory.getLogger(MessageServletSupport.class);
+ /**
+ * A configuration tag to specify the maximum message size (in bytes) for the servlet. The default
+ * is given by DEFAULT_MAX_MESSAGE_SIZE below.
+ */
+ private static final String MAX_MESSAGE_SIZE_TAG = "maxMessageSize";
+ private static final Long DEFAULT_MAX_MESSAGE_SIZE = 100000L;
private boolean defaultTopicFlag = true;
private Destination defaultDestination;
@@ -68,6 +76,7 @@
private int defaultMessagePriority = 5;
private long defaultMessageTimeToLive;
private String destinationOptions;
+ private long maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
public void init(ServletConfig servletConfig) throws ServletException {
super.init(servletConfig);
@@ -91,6 +100,11 @@
}
}
+ String maxMessageSizeConfigured = servletConfig.getInitParameter(MAX_MESSAGE_SIZE_TAG);
+ if (maxMessageSizeConfigured != null) {
+ maxMessageSize = Long.parseLong(maxMessageSizeConfigured);
+ }
+
// lets check to see if there's a connection factory set
WebClient.initContext(getServletContext());
}
@@ -344,7 +358,8 @@
if (answer == null && contentType != null) {
LOG.debug("Content-Type={}", contentType);
// lets read the message body instead
- BufferedReader reader = request.getReader();
+ BoundedInputStream boundedInputStream = new BoundedInputStream(request.getInputStream(), maxMessageSize);
+ BufferedReader reader = new BufferedReader(new InputStreamReader(boundedInputStream));
StringBuilder buffer = new StringBuilder();
while (true) {
String line = reader.readLine();
diff --git a/assembly/src/main/descriptors/common-bin.xml b/assembly/src/main/descriptors/common-bin.xml
index b1067c0..17ac1f9 100644
--- a/assembly/src/main/descriptors/common-bin.xml
+++ b/assembly/src/main/descriptors/common-bin.xml
@@ -182,6 +182,7 @@
<include>${pom.groupId}:activeio-core</include>
<include>commons-beanutils:commons-beanutils</include>
<include>commons-collections:commons-collections</include>
+ <include>commons-io:commons-io</include>
<include>org.apache.commons:commons-dbcp2</include>
<include>org.apache.commons:commons-pool2</include>
<include>commons-codec:commons-codec</include>
diff --git a/assembly/src/release/bin/env b/assembly/src/release/bin/env
index 947807b..a31d687 100644
--- a/assembly/src/release/bin/env
+++ b/assembly/src/release/bin/env
@@ -1,4 +1,3 @@
-#!/bin/sh
# ------------------------------------------------------------------------
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
diff --git a/pom.xml b/pom.xml
index 76ff4e8..4c81889 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,6 @@
<activemq-protobuf-version>1.1</activemq-protobuf-version>
<activesoap-version>1.3</activesoap-version>
<annogen-version>0.1.0</annogen-version>
- <ant-version>1.10.7</ant-version>
<ant-bundle-version>1.10.7_1</ant-bundle-version>
<aopalliance-version>1.0</aopalliance-version>
<aries-version>1.1.0</aries-version>
@@ -52,14 +51,14 @@
<cglib-version>2.2</cglib-version>
<commons-beanutils-version>1.9.4</commons-beanutils-version>
<commons-collections-version>3.2.2</commons-collections-version>
- <commons-daemon-version>1.2.2</commons-daemon-version>
- <commons-dbcp2-version>2.7.0</commons-dbcp2-version>
- <commons-io-version>2.6</commons-io-version>
+ <commons-daemon-version>1.2.3</commons-daemon-version>
+ <commons-dbcp2-version>2.8.0</commons-dbcp2-version>
+ <commons-io-version>2.8.0</commons-io-version>
<commons-lang-version>2.6</commons-lang-version>
<commons-logging-version>1.2</commons-logging-version>
<commons-pool2-version>2.8.0</commons-pool2-version>
<commons-primitives-version>1.0</commons-primitives-version>
- <commons-net-version>3.6</commons-net-version>
+ <commons-net-version>3.7.2</commons-net-version>
<directory-version>2.0.0.AM25</directory-version>
<ecj.version>3.17.0</ecj.version>
<ftpserver-version>1.1.1</ftpserver-version>
@@ -74,13 +73,13 @@
<httpcore-version>4.4.13</httpcore-version>
<insight-version>1.2.0.Beta4</insight-version>
<jackson-version>2.9.10</jackson-version>
- <jackson-databind-version>2.9.10.5</jackson-databind-version>
+ <jackson-databind-version>2.9.10.6</jackson-databind-version>
<jasypt-version>1.9.3</jasypt-version>
<jaxb-bundle-version>2.2.11_1</jaxb-bundle-version>
- <jetty9-version>9.4.28.v20200408</jetty9-version>
+ <jetty9-version>9.4.34.v20201102</jetty9-version>
<jetty-version>${jetty9-version}</jetty-version>
<jmdns-version>3.4.1</jmdns-version>
- <tomcat-api-version>9.0.35</tomcat-api-version>
+ <tomcat-api-version>9.0.39</tomcat-api-version>
<jettison-version>1.4.1</jettison-version>
<jmock-version>2.5.1</jmock-version>
<jolokia-version>1.6.2</jolokia-version>
@@ -111,16 +110,16 @@
<qpid-jms-proton-version>0.33.6</qpid-jms-proton-version>
<netty-all-version>4.1.51.Final</netty-all-version>
<regexp-version>1.3</regexp-version>
- <rome-version>1.12.2</rome-version>
+ <rome-version>1.15.0</rome-version>
<saxon-version>9.5.1-5</saxon-version>
<saxon-bundle-version>9.5.1-5_1</saxon-bundle-version>
<scala-plugin-version>3.1.0</scala-plugin-version>
<scala-version>2.11.11</scala-version>
- <shiro-version>1.5.3</shiro-version>
+ <shiro-version>1.7.0</shiro-version>
<scalatest-version>3.0.8</scalatest-version>
<slf4j-version>1.7.30</slf4j-version>
<snappy-version>1.1.2</snappy-version>
- <spring-version>4.3.26.RELEASE</spring-version>
+ <spring-version>4.3.29.RELEASE</spring-version>
<taglibs-version>1.2.5</taglibs-version>
<velocity-version>2.2</velocity-version>
<xalan-version>2.7.2</xalan-version>
@@ -1077,12 +1076,6 @@
<version>${commons-io-version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.ant</groupId>
- <artifactId>ant</artifactId>
- <version>${ant-version}</version>
- </dependency>
-
<!-- ACTIVEMQ-WEB Specific Dependencies -->
<dependency>
<groupId>com.rometools</groupId>